Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
2491946
Initial implementation for Dimensionality on Data Quality Tests
IceS2 Sep 3, 2025
a6c5db0
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Sep 3, 2025
7285db4
Fix ColumnValuesToBeUnique and create TestCaseResult API
IceS2 Sep 4, 2025
64cbe81
Refactor dimension result
IceS2 Sep 4, 2025
f6e6c6a
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Sep 15, 2025
f3bab32
Initial E2E Implementation without Impact Score
IceS2 Sep 16, 2025
3883047
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Sep 18, 2025
c403feb
Dimensionality Thin Slice
IceS2 Sep 23, 2025
afbd8da
Update generated TypeScript types
github-actions[bot] Sep 23, 2025
604870e
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Sep 25, 2025
60321af
Update generated TypeScript types
github-actions[bot] Sep 25, 2025
03ede24
Removed useless method to use the one we already had
IceS2 Sep 25, 2025
6ea9c2d
Fix Pandas Dimensionality checks
IceS2 Sep 25, 2025
b94c1d3
Remove useless comments
IceS2 Sep 25, 2025
9d811ba
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Sep 26, 2025
69c28fa
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Oct 2, 2025
926f845
Implement PR comments, fix Tests
IceS2 Oct 14, 2025
15a33b7
Merge branch 'feature/dimensionality-for-data-quality' of github.com:…
IceS2 Oct 14, 2025
00776b0
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Oct 14, 2025
3eb5e91
Improve the code a bit
IceS2 Oct 15, 2025
dbae901
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Oct 15, 2025
bfbac4d
Fix imports
IceS2 Oct 15, 2025
517b53c
Merge branch 'main' into feature/dimensionality-for-data-quality
IceS2 Oct 16, 2025
0b33546
Implement Dimensionality for ColumnMeanToBeBetween
IceS2 Oct 22, 2025
25f96f6
Merge branch 'main' into feature/dimensionality-column-mean-to-be-bet…
IceS2 Oct 22, 2025
aa257fd
Removed useless comments and improved minor things
IceS2 Oct 23, 2025
812cb57
Implement UnitTests
IceS2 Oct 23, 2025
b4fffc4
Merge branch 'main' into feature/dimensionality-column-mean-to-be-bet…
IceS2 Oct 23, 2025
8487b56
Fixes
IceS2 Oct 24, 2025
89e0e28
Merge branch 'feature/dimensionality-column-mean-to-be-between' of gi…
IceS2 Oct 24, 2025
5fac5e9
Moved import pandas to type checking
IceS2 Oct 24, 2025
70108e5
Fix Min/Max being optional
IceS2 Oct 24, 2025
e8922e1
Fix Unittests
IceS2 Oct 24, 2025
8dcf12c
small fixes
IceS2 Oct 27, 2025
5e0a7ad
Fix Unittests
IceS2 Oct 28, 2025
426e32e
Fix Issue with counting total rows on mean
IceS2 Oct 28, 2025
dc0b48f
Improve code
IceS2 Oct 28, 2025
0f2bf63
Merge branch 'main' into feature/dimensionality-column-max-to-be-between
IceS2 Oct 28, 2025
2a221a2
Fix Merge
IceS2 Oct 28, 2025
80d4f4e
Removed unused type
IceS2 Oct 28, 2025
1bebe26
Merge branch 'main' into feature/dimensionality-column-max-to-be-between
IceS2 Oct 28, 2025
284fb45
Refactor to reduce code repetition and complexity
IceS2 Oct 29, 2025
6fdcfdb
Merge branch 'main' into feature/dimensionality-column-min-to-be-between
IceS2 Oct 30, 2025
91bae70
Fix conflict
IceS2 Oct 30, 2025
7a1be5e
Rename method
IceS2 Oct 30, 2025
2ff148b
Refactor some metrics
IceS2 Oct 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 101 additions & 8 deletions ingestion/src/metadata/data_quality/validations/base_test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
from metadata.generated.schema.type.basic import Timestamp
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import test_suite_logger
from metadata.utils.sqa_like_column import SQALikeColumn

if TYPE_CHECKING:
from pandas import DataFrame
from sqlalchemy import Column

