Skip to content

API Reference

Validator

Validator class for validating DataFrames against a defined schema.

Source code in src/dataguard/validator/validator.py
class Validator:
    """Validator class for validating DataFrames against a defined schema."""

    error_collector = ErrorCollector()

    @classmethod
    def config_from_mapping(
        cls,
        config: Mapping[str, str | Sequence | Mapping],
        collect_exceptions: bool = True,
        logger: logging.Logger = logger,
    ) -> Validator:
        """Creates a Validator instance from a configuration mapping.

        Args:
            config (Mapping[str, str | Sequence | Mapping]): Configuration
                mapping for the DataFrame schema.
            collect_exceptions (bool, optional): Whether to collect exceptions
                during the schema creation. Defaults to True.
            logger (logging.Logger, optional): Logger instance for logging.
                Defaults to the module logger.

        Examples:
            The command is either a user-defined function or a string that
            maps to a function that will be used to validate the DataFrame.

            The following commands are available:

                'is_equal_to',
                'is_equal_to_or_both_missing',
                'is_greater_than_or_equal_to',
                'is_greater_than',
                'is_less_than_or_equal_to',
                'is_less_than',
                'is_not_equal_to',
                'is_not_equal_to_and_not_both_missing',
                'is_unique',
                'is_duplicated',
                'is_in',
                'is_null',
                'is_not_null'

            >>> config_input = {
                    "name": "example_schema",
                    "columns": [
                        {
                            "id": "column1",
                            "data_type": "integer",
                            "nullable": False,
                            "unique": True,
                            "required": True,
                            "checks": [
                                {
                                    "command": "is_equal_to",
                                    "subject": ["column2"]
                                }
                            ]
                        },
                    "ids": ["column1"],
                    "metadata": {"description": "Example DataFrame schema"},
                    "checks": [
                        {
                            'name': 'example_check',
                            'error_level': 'warning',
                            'error_msg': 'This is an example check',
                            'command': 'is_in',
                            'subject': ['column1'],
                            'arg_values': [1, 2]
                        }
                    ]
                }

        Returns:
            Validator: An instance of the Validator class with the schema
                created from the provided configuration mapping.

        """
        validator = cls()
        try:
            validator.df_schema = get_df_schema(config)
            logger.info('DFSchema created successfully')

        except KeyError as err:
            error_handler(
                err=err,
                err_level='critical',
                message=f'Missing the following key in config input: {err.args[0]}',  # noqa: E501
                lazy=collect_exceptions,
                logger=logger,
            )

        except (AttributeError, TypeError) as err:
            error_handler(
                err=err,
                err_level='critical',
                message=f'Invalid config type: {err.args[0]}',
                lazy=collect_exceptions,
                logger=logger,
            )

        except ValidationError as err:
            error_handler(
                err=err,
                err_level='critical',
                message=f'Invalid config type: {[error["loc"] for error in err.errors()]}',  # noqa: E501
                lazy=collect_exceptions,
                logger=logger,
            )

        except Exception as err:
            exception_handler(
                err=err,
                err_level='critical',
                lazy=collect_exceptions,
                logger=logger,
            )

            logger.error('Failed to create DFSchema from configuration')

        return validator

    def validate(
        self,
        dataframe: Mapping[str, list] | pl.DataFrame,
        lazy_validation: bool = True,
        collect_exceptions: bool = True,
        logger: logging.Logger = logger,
    ) -> None:
        """Validates a DataFrame against the defined schema.

        Args:
            dataframe (Mapping[str, list] | pl.DataFrame): The input data
                as a mapping or a Polars DataFrame.
            lazy_validation (bool, optional): Whether to perform lazy validation.
                Defaults to True.
            collect_exceptions (bool, optional): Whether to collect exceptions
                during validation. Defaults to True.
            logger (logging.Logger, optional): Logger instance for logging.
                Defaults to the module logger.

        Raises:
            Exception: If an error occurs during validation and
                collect_exceptions is False.

        """  # noqa: E501
        try:
            if not getattr(self, 'df_schema', None):
                logger.error('DataFrame schema is not defined')
                return

            logger.info('Starting DataFrame validation')
            if isinstance(dataframe, Mapping):
                dataframe = convert_mapping_to_dataframe(
                    dataframe=dataframe,
                    collect_exceptions=collect_exceptions,
                    logger=logger,
                )

            if not getattr(dataframe, 'shape', None):
                logger.error('DataFrame is not valid')
                return

            logger.info(f'Building DataFrame schema {self.df_schema.name =}')
            df_schema = self.df_schema.build()

            try:
                logger.info('Casting DataFrame Types')
                dataframe = dataframe.cast({
                    col.id: validation_type_mapper[col.data_type]
                    for col in self.df_schema.columns
                    if col.id in dataframe.columns
                })

                logger.info('Starting DataFrame validation')
                dataframe.pipe(df_schema.validate, lazy=lazy_validation)

            except pl.exceptions.PolarsError as err:
                error_handler(
                    err=err,
                    err_level='critical',
                    message=str(err),
                    lazy=collect_exceptions,
                    logger=logger,
                )

            except (pa.errors.SchemaErrors, pa.errors.SchemaError) as err:
                pandera_schema_errors_handler(
                    err=err,
                    lazy=collect_exceptions,
                    logger=logger,
                )
                logger.info('Collecting validation errors')
            # Pandera not implemented for polars some lazy validation.
            # Run in again in eager mode to catch the error.
            # This is a workaround for the issue.
            except NotImplementedError:
                try:
                    logger.warning('Trying eager validation')
                    dataframe.pipe(df_schema.validate)

                except pl.exceptions.PolarsError as err:
                    error_handler(
                        err=err,
                        err_level='critical',
                        message=str(err),
                        lazy=collect_exceptions,
                        logger=logger,
                    )

                except pa.errors.SchemaError as err:
                    pandera_schema_errors_handler(
                        err=err,
                        lazy=collect_exceptions,
                        logger=logger,
                    )

                logger.info('Collecting eager validation errors')

        except Exception as err:
            exception_handler(
                err=err,
                err_level='critical',
                lazy=collect_exceptions,
                logger=logger,
            )

        logger.info('DataFrame validation completed')

