Source code for pymagicc.core

import shutil
import subprocess  # nosec # have to use subprocess
import warnings
from collections import Counter
from copy import deepcopy
from os import listdir, makedirs
from os.path import abspath, basename, dirname, exists, isfile, join
from subprocess import PIPE  # nosec # have to use subprocess
from tempfile import mkdtemp

import f90nml
import numpy as np
import pandas as pd
from dateutil.relativedelta import relativedelta
from openscm_units import unit_registry
from scmdata import run_append

from .config import _wine_installed, config
from .errors import InvalidTemporalResError, NoReaderWriterError
from .io import MAGICCData, read_cfg_file
from .io.utils import _get_openscm_var_from_filepath
from .scenarios import zero_emissions
from .utils import get_date_time_string

IS_WINDOWS = config["is_windows"]


[docs]class WineNotInstalledError(Exception): """Exception raised if wine is not installed but is required"""
def _copy_files(source, target, recursive=False): """ Copy all the files in source directory to target. If ``recursive``, include subdirectories, otherwise ignores subdirectories. """ if recursive: shutil.copytree(source, target) return source_files = listdir(source) if not exists(target): makedirs(target) for filename in source_files: full_filename = join(source, filename) if isfile(full_filename): shutil.copy(full_filename, target) def _clean_value(v): if isinstance(v, str): return v.strip() elif isinstance(v, list): if isinstance(v[0], str): return [i.replace("\0", "").strip().replace("\n", "") for i in v] return v
[docs]class MAGICCBase(object): """ Provides access to the MAGICC binary and configuration. To enable multiple MAGICC 'setups' to be configured independently, the MAGICC directory containing the input files, configuration and binary is copied to a new folder. The configuration in this MAGICC copy can then be edited without impacting other instances or your original MAGICC distribution. A ``MAGICC`` instance first has to be setup by calling ``create_copy``. If many model runs are being performed this step only has to be performed once. The ``run`` method can then be called many times without re-copying the files each time. Between each call to ``run``, the configuration files can be updated to perform runs with different configurations. Parameters ---------- root_dir : str If ``root_dir`` is supplied, an existing MAGICC 'setup' is used. """ version = None _scen_file_name = "SCENARIO.SCEN7" def __init__(self, root_dir=None, strict=True): """ Initialise Parameters ---------- root_dir : str Root directory of the MAGICC package. If ``None``, a temporary copy of MAGICC is made based on the result of ` `self.get_exectuable()``. strict: bool If True, enforce the configuration checks, otherwise a warning is raised if any invalid configuration is found and the run is continued. Setting ``strict=False`` is only recommended for experienced users of MAGICC. """ self.root_dir = root_dir self.config = None self.executable = self.get_executable() self.strict = strict if root_dir is not None: self.is_temp = False else: # Create a temp directory self.is_temp = True def __enter__(self): if self.is_temp and self.run_dir is None: self.create_copy() return self def __exit__(self, *args, **kwargs): self.remove_temp_copy()
[docs] def create_copy(self): """ Initialises a temporary directory structure and copy of MAGICC configuration files and binary. The root folder and ``bin`` folders are copied (not recursively). The ``run`` folder is copied recursively. """ if self.executable is None or not isfile(self.executable): raise FileNotFoundError( "Could not find MAGICC{} executable: {}".format( self.version, self.executable ) ) if self.is_temp: if self.root_dir is not None: raise AssertionError( "A temp copy for this instance has already been created" ) self.root_dir = mkdtemp(prefix="pymagicc-") if exists(self.run_dir): raise Exception("A copy of MAGICC has already been created.") if not exists(self.root_dir): makedirs(self.root_dir) exec_dir = basename(self.original_dir) # Copy a subset of folders from the MAGICC `original_dir` # Also copy anything which is in the root of the MAGICC distribution # Assumes that the MAGICC binary is in a folder one level below the root # of the MAGICC distribution. i.e. /run/magicc.exe or /bin/magicc dirs_to_copy = [".", "bin"] dirs_to_copy_recursive = ["run"] # Check that the executable is in a valid sub directory if exec_dir not in dirs_to_copy + dirs_to_copy_recursive: raise AssertionError("binary must be in bin/ or run/ directory") for d in dirs_to_copy + dirs_to_copy_recursive: source_dir = abspath(join(self.original_dir, "..", d)) if exists(source_dir): _copy_files( source_dir, join(self.root_dir, d), recursive=d in dirs_to_copy_recursive, ) # Create an empty out dir # MAGICC assumes that the 'out' directory already exists makedirs(join(self.root_dir, "out")) # Create basic configuration files so magicc can run self.set_years() self.set_config()
@property def binary_name(self): """ Name of the MAGICC binary file Returns ------- str Name of the binary file """ return basename(self.executable) @property def original_dir(self): """ Directory of the MAGICC package. This is the directory which contains the ``run`` and ``out`` folders. Returns ------- str Path of the MAGICC package """ return dirname(self.executable) @property def run_dir(self): """ Run directory of the MAGICC package. This path always ends in ``run``. Returns ------- str Path of the run directory """ if self.root_dir is None: return None return join(self.root_dir, "run") @property def out_dir(self): """ Output directory of the MAGICC package. This path always ends in ``out``. Returns ------- str Path of the output directory """ if self.root_dir is None: return None return join(self.root_dir, "out") @property def default_config(self): """ Default configuration for a run Returns ------- :obj:`f90nml.Namelist` Namelist object containing the default configuration """ base = f90nml.read(join(self.run_dir, "MAGCFG_DEFAULTALL.CFG")) user = f90nml.read(join(self.run_dir, "MAGCFG_USER.CFG")) self._default_config = deepcopy(base) def _deep_update(b, o): for k, v in o.items(): if isinstance(v, dict): _deep_update(b[k], v) else: b.update(o) _deep_update(self._default_config, user) return self._default_config
[docs] def run(self, scenario=None, only=None, debug=False, **kwargs): """ Run MAGICC and parse the output. As a reminder, putting ``out_parameters=1`` will cause MAGICC to write out its parameters into ``out/PARAMETERS.OUT`` and they will then be read into ``output.metadata["parameters"]`` where ``output`` is the returned object. Any logged output from running magicc will be in``output.metadata["stderr"]``. For MAGICC7 and above, The level of logging can be controlled with the ``debug`` argument. Any subannual files output by MAGICC will be ignored by this function. These files can be read in manually using :class:`pymagicc.io.MAGICCData` directly. Parameters ---------- scenario : :obj:`pymagicc.io.MAGICCData` Scenario to run. If None MAGICC will simply run with whatever config has already been set. only : list of str If not None, only extract variables in this list. debug: {True, False, "verbose"} If true, MAGICC will run in debug mode with the maximum amount of logging. If "verbose", MAGICC will be run in verbose mode. kwargs Other config values to pass to MAGICC for the run Returns ------- :obj:`pymagicc.io.MAGICCData` MAGICCData object containing that data in its ``df`` attribute and metadata and parameters (depending on the value of ``include_parameters``) in its ``metadata`` attribute. Raises ------ ValueError If no output is found which matches the list specified in ``only``. subprocess.CalledProcessError If MAGICC fails to run. Check the 'stderr' key of the result's `metadata` attribute to inspect the results output from MAGICC. ValueError The user attempts to use ``debug`` with MAGICC6 """ if not exists(self.root_dir): raise FileNotFoundError(self.root_dir) if self.executable is None: raise ValueError( "MAGICC executable not found, try setting an environment variable `MAGICC_EXECUTABLE_{}=/path/to/binary`".format( self.version ) ) if scenario is not None: kwargs = self.set_emission_scenario_setup(scenario, kwargs) yr_config = {} if "startyear" in kwargs: yr_config["startyear"] = kwargs.pop("startyear") if "endyear" in kwargs: yr_config["endyear"] = kwargs.pop("endyear") if yr_config: self.set_years(**yr_config) # should be able to do some other nice metadata stuff re how magicc was run # etc. here kwargs.setdefault("rundate", get_date_time_string()) self.update_config(**kwargs) self.check_config() exec_dir = basename(self.original_dir) command = [join(self.root_dir, exec_dir, self.binary_name)] if self.version >= 7: if debug == "verbose": command.append("--verbose") elif debug: command.append("--debug") elif debug: raise ValueError("MAGICC6 has no debug capability") if not IS_WINDOWS and self.binary_name.endswith(".exe"): # pragma: no cover if not _wine_installed: raise WineNotInstalledError( "Wine is not installed but is required to run `.exe` binaries" ) command.insert(0, "wine") try: res = subprocess.run( # nosec # on Windows shell=True is required command, check=True, # thank you https://stackoverflow.com/a/53209196 for Python 3.6 hack stdout=PIPE, stderr=PIPE, cwd=self.run_dir, shell=IS_WINDOWS, ) except subprocess.CalledProcessError as exc: print("stderr:\n{}".format(exc.stderr.decode())) raise exc outfiles = self._get_output_filenames() read_cols = {"climate_model": ["MAGICC{}".format(self.version)]} if scenario is not None: read_cols["model"] = scenario["model"].unique().tolist() read_cols["scenario"] = scenario["scenario"].unique().tolist() else: read_cols.setdefault("model", ["unspecified"]) read_cols.setdefault("scenario", ["unspecified"]) mdata = [] for filepath in outfiles: if filepath.startswith("DAT_VOLCANIC_RF.") or "SUBANN" in filepath: warnings.warn( "Not reading file: {}. Monthly data are not read in automatically by `run`. " "Use `MAGICCData` instead.".format(filepath) ) continue try: openscm_var = _get_openscm_var_from_filepath(filepath) if only is None or openscm_var in only: tempdata = MAGICCData( join(self.out_dir, filepath), columns=deepcopy(read_cols) ) mdata.append(tempdata) except (NoReaderWriterError, InvalidTemporalResError): # TODO: something like warnings.warn("Could not read {}".format(filepath)) continue if not mdata and only is not None: raise ValueError("No output found for only={}".format(only)) if not mdata: if self.strict: raise ValueError("No output found. Check configuration") else: # No data was loaded return an empty MAGICCData object mdata = MAGICCData( data={}, columns={ "model": [], "unit": [], "variable": [], "region": [], "scenario": [], }, ) else: mdata = run_append(mdata) try: run_paras = self.read_parameters() self.config = run_paras mdata.metadata["parameters"] = run_paras except FileNotFoundError: pass mdata.metadata["stderr"] = res.stderr.decode("ascii") levels_to_warn = ["WARNING", "ERROR", "FATAL"] for level in levels_to_warn: if "<{}>".format(level) in mdata.metadata["stderr"]: warnings.warn( "magicc logged a {} message. Check the 'stderr' key of the " "result's `metadata` attribute.".format(level) ) return mdata
def _get_output_filenames(self): outfiles = [f for f in listdir(self.out_dir) if f != "PARAMETERS.OUT"] bin_out = [ f.split(".")[0] for f in outfiles if f.startswith("DAT_") and f.endswith(".BINOUT") ] extras = [] for f in outfiles: var_name, ext = f.split(".") if ext != "BINOUT" and var_name not in bin_out: extras.append(f) return [f + ".BINOUT" for f in bin_out] + extras def _check_failed(self, msg): if self.strict: raise ValueError(msg) else: warnings.warn(msg)
[docs] def check_config(self): """Check that our MAGICC ``.CFG`` files are set to safely work with PYMAGICC For further detail about why this is required, please see :ref:`MAGICC flags`. Raises ------ ValueError If we are not certain that the config written by PYMAGICC will overwrite all other config i.e. that there will be no unexpected behaviour. A ValueError will also be raised if the user tries to use more than one scenario file. """ cfg_error_msg = ( "PYMAGICC is not the only tuning model that will be used by " "`MAGCFG_USER.CFG`: your run is likely to fail/do odd things" ) emisscen_error_msg = ( "You have more than one `FILE_EMISSCEN_X` flag set. Using more than " "one emissions scenario is hard to debug and unnecessary with " "Pymagicc's Dataframe scenario input. Please combine all your " "scenarios into one Dataframe with Pymagicc and Pandas, then feed " "this single Dataframe into Pymagicc's run API." ) nml_to_check = "nml_allcfgs" usr_cfg = read_cfg_file(join(self.run_dir, "MAGCFG_USER.CFG")) for k in usr_cfg[nml_to_check]: if k.startswith("file_tuningmodel"): first_tuningmodel = k in ["file_tuningmodel", "file_tuningmodel_1"] if first_tuningmodel: if usr_cfg[nml_to_check][k] != "PYMAGICC": self._check_failed(cfg_error_msg) elif usr_cfg[nml_to_check][k] not in ["USER", ""]: self._check_failed(cfg_error_msg) elif k.startswith("file_emisscen_"): if usr_cfg[nml_to_check][k] not in ["NONE", ""]: self._check_failed(emisscen_error_msg) self._check_config()
[docs] def write(self, mdata, name): """Write an input file to disk Parameters ---------- mdata : :obj:`pymagicc.io.MAGICCData` A MAGICCData instance with the data to write name : str The name of the file to write. The file will be written to the MAGICC instance's run directory i.e. ``self.run_dir`` """ mdata.write(join(self.run_dir, name), self.version)
[docs] def read_parameters(self): """ Read a parameters.out file Returns ------- dict A dictionary containing all the configuration used by MAGICC """ param_fname = join(self.out_dir, "PARAMETERS.OUT") if not exists(param_fname): raise FileNotFoundError("No PARAMETERS.OUT found") with open(param_fname) as nml_file: parameters = dict(f90nml.read(nml_file)) for group in ["nml_years", "nml_allcfgs", "nml_outputcfgs"]: parameters[group] = dict(parameters[group]) for k, v in parameters[group].items(): parameters[group][k] = _clean_value(v) parameters[group.replace("nml_", "")] = parameters.pop(group) self.config = parameters return parameters
[docs] def remove_temp_copy(self): """ Removes a temporary copy of the MAGICC version shipped with Pymagicc. """ if self.is_temp and self.root_dir is not None: shutil.rmtree(self.root_dir) self.root_dir = None
[docs] def set_config( self, filename="MAGTUNE_PYMAGICC.CFG", top_level_key="nml_allcfgs", conflict="warn", **kwargs, ): """ Create a configuration file for MAGICC. Writes a fortran namelist in run_dir. Parameters ---------- filename : str Name of configuration file to write top_level_key : str Name of namelist to be written in the configuration file conflict : {'warn', 'ignore'} If 'warn', when a flag needs to be replaced by a different name (because, for example, the flag name changed between MAGICC versions), a warning is raised. If 'ignore', no warning is raised when a replacement is required. kwargs Other parameters to pass to the configuration file. No validation on the parameters is performed. Returns ------- dict The contents of the namelist which was written to file Warning ------- If a key is renamed, a warning is raised Raises ------ ValueError An invalid value for ``conflict`` is supplied """ kwargs = self._check_and_format_config(kwargs) fname = join(self.run_dir, filename) conf = {top_level_key: kwargs} conf = self._fix_legacy_keys(conf, conflict=conflict) f90nml.write(conf, fname, force=True) return conf
[docs] def update_config( self, filename="MAGTUNE_PYMAGICC.CFG", top_level_key="nml_allcfgs", conflict="warn", **kwargs, ): """Updates a configuration file for MAGICC Updates the contents of a fortran namelist in the run directory, creating a new namelist if none exists. Parameters ---------- filename : str Name of configuration file to write top_level_key : str Name of namelist to be written in the configuration file conflict : {'warn', 'ignore'} If 'warn', when a flag needs to be replaced by a different name (because, for example, the flag name changed between MAGICC versions), a warning is raised. If 'ignore', no warning is raised when a replacement is required. kwargs Other parameters to pass to the configuration file. No validation on the parameters is performed. Returns ------- dict The contents of the namelist which was written to file Warning ------- If a key is renamed, a warning is raised Raises ------ ValueError An invalid value for ``conflict`` is supplied """ kwargs = self._check_and_format_config(kwargs) fname = join(self.run_dir, filename) if exists(fname): conf = f90nml.read(fname) else: conf = {top_level_key: {}} conf[top_level_key].update(kwargs) conf = self._fix_legacy_keys(conf, conflict=conflict) f90nml.write(conf, fname, force=True) return conf
def _fix_legacy_keys(self, conf, conflict="warn"): """ Go through config and fix any keys which are misnamed. For example, fix any keys which have been renamed between MAGICC versions to match the new names. Parameters ---------- conf :obj:`f90nml.Namelist` Configuration to check conflict : {'warn', 'ignore'} If 'warn', when a conflict is found, a warning is raised. If 'ignore', no warning is raised when a conflict is found. Returns ------- :obj:`f90nml.Namelist` Configuration with updated keys Warning ------- If a key is renamed, a warning is raised Raises ------ ValueError An invalid value for ``conflict`` is supplied """ valid_conflicts = ["warn", "ignore"] if conflict not in valid_conflicts: raise ValueError("`conflict` must be one of: {}".format(valid_conflicts)) cfg_key = "nml_allcfgs" if cfg_key not in conf: return conf new_conf = deepcopy(conf) for wrong_key, right_key in self._config_renamings.items(): if wrong_key in new_conf[cfg_key]: new_conf[cfg_key][right_key] = new_conf[cfg_key].pop(wrong_key) if conflict == "warn": warnings.warn( "Altering config flag {} to {}".format(wrong_key, right_key) ) return new_conf
[docs] def set_zero_config(self): """Set config such that radiative forcing and temperature output will be zero This method is intended as a convenience only, it does not handle everything in an obvious way. Adjusting the parameter settings still requires great care and may behave unepexctedly. """ # zero_emissions is imported from scenarios module # TODO: setup MAGICC6 so it puts extra variables in right place and hence # warning about ignoring some data disappears zero_emissions.write(join(self.run_dir, self._scen_file_name), self.version) time = zero_emissions.filter(variable="Emissions|CH4", region="World")[ "time" ].values no_timesteps = len(time) # value doesn't actually matter as calculations are done from difference but # chose sensible value nonetheless co2_conc_pi = 722 co2_conc = co2_conc_pi * np.ones(no_timesteps) co2_conc_df = pd.DataFrame( { "time": time, "scenario": "idealised", "model": "unspecified", "climate_model": "unspecified", "variable": "Atmospheric Concentrations|CO2", "unit": "ppm", "todo": "SET", "region": "World", "value": co2_conc, } ) co2_conc_writer = MAGICCData(co2_conc_df) co2_conc_filename = "HIST_CONSTANT_CO2_CONC.IN" co2_conc_writer.metadata = { "header": "Constant pre-industrial CO2 concentrations" } co2_conc_writer.write(join(self.run_dir, co2_conc_filename), self.version) ch4_conc_pi = 722 ch4_conc = ch4_conc_pi * np.ones(no_timesteps) ch4_conc_df = pd.DataFrame( { "time": time, "scenario": "idealised", "model": "unspecified", "climate_model": "unspecified", "variable": "Atmospheric Concentrations|CH4", "unit": "ppb", "todo": "SET", "region": "World", "value": ch4_conc, } ) ch4_conc_writer = MAGICCData(ch4_conc_df) ch4_conc_filename = "HIST_CONSTANT_CH4_CONC.IN" ch4_conc_writer.metadata = { "header": "Constant pre-industrial CH4 concentrations" } ch4_conc_writer.write(join(self.run_dir, ch4_conc_filename), self.version) fgas_conc_pi = 0 fgas_conc = fgas_conc_pi * np.ones(no_timesteps) varname = "FGAS_CONC" fgas_conc_df = pd.DataFrame( { "time": time, "scenario": "idealised", "model": "unspecified", "climate_model": "unspecified", "variable": varname, "unit": "ppt", "todo": "SET", "region": "World", "value": fgas_conc, } ) fgas_conc_writer = MAGICCData(fgas_conc_df) fgas_conc_filename = "HIST_ZERO_{}.IN".format(varname) fgas_conc_writer.metadata = {"header": "Zero concentrations"} fgas_conc_writer.write(join(self.run_dir, fgas_conc_filename), self.version) def_config = self.default_config tmp_nml = f90nml.Namelist({"nml_allcfgs": {"fgas_files_conc": 1}}) fgas_files_conc_flag = list( self._fix_legacy_keys(tmp_nml, conflict="ignore")["nml_allcfgs"].keys() )[0] fgas_conc_files = [fgas_conc_filename] * len( def_config["nml_allcfgs"][fgas_files_conc_flag] ) self.set_config( conflict="ignore", file_emisscen=self._scen_file_name, rf_initialization_method="ZEROSTARTSHIFT", rf_total_constantafteryr=10000, file_co2i_emis="", file_co2b_emis="", file_co2_conc=co2_conc_filename, co2_switchfromconc2emis_year=10000, file_ch4i_emis="", file_ch4b_emis="", file_ch4n_emis="", file_ch4_conc=ch4_conc_filename, ch4_switchfromconc2emis_year=10000, file_n2oi_emis="", file_n2ob_emis="", file_n2on_emis="", file_n2o_conc="", n2o_switchfromconc2emis_year=1750, file_noxi_emis="", file_noxb_emis="", file_noxi_ot="", file_noxb_ot="", file_noxt_rf="", file_soxnb_ot="", file_soxi_ot="", file_soxt_rf="", file_soxi_emis="", file_soxb_emis="", file_soxn_emis="", file_oci_emis="", file_ocb_emis="", file_oci_ot="", file_ocb_ot="", file_oci_rf="", file_ocb_rf="", file_bci_emis="", file_bcb_emis="", file_bci_ot="", file_bcb_ot="", file_bci_rf="", file_bcb_rf="", bcoc_switchfromrf2emis_year=1750, file_nh3i_emis="", file_nh3b_emis="", file_nmvoci_emis="", file_nmvocb_emis="", file_coi_emis="", file_cob_emis="", file_mineraldust_rf="", file_landuse_rf="", file_bcsnow_rf="", # rf_fgassum_scale=0, # this appears to do nothing, hence the next two lines fgas_switchfromconc2emis_year=10000, rf_mhalosum_scale=0, stratoz_o3scale=0, rf_volcanic_scale=0, rf_solar_scale=0, mhalo_switchfromconc2emis_year=1750, fgas_files_conc=fgas_conc_files, )
def _check_and_format_config(self, config_dict): self._check_for_duplicate_keys(config_dict) config_dict = self._convert_out_config_flags_to_integers(config_dict) return config_dict @staticmethod def _check_for_duplicate_keys(config_dict): keys_lower = [v.lower() for v in config_dict.keys()] counts = Counter(keys_lower) if any([v > 1 for v in counts.values()]): duplicate_keys = [ [ck for ck in config_dict.keys() if ck.lower() == k.lower()] for k, v in counts.items() if v > 1 ] error_msg = ( "The following configuration keys clash because configs are " "case insensitive: {}".format( ", ".join([str(v) for v in duplicate_keys]) ) ) raise ValueError(error_msg) @staticmethod def _convert_out_config_flags_to_integers(config_dict): valid_out_flags = [ "out_emissions", "out_gwpemissions", "out_sum_gwpemissions", "out_concentrations", "out_carboncycle", "out_forcing", "out_forcing_subannual", "out_temperature", "out_temperature_subannual", "out_sealevel", "out_parameters", "out_misc", "out_lifetimes", "out_timeseriesmix", "out_rcpdata", "out_summaryidx", "out_tempoceanlayers", "out_oceanarea", "out_heatuptake", "out_warnings", "out_precipinput", "out_aogcmtuning", "out_ccycletuning", "out_observationaltuning", "out_keydata_1", "out_keydata_2", "out_inverseemis", "out_surfaceforcing", "out_permafrost", "out_allowanydynamicvars", ] for key in valid_out_flags: if key in config_dict: # MAGICC expects 1 and 0 instead of True/False config_dict[key] = 1 if config_dict[key] else 0 return config_dict
[docs] def set_years(self, startyear=1765, endyear=2100): """ Set the start and end dates of the simulations. Parameters ---------- startyear : int Start year of the simulation endyear : int End year of the simulation Returns ------- dict The contents of the namelist """ # TODO: test altering stepsperyear, I think 1, 2 and 24 should all work return self.set_config( "MAGCFG_NMLYEARS.CFG", "nml_years", endyear=endyear, startyear=startyear, stepsperyear=12, )
[docs] def set_output_variables(self, write_ascii=True, write_binary=False, **kwargs): """Set the output configuration, minimising output as much as possible There are a number of configuration parameters which control which variables are written to file and in which format. Limiting the variables that are written to file can greatly speed up the running of MAGICC. By default, calling this function without specifying any variables will disable all output by setting all of MAGICC's ``out_xx`` flags to ``0``. This convenience function should not be confused with ``set_config`` or ``update_config`` which allow the user to set/update the configuration flags directly, without the more convenient syntax and default behaviour provided by this function. Parameters ---------- write_ascii : bool If true, MAGICC is configured to write output files as human readable ascii files. write_binary : bool If true, MAGICC is configured to write binary output files. These files are much faster to process and write, but are not human readable. **kwargs: List of variables to write out. A list of possible options are as follows. This may not be a complete list. 'emissions', 'gwpemissions', 'sum_gwpemissions', 'concentrations', 'carboncycle', 'forcing', 'surfaceforcing', 'permafrost', 'temperature', 'sealevel', 'parameters', 'misc', 'lifetimes', 'timeseriesmix', 'rcpdata', 'summaryidx', 'inverseemis', 'tempoceanlayers', 'oceanarea', 'heatuptake', 'warnings', 'precipinput', 'aogcmtuning', 'ccycletuning', 'observationaltuning', 'keydata_1', 'keydata_2' """ if not (write_ascii or write_binary): raise AssertionError("write_binary and/or write_ascii must be configured") if write_binary and write_ascii: ascii_binary = "BOTH" elif write_ascii: ascii_binary = "ASCII" else: ascii_binary = "BINARY" # defaults outconfig = { "out_emissions": 0, "out_gwpemissions": 0, "out_sum_gwpemissions": 0, "out_concentrations": 0, "out_carboncycle": 0, "out_forcing": 0, "out_surfaceforcing": 0, "out_permafrost": 0, "out_temperature": 0, "out_sealevel": 0, "out_parameters": 0, "out_misc": 0, "out_timeseriesmix": 0, "out_rcpdata": 0, "out_summaryidx": 0, "out_inverseemis": 0, "out_tempoceanlayers": 0, "out_heatuptake": 0, "out_ascii_binary": ascii_binary, "out_warnings": 0, "out_precipinput": 0, "out_aogcmtuning": 0, "out_ccycletuning": 0, "out_observationaltuning": 0, "out_keydata_1": 0, "out_keydata_2": 0, } if self.version == 7: outconfig["out_oceanarea"] = 0 outconfig["out_lifetimes"] = 0 for kw in kwargs: val = 1 if kwargs[kw] else 0 # convert values to 0/1 instead of booleans outconfig["out_" + kw.lower()] = val self.update_config(**outconfig)
[docs] def get_executable(self): """ Get path to MAGICC executable being used Returns ------- str Path to MAGICC executable being used """ return config["executable_{}".format(self.version)]
[docs] def diagnose_tcr_ecs_tcre(self, **kwargs): """ Diagnose TCR, ECS and TCRE The transient climate response (TCR), is the global-mean temperature response per unit cumulative |CO2| emissions at the time at which atmospheric |CO2| concentrations double in an experiment where atmospheric |CO2| concentrations are increased at 1% per year from pre-industrial levels (1pctCO2 experiment). The equilibrium climate sensitivity (ECS), is the equilibrium global-mean temperature response to an instantaneous doubling of atmospheric |CO2| concentrations (abrupt-2xCO2 experiment). The transient climate response to emissions (TCRE), is the global-mean temperature response per unit cumulative |CO2| emissions at the time at which atmospheric |CO2| concentrations double in the 1pctCO2 experiment. Please note that sometimes the run length won't be long enough to allow MAGICC's oceans to fully equilibrate and hence the ECS value might not be what you expect (it should match the value of ``core_climatesensitivity``). Parameters ---------- **kwargs parameter values to use in the diagnosis e.g. ``core_climatesensitivity=4`` Returns ------- dict Dictionary with keys: "ecs" - the diagnosed ECS; "tcr" - the diagnosed TCR; "tcre" - the diagnosed TCRE; "timeseries" - the relevant model input and output timeseries used in the experiment i.e. atmospheric |CO2| concentrations, inverse |CO2| emissions, total radiative forcing and global-mean surface temperature """ ecs_res = self.diagnose_ecs(**kwargs) tcr_tcre_res = self.diagnose_tcr_tcre(**kwargs) out = {**ecs_res, **tcr_tcre_res} out["timeseries"] = run_append( [ecs_res["timeseries"], tcr_tcre_res["timeseries"]] ) return out
[docs] def diagnose_ecs(self, **kwargs): """ Diagnose ECS The equilibrium climate sensitivity (ECS), is the equilibrium global-mean temperature response to an instantaneous doubling of atmospheric |CO2| concentrations (abrupt-2xCO2 experiment). Please note that sometimes the run length won't be long enough to allow MAGICC's oceans to fully equilibrate and hence the ECS value might not be what you expect (it should match the value of ``core_climatesensitivity``). Parameters ---------- **kwargs parameter values to use in the diagnosis e.g. ``core_climatesensitivity=4`` Returns ------- dict Dictionary with keys: "ecs" - the diagnosed ECS; "timeseries" - the relevant model input and output timeseries used in the experiment i.e. atmospheric |CO2| concentrations, inverse |CO2| emissions, total radiative forcing and global-mean surface temperature """ self._diagnose_ecs_config_setup(**kwargs) timeseries = self.run( scenario=None, only=[ "Atmospheric Concentrations|CO2", "Radiative Forcing", "Surface Temperature", ], ) timeseries["scenario"] = "abrupt-2xCO2" ecs = self.get_ecs_from_diagnosis_results(timeseries) return {"ecs": ecs, "timeseries": timeseries}
[docs] def diagnose_tcr_tcre(self, **kwargs): """ Diagnose TCR and TCRE The transient climate response (TCR), is the global-mean temperature response per unit cumulative |CO2| emissions at the time at which atmospheric |CO2| concentrations double in an experiment where atmospheric |CO2| concentrations are increased at 1% per year from pre-industrial levels (1pctCO2 experiment). The transient climate response to emissions (TCRE), is the global-mean temperature response per unit cumulative |CO2| emissions at the time at which atmospheric |CO2| concentrations double in the 1pctCO2 experiment. Parameters ---------- **kwargs parameter values to use in the diagnosis e.g. ``core_climatesensitivity=4`` Returns ------- dict Dictionary with keys: "tcr" - the diagnosed TCR; "tcre" - the diagnosed TCRE; "timeseries" - the relevant model input and output timeseries used in the experiment i.e. atmospheric |CO2| concentrations, inverse |CO2| emissions, total radiative forcing and global-mean surface temperature """ self._diagnose_tcr_tcre_config_setup(**kwargs) timeseries = self.run( scenario=None, only=[ "Atmospheric Concentrations|CO2", "INVERSEEMIS", "Radiative Forcing", "Surface Temperature", ], ) # drop all the irrelevant inverse emissions timeseries = timeseries.filter( variable="Inverse Emissions*", level=1, keep=False ) # drop the final year as concs stay constant from some reason, # MAGICC bug... timeseries = timeseries.filter(time=timeseries["time"].max(), keep=False) timeseries["scenario"] = "1pctCO2" tcr, tcre = self.get_tcr_tcre_from_diagnosis_results(timeseries) return {"tcr": tcr, "tcre": tcre, "timeseries": timeseries}
def _diagnose_ecs_config_setup(self, **kwargs): self.set_years( startyear=1750, endyear=4200 ) # 4200 seems to be the max I can push too without an error self.update_config( FILE_CO2_CONC="ABRUPT2XCO2_CO2_CONC.IN", CO2_SWITCHFROMCONC2EMIS_YEAR=30000, RF_TOTAL_RUNMODUS="CO2", RF_TOTAL_CONSTANTAFTERYR=2000, **kwargs, ) def _diagnose_tcr_tcre_config_setup(self, **kwargs): self.set_years(startyear=1750, endyear=2020) self.update_config( FILE_CO2_CONC="1PCTCO2_CO2_CONC.IN", CO2_SWITCHFROMCONC2EMIS_YEAR=30000, RF_TOTAL_RUNMODUS="CO2", RF_TOTAL_CONSTANTAFTERYR=3000, OUT_INVERSEEMIS=1, **kwargs, )
[docs] def get_ecs_from_diagnosis_results(self, results_ecs_run): """ Diagnose ECS from the results of the abrupt-2xCO2 experiment Parameters ---------- results_ecs_run : :obj:`ScmRun` Results of the abrupt-2xCO2 experiment, must contain atmospheric |CO2| concentrations, total radiative forcing and surface temperature. Returns ------- ecs : :obj:`pint.quantity.Quantity` ECS diagnosed from ``results_ecs_run`` """ global_co2_concs = results_ecs_run.filter( variable="Atmospheric Concentrations|CO2", region="World" ) ecs_time, ecs_start_time = self._get_ecs_ecs_start_yr_from_CO2_concs( global_co2_concs ) global_total_rf = results_ecs_run.filter( variable="Radiative Forcing", region="World" ) self._check_ecs_total_RF(global_total_rf, jump_time=ecs_start_time) global_temp = results_ecs_run.filter( variable="Surface Temperature", region="World" ) self._check_ecs_temp(global_temp) ecs = float(global_temp.filter(time=ecs_time).values.squeeze()) unit = global_temp.get_unique_meta("unit", no_duplicates=True) ecs = ecs * unit_registry(unit) return ecs
[docs] def get_tcr_tcre_from_diagnosis_results(self, results_tcr_tcre_run): """ Diagnose TCR and TCRE from the results of the 1pctCO2 experiment Parameters ---------- results_tcr_tcre_run : :obj:`ScmRun` Results of the 1pctCO2 experiment, must contain atmospheric |CO2| concentrations, inverse |CO2| emissions, total radiative forcing and surface temperature. Returns ------- tcr, tcre : :obj:`pint.quantity.Quantity`, :obj:`pint.quantity.Quantity` TCR and TCRE diagnosed from ``results_tcr_tcre_run`` """ global_co2_concs = results_tcr_tcre_run.filter( variable="Atmospheric Concentrations|CO2", region="World" ) (tcr_time, tcr_start_time,) = self._get_tcr_tcr_start_yr_from_CO2_concs( global_co2_concs ) if tcr_time.year != tcr_start_time.year + 70: # pragma: no cover # emergency raise AssertionError("Has the definition of TCR and TCRE changed?") global_inverse_co2_emms = results_tcr_tcre_run.filter( variable="Inverse Emissions|CO2|MAGICC Fossil and Industrial", region="World", ) global_total_rf = results_tcr_tcre_run.filter( variable="Radiative Forcing", region="World" ) self._check_tcr_tcre_total_RF(global_total_rf, tcr_time=tcr_time) global_temp = results_tcr_tcre_run.filter( variable="Surface Temperature", region="World" ) self._check_tcr_tcre_temp(global_temp) tcr = float(global_temp.filter(time=tcr_time).values.squeeze()) tcr_unit = global_temp.get_unique_meta("unit", no_duplicates=True) tcr = tcr * unit_registry(tcr_unit) tcre_cumulative_emms = float( global_inverse_co2_emms.filter( year=range(tcr_start_time.year, tcr_time.year) ).values.sum() ) emms_unit = global_inverse_co2_emms.get_unique_meta("unit", no_duplicates=True) years = global_inverse_co2_emms["year"].values.squeeze() if not np.all((years[1:] - years[:-1]) == 1): # pragma: no cover raise AssertionError( "TCR/TCRE diagnosis assumed to be on annual timestep. Please " "raise an issue at " "https://github.com/openscm/pymagicc/issues to discuss " "your use case" ) # can now safely assume that our simple sum has done the right thing tcre_cumulative_emms_unit = unit_registry(emms_unit) * unit_registry("yr") tcre_cumulative_emms = tcre_cumulative_emms * tcre_cumulative_emms_unit tcre = tcr / tcre_cumulative_emms return tcr, tcre
def _get_ecs_ecs_start_yr_from_CO2_concs(self, df_co2_concs): co2_concs = df_co2_concs.timeseries() co2_conc_0 = co2_concs.iloc[0, 0] t_start = co2_concs.columns.min() t_end = co2_concs.columns.max() ecs_start_time = co2_concs.iloc[ :, co2_concs.values.squeeze() > co2_conc_0 ].columns[0] spin_up_co2_concs = ( _filter_time_range(df_co2_concs, lambda x: t_start <= x < ecs_start_time) .timeseries() .values.squeeze() ) if not (spin_up_co2_concs == co2_conc_0).all(): raise ValueError( "The ECS CO2 concs look wrong, they are not constant before they start rising" ) co2_conc_final = 2 * co2_conc_0 eqm_co2_concs = ( _filter_time_range(df_co2_concs, lambda x: ecs_start_time <= x <= t_end) .timeseries() .values.squeeze() ) if not np.isclose(eqm_co2_concs, co2_conc_final).all(): raise ValueError( "The ECS CO2 concs look wrong, they are not constant after doubling" ) ecs_time = df_co2_concs["time"].iloc[-1] return ecs_time, ecs_start_time def _get_tcr_tcr_start_yr_from_CO2_concs(self, df_co2_concs): co2_concs = df_co2_concs.timeseries() co2_conc_0 = co2_concs.iloc[0, 0] t_start = co2_concs.columns.min() t_end = co2_concs.columns.max() tcr_start_time = co2_concs.iloc[ :, co2_concs.values.squeeze() > co2_conc_0 ].columns[0] - relativedelta(years=1) tcr_time = tcr_start_time + relativedelta(years=70) spin_up_co2_concs = ( _filter_time_range(df_co2_concs, lambda x: t_start <= x <= tcr_start_time) .timeseries() .values.squeeze() ) if not (spin_up_co2_concs == co2_conc_0).all(): raise ValueError( "The TCR/TCRE CO2 concs look wrong, they are not constant before they start rising" ) actual_rise_co2_concs = ( _filter_time_range(df_co2_concs, lambda x: tcr_start_time <= x <= t_end) .timeseries() .values.squeeze() ) # this will blow up if we switch to diagnose tcr/ecs with a monthly run... expected_rise_co2_concs = co2_conc_0 * 1.01 ** np.arange( len(actual_rise_co2_concs) ) rise_co2_concs_correct = np.isclose( actual_rise_co2_concs, expected_rise_co2_concs ).all() if not rise_co2_concs_correct: raise ValueError("The TCR/TCRE CO2 concs look wrong during the rise period") return tcr_time, tcr_start_time def _check_ecs_total_RF(self, df_total_rf, jump_time): total_rf = df_total_rf.timeseries() total_rf_max = total_rf.values.squeeze().max() t_start = total_rf.columns.min() t_end = total_rf.columns.max() spin_up_rf = ( _filter_time_range(df_total_rf, lambda x: t_start <= x < jump_time) .timeseries() .values.squeeze() ) if not (spin_up_rf == 0).all(): raise ValueError( "The ECS total radiative forcing looks wrong, it is not all zero before concentrations start rising" ) eqm_rf = ( _filter_time_range(df_total_rf, lambda x: jump_time <= x <= t_end) .timeseries() .values.squeeze() ) if not (eqm_rf == total_rf_max).all(): raise ValueError( "The ECS total radiative forcing looks wrong, it is not constant after concentrations double" ) def _check_tcr_tcre_total_RF(self, df_total_rf, tcr_time): total_rf = df_total_rf.timeseries() t_start = total_rf.columns.min() tcr_start_time = tcr_time - relativedelta(years=70) spin_up_rf = ( _filter_time_range(df_total_rf, lambda x: t_start <= x <= tcr_start_time) .timeseries() .values.squeeze() ) if not (spin_up_rf == 0).all(): raise ValueError( "The TCR/TCRE total radiative forcing looks wrong, it is not all zero before concentrations start rising" ) rf_vls = total_rf.values.squeeze() rf_minus_previous_yr = rf_vls[1:] - rf_vls[:-1] if not np.all(rf_minus_previous_yr >= 0): raise ValueError( "The TCR/TCRE total radiative forcing looks wrong, it is not rising after concentrations start rising" ) def _check_ecs_temp(self, df_temp): self._check_tcr_ecs_tcre_temp( df_temp, "The ECS surface temperature looks wrong, it decreases" ) def _check_tcr_tcre_temp(self, df_temp): self._check_tcr_ecs_tcre_temp( df_temp, "The TCR/TCRE surface temperature looks wrong, it decreases" ) def _check_tcr_ecs_tcre_temp(self, df_temp, message): tmp_vls = df_temp.timeseries().values.squeeze() tmp_minus_previous_yr = tmp_vls[1:] - tmp_vls[:-1] if not np.all(tmp_minus_previous_yr >= 0): raise ValueError(message)
[docs] def set_emission_scenario_setup(self, scenario, config_dict): """Set the emissions flags correctly. Parameters ---------- scenario : :obj:`pymagicc.io.MAGICCData` Scenario to run. config_dict : dict Dictionary with current input configurations which is to be validated and updated where necessary. Returns ------- dict Updated configuration """ self.write(scenario, self._scen_file_name) emis_flag = list( self._fix_legacy_keys( f90nml.Namelist({"nml_allcfgs": {"file_emisscen": "junk"}}), conflict="ignore", )["nml_allcfgs"].keys() )[0] config_dict[emis_flag] = self._scen_file_name return config_dict
def _check_config(self): """ Check config above and beyond those checked by ``self.check_config`` """ pass
[docs]class MAGICC6(MAGICCBase): version = 6 _scen_file_name = "SCENARIO.SCEN" _config_renamings = { "file_emisscen": "file_emissionscenario", "fgas_files_conc": "file_fgas_conc", "mhalo_switchfromconc2emis_year": "mhalo_switch_conc2emis_yr", } @property def default_config(self): """ Default configuration to use in a run """ base = f90nml.read(join(self.run_dir, "MAGCFG_DEFAULTALL_69.CFG")) user = f90nml.read(join(self.run_dir, "MAGCFG_USER.CFG")) self._default_config = deepcopy(base) self._default_config.update(user) return self._default_config def _check_tcr_ecs_tcre_total_RF(self, df_total_rf, tcr_time, ecs_time): super()._check_tcr_ecs_tcre_total_RF(df_total_rf, tcr_time, ecs_time) # can be more careful with checks MAGICC6 only has logarithmic CO2 forcing # i.e. linear rise in forcing total_rf = df_total_rf.timeseries() total_rf_max = total_rf.values.squeeze().max() tcre_start_time = tcr_time - relativedelta(years=70) actual_rise_rf = ( _filter_time_range(df_total_rf, lambda x: tcre_start_time <= x <= tcr_time) .timeseries() .values.squeeze() ) # this will blow up if we switch to diagnose tcr/ecs with a monthly run... expected_rise_rf = total_rf_max / 70.0 * np.arange(71) rise_rf_correct = np.isclose(actual_rise_rf, expected_rise_rf).all() if not rise_rf_correct: raise ValueError( "The TCR/ECS/TCRE total radiative forcing looks wrong during the rise period" ) def _check_config(self): cfg = self.update_config() if "file_emissionscenario" in cfg["nml_allcfgs"]: if cfg["nml_allcfgs"]["file_emissionscenario"].endswith("SCEN7"): self._check_failed("MAGICC6 cannot run SCEN7 files")
[docs]class MAGICC7(MAGICCBase): version = 7 _config_renamings = { "file_emissionscenario": "file_emisscen", "file_fgas_conc": "fgas_files_conc", "mhalo_switch_conc2emis_yr": "mhalo_switchfromconc2emis_year", }
[docs] def create_copy(self): """ Initialises a temporary directory structure and copy of MAGICC configuration files and binary. This will also overwrite the value of all ``file_tuningmodel_x`` flags to ensure that Pymagicc's configurations will be read. If ``self.strict``, this will also overwrite the value of all ``file_emisscen_x`` flags to ensure that only Pymagicc's scenario input is used. This overwrite behaviour can be removed once the MAGICC7 binary is publicly released as we can then create a Pymagicc specific MAGCFG_USER.CFG rather than relying on whatever is in the user's current copy. """ super(MAGICC7, self).create_copy() self.update_config( "MAGCFG_USER.CFG", **{ "file_tuningmodel_1": "PYMAGICC", "file_tuningmodel_2": "USER", "file_tuningmodel_3": "USER", "file_tuningmodel_4": "USER", "file_tuningmodel_5": "USER", "file_tuningmodel_6": "USER", "file_tuningmodel_7": "USER", "file_tuningmodel_8": "USER", "file_tuningmodel_9": "USER", "file_tuningmodel_10": "USER", }, ) if self.strict: self.update_config( "MAGCFG_USER.CFG", **{ "file_emisscen_2": "NONE", "file_emisscen_3": "NONE", "file_emisscen_4": "NONE", "file_emisscen_5": "NONE", "file_emisscen_6": "NONE", "file_emisscen_7": "NONE", "file_emisscen_8": "NONE", }, )
def _diagnose_tcr_ecs_tcre_config_setup(self, **kwargs): super()._diagnose_tcr_ecs_tcre_config_setup(**kwargs) # also need to lock CH4 and N2O in case OLBL forcing mode is being used self.update_config( FILE_CH4_CONC="TCRECS_CH4_CONC.IN", CH4_SWITCHFROMCONC2EMIS_YEAR=30000, FILE_N2O_CONC="TCRECS_N2O_CONC.IN", N2O_SWITCHFROMCONC2EMIS_YEAR=30000, ) def _check_config(self): pass
def _filter_time_range(scmdf, filter_func): # TODO: move into openscm tdf = scmdf.timeseries() tdf = tdf.iloc[:, tdf.columns.map(filter_func)] return MAGICCData(tdf)