import logging
import gc
import re
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any
from shutil import rmtree
import yaml
from git import Repo
from pyam import IamDataFrame
from pyam.str import escape_regexp
from pydantic import (
BaseModel,
ConfigDict,
Field,
ValidationInfo,
field_validator,
model_validator,
)
from nomenclature.exceptions import TimeDomainError, TimeDomainErrorGroup
from nomenclature.utils import handle_remove_readonly
logger = logging.getLogger(__name__)
class CodeListFromRepository(BaseModel):
"""
Configuration for a codelist from an external repository.
The `include` and `exclude` filters allow selecting which definitions to import.
"""
name: str
include: list[dict[str, Any]] = [{"name": "*"}]
exclude: list[dict[str, Any]] = Field(default_factory=list)
class CodeListConfig(BaseModel):
"""Configuration for a dimension's codelist.
This class lists external repositories for codelists, importing definitions
from remote sources.
"""
dimension: str | None = None
repositories: list[CodeListFromRepository] = Field(
default_factory=list, alias="repository"
)
model_config = ConfigDict(
extra="forbid", validate_by_name=True, validate_by_alias=True
)
@field_validator("repositories", mode="before")
@classmethod
def add_name_if_necessary(cls, v: list):
return [
{"name": repository} if isinstance(repository, str) else repository
for repository in v
]
@field_validator("repositories", mode="before")
@classmethod
def convert_to_list_of_repos(cls, v):
if not isinstance(v, list):
return [v]
return v
@property
def repository_dimension_path(self) -> str:
return f"definitions/{self.dimension}"
class RegionCodeListConfig(CodeListConfig):
"""
Configuration for a region codelist.
This class allows selecting which regions to import from external repositories
and importing the definitions for ISO3 countries and NUTS regions.
"""
country: bool = False
nuts: dict[str, str | list[str] | bool] | None = None
@field_validator("nuts")
@classmethod
def check_nuts(
cls, v: dict[str, str | list[str] | bool] | None
) -> dict[str, str | list[str] | bool] | None:
if v and not all(k in ["nuts-1", "nuts-2", "nuts-3"] for k in v.keys()):
raise ValueError(
"Invalid fields for `nuts` in configuration. "
"Allowed values are: 'nuts-1', 'nuts-2' and 'nuts-3'."
)
return v
class Repository(BaseModel):
"""Configuration for an external codelist repository."""
url: str
hash: str | None = None
release: str | None = None
local_path: Path | None = Field(default=None, validate_default=True)
# Defined via the `repository` name in the configuration
@model_validator(mode="after")
@classmethod
def check_hash_and_release(cls, v: "Repository") -> "Repository":
if v.hash and v.release:
raise ValueError("Either `hash` or `release` can be provided, not both.")
return v
@field_validator("local_path")
@classmethod
def check_path_empty(cls, v):
if v is not None:
raise ValueError("The `local_path` must not be set as part of the config.")
return v
@property
def revision(self):
return self.hash or self.release or "main"
@property
def has_auto_update(self) -> bool:
return self.hash is None and self.release is None
def fetch_repo(self, to_path):
to_path = to_path if isinstance(to_path, Path) else Path(to_path)
if not to_path.is_dir():
repo = Repo.clone_from(self.url, to_path)
else:
repo = Repo(to_path)
# If the URL has changed, remove existing directory and re-clone
if repo.remotes.origin.url != self.url:
logger.warning(
f"Repository URL changed from '{repo.remotes.origin.url}' to '{self.url}'. "
f"Re-cloning repository to '{to_path}'..."
)
repo.close() # Close repo before removing directory
del repo # Delete reference to allow garbage collection
gc.collect() # Force garbage collection to release file handles
rmtree(to_path, onerror=handle_remove_readonly)
repo = Repo.clone_from(self.url, to_path)
else:
repo.remotes.origin.fetch()
self.local_path = to_path
repo.git.reset("--hard")
repo.git.checkout(self.revision)
repo.git.reset("--hard")
repo.git.clean("-xdf")
if self.revision == "main":
repo.remotes.origin.pull()
self.check_external_repo_double_stacking()
def check_external_repo_double_stacking(self):
nomenclature_config = self.local_path / "nomenclature.yaml"
if nomenclature_config.is_file():
with open(nomenclature_config, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
if config.get("repositories"):
raise ValueError(
(
"External repos cannot again refer to external repos, "
f"found in nomenclature.yaml in '{self.url}'"
)
)
[docs]
class DataStructureConfig(BaseModel):
"""
Configuration class for the data structure definition.
This class defines the configuration for the main IAMC dimensions:
- scenario
- region
- variable
Each dimension can be configured with its own code list and repository sources.
"""
scenario: CodeListConfig = Field(default_factory=CodeListConfig)
region: RegionCodeListConfig = Field(default_factory=RegionCodeListConfig)
variable: CodeListConfig = Field(default_factory=CodeListConfig)
@field_validator("scenario", "region", "variable", mode="before")
@classmethod
def add_dimension(cls, v, info: ValidationInfo):
return {"dimension": info.field_name, **v}
@property
def repos(self) -> dict[str, str]:
return {
dimension: getattr(self, dimension).repositories
for dimension in ("scenario", "region", "variable")
if getattr(self, dimension).repositories
}
class MappingRepository(BaseModel):
"""Configuration for a mapping repository."""
name: str
include: list[str] = ["*"]
@property
def regex_include_patterns(self):
return [re.compile(escape_regexp(pattern) + "$") for pattern in self.include]
def match_models(self, models: list[str]) -> list[str]:
return [
model
for model in models
for pattern in self.regex_include_patterns
if re.match(pattern, model) is not None
]
class RegionMappingConfig(BaseModel):
"""Configuration for region mapping/aggregation external repositories."""
repositories: list[MappingRepository] = Field(
default_factory=list, alias="repository"
)
model_config = ConfigDict(validate_by_name=True, validate_by_alias=True)
@field_validator("repositories", mode="before")
@classmethod
def add_name_if_necessary(cls, v: list):
return [
{"name": repository} if isinstance(repository, str) else repository
for repository in v
]
@field_validator("repositories", mode="before")
@classmethod
def convert_to_set_of_repos(cls, v):
if not isinstance(v, list):
return [v]
return v
class ProcessorConfig(BaseModel):
"""Configuration for region processor settings."""
nuts: list[str] = Field(default_factory=list, alias="nuts-processor")
region_processor: bool = Field(default=False, alias="region-processor")
model_config = ConfigDict(
validate_by_name=True, validate_by_alias=True, extra="forbid"
)
class TimeDomainConfig(BaseModel):
"""Configuration for time domain validation settings."""
year_allowed: bool = Field(default=True, alias="year")
datetime_allowed: bool = Field(default=False, alias="datetime")
timezone: str | None = Field(
default=None,
pattern=r"^UTC([+-])(1[0-4]|0?[0-9]):([0-5][0-9])$",
)
model_config = ConfigDict(validate_by_name=True, validate_by_alias=True)
@model_validator(mode="after")
@classmethod
def validate_datetime_and_timezone(
cls, v: "TimeDomainConfig"
) -> "TimeDomainConfig":
if v.timezone is not None and not v.datetime_allowed:
raise ValueError("'timezone' is set but 'datetime' is not allowed")
return v
@property
def mixed_allowed(self) -> bool:
return self.year_allowed and self.datetime_allowed
@property
def datetime_format(self) -> str:
# If year is a separate column, exclude it from format
# If not, datetime is coerced to IamDataFrame, and include seconds
return "%Y-%m-%d %H:%M:%S" if self.datetime_allowed else None
def check_datetime_format(self, df: IamDataFrame) -> None:
"""Validate that datetime values conform to configured format and timezone."""
errors = []
_datetime = [d for d in df.time if isinstance(d, datetime)]
for d in _datetime:
try:
_dt = datetime.strptime(str(d), self.datetime_format + "%z")
# Only check timezone if a specific timezone is required
if self.timezone and not _dt.tzname() == self.timezone:
errors.append(TimeDomainError(f"{d} - invalid timezone"))
except ValueError:
errors.append(TimeDomainError(f"{d} - missing timezone"))
if errors:
raise TimeDomainErrorGroup(
"The following datetime values are invalid:", errors
)
def validate_datetime(self, df: IamDataFrame) -> None:
"""Validate datetime coordinates against allowed format and/or timezone."""
if df.time_domain == "year":
if not self.year_allowed:
raise TimeDomainError(
"Invalid time domain - `year` found, but not allowed."
)
elif df.time_domain == "mixed":
if not self.mixed_allowed:
raise TimeDomainError(
"Invalid time domain - `mixed` found, but not allowed."
)
self.check_datetime_format(df)
elif df.time_domain == "datetime":
if not self.datetime_allowed:
raise TimeDomainError(
"Invalid time domain - `datetime` found, but not allowed."
)
self.check_datetime_format(df)
else:
raise TimeDomainError(
"IamDataFrame.time_domain must be one of ['year', 'mixed', "
f"datetime'], found '{df.time_domain}'"
)
class DimensionEnum(str, Enum):
model = "model"
scenario = "scenario"
variable = "variable"
region = "region"
subannual = "subannual"
class NomenclatureConfig(BaseModel):
dimensions: None | list[DimensionEnum] = None
repositories: dict[str, Repository] = Field(default_factory=dict)
definitions: DataStructureConfig = Field(default_factory=DataStructureConfig)
mappings: RegionMappingConfig = Field(default_factory=RegionMappingConfig)
processor: ProcessorConfig = Field(
default_factory=ProcessorConfig, alias="processors"
)
illegal_characters: list[str] = Field(
default=[":", ";", '"'], alias="illegal-characters"
)
time_domain: TimeDomainConfig = Field(
default_factory=TimeDomainConfig, alias="time-domain"
)
model_config = ConfigDict(
use_enum_values=True, validate_by_name=True, validate_by_alias=True
)
@field_validator("illegal_characters", mode="before")
@classmethod
def check_illegal_chars(cls, v: str | list[str]) -> list[str]:
return v if isinstance(v, list) else [v]
@model_validator(mode="after")
@classmethod
def check_definitions_repository(
cls, v: "NomenclatureConfig"
) -> "NomenclatureConfig":
"""Check that all repositories referenced in definitions and mappings exist."""
mapping_repos = {"mappings": v.mappings.repositories} if v.mappings else {}
repos: dict[str, list[MappingRepository]] = {
**v.definitions.repos,
**mapping_repos,
}
for use, repositories in repos.items():
repository_names = [repository.name for repository in repositories]
if unknown_repos := repository_names - v.repositories.keys():
raise ValueError((f"Unknown repository {unknown_repos} in '{use}'."))
return v
@model_validator(mode="after")
@classmethod
def check_nuts_consistency(cls, v: "NomenclatureConfig") -> "NomenclatureConfig":
if v.processor.nuts and not v.definitions.region.nuts:
raise ValueError(
"`nuts` region processor set but no NUTS regions in `definitions`. "
"To fix, set `definitions.regions.nuts` to True."
)
return v
def fetch_repos(self, target_folder: Path):
for repo_name, repo in self.repositories.items():
repo.fetch_repo(target_folder / repo_name)
@classmethod
def from_file(cls, file: Path, dry_run: bool = False):
"""Read a DataStructureConfig from a file
Parameters
----------
file : :class:`pathlib.Path` or path-like
Path to config file
"""
with open(file, "r", encoding="utf-8") as stream:
config = yaml.safe_load(stream)
instance = cls(**config)
if not dry_run:
instance.fetch_repos(file.parent)
return instance