logger = test_suite_logger()

Expand Down Expand Up @@ -195,21 +197,55 @@ def _run_validation(self) -> TestCaseResult:
def _run_dimensional_validation(self) -> List[DimensionResult]:
"""Execute dimensional validation for this test

This method should implement the dimensional logic specific to each test type.
It will be called automatically by the template method when dimensionColumns
are configured in the test case.
Default implementation that delegates to _execute_dimensional_validation
for each dimension column. This provides a common pattern across validators.

The new approach runs separate queries for each dimension column instead of
combining them with GROUP BY. For example, if dimensionColumns = ["country", "age"],
this method will:
1. Run one query: GROUP BY country -> {"Spain": result1, "Argentina": result2}
2. Run another query: GROUP BY age -> {"10": result3, "12": result4}

Override this method only if you need completely different dimensional logic.
Most validators should just implement _execute_dimensional_validation instead.

Returns:
List[DimensionResult]: Empty list by default. Override in child classes
to implement dimensional validation support.
List[DimensionResult]: List of dimension-specific test results
"""
return []
try:
dimension_columns = self.test_case.dimensionColumns or []
if not dimension_columns:
return []

column: Union[SQALikeColumn, Column] = self.get_column()

test_params = self._get_test_parameters()
metrics_to_compute = self._get_metrics_to_compute(test_params)

dimension_results = []
for dimension_column in dimension_columns:
try:
dimension_col = self.get_column(dimension_column)

single_dimension_results = self._execute_dimensional_validation(
column, dimension_col, metrics_to_compute, test_params
)

dimension_results.extend(single_dimension_results)

except Exception as exc:
logger.warning(
f"Error executing dimensional query for column {dimension_column}: {exc}"
)
logger.debug(traceback.format_exc())
continue

return dimension_results

except Exception as exc:
logger.warning(f"Error executing dimensional validation: {exc}")
logger.debug(traceback.format_exc())
return []

def _get_test_parameters(self) -> Optional[dict]:
"""Get test-specific parameters from test case
Expand All @@ -222,6 +258,63 @@ def _get_test_parameters(self) -> Optional[dict]:
"""
return None

def _get_metrics_to_compute(self, test_params: Optional[dict] = None) -> dict:
"""Get metrics that need to be computed for this test

Default implementation returns empty dict. Override in child classes
that implement dimensional validation.

Args:
test_params: Optional test parameters (may affect which metrics to compute)

Returns:
dict: Dictionary mapping metric names to Metrics enum values
e.g., {"MIN": Metrics.MIN} or {"COUNT": Metrics.COUNT, "UNIQUE_COUNT": Metrics.UNIQUE_COUNT}
"""
return {}

def get_column(
self, column_name: Optional[str] = None
) -> Union[SQALikeColumn, Column]:
"""Get column object from column_name. If no column_name is present,
it returns the main column for the test.

Uses cooperative multiple inheritance. Implementation provided by
SQAValidatorMixin or PandasValidatorMixin via MRO chain.
Concrete validators must inherit from one of these mixins.

Returns:
Column object (sqlalchemy.Column or SQALikeColumn depending on mixin)
"""
return super().get_column(column_name) # type: ignore

def _execute_dimensional_validation(
self,
column: Union[SQALikeColumn, Column],
dimension_col: Union[SQALikeColumn, Column],
metrics_to_compute: dict,
test_params: Optional[dict],
) -> List[DimensionResult]:
"""Execute dimensional validation query for a single dimension column

Must be implemented by child classes to support dimensional validation.

Args:
column: The column being tested (e.g., revenue)
dimension_col: The dimension column to group by (e.g., region)
metrics_to_compute: Dict mapping metric names to Metrics enum values
test_params: Test parameters including bounds, allowed values, etc.

Returns:
List[DimensionResult]: List of dimension results for each dimension value

Raises:
NotImplementedError: If child class doesn't override this method
"""
raise NotImplementedError(
f"{self.__class__.__name__} must implement _execute_dimensional_validation() for dimensional validation"
)

