Source code for trailpack.validation.standard_validator

"""
Standard validator for Trailpack data packages.

Validates metadata, resources, fields, and data quality against
the Trailpack standard specification.
"""

import csv
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
import yaml

from trailpack.validation import get_standard_path


[docs] class ValidationResult: """ Result of a validation check. Contains three types of messages: - errors: Type consistency violations that fail validation - warnings: Recommended fields or practices that should be addressed - info: Data quality metrics and informational messages A validation is considered valid (passed) if there are no errors, regardless of warnings or info messages. **Data Inconsistency Tracking:** When type inconsistencies are detected (e.g., mixed types in a column), each inconsistent value is tracked in the `inconsistencies` list. This list is automatically exported to 'data_inconsistencies.csv' when the result is printed or converted to string. The CSV file contains the row, column, value, actual type, and expected type for each inconsistency. Attributes: errors: List of error messages (type consistency violations) warnings: List of warning messages (recommended practices) info: List of informational messages (data quality metrics) level: Validation compliance level (if assigned) inconsistencies: List of dicts with inconsistent value details """ def __init__(self):
[docs] self.errors: List[str] = []
[docs] self.warnings: List[str] = []
[docs] self.info: List[str] = []
[docs] self.level: Optional[str] = None
[docs] self.inconsistencies: List[Dict[str, Any]] = [] # Track type inconsistencies
@property
[docs] def is_valid(self) -> bool: """Check if validation passed (no errors).""" return len(self.errors) == 0
@property
[docs] def has_warnings(self) -> bool: """Check if there are any warnings.""" return len(self.warnings) > 0
[docs] def add_error(self, message: str, field: Optional[str] = None): """Add an error message.""" if field: self.errors.append(f"[{field}] {message}") else: self.errors.append(message)
[docs] def add_warning(self, message: str, field: Optional[str] = None): """Add a warning message.""" if field: self.warnings.append(f"[{field}] {message}") else: self.warnings.append(message)
[docs] def add_info(self, message: str, field: Optional[str] = None): """Add an info message.""" if field: self.info.append(f"[{field}] {message}") else: self.info.append(message)
[docs] def add_inconsistency( self, row: int, column: str, value: Any, actual_type: str, expected_type: str ): """ Track a data type inconsistency for later export. This method is called automatically during validation when mixed types are detected in a column. Each inconsistent value (one that doesn't match the most common type in the column) is recorded with its location and type information. The inconsistencies are automatically exported to CSV when the ValidationResult is printed or can be manually exported using export_inconsistencies_to_csv(). Args: row: Row index of the inconsistent value column: Column name where the inconsistency was found value: The actual inconsistent value actual_type: Python type name of the value (e.g., 'int', 'str') expected_type: Expected type based on most common type in column """ self.inconsistencies.append( { "row": row, "column": column, "value": str(value), "actual_type": actual_type, "expected_type": expected_type, } )
[docs] def export_inconsistencies_to_csv( self, output_path: str = "data_inconsistencies.csv" ): """ Export data type inconsistencies to a CSV file for analysis. Creates a CSV file with details about each value that has an inconsistent type compared to the expected type in its column. This is useful for data cleaning workflows where you need to identify and fix specific problematic values. The CSV includes columns: row, column, value, actual_type, expected_type This method is called automatically when printing the ValidationResult if inconsistencies exist, but can also be called manually to export to a custom location. Args: output_path: Path to the output CSV file. Defaults to "data_inconsistencies.csv" in the current working directory. Returns: Path to the created CSV file (str), or None if no inconsistencies to export. Example: >>> result = validator.validate_data_quality(df, schema) >>> if result.inconsistencies: ... csv_path = result.export_inconsistencies_to_csv("issues.csv") ... print(f"Found {len(result.inconsistencies)} issues in {csv_path}") """ if not self.inconsistencies: return None with open(output_path, "w", newline="", encoding="utf-8") as f: fieldnames = ["row", "column", "value", "actual_type", "expected_type"] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(self.inconsistencies) return output_path
[docs] def get_summary(self) -> str: """Get a summary of the validation result.""" if self.level: return f"{self.level}: {len(self.errors)} errors, {len(self.warnings)} warnings" return f"{len(self.errors)} errors, {len(self.warnings)} warnings"
def __str__(self) -> str: """String representation.""" lines = [] if self.level: lines.append(f"\n{self.level}\n{'=' * len(self.level)}") if self.errors: lines.append(f"\n❌ ERRORS ({len(self.errors)}):") for error in self.errors: lines.append(f" • {error}") if self.warnings: lines.append(f"\n⚠️ WARNINGS ({len(self.warnings)}):") for warning in self.warnings: lines.append(f" • {warning}") if self.info: lines.append(f"\nℹ️ INFO ({len(self.info)}):") for info in self.info: lines.append(f" • {info}") if self.inconsistencies: lines.append( f"\n📋 DATA INCONSISTENCIES ({len(self.inconsistencies)} values):" ) # Auto-export inconsistencies csv_path = self.export_inconsistencies_to_csv() if csv_path: lines.append(f" → Exported to: {csv_path}") else: lines.append( " Use result.export_inconsistencies_to_csv() to export details" ) if not self.errors and not self.warnings: lines.append("\n✅ All checks passed!") return "\n".join(lines)
[docs] class StandardValidator: """ Validate data packages against Trailpack standards. The StandardValidator checks data packages for: - Metadata completeness (required and recommended fields) - Resource definitions (proper schema, formats, name sanitization) - Field definitions (types, units, constraints) - Data quality (missing values, duplicates, type consistency) - Schema matching (column types match field definitions) All numeric fields must have units specified, even for dimensionless quantities: - Measurements: Use appropriate SI or domain units (kg, m, °C, etc.) - Counts/IDs: Use dimensionless unit (http://qudt.org/vocab/unit/NUM) - Percentages: Use percent or dimensionless unit **Resource Name Sanitization:** Resource names must match ^[a-z0-9\\-_.]+$. The validator automatically: - Detects invalid resource names - Suggests sanitized alternatives - Can auto-sanitize names with sanitize_resource_name() **Automatic Inconsistency Export:** When type inconsistencies are detected during validation (e.g., mixed types in a column), each inconsistent value is tracked and automatically exported to 'data_inconsistencies.csv' when the ValidationResult is printed. This provides a detailed breakdown for data cleaning workflows. Example: >>> validator = StandardValidator("1.0.0") >>> result = validator.validate_metadata(metadata) >>> if result.is_valid: ... print("✅ Valid!") ... else: ... print(result) >>> # Validate with schema (auto-exports inconsistencies.csv if errors found) >>> result = validator.validate_data_quality(df, schema=schema) >>> print(result) # Shows errors and exports CSV automatically >>> # Sanitize resource names >>> clean_name = validator.sanitize_resource_name("My File!") >>> print(clean_name) # "my_file" """ def __init__(self, version: str = "1.0.0"): """ Initialize validator with a specific standard version. Args: version: Standard version to validate against (default: "1.0.0") """
[docs] self.version = version
[docs] self.standard = self._load_standard(version)
[docs] def _load_standard(self, version: str) -> Dict[str, Any]: """Load the standard specification from YAML.""" standard_path = get_standard_path(version) with open(standard_path) as f: return yaml.safe_load(f)
[docs] def validate_all( self, metadata: Dict[str, Any], df: Optional[pd.DataFrame] = None, mappings: Optional[Dict[str, Any]] = None, ) -> ValidationResult: """ Validate everything: metadata, data quality, and mappings. Args: metadata: Data package metadata dictionary df: Optional DataFrame to validate data quality mappings: Optional field mappings to validate Returns: ValidationResult with all validation results """ result = ValidationResult() # 1. Validate metadata structure meta_result = self.validate_metadata(metadata) result.errors.extend(meta_result.errors) result.warnings.extend(meta_result.warnings) result.info.extend(meta_result.info) # 2. Validate resources if present if "resources" in metadata: for idx, resource in enumerate(metadata["resources"]): res_result = self.validate_resource(resource) # Prefix errors/warnings with resource name resource_name = resource.get("name", f"resource_{idx}") for error in res_result.errors: result.add_error(error, f"Resource '{resource_name}'") for warning in res_result.warnings: result.add_warning(warning, f"Resource '{resource_name}'") # 3. Validate data quality if DataFrame provided if df is not None: # Extract schema from metadata if available schema = None if metadata and "resources" in metadata: # Find the matching resource schema (if multiple resources exist) # For now, use the first resource schema resources = metadata.get("resources", []) if resources: schema = resources[0].get("schema") quality_result = self.validate_data_quality(df, schema=schema) result.errors.extend(quality_result.errors) result.warnings.extend(quality_result.warnings) # 4. Determine validation level result.level = self._determine_level(result) return result
[docs] def validate_metadata(self, metadata: Dict[str, Any]) -> ValidationResult: """ Validate metadata against required and recommended fields. Args: metadata: Data package metadata dictionary Returns: ValidationResult with validation errors and warnings """ result = ValidationResult() # Get field requirements from standard required_fields = self.standard["metadata"]["required"] recommended_fields = self.standard["metadata"].get("recommended", {}) # 1. Check required fields for field_spec in required_fields: for field_name, field_def in field_spec.items(): if field_name not in metadata: msg = field_def.get( "validation_message", f"Required field '{field_name}' is missing", ) result.add_error(msg, field_name) else: # Validate field value field_result = self._validate_field_value( field_name, metadata[field_name], field_def ) result.errors.extend(field_result.errors) result.warnings.extend(field_result.warnings) # 2. Check recommended fields for field_spec in recommended_fields: for field_name, field_def in field_spec.items(): if field_name not in metadata: msg = field_def.get( "validation_message", f"Recommended field '{field_name}' is missing", ) result.add_warning(msg, field_name) else: # Validate field value field_result = self._validate_field_value( field_name, metadata[field_name], field_def ) result.warnings.extend(field_result.warnings) # 3. Special validation for contributors (must have at least one author) if "contributors" in metadata: has_author = any( c.get("role") == "author" for c in metadata["contributors"] ) if not has_author: result.add_error( "At least one contributor with role 'author' is required", "contributors", ) return result
[docs] def validate_resource(self, resource: Dict[str, Any]) -> ValidationResult: """ Validate a resource (data file) definition. Automatically checks and suggests sanitized names for invalid resource names. Args: resource: Resource dictionary from metadata Returns: ValidationResult with validation errors and warnings """ result = ValidationResult() # Get resource requirements from standard required_fields = self.standard["resources"]["required"] recommended_fields = self.standard["resources"].get("recommended", {}) # 1. Check required fields for field_spec in required_fields: for field_name, field_def in field_spec.items(): if field_name not in resource: msg = field_def.get( "validation_message", f"Required field '{field_name}' is missing", ) result.add_error(msg, field_name) else: # Validate field value field_result = self._validate_field_value( field_name, resource[field_name], field_def ) result.errors.extend(field_result.errors) # Special handling for resource name - suggest sanitized version if field_name == "name": is_valid, _, suggestion = ( self.validate_and_sanitize_resource_name( resource[field_name], auto_fix=False ) ) if not is_valid and suggestion: result.add_warning( f"Resource name '{resource[field_name]}' contains invalid characters. " f"Suggested name: '{suggestion}'", "name", ) # 2. Check format preference if "format" in resource: preferred_format = self.standard["resources"]["required"][2]["format"].get( "preferred_format" ) if resource["format"] != preferred_format: result.add_warning( f"Format '{resource['format']}' is acceptable, but '{preferred_format}' is preferred", "format", ) # 3. Check recommended fields for field_spec in recommended_fields: for field_name, field_def in field_spec.items(): if field_name not in resource: msg = field_def.get( "validation_message", f"Recommended field '{field_name}' is missing", ) result.add_warning(msg, field_name) # 4. Validate schema if present if "schema" in resource and "fields" in resource["schema"]: for field in resource["schema"]["fields"]: field_result = self.validate_field_definition(field) result.errors.extend(field_result.errors) result.warnings.extend(field_result.warnings) return result
[docs] def validate_field_definition(self, field: Dict[str, Any]) -> ValidationResult: """ Validate a field (column) definition. Args: field: Field dictionary from schema Returns: ValidationResult with validation errors and warnings """ result = ValidationResult() field_name = field.get("name", "unknown") # Get field requirements from standard required_fields = self.standard["fields"]["required"] # 1. Check required fields for field_spec in required_fields: for req_name, req_def in field_spec.items(): if req_name not in field: msg = req_def.get( "validation_message", f"Required field '{req_name}' is missing" ) result.add_error(msg, field_name) else: # Validate against allowed types if req_name == "type": allowed_types = req_def.get("allowed_types", []) if field["type"] not in allowed_types: result.add_error( f"Invalid type '{field['type']}'. Must be one of: {', '.join(allowed_types)}", field_name, ) # 2. Check numeric fields have units if field.get("type") in ["number", "integer"]: if "unit" not in field: msg = self.standard["fields"]["recommended_for_numeric"][0]["unit"][ "validation_message" ] result.add_error(msg, field_name) # 3. Check recommended fields if "description" not in field: result.add_warning( "Field description improves dataset usability", field_name ) return result
[docs] def validate_data_quality( self, df: pd.DataFrame, schema: Optional[Dict[str, Any]] = None ) -> ValidationResult: """ Validate data quality of a DataFrame. Data quality checks are logged as informational messages, not errors: - Missing data: Percentage of nulls per column - Duplicates: Percentage of duplicate rows Type consistency checks RAISE ERRORS (not just logged): - Mixed types: Columns with multiple Python types (e.g., strings and integers mixed) - Schema matching: Column types must match field definitions - Unit requirements: Numeric fields must have units (including dimensionless for IDs/counts) **Automatic Inconsistency Export:** When type inconsistencies are detected (mixed types in columns), each inconsistent value is tracked with its row number, column, actual type, and expected type. These inconsistencies are automatically exported to 'data_inconsistencies.csv' when the ValidationResult is printed. You can also manually export to a custom location using `result.export_inconsistencies_to_csv("custom_path.csv")`. Args: df: DataFrame to validate schema: Optional schema with field definitions to validate against. Should contain 'fields' list with field definitions including: - name: Field name matching column name - type: Field type (string, integer, number, boolean, etc.) - unit: Unit definition (required for numeric fields) - description: Field description Returns: ValidationResult with: - errors: Type consistency violations (mixed types, schema mismatches) - info: Data quality metrics (nulls, duplicates) - inconsistencies: List of dicts with details about each inconsistent value (automatically exported to CSV when result is printed) Example: >>> schema = { ... "fields": [ ... { ... "name": "id", ... "type": "integer", ... "description": "Unique identifier", ... "unit": {"name": "dimensionless", "path": "http://qudt.org/vocab/unit/NUM"} ... }, ... { ... "name": "mass", ... "type": "number", ... "description": "Mass measurement", ... "unit": {"name": "kg", "path": "http://qudt.org/vocab/unit/KiloGM"} ... } ... ] ... } >>> result = validator.validate_data_quality(df, schema=schema) >>> # result.errors will contain type/schema mismatches and mixed type violations >>> # result.info will contain data quality observations (nulls, duplicates) Note: Identifier fields (with "id", "index", "identifier" in name or description) are automatically recognized and should use dimensionless units. Returns: ValidationResult with quality issues """ result = ValidationResult() quality_spec = self.standard["data_quality"] # 1. Check missing data thresholds (log as info, not errors) max_null_pct = quality_spec["missing_data"]["max_null_percentage"] critical_threshold = quality_spec["missing_data"]["critical_threshold"] for col in df.columns: null_count = df[col].isnull().sum() null_pct = null_count / len(df) if len(df) > 0 else 0 if null_pct > 0: if null_pct > max_null_pct: result.add_info( f"Column '{col}' has {null_pct:.1%} missing values (exceeds recommended max: {max_null_pct:.1%})", "data_quality", ) elif null_pct > critical_threshold: result.add_info( f"Column '{col}' has {null_pct:.1%} missing values (approaching threshold: {critical_threshold:.1%})", "data_quality", ) else: result.add_info( f"Column '{col}' has {null_pct:.1%} missing values", "data_quality", ) # 2. Check type consistency (basic - mixed types in object columns) # This is a type consistency issue - raise errors if not quality_spec["type_consistency"]["allow_mixed_types"]: for col in df.columns: if df[col].dtype == "object": # Check if column has mixed types non_null = df[col].dropna() if len(non_null) > 0: types = non_null.apply(type).unique() if len(types) > 1: type_names = [t.__name__ for t in types] # Track each inconsistent value # Determine the most common type as "expected" type_list = [type(v) for v in non_null] type_counts_dict: Dict[type, int] = {} for t in type_list: type_counts_dict[t] = type_counts_dict.get(t, 0) + 1 most_common_type = max( type_counts_dict, key=lambda k: type_counts_dict[k] ) expected_type = most_common_type.__name__ inconsistent_count = 0 for idx, value in non_null.items(): actual_type = type(value).__name__ if actual_type != expected_type: result.add_inconsistency( row=( int(idx) if isinstance(idx, (int, float)) else 0 ), column=col, value=value, actual_type=actual_type, expected_type=expected_type, ) inconsistent_count += 1 result.add_error( f"Column '{col}' has mixed types: {', '.join(type_names)} " f"({inconsistent_count} inconsistent values tracked)", "type_consistency", ) # 3. Check schema-based type consistency (if schema provided) if schema and quality_spec["type_consistency"].get( "check_against_schema", False ): schema_result = self._validate_data_against_schema(df, schema, quality_spec) result.errors.extend(schema_result.errors) result.warnings.extend(schema_result.warnings) # 4. Check duplicates (log as info) if quality_spec["duplicates"]["check_duplicates"]: dup_count = df.duplicated().sum() if dup_count > 0: dup_pct = dup_count / len(df) max_dup_pct = quality_spec["duplicates"]["max_duplicate_percentage"] if dup_pct > max_dup_pct: result.add_info( f"{dup_count} duplicate rows ({dup_pct:.1%}) exceeds recommended threshold ({max_dup_pct:.1%})", "data_quality", ) else: result.add_info( f"{dup_count} duplicate rows found ({dup_pct:.1%})", "data_quality", ) # 5. Add info about dataset result.add_info(f"Dataset has {len(df)} rows and {len(df.columns)} columns") # 6. Add note about inconsistencies CSV export if any were found if result.inconsistencies: result.add_info( f"Found {len(result.inconsistencies)} type inconsistencies. " "CSV export will be created automatically when result is printed, " "or call result.export_inconsistencies_to_csv() manually" ) return result
[docs] def _validate_data_against_schema( self, df: pd.DataFrame, schema: Dict[str, Any], quality_spec: Dict[str, Any] ) -> ValidationResult: """ Validate DataFrame against field schema definitions. Checks that actual column types match declared field types, and that numeric fields have proper units defined. Args: df: DataFrame to validate schema: Schema dictionary with field definitions quality_spec: Quality specification from standard Returns: ValidationResult with schema validation errors """ result = ValidationResult() # Get type mapping from standard type_consistency = quality_spec["type_consistency"] type_mapping = type_consistency.get("schema_matching", {}).get( "type_mapping", {} ) # Get field definitions from schema fields = schema.get("fields", []) field_dict = {f["name"]: f for f in fields} # Check each column in DataFrame for col in df.columns: if col not in field_dict: result.add_warning( f"Column '{col}' in data but not in schema definition", "schema_matching", ) continue field_def = field_dict[col] declared_type = field_def.get("type") if not declared_type: continue # Get expected Python types for this field type expected_types = type_mapping.get(declared_type, []) # Check actual column type actual_dtype = str(df[col].dtype) # For object columns, check actual Python types of values if df[col].dtype == "object": non_null = df[col].dropna() if len(non_null) > 0: actual_python_types = set( type(v).__name__ for v in non_null.head(100) ) # Check if any actual type matches expected type_matches = any( actual_type in expected_types for actual_type in actual_python_types ) if not type_matches: result.add_error( f"Column '{col}' declared as '{declared_type}' but contains " f"{', '.join(sorted(actual_python_types))}. Expected: {', '.join(expected_types)}", "schema_matching", ) # For numeric dtypes, check against expected numeric types elif declared_type in ["number", "integer"]: # Check if dtype is numeric if not df[col].dtype in [ "int64", "int32", "float64", "float32", "int", "float", ]: result.add_error( f"Column '{col}' declared as '{declared_type}' but has dtype '{actual_dtype}'", "schema_matching", ) # Check if numeric field has unit if type_consistency.get("schema_matching", {}).get( "numeric_must_have_unit", False ): # Check if the field definition has a unit field_unit = field_def.get("unit") # Field unit can be either a dict (with name, etc.) or None has_unit = field_unit is not None and ( isinstance(field_unit, dict) and field_unit.get("name") or isinstance(field_unit, str) and field_unit ) if not has_unit: result.add_error( f"Numeric field '{col}' must have a unit specified in the field definition", "schema_matching", ) # For string type, check if it's actually string-like elif declared_type == "string": if df[col].dtype != "object" and not df[col].dtype.name.startswith( "string" ): result.add_error( f"Column '{col}' declared as 'string' but has dtype '{actual_dtype}'", "schema_matching", ) # For boolean type elif declared_type == "boolean": if df[col].dtype != "bool": result.add_error( f"Column '{col}' declared as 'boolean' but has dtype '{actual_dtype}'", "schema_matching", ) # Check for fields in schema but missing in data for field_name in field_dict.keys(): if field_name not in df.columns: result.add_warning( f"Field '{field_name}' defined in schema but not found in data", "schema_matching", ) return result
[docs] def _validate_field_value( self, field_name: str, value: Any, field_def: Dict[str, Any] ) -> ValidationResult: """ Validate a specific field value against its definition. Args: field_name: Name of the field value: Value to validate field_def: Field definition from standard Returns: ValidationResult with validation errors """ result = ValidationResult() # Check type expected_type = field_def.get("type") if expected_type == "string": if not isinstance(value, str): result.add_error( f"Expected string, got {type(value).__name__}", field_name ) else: # Check min/max length if "min_length" in field_def and len(value) < field_def["min_length"]: result.add_error( f"Minimum length is {field_def['min_length']}, got {len(value)}", field_name, ) if "max_length" in field_def and len(value) > field_def["max_length"]: result.add_error( f"Maximum length is {field_def['max_length']}, got {len(value)}", field_name, ) # Check pattern if "pattern" in field_def: if not re.match(field_def["pattern"], value): msg = field_def.get( "validation_message", "Value does not match pattern" ) result.add_error(msg, field_name) elif expected_type == "array": if not isinstance(value, list): result.add_error( f"Expected array, got {type(value).__name__}", field_name ) else: # Check min/max items if "min_items" in field_def and len(value) < field_def["min_items"]: msg = field_def.get( "validation_message", f"Minimum {field_def['min_items']} items required", ) result.add_error(msg, field_name) if "max_items" in field_def and len(value) > field_def["max_items"]: result.add_error( f"Maximum {field_def['max_items']} items allowed", field_name ) elif expected_type == "url": if not isinstance(value, str): result.add_error( f"Expected URL string, got {type(value).__name__}", field_name ) elif not re.match(r"^https?://", value): result.add_error("URL must start with http:// or https://", field_name) return result
[docs] def sanitize_resource_name(self, name: str) -> str: r""" Sanitize resource name to match the required pattern ^[a-z0-9\-_.]+$. The resource name must only contain: - Lowercase letters (a-z) - Numbers (0-9) - Hyphens (-) - Underscores (_) - Dots (.) Args: name: Raw name string to sanitize Returns: Sanitized name matching the required pattern Example: >>> validator = StandardValidator() >>> validator.sanitize_resource_name("My Resource Name!") 'my_resource_name' >>> validator.sanitize_resource_name("Test@123") 'test123' """ # Handle None or empty input if not name: return "resource" # Convert to string if not already name = str(name) # Convert to lowercase name = name.lower() # Replace spaces with underscores name = name.replace(" ", "_") # Remove or replace invalid characters # Keep only lowercase letters, numbers, hyphens, underscores, and dots name = re.sub(r"[^a-z0-9\-_.]", "", name) # Ensure name doesn't start or end with dots name = name.strip(".") # Ensure name is not empty after sanitization if not name: name = "resource" return name
[docs] def validate_and_sanitize_resource_name( self, name: str, auto_fix: bool = False ) -> Tuple[bool, str, Optional[str]]: """ Validate a resource name and optionally sanitize it. Args: name: Resource name to validate auto_fix: If True, return sanitized name; if False, just validate Returns: Tuple of (is_valid, original_or_sanitized_name, suggestion) - is_valid: Whether the original name is valid - original_or_sanitized_name: Original name if valid/not auto_fix, sanitized if auto_fix - suggestion: Sanitized name suggestion if original is invalid, None otherwise Example: >>> validator = StandardValidator() >>> is_valid, name, suggestion = validator.validate_and_sanitize_resource_name("Invalid Name!") >>> print(f"Valid: {is_valid}, Suggestion: {suggestion}") Valid: False, Suggestion: invalid_name >>> is_valid, name, _ = validator.validate_and_sanitize_resource_name("valid-name") >>> print(f"Valid: {is_valid}, Name: {name}") Valid: True, Name: valid-name """ # Handle None or empty input if not name: sanitized = self.sanitize_resource_name(name) if auto_fix: return False, sanitized, None else: return False, name if name is not None else "", sanitized # Convert to string if not already name = str(name) pattern = r"^[a-z0-9\-_.]+$" is_valid = bool(re.match(pattern, name)) if is_valid: return True, name, None sanitized = self.sanitize_resource_name(name) if auto_fix: return False, sanitized, None else: return False, name, sanitized
[docs] def _determine_level(self, result: ValidationResult) -> str: """ Determine validation level based on errors and warnings. Args: result: ValidationResult to evaluate Returns: Validation level badge string """ levels = self.standard["validation_levels"] if not result.errors and not result.warnings: return levels["strict"]["badge"] elif not result.errors and len(result.warnings) <= 5: return levels["standard"]["badge"] elif len(result.errors) <= 10: return levels["basic"]["badge"] else: return levels["invalid"]["badge"]
[docs] def get_help_url(self, topic: str) -> Optional[str]: """ Get help URL for a specific topic. Args: topic: Topic name (e.g., 'frictionless_spec', 'qudt_units') Returns: URL string or None if not found """ return self.standard.get("help_urls", {}).get(topic)