Source code for nomenclature.processor.region

import logging
from collections import Counter
from pathlib import Path

import numpy as np
import pandas as pd
import pyam
import yaml
from pyam import IamDataFrame
from pyam.utils import adjust_log_level
from pydantic import (
    AfterValidator,
    BaseModel,
    ConfigDict,
    Field,
    ValidationInfo,
    field_serializer,
    field_validator,
    model_validator,
    validate_call,
)
from pydantic.types import DirectoryPath, FilePath
from typing_extensions import Annotated

from nomenclature.codelist import RegionCodeList, VariableCodeList
from nomenclature.definition import DataStructureDefinition
from nomenclature.exceptions import (
    ConstituentsNotNativeError,
    ExcludeRegionOverlapError,
    RegionNameCollisionError,
    RegionNotDefinedError,
    UnknownRegionError,
)
from nomenclature.processor import Processor
from nomenclature.utils import get_relative_path

logger = logging.getLogger(__name__)

here = Path(__file__).parent.absolute()


class NativeRegion(BaseModel):
    """Define a model native region.

    Can optionally have a renaming attribute which is applied in the region processing.

    Attributes
    ----------
    name : str
        Name of the model native region.
    rename: str, optional
        Optional second name that the region will be renamed to.
    """

    name: str
    rename: str | None = None

    @property
    def target_native_region(self) -> str:
        """Return the resulting name, i.e. either rename or, if not given, name.

        Returns
        -------
        str
            Resulting name.
        """
        return self.rename if self.rename is not None else self.name


class CommonRegion(BaseModel):
    """Common region used for model intercomparison.

    Attributes
    ----------
    name : str
        Name of the common region.
    constituent_regions:
        List of strings which refer to the original (not renamed, see
        :class:`NativeRegion`) names of model native regions.
    """

    name: str
    constituent_regions: list[str]

    @property
    def is_single_constituent_region(self):
        return len(self.constituent_regions) == 1

    @property
    def rename_dict(self):
        if self.is_single_constituent_region:
            return {self.constituent_regions[0]: self.name}
        raise AttributeError(
            "rename_dict is only available for single constituent regions"
        )