def _evaluate_test_condition(
self, metric_values: dict, test_params: Optional[dict] = None
) -> TestEvaluation:
Expand Down Expand Up @@ -481,15 +574,15 @@ def are_dimension_columns_valid(self) -> bool:
if not self.is_dimensional_test():
return False # No dimensions to validate

if not hasattr(self, "_get_column_name"):
if not hasattr(self, "get_column"):
logger.warning("Validator does not support dimensional column validation")
return False

try:
missing_columns = []
for dim_col in self.test_case.dimensionColumns:
try:
self._get_column_name(dim_col) # type: ignore[attr-defined] - Delegates to child class
self.get_column(dim_col) # type: ignore[attr-defined] - Delegates to child class
except ValueError:
missing_columns.append(dim_col)
except NotImplementedError:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Base Checker abstract class.
Should be extended to implement different validation checkers that are used to define if a given data quality test passes or fails.
"""
from abc import ABC, abstractmethod
from typing import Any, Callable


class BaseValidationChecker(ABC):
"""Base Checker Abstract Class"""

@abstractmethod
def check_pandas(self, value: Any) -> bool:
"""Check if value violates condition. Used for Pandas Data Quality"""

@abstractmethod
def get_sqa_failed_rows_builder(
self, metric_col_name: str, total_count_col_name: str
) -> Callable:
"""Build SQLAlchemy Failed Rows expression"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in BaseTestValidator or are you thinking to just use it for that new Between object?

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
BetweenBoundsChecker implements the checker for any metric that should be between two bounds
"""
import math
from typing import Any, Callable

from metadata.data_quality.validations.checkers.base_checker import (
BaseValidationChecker,
)


class BetweenBoundsChecker(BaseValidationChecker):
"""Checks if value is outside [min_bound, max_bound]"""

def __init__(self, min_bound: float, max_bound: float):
self.min_bound = min_bound
self.max_bound = max_bound

def check_pandas(self, value: Any) -> bool:
"""Check if value is outside [min_bound, max_bound]. Used on Pandas Data Quality."""
import pandas as pd

if value is None or pd.isna(value):
return False

return not (self.min_bound <= value <= self.max_bound)

def get_sqa_failed_rows_builder(
self, metric_col_name: str, total_count_col_name: str
) -> Callable:
"""Builds SQA Failed Rows Expression. If metric is outside bounds, failed rows count = total rows count. Else, failed rows count = 0."""
from sqlalchemy import case, literal, or_

def build_sqa_failed_rows_expression(cte):
conditions = []
metric_col = getattr(cte.c, metric_col_name)
total_count_col = getattr(cte.c, total_count_col_name)

if not math.isinf(self.min_bound):
conditions.append(metric_col < self.min_bound)
if not math.isinf(self.max_bound):
conditions.append(metric_col > self.max_bound)

if not conditions:
return literal(0)

violation = or_(*conditions) if len(conditions) > 1 else conditions[0]

return case(
(metric_col.is_(None), literal(0)),
(violation, total_count_col),
else_=literal(0),
)

return build_sqa_failed_rows_expression
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _run_validation(self) -> TestCaseResult:
TestCaseResult: The test case result for the overall validation
"""
try:
column: Union[SQALikeColumn, Column] = self._get_column_name()
column: Union[SQALikeColumn, Column] = self.get_column()
max_res = self._run_results(Metrics.MAX_LENGTH, column)
min_res = self._run_results(Metrics.MIN_LENGTH, column)
except (ValueError, RuntimeError) as exc:
Expand Down Expand Up @@ -90,10 +90,6 @@ def _run_validation(self) -> TestCaseResult:
max_bound=max_bound,
)

@abstractmethod
def _get_column_name(self):
raise NotImplementedError

@abstractmethod
def _run_results(self, metric: Metrics, column: Union[SQALikeColumn, Column]):
raise NotImplementedError
Expand Down Expand Up @@ -124,4 +120,4 @@ def get_row_count(self, min_bound, max_bound) -> Tuple[int, int]:
Returns:
Tuple[int, int]:
"""
return self.compute_row_count(self._get_column_name(), min_bound, max_bound)
return self.compute_row_count(self.get_column(), min_bound, max_bound)
Loading
Loading