import logging
import textwrap
from enum import IntEnum
from pathlib import Path
import pandas as pd
import yaml
from pyam import IamDataFrame
from pyam.utils import adjust_log_level
from pydantic import (
BaseModel,
ConfigDict,
Field,
computed_field,
field_validator,
model_validator,
)
from nomenclature.codelist import VariableCodeList
from nomenclature.definition import DataStructureDefinition
from nomenclature.exceptions import DataValidationError, NoTracebackExceptionGroup
from nomenclature.processor import Processor
from nomenclature.processor.iamc import IamcDataFilter
from nomenclature.utils import get_relative_path
logger = logging.getLogger(__name__)
class WarningEnum(IntEnum):
error = 50
high = 40
medium = 30
low = 20
class DataValidationCriteria(BaseModel):
warning_level: WarningEnum = WarningEnum.error
model_config = ConfigDict(extra="forbid")
@field_validator("warning_level", mode="before")
@classmethod
def validate_warning_level(cls, value):
if isinstance(value, str):
try:
return WarningEnum[value]
except KeyError:
raise ValueError(
f"Invalid warning level: {value}. Expected one of:"
f" {', '.join(level.name for level in WarningEnum)}"
)
return value
@property
def criteria(self):
pass
def __str__(self):
return ", ".join([f"{key}: {value}" for key, value in self.criteria.items()])
class DataValidationValue(DataValidationCriteria):
value: float
rtol: float = 0.0
atol: float = 0.0
@property
def tolerance(self) -> float:
return self.value * self.rtol + self.atol
@computed_field
def upper_bound(self) -> float:
return self.value + self.tolerance
@computed_field
def lower_bound(self) -> float:
return self.value - self.tolerance
@property
def validation_args(self):
"""Attributes used for validation (as bounds)."""
return self.model_dump(
exclude_none=True,
exclude_unset=True,
exclude=["warning_level", "value", "rtol", "atol"],
)
@property
def criteria(self):
"""Attributes used for validation (as specified in the file)."""
return self.model_dump(
exclude_none=True,
exclude_unset=True,
exclude=["warning_level", "lower_bound", "upper_bound"],
)
class DataValidationBounds(DataValidationCriteria):
upper_bound: float | None = None
lower_bound: float | None = None
# Allow extra but raise error to guard against multiple criteria
model_config = ConfigDict(extra="allow")
@model_validator(mode="after")
def check_validation_criteria_exist(self):
if self.upper_bound is None and self.lower_bound is None:
raise ValueError("No validation criteria provided: " + str(self.criteria))
return self
@model_validator(mode="after")
def check_validation_multiple_criteria(self):
if self.model_extra:
raise ValueError(
"Must use either bounds, range or value, found: " + str(self.criteria)
)
return self
@property
def validation_args(self):
return self.criteria
@property
def criteria(self):
return self.model_dump(
exclude_none=True, exclude_unset=True, exclude=["warning_level"]
)
class DataValidationRange(DataValidationCriteria):
range: list[float] = Field(..., min_length=2, max_length=2)
@field_validator("range", mode="after")
@classmethod
def check_range_is_valid(cls, value: list[float]):
if value[0] > value[1]:
raise ValueError(
"Validation 'range' must be given as `(lower_bound, upper_bound)`, "
"found: " + str(value)
)
return value
@computed_field
def upper_bound(self) -> float:
return self.range[1]
@computed_field
def lower_bound(self) -> float:
return self.range[0]
@property
def validation_args(self):
"""Attributes used for validation (as bounds)."""
return self.model_dump(
exclude_none=True,
exclude_unset=True,
exclude=["warning_level", "range"],
)
@property
def criteria(self):
return self.model_dump(
exclude_none=True,
exclude_unset=True,
exclude=["warning_level", "lower_bound", "upper_bound"],
)
class DataValidationItem(IamcDataFilter):
name: str | None = None
validation: list[DataValidationValue | DataValidationRange | DataValidationBounds]
@model_validator(mode="after")
def check_warnings_order(self):
"""Check if warnings are set in descending order of severity."""
if self.validation != sorted(
self.validation, key=lambda c: c.warning_level, reverse=True
):
raise ValueError(
f"Validation criteria for {self.criteria} not sorted"
" in descending order of severity."
)
else:
return self
@property
def filter_args(self):
"""Attributes used for validation (as specified in the file)."""
return self.model_dump(
exclude_none=True, exclude_unset=True, exclude=["validation", "name"]
)
def __str__(self):
return ", ".join([f"{key}: {value}" for key, value in self.filter_args.items()])
def apply(
self, df: IamDataFrame, fail_list: list, output_list: list
) -> tuple[bool, list, list]:
error = False
per_item_df = df.filter(**self.filter_args)
# If name is given, set a meta indicator for the item being processed
if self.name is not None:
meta_index = per_item_df.index.copy()
df.set_meta(name=self.name, meta="ok", index=meta_index)
for criterion in self.validation:
failed_validation = per_item_df.validate(**criterion.validation_args)
if failed_validation is not None:
per_item_df = IamDataFrame(
pd.concat([per_item_df.data, failed_validation]).drop_duplicates(
keep=False
)
)
# Mark failing scenarios with a meta indicator and warning level
failed_index = failed_validation.set_index(
["model", "scenario"]
).index.drop_duplicates()
if self.name is not None:
df.set_meta(
name=self.name,
meta=criterion.warning_level.name,
index=meta_index.intersection(failed_index),
)
# Remove failed scenarios from the meta index to avoid
# lower warnings overriding higher warnings in meta indicators
meta_index = meta_index.difference(failed_index)
failed_validation["warning_level"] = criterion.warning_level.name
failed_validation["criteria"] = str(criterion)
output_list.append(failed_validation)
if criterion.warning_level == WarningEnum.error:
error = True
fail_list.append(" Criteria: " + str(self) + ", " + str(criterion))
fail_list.append(
textwrap.indent(
failed_validation.iloc[:, :-1].to_string(), prefix=" "
)
+ "\n"
)
return error, fail_list, output_list
[docs]
class DataValidator(Processor):
"""Processor for validating IAMC datapoints"""
criteria_items: list[DataValidationItem]
file: Path | str
output_path: Path | None = None
[docs]
@classmethod
def from_file(
cls, file: Path | str, output_path: Path | str | None = None
) -> "DataValidator":
"""Create a :class:`DataValidator` from a YAML file.
Parameters
----------
file : :class:`pathlib.Path` or str
Path to the YAML file containing the validation criteria.
output_path : :class:`pathlib.Path` or str, optional
Path to write an Excel file with all flagged datapoints.
Returns
-------
DataValidator
"""
with open(file, "r", encoding="utf-8") as f:
content = yaml.safe_load(f)
criteria_items = []
for item in content:
# Simple case where filter and criteria args are all given at top level
if "validation" not in item:
item["validation"] = [dict()]
# If some criteria args are given at top-level, add to "validation" list
criteria = [
criterion
for criterion in item
if criterion
not in list(IamcDataFilter.model_fields) + ["validation", "name"]
]
for criterion in criteria:
value = item.pop(criterion)
for criteria_item in item["validation"]:
criteria_item[criterion] = value
criteria_items.append(item)
return cls(file=file, criteria_items=criteria_items, output_path=output_path) # type: ignore
[docs]
@classmethod
def from_codelist(
cls, codelist: VariableCodeList, output_path: Path | None = None
) -> "DataValidator":
"""Create a :class:`DataValidator` from a :class:`~nomenclature.codelist.VariableCodeList`.
Extracts validation criteria from variables in the codelist that define
bounds or tolerance ranges.
Parameters
----------
codelist : VariableCodeList
Variable codelist containing validation arguments.
output_path : :class:`pathlib.Path`, optional
Path to write an Excel file with all flagged datapoints.
Returns
-------
DataValidator
"""
criteria_items = [
{
"variable": variable.name,
"validation": [variable.validation_args],
}
for variable in codelist.values()
if variable.has_validation_args
]
return cls(
file="definitions", criteria_items=criteria_items, output_path=output_path
)
[docs]
def apply(self, df: IamDataFrame) -> IamDataFrame:
"""Validates data in IAMC format according to specified criteria.
Logs warning/error messages for each criterion that is not met.
Parameters
----------
df : pyam.IamDataFrame
Data in IAMC format to be validated
Returns
-------
pyam.IamDataFrame
Raises
------
:exc:`ValueError` if any criterion has a warning level of ``error``
"""
error_list: list[bool] = []
fail_list: list[str] = []
output_list: list[pd.DataFrame] = []
with adjust_log_level():
for item in self.criteria_items:
error, fail_list, output_list = item.apply(df, fail_list, output_list)
error_list.append(error)
if self.output_path:
pd.concat(output_list).to_excel(self.output_path, index=False)
fail_msg = "(file %s):\n" % get_relative_path(self.file)
if any(error_list):
raise DataValidationError(fail_list, self.file)
if fail_list:
fail_msg = (
"Data validation with warning(s) " + fail_msg + "\n".join(fail_list)
)
logger.warning(fail_msg)
return df
[docs]
def validate_with_definition(self, dsd: DataStructureDefinition) -> None:
"""Validate the criteria items against a :class:`DataStructureDefinition`.
Checks that all variables and regions referenced in the criteria
exist in the provided definition.
Parameters
----------
dsd : DataStructureDefinition
Data structure definition to validate against.
Raises
------
ExceptionGroup
If any criteria item references unknown variables or regions.
"""
errors: list[Exception] = []
for criterion in self.criteria_items:
try:
criterion.validate_with_definition(dsd)
except NoTracebackExceptionGroup as exception:
errors.extend(exception.exceptions)
if errors:
raise NoTracebackExceptionGroup(
f"Error in DataValidator (file {get_relative_path(self.file)})",
errors,
)