class RegionAggregationMapping(BaseModel):
    """Hold information for region processing on a per-model basis.

    Region processing is comprised of native region selection and potentially renaming
    as well as aggregation to "common regions" (regions used for reporting and
    comparison by multiple models).

    Attributes
    ----------
    model: str
        Name of the model for which RegionAggregationMapping is defined.
    file: FilePath
        File path of the mapping file. Saved mostly for error reporting purposes.
    native_regions: list[NativeRegion], optional
        Optionally, list of model native regions to select and potentially rename.
    common_regions: list[CommonRegion], optional
        Optionally, list of common regions where aggregation will be performed.
    exclude_regions: list[str], optional
        Optionally, list of model native regions to exclude from processing.
    """

    model: list[str]
    file: FilePath
    native_regions: list[NativeRegion] = Field(default_factory=list)
    common_regions: list[CommonRegion] = Field(default_factory=list)
    exclude_regions: list[str] = Field(default_factory=list)

    model_config = ConfigDict(extra="forbid")

    @field_validator("model", mode="before")
    @classmethod
    def convert_to_list(cls, v):
        return pyam.utils.to_list(v)

    @field_validator("native_regions")
    @classmethod
    def check_native_regions_name(cls, v, info: ValidationInfo):
        """
        Check that each native region source name must appear *at most* once as:
        - A region without renaming (keep original name), and/or
        - A region with renaming (assign new name)
        """
        keep = [nr.name for nr in v if nr.rename is None]
        rename = [nr.name for nr in v if nr.rename is not None]
        keep_dups = [item for item, count in Counter(keep).items() if count > 1]
        rename_dups = [item for item, count in Counter(rename).items() if count > 1]
        if keep_dups or rename_dups:
            raise RegionNameCollisionError(
                {
                    "location": "native regions (names)",
                    "duplicates": list(set(keep_dups + rename_dups)),
                    "file": info.data["file"],
                },
            )
        return v

    @field_validator("native_regions")
    @classmethod
    def check_native_regions_target(cls, v, info: ValidationInfo):
        """Check that target region names (after renaming, if applicable) are unique."""
        target_names = [nr.target_native_region for nr in v]
        duplicates = [
            item for item, count in Counter(target_names).items() if count > 1
        ]
        if duplicates:
            raise RegionNameCollisionError(
                {
                    "location": "native regions (rename-targets)",
                    "duplicates": duplicates,
                    "file": info.data["file"],
                }
            )
        return v

    @field_validator("common_regions")
    @classmethod
    def check_common_regions_unique(cls, v, info: ValidationInfo):
        """Check for duplicate common (target) regions and self-referencing
        (source in target) regions."""
        names = [cr.name for cr in v]
        self_referencing = [cr.name for cr in v if cr.name in cr.constituent_regions]
        duplicates = [item for item, count in Counter(names).items() if count > 1]
        if duplicates or self_referencing:
            raise RegionNameCollisionError(
                {
                    "location": "common regions",
                    "duplicates": list(set(duplicates + self_referencing)),
                    "file": info.data["file"],
                },
            )
        return v

    @model_validator(mode="after")
    @classmethod
    def check_native_or_common_regions(
        cls, v: "RegionAggregationMapping"
    ) -> "RegionAggregationMapping":
        """Check that at least one of the following is provided: native or common regions."""
        if not v.native_regions and not v.common_regions:
            raise ValueError(
                "At least one of 'native_regions' and 'common_regions' must be "
                f"provided in {v.file}"
            )
        return v

    @model_validator(mode="after")
    @classmethod
    def check_native_common_region_no_overlap(
        cls, v: "RegionAggregationMapping"
    ) -> "RegionAggregationMapping":
        """Check that native region target names do not overlap with common region names."""
        native_region_names = {nr.target_native_region for nr in v.native_regions}
        common_region_names = {cr.name for cr in v.common_regions}
        overlap = list(native_region_names & common_region_names)
        if overlap:
            raise RegionNameCollisionError(
                {
                    "location": "native and common regions",
                    "duplicates": overlap,
                    "file": v.file,
                }
            )
        return v

    @model_validator(mode="after")
    @classmethod
    def check_exclude_native_region_overlap(
        cls, v: "RegionAggregationMapping"
    ) -> "RegionAggregationMapping":
        return _check_exclude_region_overlap(v, "native_regions")

    @model_validator(mode="after")
    @classmethod
    def check_exclude_common_region_overlap(
        cls, v: "RegionAggregationMapping"
    ) -> "RegionAggregationMapping":
        return _check_exclude_region_overlap(v, "common_regions")

    @model_validator(mode="after")
    @classmethod
    def check_constituent_regions_in_native_regions(
        cls, v: "RegionAggregationMapping"
    ) -> "RegionAggregationMapping":
        """Check that all constituent regions in common regions are listed as native regions."""
        if v.common_regions and v.native_regions:
            if missing := set(
                [cr for r in v.common_regions for cr in r.constituent_regions]
            ).difference([r.name for r in v.native_regions] + v.exclude_regions):
                raise ConstituentsNotNativeError({"regions": missing, "file": v.file})
        return v

    @classmethod
    def from_file(cls, file: Path | str) -> "RegionAggregationMapping":
        """Initialize a RegionAggregationMapping from a file.

        Parameters
        ----------
        file : Path | str
            Path to a file which contains region aggregation information for one model.

        Returns
        -------
        RegionAggregationMapping
            The resulting region aggregation mapping.

        Notes
        -----

        This function is used to convert a model mapping file into a dictionary
        which is used to initialize a RegionAggregationMapping.
        """

        file = Path(file) if isinstance(file, str) else file
        FILE_PARSERS = {
            ".yaml": cls.from_yaml,
            ".yml": cls.from_yaml,
            ".xlsx": cls.from_excel,
        }
        if file.suffix in FILE_PARSERS:
            return FILE_PARSERS[file.suffix](file)
        raise ValueError(f"No parser implemented for {file.suffix}")

    @classmethod
    def from_yaml(cls, file: Path) -> "RegionAggregationMapping":
        """Initialize a RegionAggregationMapping from a yaml file.

        Parameters
        ----------
        file : Path
            Path to a yaml file which contains region aggregation information for one model.

        Returns
        -------
        RegionAggregationMapping
            The resulting region aggregation mapping.
        """
        try:
            with open(file, "r", encoding="utf-8") as f:
                mapping_input = yaml.safe_load(f)

            # Add the file name to mapping_input
            mapping_input["file"] = get_relative_path(file)

            # Reformat the "native_regions"
            if "native_regions" in mapping_input:
                native_region_list: list[dict] = []
                for native_region in mapping_input["native_regions"]:
                    if isinstance(native_region, str):
                        native_region_list.append({"name": native_region})
                    elif isinstance(native_region, dict):
                        native_region_list.append(
                            {
                                "name": list(native_region)[0],
                                "rename": list(native_region.values())[0],
                            }
                        )
                mapping_input["native_regions"] = native_region_list

            # Reformat the "common_regions"
            if "common_regions" in mapping_input:
                common_region_list: list[dict[str, list[dict[str, str]]]] = []
                for common_region in mapping_input["common_regions"]:
                    common_region_name = list(common_region)[0]
                    common_region_list.append(
                        {
                            "name": common_region_name,
                            "constituent_regions": common_region[common_region_name],
                        }
                    )
                mapping_input["common_regions"] = common_region_list
        except Exception as error:
            raise ValueError(f"{error} in {get_relative_path(file)}") from error
        return cls(**mapping_input)

    @classmethod
    def from_excel(cls, file: Path) -> "RegionAggregationMapping":
        """Initialize a RegionAggregationMapping from a spreadsheet file.

        Parameters
        ----------
        file : Path
            Path to a spreadsheet file which contains region aggregation information for one model.

        Returns
        -------
        RegionAggregationMapping
            The resulting region aggregation mapping.
        """
        try:
            model = pd.read_excel(file, sheet_name="Model", usecols="B", nrows=1).iloc[
                0, 0
            ]

            regions = pd.read_excel(file, sheet_name="Common-Region-Mapping", header=3)
            regions = regions.drop(
                columns=(c for c in regions.columns if c.startswith("Unnamed: "))
            ).drop(index=0)
            # Replace nan with None
            regions = regions.where(pd.notnull(regions), None)
            native = "Native region (as reported by the model)"
            rename = "Native region (after renaming)"
            native_regions = [
                NativeRegion(name=row[native], rename=row[rename])
                for row in regions[[native, rename]].to_dict(orient="records")
            ]
            common_region_groups = [
                r for r in regions.columns if r not in (native, rename)
            ]
            common_regions = [
                CommonRegion(
                    name=common_region,
                    constituent_regions=constituent_regions.split(","),
                )
                for common_region_group in common_region_groups
                for common_region, constituent_regions in regions[
                    [native, common_region_group]
                ]
                .groupby(common_region_group)[native]
                .apply(lambda x: ",".join(x))
                .to_dict()
                .items()
            ]
            common_region_names = [
                common_region.name for common_region in common_regions
            ]

            if "R5" in common_region_groups and "World" not in common_region_names:
                r5_regions = [
                    region for region in common_regions if "(R5)" in region.name
                ]
                # Only add "World" from R5-constituent region if all R5 regions given
                if len(r5_regions) in [5, 6]:
                    constituent_world_regions = sorted(
                        region
                        for r5_regions in r5_regions
                        for region in r5_regions.constituent_regions
                    )
                    common_regions.insert(
                        0,
                        CommonRegion(
                            name="World", constituent_regions=constituent_world_regions
                        ),
                    )
        except Exception as error:
            raise ValueError(f"{error} in {get_relative_path(file)}") from error
        return cls(
            model=model,
            file=file,
            native_regions=native_regions,
            common_regions=common_regions,
        )

    @property
    def all_regions(self) -> list[str]:
        """List of all native and common regions in the mapping."""
        # For the native regions we take the **renamed** (if given) names
        nr_list = [x.target_native_region for x in self.native_regions or []]
        return nr_list + self.common_region_names

    @property
    def model_native_region_names(self) -> list[str]:
        """List of the original model native region names."""
        return [x.name for x in self.native_regions or []]

    @property
    def common_region_names(self) -> list[str]:
        """List of the common region names."""
        return [x.name for x in self.common_regions or []]

    @property
    def rename_mapping(self) -> dict[str, str]:
        """Mapping from original native region names to renamed native region names."""
        return {
            r.name: r.target_native_region
            for r in self.native_regions or []
            if r.rename is not None
        }

    @property
    def reverse_rename_mapping(self) -> dict[str, str]:
        """Mapping from renamed native region names to original native region names."""
        return {renamed: original for original, renamed in self.rename_mapping.items()}

    @property
    def upload_native_regions(self) -> list[str]:
        """List of native region names to be uploaded."""
        return [
            native_region.target_native_region
            for native_region in self.native_regions or []
        ]

    @property
    def models(self) -> list[str]:
        return self.model

    def check_unexpected_regions(self, df: IamDataFrame) -> None:
        """Raise an error if there are regions in the input data that are not in the model mapping."""

        if regions_not_found := set(df.region) - set(
            self.model_native_region_names
            + self.common_region_names
            + [
                constituent_region
                for common_region in self.common_regions or []
                for constituent_region in common_region.constituent_regions
            ]
            + (self.exclude_regions or [])
        ):
            raise ValueError(
                f"Did not find region(s) {regions_not_found} in 'native_regions', "
                "'common_regions' or 'exclude_regions' in model mapping for "
                f"{self.model} in {self.file}. If they are not meant to be included "
                "in the results add to the 'exclude_regions' section in the model "
                "mapping to silence this error."
            )

    def __eq__(self, other: "RegionAggregationMapping") -> bool:
        return self.model_dump(exclude={"file"}) == other.model_dump(exclude={"file"})

    @field_serializer("model", when_used="json")
    def serialize_model(self, model) -> str | list[str]:
        return model[0] if len(model) == 1 else model

    @field_serializer("native_regions", when_used="json")
    def serialize_native_regions(self, native_regions) -> list:
        return [
            (
                {native_region.name: native_region.rename}
                if native_region.rename
                else native_region.name
            )
            for native_region in native_regions
        ]

    @field_serializer("common_regions", when_used="json")
    def serialize_common_regions(self, common_regions) -> list:
        return [
            {common_region.name: common_region.constituent_regions}
            for common_region in common_regions
        ]

    def to_yaml(self, file: Path) -> None:
        """Write the RegionAggregationMapping to a yaml file."""
        with open(file, "w", encoding="utf-8") as f:
            yaml.dump(
                self.model_dump(mode="json", exclude_defaults=True, exclude={"file"}),
                f,
                sort_keys=False,
                allow_unicode=True,
            )


