Source code for nomenclature.processor.nuts

import logging
import pyam
import pandas as pd

from collections import defaultdict
from pathlib import Path
from pyam import IamDataFrame
from pyam.utils import adjust_log_level
from pydantic import ConfigDict

from nomenclature.codelist import VariableCodeList, RegionCodeList
from nomenclature.definition import DataStructureDefinition
from nomenclature.processor import Processor
from nomenclature.processor.region import (
    aggregate_region_with_variable_rules,
    merge_with_preaggregated_data,
)
from nomenclature.exceptions import UnknownRegionError
from nomenclature.countries import countries
from nomenclature.nuts import nuts

logger = logging.getLogger(__name__)

here = Path(__file__).parent.absolute()

# EU27 member states alpha-2 codes (ISO 3166-1), membership as of 2026
EU27_ALPHA2: frozenset[str] = frozenset(
    {
        "AT",  # Austria
        "BE",  # Belgium
        "BG",  # Bulgaria
        "CY",  # Cyprus
        "CZ",  # Czechia
        "DE",  # Germany
        "DK",  # Denmark
        "EE",  # Estonia
        "ES",  # Spain
        "FI",  # Finland
        "FR",  # France
        "GR",  # Greece
        "HR",  # Croatia
        "HU",  # Hungary
        "IE",  # Ireland
        "IT",  # Italy
        "LT",  # Lithuania
        "LU",  # Luxembourg
        "LV",  # Latvia
        "MT",  # Malta
        "NL",  # Netherlands
        "PL",  # Poland
        "PT",  # Portugal
        "RO",  # Romania
        "SE",  # Sweden
        "SI",  # Slovenia
        "SK",  # Slovakia
    }
)
# Minimum number of EU27 member countries required to aggregate to "European Union"
EU27_MIN_COUNTRIES: int = 23
# UK alpha-2 code for "European Union and United Kingdom" aggregation
UK_ALPHA2: str = "UK"


