"""
Export service for converting UI data to Frictionless Data Package in Parquet.
"""
from typing import Any, Dict, List, Tuple, Optional
from pathlib import Path
import pandas as pd
import re
from trailpack.packing.datapackage_schema import (
Field,
Unit,
Resource,
MetaDataBuilder,
DataPackageSchema,
)
from trailpack.packing.packing import Packing
from trailpack.validation.standard_validator import StandardValidator
[docs]
class DataPackageExporter:
"""Service for exporting UI data to Frictionless Data Package in Parquet."""
def __init__(
self,
df: pd.DataFrame,
column_mappings: Dict[str, str],
general_details: Dict[str, Any],
sheet_name: str,
file_name: str,
suggestions_cache: Dict[str, List] = None,
column_descriptions: Dict[str, str] = None,
standard_version: str = "1.0.0",
):
"""
Initialize with UI session state data.
Args:
df: Pandas DataFrame with the actual data
column_mappings: Mapping of column names to PyST concept IDs
general_details: Metadata from the general details form
sheet_name: Name of the Excel sheet
file_name: Original file name
suggestions_cache: Cache of PyST suggestions with id and label
column_descriptions: User-provided descriptions/comments for columns
standard_version: Trailpack standard version to validate against
"""
[docs]
self.column_mappings = column_mappings
[docs]
self.general_details = general_details
[docs]
self.sheet_name = sheet_name
[docs]
self.file_name = file_name
[docs]
self.suggestions_cache = suggestions_cache or {}
[docs]
self.column_descriptions = column_descriptions or {}
[docs]
self.schema = DataPackageSchema()
[docs]
self.validator = StandardValidator(standard_version)
[docs]
def validate(self) -> Tuple[bool, List[str]]:
"""Validate all inputs before processing."""
errors = []
if self.df is None or self.df.empty:
errors.append("DataFrame is empty or None")
if not self.general_details.get("name"):
errors.append("Package name is required")
if "name" in self.general_details:
is_valid, error_msg = self.schema.validate_package_name(
self.general_details["name"]
)
if not is_valid:
errors.append(f"Invalid package name: {error_msg}")
return len(errors) == 0, errors
[docs]
def _sanitize_resource_name(self, name: str) -> str:
"""
Sanitize resource name to match the pattern ^[a-z0-9\\-_\\.]+$.
The resource name must only contain:
- Lowercase letters (a-z)
- Numbers (0-9)
- Hyphens (-)
- Underscores (_)
- Dots (.)
Args:
name: Raw name string to sanitize
Returns:
Sanitized name matching the required pattern
"""
# Convert to lowercase
name = name.lower()
# Replace spaces with underscores
name = name.replace(" ", "_")
# Remove or replace invalid characters
# Keep only lowercase letters, numbers, hyphens, underscores, and dots
name = re.sub(r"[^a-z0-9\-_.]", "", name)
# Ensure name doesn't start or end with dots
name = name.strip(".")
# Ensure name is not empty after sanitization
if not name:
name = "resource"
return name
[docs]
def build_fields(self) -> List[Field]:
"""Convert column mappings to Field definitions."""
fields = []
for column in self.df.columns:
# Infer type
field_type = self._infer_field_type(self.df[column])
# Get ontology mapping
ontology_id = self.column_mappings.get(column)
# Build unit if numeric
unit = None
if pd.api.types.is_numeric_dtype(self.df[column]):
unit_id = self.column_mappings.get(f"{column}_unit")
if unit_id:
# Find label from suggestions cache
unit_label = self._find_label_for_id(unit_id)
unit = Unit(
name=unit_label or unit_id.split("/")[-1],
long_name=unit_label,
path=unit_id,
)
# Handle numeric fields without unit - use dimensionless
if field_type in ["number", "integer"] and not unit:
unit = Unit(
name="NUM",
long_name="dimensionless number",
path="https://vocab.sentier.dev/units/unit/NUM",
)
# Get description/comment from user input or use default
description = self.column_descriptions.get(
column, f"Column from {self.sheet_name}"
)
field = Field(
name=column,
type=field_type,
description=description,
unit=unit,
rdf_type=ontology_id,
taxonomy_url=ontology_id if ontology_id else None,
)
fields.append(field)
return fields
[docs]
def build_resource(self, fields: List[Field]) -> Resource:
"""Create Resource definition with fields."""
resource_name = self._sanitize_resource_name(
f"{Path(self.file_name).stem}_{self.sheet_name}"
)
return Resource(
name=resource_name,
path=f"{resource_name}.parquet",
title=self.general_details.get("title", self.file_name),
description=self.general_details.get(
"description", f"Data from {self.sheet_name}"
),
format="parquet",
mediatype="application/vnd.apache.parquet",
encoding="utf-8",
profile="tabular-data-resource",
fields=fields,
)
[docs]
def export(
self, output_path: str, validate_standard: bool = True
) -> Tuple[str, Optional[str], Optional[Any]]:
"""
Execute full export workflow and write Parquet.
Args:
output_path: Path where Parquet file will be written
validate_standard: Whether to validate against Trailpack standard (default: True)
Returns:
Tuple of (output_path, quality_level, validation_result)
- output_path: Path to exported Parquet file
- quality_level: Validation level ("STRICT", "STANDARD", "BASIC", "INVALID") or None if validation skipped
- validation_result: Full ValidationResult object for report generation, or None if validation skipped
Raises:
ValueError: If validation fails or data quality issues found
"""
# Validate basic inputs
is_valid, errors = self.validate()
if not is_valid:
raise ValueError(f"Validation failed: {', '.join(errors)}")
# Validate DataFrame for Parquet compatibility
self._validate_dataframe_for_parquet(self.df)
# Build fields
fields = self.build_fields()
# Build resource
resource = self.build_resource(fields)
# Build metadata
metadata = self.build_metadata(resource)
# Validate against Trailpack standard (if enabled)
quality_level = None
validation_result = None
if validate_standard:
validation_result = self.validator.validate_all(
metadata=metadata, df=self.df, mappings=self.column_mappings
)
# Check if validation passed
if not validation_result.is_valid:
error_msg = self._format_validation_errors(validation_result)
raise ValueError(error_msg)
quality_level = (
validation_result.level
) # "STRICT", "STANDARD", "BASIC", or "INVALID"
# Write to Parquet
packer = Packing(data=self.df, meta_data=metadata)
packer.write_parquet(output_path)
return output_path, quality_level, validation_result
[docs]
def _validate_dataframe_for_parquet(self, df: pd.DataFrame) -> None:
"""Validate DataFrame is compatible with Arrow/Parquet format.
Raises:
ValueError: If data quality issues are found (e.g., mixed types in columns)
"""
errors = []
for column in df.columns:
# Check for mixed types in object columns
if df[column].dtype == "object":
non_null_values = df[column].dropna()
if len(non_null_values) == 0:
continue
# Get unique types in the column
types = non_null_values.apply(type).unique()
if len(types) > 1:
type_names = [t.__name__ for t in types]
sample_values = []
for t in types:
sample = non_null_values[non_null_values.apply(type) == t].iloc[
0
]
sample_values.append(f"{t.__name__}: {repr(sample)}")
errors.append(
f"Column '{column}' contains mixed data types: {', '.join(type_names)}.\n"
f" Examples: {' | '.join(sample_values)}\n"
f" Please ensure all values in this column are of the same type."
)
if errors:
error_message = (
"Data quality issues found that prevent Parquet conversion:\n\n"
)
error_message += "\n\n".join(f"{i+1}. {e}" for i, e in enumerate(errors))
error_message += "\n\nPlease clean your data and try again."
raise ValueError(error_message)
[docs]
def _infer_field_type(self, series: pd.Series) -> str:
"""Infer Frictionless field type from pandas Series."""
if pd.api.types.is_integer_dtype(series):
return "integer"
elif pd.api.types.is_float_dtype(series):
return "number"
elif pd.api.types.is_bool_dtype(series):
return "boolean"
elif pd.api.types.is_datetime64_any_dtype(series):
return "datetime"
else:
return "string"
[docs]
def _find_label_for_id(self, concept_id: str) -> Optional[str]:
"""Find label for a PyST concept ID from suggestions cache."""
for cache_key, suggestions in self.suggestions_cache.items():
for s in suggestions:
try:
if isinstance(s, dict):
s_id = s.get("id") or s.get("id_") or s.get("uri")
s_label = s.get("label") or s.get("name")
else:
s_id = getattr(s, "id", None) or getattr(s, "id_", None)
s_label = getattr(s, "label", None) or getattr(s, "name", None)
if str(s_id) == str(concept_id):
return str(s_label) if s_label else None
except Exception:
continue
return None
[docs]
def generate_validation_report(self, validation_result) -> str:
"""
Generate a complete validation report for download.
Includes errors, warnings, and info (data quality metrics).
Args:
validation_result: ValidationResult object from validation
Returns:
Formatted report as string
"""
from datetime import datetime
lines = []
lines.append("=" * 80)
lines.append("TRAILPACK VALIDATION REPORT")
lines.append("=" * 80)
lines.append(f"\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"Dataset: {self.file_name} - {self.sheet_name}")
lines.append(f"Package Name: {self.general_details.get('name', 'N/A')}")
if validation_result.level:
lines.append(f"\nValidation Level: {validation_result.level}")
lines.append(
f"\nValidation Status: {'PASSED' if validation_result.is_valid else 'FAILED'}"
)
# Summary
lines.append("\n" + "=" * 80)
lines.append("SUMMARY")
lines.append("=" * 80)
lines.append(f"Errors: {len(validation_result.errors)}")
lines.append(f"Warnings: {len(validation_result.warnings)}")
lines.append(f"Info Messages: {len(validation_result.info)}")
# Errors
if validation_result.errors:
lines.append("\n" + "=" * 80)
lines.append("ERRORS")
lines.append("=" * 80)
for i, error in enumerate(validation_result.errors, 1):
lines.append(f"{i}. {error}")
# Warnings
if validation_result.warnings:
lines.append("\n" + "=" * 80)
lines.append("WARNINGS")
lines.append("=" * 80)
for i, warning in enumerate(validation_result.warnings, 1):
lines.append(f"{i}. {warning}")
# Info (data quality metrics)
if validation_result.info:
lines.append("\n" + "=" * 80)
lines.append("DATA QUALITY METRICS")
lines.append("=" * 80)
for i, info in enumerate(validation_result.info, 1):
lines.append(f"{i}. {info}")
# Dataset information
lines.append("\n" + "=" * 80)
lines.append("DATASET INFORMATION")
lines.append("=" * 80)
lines.append(f"Rows: {len(self.df)}")
lines.append(f"Columns: {len(self.df.columns)}")
lines.append(f"Columns mapped: {len(self.column_mappings)}")
# Column mappings summary
lines.append("\n" + "=" * 80)
lines.append("COLUMN MAPPINGS")
lines.append("=" * 80)
for col in self.df.columns:
mapping = self.column_mappings.get(col, "Not mapped")
unit = self.column_mappings.get(f"{col}_unit", "")
if unit:
lines.append(f"- {col}: {mapping} (unit: {unit})")
else:
lines.append(f"- {col}: {mapping}")
lines.append("\n" + "=" * 80)
lines.append("END OF REPORT")
lines.append("=" * 80)
return "\n".join(lines)