def validate_with_definition(v: RegionAggregationMapping, info: ValidationInfo):
    """Check if mappings valid with respect to RegionCodeList."""
    if invalid := info.data["region_codelist"].validate_items(v.all_regions):
        raise RegionNotDefinedError({"regions": invalid, "file": v.file})
    return v


[docs] class RegionProcessor(Processor): """Region aggregation mappings for scenario processing""" region_codelist: RegionCodeList variable_codelist: VariableCodeList mappings: dict[ str, Annotated[RegionAggregationMapping, AfterValidator(validate_with_definition)], ] model_config = ConfigDict(hide_input_in_errors=True)
[docs] @classmethod @validate_call(config={"arbitrary_types_allowed": True}) def from_directory(cls, path: DirectoryPath, dsd: DataStructureDefinition): """Initialize a RegionProcessor from a directory of model-aggregation mappings. Parameters ---------- path : DirectoryPath Directory which holds all the mappings. dsd : DataStructureDefinition Instance of DataStructureDefinition used for validation of mappings and region aggregation. Returns ------- RegionProcessor The resulting region processor object. Raises ------ ValueError Raised in case there are multiple mappings defined for the same model or there is an issue with region the RegionAggregationMapping AttributeError Raised if the provided DataStructureDefinition does not contain the dimensions ``region`` and ``variable``. """ mapping_dict: dict[str, RegionAggregationMapping] = {} errors: list[Exception] = [] mapping_files = [mapping_file for mapping_file in path.glob("**/*.y*ml")] # Read model mappings from external repositories for repository in dsd.config.mappings.repositories: repo_mapping_dir = ( dsd.config.repositories[repository.name].local_path / "mappings" ) parsed_repo_mappings: list[RegionAggregationMapping] = [] for mapping_file in repo_mapping_dir.glob("**/*.y*ml"): try: parsed_repo_mappings.append( RegionAggregationMapping.from_file(mapping_file) ) except ValueError as error: errors.append(error) for mapping in parsed_repo_mappings: for model in repository.match_models(mapping.models): if model not in mapping_dict: mapping_dict[model] = mapping else: errors.append( ValueError( "Multiple region aggregation mappings for " f"model {model} in [{mapping.file}, " f"{mapping_dict[model].file}]" ) ) # Read model mappings from the local repository for mapping_file in mapping_files: try: mapping = RegionAggregationMapping.from_file(mapping_file) for model in mapping.models: if model not in mapping_dict: mapping_dict[model] = mapping else: errors.append( ValueError( "Multiple region aggregation mappings for " f"model {model} in [{mapping.file}, " f"{mapping_dict[model].file}]" ) ) except ValueError as error: errors.append(error) if errors: raise ExceptionGroup("Found errors in RegionProcessor", errors) if missing_dims := [ dim for dim in ("region", "variable") if not hasattr(dsd, dim) ]: raise AttributeError( "Provided DataStructureDefinition is missing the following " f"attributes: {missing_dims}" ) return cls( mappings=mapping_dict, region_codelist=dsd.region, variable_codelist=dsd.variable, )
[docs] def apply(self, df: IamDataFrame) -> IamDataFrame: """Apply region processing Parameters ---------- df : IamDataFrame Input data that the region processing is applied to Returns ------- IamDataFrame: Processed data Raises ------ ValueError * If *df* contains regions that are not listed in the model mapping, or * If the region-processing results in an empty **IamDataFrame**. """ processed_dfs: list[IamDataFrame] = [] for model in df.model: model_df = df.filter(model=model) # if no mapping is defined the data frame is returned unchanged if model not in self.mappings: logger.info( f"Skipping region aggregation for model '{model}' (no region processing mapping)" ) processed_dfs.append(model_df) # otherwise we first rename, then aggregate else: file = self.mappings[model].file logger.info( f"Applying region-processing for model '{model}' from '{file}'" ) processed_dfs.append(self._apply_region_processing(model_df)[0]) res = pyam.concat(processed_dfs) if not_defined_regions := self.region_codelist.validate_items(res.region): raise UnknownRegionError(not_defined_regions) return res
[docs] def check_region_aggregation( self, df: IamDataFrame, rtol_difference: float = 0.01 ) -> tuple[IamDataFrame, pd.DataFrame]: """Return region aggregation results and differences between aggregated and model native data Parameters ---------- df : IamDataFrame Input data rtol_difference : float, optional limit on the relative tolerance for differences, by default 0.01 Returns ------- tuple[IamDataFrame, pd.DataFrame] IamDataFrame containing aggregation results and pandas dataframe containing the differences """ region_processing_results = [ self._apply_region_processing( df.filter(model=model), rtol_difference=rtol_difference, return_aggregation_difference=True, ) for model in set(df.model) & set(self.mappings) ] return pyam.concat(res[0] for res in region_processing_results), pd.concat( res[1] for res in region_processing_results )
def _apply_region_processing( self, model_df: IamDataFrame, return_aggregation_difference: bool = False, rtol_difference: float = 0.01, ) -> tuple[IamDataFrame, pd.DataFrame]: """Apply region processing for a single model""" if len(model_df.model) != 1: raise ValueError( f"Must be called for a unique model, found: {model_df.model}" ) model = model_df.model[0] self.mappings[model].check_unexpected_regions(model_df) _processed_data: list[pd.Series] = [] # Silence pyam's empty filter warnings with adjust_log_level(logger="pyam", level="ERROR"): # Native region handling # Unchanged regions are added to processed data directly keep = [ r.name for r in self.mappings[model].native_regions if r.rename is None ] keep_df = model_df.filter(region=keep) if not keep_df.empty: _processed_data.append(keep_df._data) # Renamed regions are added to processed data rename = [ r.name for r in self.mappings[model].native_regions if r.rename is not None ] rename_df = model_df.filter(region=rename) if not rename_df.empty: _processed_data.append( rename_df.rename(region=self.mappings[model].rename_mapping)._data ) # Aggregation for common_region in self.mappings[model].common_regions: # Single constituent common regions are a special rename case # (technically aggregated, so aggregation-skipped variables are excluded) if common_region.is_single_constituent_region: _df = model_df.filter( region=common_region.constituent_regions[0], variable=self.variable_codelist.region_aggregation_variables, ).rename(region=common_region.rename_dict) if not _df.empty: _processed_data.append(_df._data) else: # Use aggregation function aggregated = aggregate_region_with_variable_rules( model_df, common_region.name, common_region.constituent_regions, self.variable_codelist, ) _processed_data.extend(aggregated) # Compare & merge with pre-aggregated data _data, difference = merge_with_preaggregated_data( model_df, _processed_data, self.mappings[model].common_region_names, self.variable_codelist, rtol_difference, return_aggregation_difference, model, ) return IamDataFrame(_data, meta=model_df.meta), difference def revert(self, df: pyam.IamDataFrame) -> pyam.IamDataFrame: """Revert region processing by removing common regions and applying inverse renaming.""" model_dfs = [] for model in df.model: model_df = df.filter(model=model) if mapping := self.mappings.get(model): model_df = model_df.filter( region=mapping.common_region_names, keep=False ).rename(region=mapping.reverse_rename_mapping) model_dfs.append(model_df) return pyam.concat(model_dfs)
[docs] def get_common_region_country_mapping(self, model: str) -> dict[str, list[str]]: """Return a mapping from common region names to constituent countries for a model. Parameters ---------- model : str Name of the model. Returns ------- dict[str, list[str]] Dictionary mapping each common region name to the aggregated list of countries from all of its constituent native regions. """ mapping = self.mappings[model] result: dict[str, list[str]] = {} for common_region in mapping.common_regions: countries: list[str] = [] for constituent in common_region.constituent_regions: # Apply renaming if applicable to get the correct region name target_name = mapping.rename_mapping.get(constituent, constituent) region_code = self.region_codelist[target_name] if region_code.countries: countries.extend(region_code.countries) result[common_region.name] = countries return result
[docs] def get_native_region_country_mapping(self, model: str) -> dict[str, list[str]]: """Return a mapping from (renamed) native region names to countries for a model. Parameters ---------- model : str Name of the model. Returns ------- dict[str, list[str]] Dictionary mapping each native region name (after any renaming) to its list of countries. """ mapping = self.mappings[model] return { nr.target_native_region: self.region_codelist[ nr.target_native_region ].countries or [] for nr in mapping.native_regions }
def aggregate_region_with_variable_rules( df: IamDataFrame, target_region: str, constituent_regions: list[str], variable_codelist: VariableCodeList, ) -> list[pd.Series]: """ Core region aggregation logic with variable-specific rules. This is the shared aggregation engine used by different processors. It handles: - Variables with simple aggregation (sum) - Variables with weighted aggregation - Variables with custom methods - Variables with skip_region_aggregation flag Parameters ---------- df : IamDataFrame Source data target_region : str Name of region to create constituent_regions : list of str Regions to aggregate from variable_codelist : VariableCodeList Variable definitions with aggregation rules Returns ------- list of pd.Series Aggregated data series """ aggregated_data = [] regions = [target_region, constituent_regions] # Simple aggregation (default sum) simple_vars = [var for var in variable_codelist.vars_default_agg_args(df.variable)] if simple_vars: _df = df.aggregate_region(simple_vars, *regions) if _df is not None and not _df.empty: aggregated_data.append(_df._data) # Weighted/special aggregation for var in variable_codelist.vars_special_agg_kwargs(df.variable): if var.region_aggregation is None: # Standard weighted aggregation _df = _aggregate_region(df, var.name, *regions, **var.pyam_agg_kwargs) if _df is not None and not _df.empty: aggregated_data.append(_df._data) else: # Aggregation with variable renaming for rename_var in var.region_aggregation: for _rename, _kwargs in rename_var.items(): _df = _aggregate_region(df, var.name, *regions, **_kwargs) if _df is not None and not _df.empty: aggregated_data.append( _df.rename(variable={var.name: _rename})._data ) return aggregated_data def merge_with_preaggregated_data( model_df: IamDataFrame, aggregated_data: list[pd.Series], target_regions: list[str], variable_codelist: VariableCodeList, rtol_difference: float = 0.01, return_aggregation_difference: bool = False, model_name: str = "", ) -> tuple[pd.Series, pd.DataFrame]: """Merge aggregated data with any pre-aggregated data that exists at target regions. Parameters ---------- model_df : IamDataFrame Original model data aggregated_data : list of pd.Series List of aggregated data series target_regions : list of str Regions to filter for pre-aggregated data variable_codelist : VariableCodeList Variables to include rtol_difference : float Relative tolerance for comparison return_aggregation_difference : bool Whether to return difference dataframe model_name : str Model name for error messages Returns ------- tuple of (pd.Series, pd.DataFrame) Merged data and difference report """ # Filter for pre-aggregated data pre_aggregated_df = model_df.filter( region=target_regions, variable=variable_codelist, ) difference = pd.DataFrame() if aggregated_data: _data = pd.concat(aggregated_data) if not pre_aggregated_df.empty: _data, difference = _compare_and_merge( pre_aggregated_df._data, _data, rtol_difference, return_aggregation_difference, ) elif not pre_aggregated_df.empty: _data = pre_aggregated_df._data else: raise ValueError( f"Region-processing for model '{model_name}' returned an empty dataset" ) return _data, difference def _aggregate_region(df, var, *regions, **kwargs): """Perform region aggregation with kwargs catching inconsistent-index errors""" try: return df.aggregate_region(var, *regions, **kwargs) except ValueError as error: if str(error).startswith("Missing weights for the following data"): logger.warning( f"Could not aggregate '{var}' for region '{regions[0]}' ({kwargs})" ) else: raise error def _compare_and_merge( original: pd.Series, aggregated: pd.Series, rtol: float = 0.01, return_aggregation_difference: bool = False, ) -> tuple[IamDataFrame, pd.DataFrame]: """Compare and merge original and aggregated results""" # Compare aggregated (processed) and original data at the common-region level compare = pd.merge( left=original.rename(index="original"), right=aggregated.rename(index="aggregated"), how="outer", left_index=True, right_index=True, ) # Drop rows that are not in conflict compare = compare.dropna() difference = compare[ ~np.isclose(compare["original"], compare["aggregated"], rtol=rtol) ] difference.insert( len(difference.columns), "difference (%)", 100 * np.abs( (difference["original"] - difference["aggregated"]) / difference["original"] ), ) difference = difference.sort_values("difference (%)", ascending=False) if difference is not None and len(difference): with pd.option_context("display.max_columns", None): logger.warning( f"Difference between original and aggregated data:\n{difference}" ) if not return_aggregation_difference: logger.info( "Please refer to the user guide of the nomenclature package: " "https://nomenclature-iamc.readthedocs.io/en/stable/user_guide" "/model-mapping.html#computing-differences-between-original-and" "-aggregated-data for obtaining the differences as " "dataframe or file." ) # Merge aggregated data onto original common-region data index = aggregated.index.difference(original.index) return pd.concat([original, aggregated[index]]), difference def _check_exclude_region_overlap( region_aggregation_mapping: RegionAggregationMapping, region_type: str ) -> RegionAggregationMapping: if ( region_aggregation_mapping.exclude_regions is None or getattr(region_aggregation_mapping, region_type) is None ): return region_aggregation_mapping if overlap := set(region_aggregation_mapping.exclude_regions) & { r.name for r in getattr(region_aggregation_mapping, region_type) }: raise ExcludeRegionOverlapError( { "region": overlap, "region_type": region_type, "file": region_aggregation_mapping.file, }, ) return region_aggregation_mapping