[docs] class NutsProcessor(Processor): """NUTS region aggregation mappings for scenario processing""" variable_codelist: VariableCodeList region_codelist: RegionCodeList models: list[str] model_config = ConfigDict(hide_input_in_errors=True)
[docs] @classmethod def from_definition( cls, dsd: DataStructureDefinition, models: list[str] | None = None ): """Instantiate from a :class:`DataStructureDefinition`. Parameters ---------- dsd : DataStructureDefinition Project data structure definition. models : list[str], optional Models to apply NUTS aggregation to. Defaults to the list configured under ``config.processor.nuts`` in *dsd*. Raises ------ ValueError If no models are configured for NUTS processing. """ models = models or dsd.config.processor.nuts if not models: raise ValueError("No models configured for NUTS processor") return cls( variable_codelist=dsd.variable, region_codelist=dsd.region, models=models )
@property def nuts_codelist(self): return RegionCodeList( name="NUTS", mapping={ code.name: code for code in self.region_codelist.mapping.values() if code.hierarchy.startswith("NUTS ") }, )
[docs] def apply(self, df: IamDataFrame): """Apply NUTS region aggregation. Parameters ---------- df : IamDataFrame Input data to be aggregated. Returns ------- IamDataFrame Aggregated data. Raises ------ ValueError If a NUTS region in *df* is not listed in ``definitions.region.nuts``. UnknownRegionError If the result contains regions not defined in the region codelist. """ processed_dfs: list[IamDataFrame] = [] # Check for NUTS regions not listed in the configuration all_nuts = {r.code for r in nuts.get(level=[1, 2, 3])} if unaccounted_nuts := self.nuts_codelist.validate_df( df.filter(region=all_nuts), "region" ): raise ValueError( f"Did not find NUTS region(s) {unaccounted_nuts} in 'region.nuts' configuration." ) for model in df.model: model_df = df.filter(model=model) # Skip unlisted models if model not in self.models: logger.info( f"Skipping NUTS region aggregation for model '{model}' (no NUTS aggregation mapping)" ) processed_dfs.append(model_df) else: logger.info(f"Applying NUTS processing for model '{model}'") processed_dfs.append(self._apply_nuts_processing(model_df)[0]) res = pyam.concat(processed_dfs) if not_defined_regions := self.region_codelist.validate_items(res.region): raise UnknownRegionError(not_defined_regions) return res
def _aggregate_nuts_level( self, model_df: IamDataFrame, source_regions: list[str], parent_prefix_length: int, ) -> IamDataFrame: """Aggregate source NUTS regions to their parent region. Parameters ---------- model_df : IamDataFrame Input data source_regions : list[str] List of NUTS region codes to aggregate parent_prefix_length : int Length of parent region code (4 for NUTS2, 3 for NUTS1, 2 for country) Returns ------- IamDataFrame Aggregated data """ aggregated_data = [] # Group by parent region parent_groups = defaultdict(list) for source_region in source_regions: parent = source_region[:parent_prefix_length] parent_groups[parent].append(source_region) # Aggregate each parent from its constituents for parent_code, constituents in parent_groups.items(): parent = ( parent_code if len(parent_code) > 2 # If NUTS 1 > country, use name else countries.get(alpha_2=parent_code).name ) aggregated = aggregate_region_with_variable_rules( model_df, parent, constituents, self.variable_codelist, ) aggregated_data.extend(aggregated) return IamDataFrame(pd.concat(aggregated_data), meta=model_df.meta) def _aggregate_to_eu27(self, df: IamDataFrame) -> list[pd.Series]: """Aggregate country-level data to European Union (and United Kingdom). Aggregation is performed if at least 23 of the 27 EU member states are present in `df`. Aggregation to EU27+UK is additionally performed if the United Kingdom is also present. Both aggregations are **only** attempted if the target region is defined in the project's region codelist. If either target is not defined, the corresponding aggregation is silently skipped. Parameters ---------- df : IamDataFrame Country-level data (after NUTS aggregation). Returns ------- list[pd.Series] Aggregated EU data series (empty if threshold or codelist conditions are not met). """ eu27_names = {countries.get(alpha_2=alpha2).name for alpha2 in EU27_ALPHA2} uk_name = countries.get(alpha_2=UK_ALPHA2).name available_eu27 = eu27_names & set(df.region) result: list[pd.Series] = [] if len(available_eu27) < EU27_MIN_COUNTRIES: return result if "European Union" in self.region_codelist.mapping: logger.info( f"Aggregating {len(available_eu27)} EU27 member countries " "to 'European Union'" ) result.extend( aggregate_region_with_variable_rules( df, "European Union", sorted(available_eu27), self.variable_codelist, ) ) if ( "European Union and United Kingdom" in self.region_codelist.mapping and uk_name in set(df.region) ): logger.info( "Aggregating EU27 countries + United Kingdom to 'European Union and United Kingdom'" ) result.extend( aggregate_region_with_variable_rules( df, "European Union and United Kingdom", sorted(available_eu27) + [uk_name], self.variable_codelist, ) ) return result def _apply_nuts_processing( self, model_df: IamDataFrame, return_aggregation_difference: bool = False, rtol_difference: float = 0.01, ): """Apply the full NUTS aggregation pipeline for a single model. Parameters ---------- model_df : IamDataFrame Data for a single model. return_aggregation_difference : bool, optional Whether to return aggregation differences for diagnostics. rtol_difference : float, optional Relative tolerance used when comparing pre-aggregated country data against freshly aggregated values. Returns ------- tuple[IamDataFrame, Any] Processed data and (optionally populated) aggregation difference. """ model = model_df.model[0] _df = model_df.copy() # Silence pyam's empty filter warnings with adjust_log_level(logger="pyam", level="ERROR"): # NUTS3 > NUTS2 aggregation if nuts3_in_data := _df.filter(region={r.code for r in nuts.get(level=3)}): # Keep NUTS3, add aggregated NUTS2 _df = pyam.concat( [_df, self._aggregate_nuts_level(_df, nuts3_in_data.region, 4)] ) # NUTS2 > NUTS1 aggregation (uses original NUTS2 + aggregated NUTS2) if nuts2_in_data := _df.filter(region={r.code for r in nuts.get(level=2)}): # Keep NUTS2, add aggregated NUTS1 _df = pyam.concat( [_df, self._aggregate_nuts_level(_df, nuts2_in_data.region, 3)] ) # NUTS1 > Country aggregation (uses original NUTS1 + aggregated NUTS1) if nuts1_in_data := _df.filter(region={r.code for r in nuts.get(level=1)}): _nuts1_agg = self._aggregate_nuts_level(_df, nuts1_in_data.region, 2) # Compare & merge country-level aggregated data with any pre-aggregated # country data in the original model input _data, difference = merge_with_preaggregated_data( model_df, [_nuts1_agg._data] if nuts1_in_data else [], countries.names, self.variable_codelist, rtol_difference, return_aggregation_difference, model, ) # EU27(+UK) aggregation from country-level data _country_df = IamDataFrame(_data, meta=model_df.meta) if eu_target_regions := [ r for r in ("European Union", "European Union and United Kingdom") if r in self.region_codelist.mapping ]: _eu_aggregated = self._aggregate_to_eu27(_country_df) if _eu_aggregated: _eu_data, _ = merge_with_preaggregated_data( model_df, _eu_aggregated, eu_target_regions, self.variable_codelist, rtol_difference, return_aggregation_difference, model, ) _data = pd.concat([_data, _eu_data]) # Include all NUTS regions (source + intermediate aggregated levels) # that are present in the configured nuts_codelist if nuts_to_keep := set(_df.region) & set(self.nuts_codelist.mapping): _data = pd.concat([_data, _df.filter(region=list(nuts_to_keep))._data]) return IamDataFrame(_data, meta=model_df.meta), difference