config_from_mapping classmethod

config_from_mapping(
    config: Mapping[str, str | Sequence | Mapping],
    collect_exceptions: bool = True,
    logger: logging.Logger = logger,
) -> Validator

Creates a Validator instance from a configuration mapping.

Parameters:

Name Type Description Default

config

Mapping[str, str | Sequence | Mapping]

Configuration mapping for the DataFrame schema.

required

collect_exceptions

bool

Whether to collect exceptions during the schema creation. Defaults to True.

True

logger

logging.Logger

Logger instance for logging. Defaults to the module logger.

logger

Examples:

The command is either a user-defined function or a string that maps to a function that will be used to validate the DataFrame.

The following commands are available:

'is_equal_to',
'is_equal_to_or_both_missing',
'is_greater_than_or_equal_to',
'is_greater_than',
'is_less_than_or_equal_to',
'is_less_than',
'is_not_equal_to',
'is_not_equal_to_and_not_both_missing',
'is_unique',
'is_duplicated',
'is_in',
'is_null',
'is_not_null'
>>> config_input = {
        "name": "example_schema",
        "columns": [
            {
                "id": "column1",
                "data_type": "integer",
                "nullable": False,
                "unique": True,
                "required": True,
                "checks": [
                    {
                        "command": "is_equal_to",
                        "subject": ["column2"]
                    }
                ]
            },
        "ids": ["column1"],
        "metadata": {"description": "Example DataFrame schema"},
        "checks": [
            {
                'name': 'example_check',
                'error_level': 'warning',
                'error_msg': 'This is an example check',
                'command': 'is_in',
                'subject': ['column1'],
                'arg_values': [1, 2]
            }
        ]
    }

Returns:

Name Type Description
Validator Validator

An instance of the Validator class with the schema created from the provided configuration mapping.

Source code in src/dataguard/validator/validator.py
@classmethod
def config_from_mapping(
    cls,
    config: Mapping[str, str | Sequence | Mapping],
    collect_exceptions: bool = True,
    logger: logging.Logger = logger,
) -> Validator:
    """Creates a Validator instance from a configuration mapping.

    Args:
        config (Mapping[str, str | Sequence | Mapping]): Configuration
            mapping for the DataFrame schema.
        collect_exceptions (bool, optional): Whether to collect exceptions
            during the schema creation. Defaults to True.
        logger (logging.Logger, optional): Logger instance for logging.
            Defaults to the module logger.

    Examples:
        The command is either a user-defined function or a string that
        maps to a function that will be used to validate the DataFrame.

        The following commands are available:

            'is_equal_to',
            'is_equal_to_or_both_missing',
            'is_greater_than_or_equal_to',
            'is_greater_than',
            'is_less_than_or_equal_to',
            'is_less_than',
            'is_not_equal_to',
            'is_not_equal_to_and_not_both_missing',
            'is_unique',
            'is_duplicated',
            'is_in',
            'is_null',
            'is_not_null'

        >>> config_input = {
                "name": "example_schema",
                "columns": [
                    {
                        "id": "column1",
                        "data_type": "integer",
                        "nullable": False,
                        "unique": True,
                        "required": True,
                        "checks": [
                            {
                                "command": "is_equal_to",
                                "subject": ["column2"]
                            }
                        ]
                    },
                "ids": ["column1"],
                "metadata": {"description": "Example DataFrame schema"},
                "checks": [
                    {
                        'name': 'example_check',
                        'error_level': 'warning',
                        'error_msg': 'This is an example check',
                        'command': 'is_in',
                        'subject': ['column1'],
                        'arg_values': [1, 2]
                    }
                ]
            }

    Returns:
        Validator: An instance of the Validator class with the schema
            created from the provided configuration mapping.

    """
    validator = cls()
    try:
        validator.df_schema = get_df_schema(config)
        logger.info('DFSchema created successfully')

    except KeyError as err:
        error_handler(
            err=err,
            err_level='critical',
            message=f'Missing the following key in config input: {err.args[0]}',  # noqa: E501
            lazy=collect_exceptions,
            logger=logger,
        )

    except (AttributeError, TypeError) as err:
        error_handler(
            err=err,
            err_level='critical',
            message=f'Invalid config type: {err.args[0]}',
            lazy=collect_exceptions,
            logger=logger,
        )

    except ValidationError as err:
        error_handler(
            err=err,
            err_level='critical',
            message=f'Invalid config type: {[error["loc"] for error in err.errors()]}',  # noqa: E501
            lazy=collect_exceptions,
            logger=logger,
        )

    except Exception as err:
        exception_handler(
            err=err,
            err_level='critical',
            lazy=collect_exceptions,
            logger=logger,
        )

        logger.error('Failed to create DFSchema from configuration')

    return validator

validate

validate(
    dataframe: Mapping[str, list] | pl.DataFrame,
    lazy_validation: bool = True,
    collect_exceptions: bool = True,
    logger: logging.Logger = logger,
) -> None

Validates a DataFrame against the defined schema.

Parameters:

Name Type Description Default

dataframe

Mapping[str, list] | pl.DataFrame

The input data as a mapping or a Polars DataFrame.

required

lazy_validation

bool

Whether to perform lazy validation. Defaults to True.

True

collect_exceptions

bool

Whether to collect exceptions during validation. Defaults to True.

True

logger

logging.Logger

Logger instance for logging. Defaults to the module logger.

logger

Raises:

Type Description
Exception

If an error occurs during validation and collect_exceptions is False.

Source code in src/dataguard/validator/validator.py
def validate(
    self,
    dataframe: Mapping[str, list] | pl.DataFrame,
    lazy_validation: bool = True,
    collect_exceptions: bool = True,
    logger: logging.Logger = logger,
) -> None:
    """Validates a DataFrame against the defined schema.

    Args:
        dataframe (Mapping[str, list] | pl.DataFrame): The input data
            as a mapping or a Polars DataFrame.
        lazy_validation (bool, optional): Whether to perform lazy validation.
            Defaults to True.
        collect_exceptions (bool, optional): Whether to collect exceptions
            during validation. Defaults to True.
        logger (logging.Logger, optional): Logger instance for logging.
            Defaults to the module logger.

    Raises:
        Exception: If an error occurs during validation and
            collect_exceptions is False.

    """  # noqa: E501
    try:
        if not getattr(self, 'df_schema', None):
            logger.error('DataFrame schema is not defined')
            return

        logger.info('Starting DataFrame validation')
        if isinstance(dataframe, Mapping):
            dataframe = convert_mapping_to_dataframe(
                dataframe=dataframe,
                collect_exceptions=collect_exceptions,
                logger=logger,
            )

        if not getattr(dataframe, 'shape', None):
            logger.error('DataFrame is not valid')
            return

        logger.info(f'Building DataFrame schema {self.df_schema.name =}')
        df_schema = self.df_schema.build()

        try:
            logger.info('Casting DataFrame Types')
            dataframe = dataframe.cast({
                col.id: validation_type_mapper[col.data_type]
                for col in self.df_schema.columns
                if col.id in dataframe.columns
            })

            logger.info('Starting DataFrame validation')
            dataframe.pipe(df_schema.validate, lazy=lazy_validation)

        except pl.exceptions.PolarsError as err:
            error_handler(
                err=err,
                err_level='critical',
                message=str(err),
                lazy=collect_exceptions,
                logger=logger,
            )

        except (pa.errors.SchemaErrors, pa.errors.SchemaError) as err:
            pandera_schema_errors_handler(
                err=err,
                lazy=collect_exceptions,
                logger=logger,
            )
            logger.info('Collecting validation errors')
        # Pandera not implemented for polars some lazy validation.
        # Run in again in eager mode to catch the error.
        # This is a workaround for the issue.
        except NotImplementedError:
            try:
                logger.warning('Trying eager validation')
                dataframe.pipe(df_schema.validate)

            except pl.exceptions.PolarsError as err:
                error_handler(
                    err=err,
                    err_level='critical',
                    message=str(err),
                    lazy=collect_exceptions,
                    logger=logger,
                )

            except pa.errors.SchemaError as err:
                pandera_schema_errors_handler(
                    err=err,
                    lazy=collect_exceptions,
                    logger=logger,
                )

            logger.info('Collecting eager validation errors')

    except Exception as err:
        exception_handler(
            err=err,
            err_level='critical',
            lazy=collect_exceptions,
            logger=logger,
        )

    logger.info('DataFrame validation completed')

Enums used in the validation library. These enums define various constants used throughout the library for error levels, validation types, and check cases.

config_input must use the following naming conventions.

CheckCases

Bases: Enum

Enum representing different types of check cases.

Source code in src/dataguard/core/utils/enums.py
class CheckCases(Enum):
    """Enum representing different types of check cases."""

    CONDITION = 'condition'
    CONJUNCTION = 'conjunction'
    DISJUNCTION = 'disjunction'

ErrorLevel

Bases: Enum

Enum representing different levels of error severity.

Source code in src/dataguard/core/utils/enums.py
class ErrorLevel(Enum):
    """Enum representing different levels of error severity."""

    WARNING = 'warning'
    ERROR = 'error'
    CRITICAL = 'critical'

ValidationType

Bases: Enum

Enum representing different validation types for DataFrame columns.

Source code in src/dataguard/core/utils/enums.py
class ValidationType(Enum):
    """Enum representing different validation types for DataFrame columns."""

    DATE = 'date'
    DATETIME = 'datetime'
    BOOL = 'boolean'
    FLOAT = 'float'
    INT = 'integer'
    STR = 'string'
    CAT = 'categorical'
    DECIMAL = 'decimal'

ErrorCollector cached

ErrorCollector class for collecting errors during validation.

Source code in src/dataguard/error_report/error_collector.py
@cache
class ErrorCollector:
    """ErrorCollector class for collecting errors during validation."""

    COUNTER = 0

    def __init__(self):
        self.__errors = []
        self.__exceptions = []

    def add_unknown_exception(
        self,
        exception: ExceptionSchema,
    ) -> None:
        """Adds an unknown exception to the collector.

        Args:
            exception (ExceptionSchema): The exception to add.

        Returns:
            None

        """
        self.__exceptions.append(exception)

    def add_error_report(
        self,
        error_report: ErrorReportSchema,
    ) -> None:
        """Adds an error report to the collector.

        Args:
            error_report (ErrorReportSchema): The error report to add.

        Returns:
            None

        """
        self.__errors.append(error_report)
        self.COUNTER += error_report.total_errors

    def get_errors(self) -> ErrorCollectorSchema:
        """Returns the collected errors and exceptions.

        Returns:
            ErrorCollectorSchema: A schema containing the collected errors and exceptions.

        """  # noqa: E501
        return ErrorCollectorSchema(
            error_reports=self.__errors, exceptions=self.__exceptions
        )

    def clear_errors(self) -> None:
        """Clears the collected errors and exceptions."""
        self.__errors.clear()
        self.__exceptions.clear()
        self.COUNTER = 0

add_error_report

add_error_report(error_report: ErrorReportSchema) -> None

Adds an error report to the collector.

Parameters:

Name Type Description Default

error_report

ErrorReportSchema

The error report to add.

required

Returns:

Type Description
None

None

Source code in src/dataguard/error_report/error_collector.py
def add_error_report(
    self,
    error_report: ErrorReportSchema,
) -> None:
    """Adds an error report to the collector.

    Args:
        error_report (ErrorReportSchema): The error report to add.

    Returns:
        None

    """
    self.__errors.append(error_report)
    self.COUNTER += error_report.total_errors

add_unknown_exception

add_unknown_exception(exception: ExceptionSchema) -> None

Adds an unknown exception to the collector.

Parameters:

Name Type Description Default

exception

ExceptionSchema

The exception to add.

required

Returns:

Type Description
None

None

Source code in src/dataguard/error_report/error_collector.py
def add_unknown_exception(
    self,
    exception: ExceptionSchema,
) -> None:
    """Adds an unknown exception to the collector.

    Args:
        exception (ExceptionSchema): The exception to add.

    Returns:
        None

    """
    self.__exceptions.append(exception)

clear_errors

clear_errors() -> None

Clears the collected errors and exceptions.

Source code in src/dataguard/error_report/error_collector.py
def clear_errors(self) -> None:
    """Clears the collected errors and exceptions."""
    self.__errors.clear()
    self.__exceptions.clear()
    self.COUNTER = 0

get_errors

get_errors() -> ErrorCollectorSchema

Returns the collected errors and exceptions.

Returns:

Name Type Description
ErrorCollectorSchema ErrorCollectorSchema

A schema containing the collected errors and exceptions.

Source code in src/dataguard/error_report/error_collector.py
def get_errors(self) -> ErrorCollectorSchema:
    """Returns the collected errors and exceptions.

    Returns:
        ErrorCollectorSchema: A schema containing the collected errors and exceptions.

    """  # noqa: E501
    return ErrorCollectorSchema(
        error_reports=self.__errors, exceptions=self.__exceptions
    )

BasicExceptionSchema

Bases: BaseModel

Basic schema for exceptions.

Attributes:

Name Type Description
type str

Type of the error.

message str

Message describing the error.

Source code in src/dataguard/error_report/error_schemas.py
class BasicExceptionSchema(BaseModel):
    """Basic schema for exceptions.

    Attributes:
        type (str): Type of the error.
        message (str): Message describing the error.

    """

    type: str
    message: str
    level: ErrorLevel

DFErrorSchema

Bases: ErrorSchema

Schema for errors that occur during DataFrame validation.

Attributes:

Name Type Description
column_names list[str] | str

Names of the columns where the error occurred.

row_ids list[int]

IDs of the rows where the error occurred.

idx_columns list[str]

Index columns used for identifying errors.

level str

Level of the error, e.g., 'error', 'warning'.

message str

Message describing the error.

title str

Title of the error.

Source code in src/dataguard/error_report/error_schemas.py
class DFErrorSchema(ErrorSchema):
    """Schema for errors that occur during DataFrame validation.

    Attributes:
        column_names (list[str] | str): Names of the columns where the error occurred.
        row_ids (list[int]): IDs of the rows where the error occurred.
        idx_columns (list[str]): Index columns used for identifying errors.
        level (str): Level of the error, e.g., 'error', 'warning'.
        message (str): Message describing the error.
        title (str): Title of the error.

    """  # noqa: E501

    column_names: list[str]
    row_ids: list[int]
    idx_columns: list[str]
    title: str
    traceback: str | None = None

ErrorCollectorSchema

Bases: BaseModel

Schema for collecting errors and exceptions during validation.

Attributes:

Name Type Description
error_reports list[ErrorReportSchema]

List of error reports.

exceptions list[ExceptionSchema]

List of exceptions that occurred during validation.

Source code in src/dataguard/error_report/error_schemas.py
class ErrorCollectorSchema(BaseModel):
    """Schema for collecting errors and exceptions during validation.

    Attributes:
        error_reports (list[ErrorReportSchema]): List of error reports.
        exceptions (list[ExceptionSchema]): List of exceptions that occurred during validation.

    """  # noqa: E501

    error_reports: list[ErrorReportSchema] = []
    exceptions: list[ExceptionSchema] = []

ErrorReportSchema

Bases: BaseModel

Schema for error reports generated during validation.

Attributes:

Name Type Description
name str

Name of the error report.

errors list[ErrorSchema]

List of errors found in the DataFrame.

total_errors int

Total number of errors in the report.

id int

Unique identifier for the error report.

Source code in src/dataguard/error_report/error_schemas.py
class ErrorReportSchema(BaseModel):
    """Schema for error reports generated during validation.

    Attributes:
        name (str): Name of the error report.
        errors (list[ErrorSchema]): List of errors found in the DataFrame.
        total_errors (int): Total number of errors in the report.
        id (int): Unique identifier for the error report.

    """

    name: str
    errors: list[ErrorSchema]
    total_errors: int
    id: str

    model_config = ConfigDict(arbitrary_types_allowed=True)

ErrorSchema

Bases: BasicExceptionSchema

Schema for errors that occur during validation.

Attributes:

Name Type Description
type str

Type of the error.

message str

Message describing the error.

title str

Title of the error.

traceback str

Traceback of the error.

Source code in src/dataguard/error_report/error_schemas.py
class ErrorSchema(BasicExceptionSchema):
    """Schema for errors that occur during validation.

    Attributes:
        type (str): Type of the error.
        message (str): Message describing the error.
        title (str): Title of the error.
        traceback (str): Traceback of the error.
    """

    title: str
    traceback: str

ExceptionSchema

Bases: BasicExceptionSchema

Schema for unknown exceptions that occur during validation.

Attributes:

Name Type Description
type str

Type of the error.

message str

Message describing the error.

level ErrorLevel

Level of the error.

traceback str

Traceback of the error.

Source code in src/dataguard/error_report/error_schemas.py
class ExceptionSchema(BasicExceptionSchema):
    """Schema for unknown exceptions that occur during validation.

    Attributes:
        type (str): Type of the error.
        message (str): Message describing the error.
        level (ErrorLevel): Level of the error.
        traceback (str): Traceback of the error.

    """

    traceback: str