Source code for trailpack.ui.streamlit_app

"""Streamlit UI application for trailpack - Excel to PyST mapper."""

import sys
from pathlib import Path

# Add parent directory to path for Streamlit Cloud deployment
# This ensures that trailpack modules can be imported even when the package isn't installed
[docs] _current_dir = Path(__file__).resolve().parent
[docs] _repo_root = _current_dir.parent.parent
if str(_repo_root) not in sys.path: sys.path.insert(0, str(_repo_root)) # Load .env file before importing any trailpack modules try: from dotenv import load_dotenv
[docs] env_path = _repo_root / ".env"
if env_path.exists(): load_dotenv(env_path) print(f"Loaded .env from: {env_path}") except ImportError: print("python-dotenv not installed, skipping .env loading") import asyncio import base64 import tempfile import json from typing import Dict, List, Optional, Any from datetime import datetime from urllib.parse import quote import streamlit as st import pandas as pd import openpyxl from trailpack.excel import ExcelReader from trailpack.io.smart_reader import SmartDataReader from trailpack.pyst.api.requests.suggest import SUPPORTED_LANGUAGES from trailpack.pyst.api.client import get_suggest_client from trailpack.packing.datapackage_schema import DataPackageSchema, COMMON_LICENSES from trailpack.validation import StandardValidator from trailpack.config import ( build_mapping_config, build_metadata_config, export_mapping_json, export_metadata_json, generate_config_filename, )
[docs] ICON_PATH = Path(__file__).parent / "icon.svg"
[docs] PAGE_ICON = str(ICON_PATH) if ICON_PATH.is_file() else "📦"
[docs] LOGO_BASE64 = ( base64.b64encode(ICON_PATH.read_bytes()).decode("utf-8") if ICON_PATH.is_file() else None )
[docs] def iri_to_web_url(iri: str, language: str = "en") -> str: """ Convert an IRI to a vocab.sentier.dev web page URL. Args: iri: The IRI (e.g., "https://vocab.sentier.dev/Geonames/A") language: Language code (default: "en") Returns: Web page URL (e.g., "https://vocab.sentier.dev/web/concept/...?concept_scheme=...&language=en") Example: >>> iri_to_web_url("https://vocab.sentier.dev/Geonames/A", "en") 'https://vocab.sentier.dev/web/concept/https%3A%2F%2Fvocab.sentier.dev%2FGeonames%2FA?concept_scheme=https%3A%2F%2Fvocab.sentier.dev%2FGeonames&language=en' """ # Extract the concept scheme from the IRI # For IRIs like "https://vocab.sentier.dev/{namespace}/{concept}", # the scheme is "https://vocab.sentier.dev/{namespace}/" # For IRIs like "https://vocab.sentier.dev/{namespace}/{type}/{term}", # the scheme is still "https://vocab.sentier.dev/{namespace}/" parts = iri.split("/") if len(parts) >= 5 and parts[2] == "vocab.sentier.dev": # Scheme is base_url + namespace (first path segment) + trailing slash concept_scheme = "/".join(parts[:4]) + "/" else: # Fallback: use base URL as concept scheme for non-standard IRIs concept_scheme = "/".join(parts[:3]) + "/" if len(parts) >= 3 else iri # URL encode the IRI and concept scheme encoded_iri = quote(iri, safe="") encoded_scheme = quote(concept_scheme, safe="") # Construct the web URL web_url = f"https://vocab.sentier.dev/web/concept/{encoded_iri}" return web_url
# Page configuration st.set_page_config( page_title="Trailpack - Excel to PyST Mapper", page_icon=PAGE_ICON, layout="wide", initial_sidebar_state="expanded", ) # Custom CSS for consistent typography st.markdown( """ <style> @import url('https://fonts.googleapis.com/css2?family=Montserrat:wght@300;400;500;600;700&display=swap'); @import url('https://fonts.googleapis.com/css2?family=Material+Symbols+Outlined:opsz,wght,FILL,GRAD@20..48,100..700,0..1,-50..200'); /* Apply Montserrat to all text, but exclude Material Icons */ html, body, [class*="css"], [class*="st-"], h1, h2, h3, h4, h5, h6, p, label, input, textarea, select, .stMarkdown, .stText, .stButton, .stTextInput, .stSelectbox, .stTextArea, .stRadio, .stCheckbox, .stMetric, .stDataFrame, .stCaption { font-family: 'Montserrat', sans-serif !important; } /* Ensure Material Icons elements use the correct font */ span[data-testid*="stIcon"], button span[class*="material"], [class*="material-icons"], [data-testid*="collapsedControl"] span, [data-testid="baseButton-header"] span { font-family: 'Material Symbols Outlined' !important; } </style> """, unsafe_allow_html=True, ) # Initialize session state if "page" not in st.session_state: st.session_state.page = 1 if "file_bytes" not in st.session_state: st.session_state.file_bytes = None if "file_name" not in st.session_state: st.session_state.file_name = None if "language" not in st.session_state: st.session_state.language = "en" if "temp_path" not in st.session_state: st.session_state.temp_path = None if "reader" not in st.session_state: st.session_state.reader = None if "selected_sheet" not in st.session_state: st.session_state.selected_sheet = None if "df" not in st.session_state: st.session_state.df = None if "column_mappings" not in st.session_state: st.session_state.column_mappings = {} if "column_descriptions" not in st.session_state: st.session_state.column_descriptions = {} if "concept_definitions" not in st.session_state: st.session_state.concept_definitions = {} if "suggestions_cache" not in st.session_state: st.session_state.suggestions_cache = {} if "view_object" not in st.session_state: st.session_state.view_object = {} if "general_details" not in st.session_state: st.session_state.general_details = {} if "resource_name" not in st.session_state: st.session_state.resource_name = None if "resource_name_confirmed" not in st.session_state: st.session_state.resource_name_confirmed = False if "resource_name_accepted" not in st.session_state: st.session_state.resource_name_accepted = False if "resource_name_editing" not in st.session_state: st.session_state.resource_name_editing = False
[docs] def render_sidebar_header(): """Render the Trailpack branding block in the sidebar.""" if LOGO_BASE64: st.markdown( f""" <div style="display:flex;align-items:center;gap:0.75rem;margin-bottom:1.5rem;"> <img src="data:image/svg+xml;base64,{LOGO_BASE64}" alt="Trailpack logo" style="width:56px;height:auto;" /> <div style="display:flex;flex-direction:column;"> <span style="font-size:1.3rem;font-weight:600;line-height:1;">Trailpack</span> <span style="font-size:0.95rem;color:#6b7280;line-height:1.2;"> Excel to PyST Mapper </span> </div> </div> """, unsafe_allow_html=True, ) else: st.title("Trailpack") st.markdown("### Excel to PyST Mapper")
[docs] def on_sheet_change(): """Callback when sheet selection changes. Clears cached data.""" # This runs BEFORE the page renders, so sidebar will show updated sheet name selected = st.session_state.get("sheet_radio") if selected and selected != st.session_state.selected_sheet: old_sheet = st.session_state.selected_sheet st.session_state.selected_sheet = selected st.session_state.suggestions_cache = {} st.session_state.column_mappings = {} st.session_state.view_object = {} # Clear search queries initialized flag for the old sheet only # This allows returning to this sheet to re-fetch suggestions if "search_queries_initialized" in st.session_state and old_sheet: st.session_state.search_queries_initialized.pop(old_sheet, None)
[docs] def sanitize_search_query(query: str) -> str: """ Sanitize search query for safe API calls. Replaces special characters that might cause issues with the PyST API. Converts problematic characters to spaces and cleans up the result. Args: query: The original search query string Returns: Sanitized query string safe for API calls """ import re # Replace forward slashes, backslashes, and other special characters with spaces # Keep alphanumeric, spaces, hyphens, underscores, and periods sanitized = re.sub(r"[^\w\s\-.]", " ", query) # Collapse multiple spaces into single space sanitized = re.sub(r"\s+", " ", sanitized) # Strip leading/trailing whitespace sanitized = sanitized.strip() return sanitized
[docs] def extract_first_word(query: str) -> str: """ Extract the first word from a string, stopping at the first space. This is used to populate search fields with just the first word of a column name instead of the entire name, making searches more focused. Args: query: The input string Returns: The first word (substring up to first space), or empty string if input is empty """ if not query: return "" # Split at first space and take the first part parts = query.split(" ", 1) return parts[0] if parts else ""
[docs] def clear_column_cache_entries(column: str, prefix: str = "") -> None: """ Clear all cache entries for a column from suggestions cache. Args: column: Column name to clear cache for prefix: Optional prefix to append (e.g., "unit_" for unit caches) """ cache_prefix = f"{column}_{prefix}" if prefix else f"{column}_" suggestions_keys = st.session_state.suggestions_cache.keys() cache_keys_to_remove = [k for k in suggestions_keys if k.startswith(cache_prefix)] for cache_key in cache_keys_to_remove: st.session_state.suggestions_cache.pop(cache_key, None)
[docs] def load_excel_data(sheet_name: str) -> pd.DataFrame: """Load Excel data into a pandas DataFrame using SmartDataReader.""" if st.session_state.temp_path is None: return None try: # Use SmartDataReader for optimized reading smart_reader = SmartDataReader(st.session_state.temp_path) # Store engine info in session state for display st.session_state.reader_engine = smart_reader.engine st.session_state.estimated_memory = smart_reader.estimate_memory() # Read data with optimal engine df = smart_reader.read(sheet_name=sheet_name) return df except Exception as e: st.error(f"Error loading Excel data: {e}") return None
[docs] async def fetch_suggestions_async( column_name: str, language: str ) -> List[Dict[str, str]]: """Fetch PyST suggestions for a column name.""" try: # Sanitize the search query to prevent API errors from special characters sanitized_query = sanitize_search_query(column_name) # Skip if sanitization resulted in empty string if not sanitized_query: st.warning(f"Column name '{column_name}' could not be sanitized for search") return [] client = get_suggest_client() suggestions = await client.suggest(sanitized_query, language) # Debug: Log first suggestion structure to understand response format if suggestions and len(suggestions) > 0: import sys print( f"DEBUG - First suggestion keys: {suggestions[0].keys() if isinstance(suggestions[0], dict) else dir(suggestions[0])}", file=sys.stderr, ) print(f"DEBUG - First suggestion: {suggestions[0]}", file=sys.stderr) return suggestions[:5] # Limit to top 5 except Exception as e: st.warning(f"Could not fetch suggestions for '{column_name}': {e}") return []
[docs] def fetch_suggestions_sync(column_name: str, language: str) -> List[Dict[str, str]]: """ Synchronous wrapper for fetching suggestions. Handles event loop management for Streamlit compatibility. Creates a new event loop if needed to avoid "Event loop is closed" errors. """ try: # Try to get the current event loop try: loop = asyncio.get_event_loop() if loop.is_closed(): # Loop is closed, create a new one loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: # No event loop exists, create a new one loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Run the async function return loop.run_until_complete(fetch_suggestions_async(column_name, language)) except Exception as e: st.warning(f"Could not fetch suggestions for '{column_name}': {e}") return []
[docs] async def fetch_concept_async(iri: str, language: str) -> Optional[str]: """Fetch concept definition from PyST API.""" try: client = get_suggest_client() concept = await client.get_concept(iri) # Extract SKOS definition # The response format is: "http://www.w3.org/2004/02/skos/core#definition": [{"@language": "en", "@value": "..."}] definitions = concept.get("http://www.w3.org/2004/02/skos/core#definition", []) if not definitions: return None # Try to find definition in the requested language for definition in definitions: if isinstance(definition, dict) and definition.get("@language") == language: return definition.get("@value") # Fallback: return first available definition if definitions and isinstance(definitions[0], dict): return definitions[0].get("@value") return None except Exception as e: import sys print(f"DEBUG - Error fetching concept {iri}: {e}", file=sys.stderr) return None
[docs] def fetch_concept_sync(iri: str, language: str) -> Optional[str]: """ Synchronous wrapper for fetching concept definition. Handles event loop management for Streamlit compatibility. """ try: # Try to get the current event loop try: loop = asyncio.get_event_loop() if loop.is_closed(): # Loop is closed, create a new one loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: # No event loop exists, create a new one loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Run the async function return loop.run_until_complete(fetch_concept_async(iri, language)) except Exception as e: import sys print(f"DEBUG - Error in fetch_concept_sync for {iri}: {e}", file=sys.stderr) return None
[docs] def generate_view_object() -> Dict[str, Any]: """Generate the internal view object with all mappings.""" if not st.session_state.selected_sheet or st.session_state.df is None: return {} # Create dataset name dataset_name = f"{Path(st.session_state.file_name).stem}_{st.session_state.selected_sheet.replace(' ', '_')}" # Build columns dict columns_dict = {} # Get column names from ExcelReader (source of truth) columns = st.session_state.reader.columns(st.session_state.selected_sheet) for column in columns: # Get sample values (first 10 non-null values) sample_values = ( st.session_state.df[column].dropna().head(10).astype(str).tolist() ) # Get suggestions from cache suggestions = st.session_state.suggestions_cache.get(column, []) # Normalize suggestions to ensure they have id and label keys normalized_suggestions = [] for s in suggestions: try: if isinstance(s, dict): s_id = ( s.get("id") or s.get("id_") or s.get("uri") or s.get("concept_id") ) s_label = s.get("label") or s.get("name") or s.get("title") else: s_id = ( getattr(s, "id", None) or getattr(s, "id_", None) or getattr(s, "uri", None) ) s_label = getattr(s, "label", None) or getattr(s, "name", None) if s_id and s_label: normalized_suggestions.append( {"id": str(s_id), "label": str(s_label)} ) except Exception: continue # Get selected mapping selected_id = st.session_state.column_mappings.get(column) selected_suggestion = None if selected_id: for s in normalized_suggestions: if s["id"] == selected_id: selected_suggestion = {"label": s["label"], "id": s["id"]} break columns_dict[column] = { "values": sample_values, "mapping_to_pyst": { "suggestions": normalized_suggestions, "selected": selected_suggestion if selected_suggestion else selected_id, }, } # Build final view object view_object = { "sheet_name": st.session_state.selected_sheet, "dataset_name": dataset_name, "columns": columns_dict, } return view_object
# ===== SIDEBAR ===== with st.sidebar: render_sidebar_header() st.markdown("---") st.markdown("### Steps:") # Step indicators with icons if st.session_state.page >= 1: st.markdown( "**1. Upload & Select Language**" if st.session_state.page > 1 else "> **1. Upload & Select Language**" ) else: st.markdown("1. Upload & Select Language") if st.session_state.page >= 2: st.markdown( "**2. Select Sheet**" if st.session_state.page > 2 else "> **2. Select Sheet**" ) else: st.markdown("2. Select Sheet") if st.session_state.page >= 3: st.markdown( "**3. Map Columns**" if st.session_state.page > 3 else "> **3. Map Columns**" ) else: st.markdown("3. Map Columns") if st.session_state.page >= 4: st.markdown( "**4. General Details**" if st.session_state.page > 4 else "> **4. General Details**" ) else: st.markdown("4. General Details") if st.session_state.page >= 5: st.markdown("> **5. Review Parquet File**") else: st.markdown("5. Review Parquet File") st.markdown("---") # Show current file info if available if st.session_state.file_name: st.markdown("### Current File") st.info(f" {st.session_state.file_name}") if st.session_state.selected_sheet: st.info(f"****{st.session_state.selected_sheet}") # ===== MAIN CONTENT ===== # Page 1: File Upload and Language Selection if st.session_state.page == 1: st.title("Step 1: Upload File and Select Language") st.markdown( "Upload an Excel file and select the language for PyST concept mapping." ) # Show current file if already uploaded if st.session_state.file_name: st.success(f"Current file: **{st.session_state.file_name}**")
[docs] change_file = st.checkbox("Upload a different file", value=False)
else: change_file = True # File upload uploaded_file = None if change_file or not st.session_state.file_name: uploaded_file = st.file_uploader( "Choose an Excel file", type=["xlsx", "xlsm", "xltx", "xltm"], help="Upload an Excel file to map its columns to PyST concepts", ) # Language selection language = st.selectbox( "Select Language", options=sorted(list(SUPPORTED_LANGUAGES)), index=( sorted(list(SUPPORTED_LANGUAGES)).index("en") if "en" in SUPPORTED_LANGUAGES else 0 ), help="Select the language for PyST concept suggestions", ) st.session_state.language = language # Show file info if file exists if st.session_state.temp_path and st.session_state.temp_path.exists(): file_size_mb = st.session_state.temp_path.stat().st_size / (1024 * 1024) st.info( f"**File:** {st.session_state.file_name} | **Size:** {file_size_mb:.2f} MB" ) # Navigation col1, col2, col3 = st.columns([1, 1, 1]) with col3: # Enable Next button if file exists has_file = uploaded_file is not None or st.session_state.file_name is not None if has_file: if st.button("Next ", type="primary", use_container_width=True): # Save file only if newly uploaded if uploaded_file is not None: st.session_state.file_bytes = uploaded_file.getvalue() st.session_state.file_name = uploaded_file.name # Save to temp file with tempfile.NamedTemporaryFile( delete=False, suffix=".xlsx" ) as tmp: tmp.write(st.session_state.file_bytes) st.session_state.temp_path = Path(tmp.name) # Load Excel reader try: st.session_state.reader = ExcelReader( st.session_state.temp_path ) except Exception as e: st.error(f"Error loading Excel file: {e}") st.stop() navigate_to(2) else: st.button( "Next ", type="primary", disabled=True, use_container_width=True, help="Please upload a file first", ) # Page 2: Sheet Selection elif st.session_state.page == 2: st.title("Step 2: Select Sheet") st.markdown(f"**File:** {st.session_state.file_name}") if st.session_state.reader: sheets = st.session_state.reader.sheets() st.markdown("Select the sheet you want to process:") # Get current index for default selection current_sheet = st.session_state.selected_sheet default_index = 0 if current_sheet and current_sheet in sheets: default_index = sheets.index(current_sheet) selected_sheet = st.radio( "Available Sheets", options=sheets, index=default_index, key="sheet_radio", on_change=on_sheet_change, label_visibility="collapsed", ) # Update session state if not already updated by callback if st.session_state.selected_sheet != selected_sheet: st.session_state.selected_sheet = selected_sheet # Show preview of the selected sheet if selected_sheet: st.markdown("### Data Preview") with st.spinner("Loading data preview..."): df = load_excel_data(selected_sheet) if df is not None: st.session_state.df = df # Show basic info col1, col2, col3 = st.columns(3) with col1: st.metric("Rows", len(df)) with col2: # Use ExcelReader for column count (consistent source) column_count = len( st.session_state.reader.columns(selected_sheet) ) st.metric("Columns", column_count) with col3: st.metric("Non-empty cells", df.notna().sum().sum()) # Show SmartDataReader engine info if hasattr(st.session_state, "reader_engine") and hasattr( st.session_state, "estimated_memory" ): st.caption( f"**Engine:** {st.session_state.reader_engine} | " f"**Est. Memory:** {st.session_state.estimated_memory}" ) # Show first few rows st.markdown("**First 10 rows:**") st.dataframe(df.head(10), use_container_width=True) # Navigation col1, col2, col3 = st.columns([1, 1, 1]) with col1: if st.button("Back", use_container_width=True): navigate_to(1) with col3: if st.session_state.selected_sheet: if st.button("Next ", type="primary", use_container_width=True): navigate_to(3) else: st.button("Next ", type="primary", disabled=True, use_container_width=True) # Page 3: Column Mapping elif st.session_state.page == 3: st.title("Step 3: Map Columns to PyST Concepts") st.markdown( f"**File:** {st.session_state.file_name} | **Sheet:** {st.session_state.selected_sheet}" ) if st.session_state.df is not None: # Show data preview at the top with st.expander("📊 View Data Preview", expanded=False): st.dataframe(st.session_state.df.head(20), use_container_width=True) st.markdown("### Column Mappings") st.markdown("Select a PyST concept for each column.") # Get column names from ExcelReader (consistent with sheet selection on Page 2) columns = st.session_state.reader.columns(st.session_state.selected_sheet) # Display column mappings in a clean table-like format st.markdown("---") # Pre-populate search queries with column names on first load of the sheet if "search_queries_initialized" not in st.session_state: st.session_state.search_queries_initialized = {} # Initialize search queries for this sheet if not already done sheet_key = st.session_state.selected_sheet if sheet_key not in st.session_state.search_queries_initialized: # Show a brief loading message while pre-fetching with st.spinner("Pre-loading ontology suggestions for columns..."): for column in columns: # Initialize search query with first word of sanitized column name # This makes search more focused than using the entire column name search_key = f"search_{column}" if search_key not in st.session_state: sanitized_column = sanitize_search_query(column) first_word = extract_first_word(sanitized_column) st.session_state[search_key] = first_word # Pre-fetch suggestions for the first word # Use explicit cache key format for pre-populated suggestions first_word = st.session_state[search_key] cache_key = f"{column}_{first_word}" # {column}_{search_query} where search_query == first word if cache_key not in st.session_state.suggestions_cache: suggestions = fetch_suggestions_sync( first_word, st.session_state.language ) st.session_state.suggestions_cache[cache_key] = suggestions[:5] # Mark this sheet as initialized st.session_state.search_queries_initialized[sheet_key] = True for column in columns: with st.container(): col1, col2 = st.columns([1, 2]) with col1: st.markdown(f"**{column}**") # Show sample values sample_values = ( st.session_state.df[column] .dropna() .head(3) .astype(str) .tolist() ) if sample_values: st.caption(f"Sample: {', '.join(sample_values[:3])}") with col2: # Check if column is numeric is_numeric = pd.api.types.is_numeric_dtype( st.session_state.df[column] ) # Ontology search field (for all columns) # Use the value from session state without fallback to avoid re-populating after clear search_key = f"search_{column}" default_search_value = st.session_state.get(search_key, "") search_query = st.text_input( "Search for ontology", value=default_search_value, key=f"search_input_{column}", placeholder="Type and press Enter to search...", label_visibility="visible", ) # Update session state with current search query st.session_state[search_key] = search_query # Fetch and display ontology suggestions if search_query and len(search_query) >= 2: cache_key = f"{column}_{search_query}" if cache_key not in st.session_state.suggestions_cache: suggestions = fetch_suggestions_sync( search_query, st.session_state.language ) st.session_state.suggestions_cache[cache_key] = suggestions[ :5 ] # Limit to 5 # Show suggestions dropdown suggestions = st.session_state.suggestions_cache.get( cache_key, [] ) if suggestions: valid_suggestions = [] for s in suggestions: try: if isinstance(s, dict): s_id = ( s.get("id") or s.get("id_") or s.get("uri") or s.get("concept_id") ) s_label = ( s.get("label") or s.get("name") or s.get("title") ) else: s_id = ( getattr(s, "id", None) or getattr(s, "id_", None) or getattr(s, "uri", None) ) s_label = getattr(s, "label", None) or getattr( s, "name", None ) if s_id and s_label: valid_suggestions.append( {"id": s_id, "label": s_label} ) except Exception: continue if valid_suggestions: options = [s["label"] for s in valid_suggestions] option_ids = [s["id"] for s in valid_suggestions] # Get current selection index current_mapping = st.session_state.column_mappings.get( column ) default_idx = 0 if current_mapping in option_ids: default_idx = option_ids.index(current_mapping) selected = st.selectbox( "Select from results", options=options, index=default_idx, key=f"select_{column}", label_visibility="visible", ) # Store selection selected_idx = options.index(selected) selected_id = option_ids[selected_idx] selected_label = options[selected_idx] st.session_state.column_mappings[column] = selected_id # Fetch concept definition from API if not already cached concept_cache_key = f"concept_{selected_id}" if ( concept_cache_key not in st.session_state.concept_definitions ): concept_definition = fetch_concept_sync( selected_id, st.session_state.language ) if concept_definition: st.session_state.concept_definitions[ concept_cache_key ] = concept_definition # Display selected concept with link web_url = iri_to_web_url( selected_id, st.session_state.language ) # Get the definition to display concept_definition = ( st.session_state.concept_definitions.get( concept_cache_key ) ) col_info, col_clear = st.columns([4, 1]) with col_info: if concept_definition: st.info( f"**Selected:** {selected_label}\n\n" f"**Description:** {concept_definition}\n\n" f"[🔗 View on vocab.sentier.dev]({web_url})" ) else: st.info( f"**Selected:** {selected_label}\n\n" f"[🔗 View on vocab.sentier.dev]({web_url})" ) with col_clear: if st.button( "Clear", key=f"clear_{column}", help="Remove ontology selection", ): # Clear the ontology mapping st.session_state.column_mappings.pop( column, None ) # Clear concept definition cache st.session_state.concept_definitions.pop( concept_cache_key, None ) # Clear search field text search_key = f"search_{column}" search_input_key = f"search_input_{column}" if search_key in st.session_state: del st.session_state[search_key] if search_input_key in st.session_state: del st.session_state[search_input_key] # Clear all cache entries for this column clear_column_cache_entries(column) st.rerun() # Description/Comment field - directly underneath ontology results # Show for all columns but with different labels and requirements has_ontology = ( st.session_state.column_mappings.get(column) is not None ) # Check if we have a concept definition from the API concept_cache_key = ( f"concept_{st.session_state.column_mappings.get(column)}" if has_ontology else None ) concept_definition = ( st.session_state.concept_definitions.get(concept_cache_key) if concept_cache_key else None ) # Determine label and help text based on ontology status if has_ontology and concept_definition: # Ontology selected with API definition description_label = "Comment (optional)" description_help = ( "Optional: Add additional comments or " "notes about this column." ) is_required = False elif has_ontology and not concept_definition: # Ontology selected but no API definition description_label = "Column Description (optional)" description_help = ( "Optional: Add a custom description for this column. " "The selected ontology has no description available " "from the API." ) is_required = False else: # No ontology selected - description is required description_label = "Column Description *" description_help = ( "Required: No ontology mapping selected. " "Please provide a description for this column." ) is_required = True column_description = st.text_area( description_label, value=st.session_state.column_descriptions.get(column, ""), placeholder=( "Describe what this column represents..." if is_required else "Add optional comments or notes..." ), help=description_help, key=f"description_{column}", height=80, ) # Store description in session state if column_description: st.session_state.column_descriptions[column] = ( column_description ) else: st.session_state.column_descriptions.pop(column, None) # Show warning only if description is required and missing if is_required and not column_description: st.warning( "Please provide either an ontology mapping or a " "description for this column." ) # If numeric, show unit search field below ontology if is_numeric: # Unit search field # Pre-populate with "unit" as default search term unit_search_key = f"search_unit_{column}" default_unit_search_value = st.session_state.get( unit_search_key, "" ) unit_search_query = st.text_input( "Search for unit", value=default_unit_search_value, key=f"search_unit_input_{column}", placeholder="Type and press Enter to search...", label_visibility="visible", ) # Show warning if no search has been made yet or no results show_warning = True # Update session state with current unit search query st.session_state[unit_search_key] = unit_search_query # Fetch and display unit suggestions if unit_search_query and len(unit_search_query) >= 2: cache_key = f"{column}_unit_{unit_search_query}" if cache_key not in st.session_state.suggestions_cache: suggestions = fetch_suggestions_sync( unit_search_query, st.session_state.language ) st.session_state.suggestions_cache[cache_key] = ( suggestions[:5] ) # Show unit suggestions dropdown suggestions = st.session_state.suggestions_cache.get( cache_key, [] ) if suggestions: valid_suggestions = [] for s in suggestions: try: if isinstance(s, dict): s_id = ( s.get("id") or s.get("id_") or s.get("uri") or s.get("concept_id") ) s_label = ( s.get("label") or s.get("name") or s.get("title") ) else: s_id = ( getattr(s, "id", None) or getattr(s, "id_", None) or getattr(s, "uri", None) ) s_label = getattr( s, "label", None ) or getattr(s, "name", None) if s_id and s_label: valid_suggestions.append( {"id": s_id, "label": s_label} ) except Exception: continue if valid_suggestions: # Hide warning when selectbox is shown show_warning = False options = [s["label"] for s in valid_suggestions] option_ids = [s["id"] for s in valid_suggestions] # Get current selection index for unit current_unit_mapping = ( st.session_state.column_mappings.get( f"{column}_unit" ) ) default_idx = 0 if current_unit_mapping in option_ids: default_idx = option_ids.index( current_unit_mapping ) selected = st.selectbox( "Select unit from results", options=options, index=default_idx, key=f"select_unit_{column}", label_visibility="visible", ) # Store unit selection selected_idx = options.index(selected) selected_unit_id = option_ids[selected_idx] selected_unit_label = options[selected_idx] st.session_state.column_mappings[ f"{column}_unit" ] = selected_unit_id # Fetch concept definition for unit from API if not already cached unit_concept_cache_key = ( f"concept_{selected_unit_id}" ) if ( unit_concept_cache_key not in st.session_state.concept_definitions ): unit_concept_definition = fetch_concept_sync( selected_unit_id, st.session_state.language ) if unit_concept_definition: st.session_state.concept_definitions[ unit_concept_cache_key ] = unit_concept_definition # Display selected unit with clickable link to web page web_url = iri_to_web_url( selected_unit_id, st.session_state.language ) # Get the definition to display unit_concept_definition = ( st.session_state.concept_definitions.get( unit_concept_cache_key ) ) col_unit_info, col_unit_clear = st.columns([4, 1]) with col_unit_info: if unit_concept_definition: st.info( f"**Selected unit:** {selected_unit_label}\n\n" f"**Description:** {unit_concept_definition}\n\n" f"[🔗 View on vocab.sentier.dev]({web_url})" ) else: st.info( f"**Selected unit:** {selected_unit_label}\n\n[🔗 View on vocab.sentier.dev]({web_url})" ) with col_unit_clear: if st.button( "Clear", key=f"clear_unit_{column}", help="Remove unit selection", ): # Clear the unit mapping st.session_state.column_mappings.pop( f"{column}_unit", None ) # Clear concept definition cache st.session_state.concept_definitions.pop( unit_concept_cache_key, None ) # Clear unit search field text by deleting the widget state unit_search_key = f"search_unit_{column}" unit_search_input_key = ( f"search_unit_input_{column}" ) if unit_search_key in st.session_state: del st.session_state[unit_search_key] if ( unit_search_input_key in st.session_state ): del st.session_state[ unit_search_input_key ] # Clear unit cache entries clear_column_cache_entries( column, prefix="unit_" ) st.rerun() # Show warning if no selectbox was displayed if show_warning: st.warning( "This column contains numerical data and requires a unit " "to be selected." ) st.markdown("---") # Generate view object internally (not displayed) st.session_state.view_object = generate_view_object() # Navigation and actions col1, col2, col3 = st.columns([1, 1, 1]) with col1: if st.button("Back", use_container_width=True): navigate_to(2) with col3: # Check if all columns have either ontology or description columns = st.session_state.reader.columns(st.session_state.selected_sheet) missing_info = [] missing_units = [] for column in columns: has_ontology = st.session_state.column_mappings.get(column) is not None has_description = bool(st.session_state.column_descriptions.get(column)) # Check if we have a concept definition from the API concept_cache_key = ( f"concept_{st.session_state.column_mappings.get(column)}" if has_ontology else None ) has_concept_definition = ( bool(st.session_state.concept_definitions.get(concept_cache_key)) if concept_cache_key else False ) # Column is valid if: # 1. Has ontology with API definition (description not needed), OR # 2. Has ontology without API definition (description optional per requirements), OR # 3. Has no ontology but has manual description # Only invalid if missing BOTH ontology AND description if not has_ontology and not has_description: # Missing both ontology and description missing_info.append(column) # Check if numerical columns have units is_numeric = pd.api.types.is_numeric_dtype(st.session_state.df[column]) if is_numeric: has_unit = ( st.session_state.column_mappings.get(f"{column}_unit") is not None ) if not has_unit: missing_units.append(column) can_proceed = len(missing_info) == 0 and len(missing_units) == 0 if can_proceed: if st.button("Next ", type="primary", use_container_width=True): # Generate view object internally (not displayed) st.session_state.view_object = generate_view_object() navigate_to(4) else: st.button( "Next ", type="primary", disabled=True, use_container_width=True ) error_messages = [] if missing_info: error_messages.append( "The following columns need either an ontology mapping " f"or a description: {', '.join(missing_info)}" ) if missing_units: error_messages.append( "The following numerical columns require a unit to be " f"selected: {', '.join(missing_units)}" ) for error_msg in error_messages: st.error(error_msg) # Page 4: General Details elif st.session_state.page == 4: st.title("Step 4: General Details") st.markdown("Provide metadata for your data package.") # Initialize schema schema = DataPackageSchema() # Get field definitions for the form field_defs = schema.field_definitions st.markdown("### Basic Information") # Package Name (required) name_field = field_defs.get("name", {}) package_name = st.text_input( name_field.get("label", "Package Name") + " *", value=st.session_state.general_details.get("name", ""), placeholder=name_field.get("placeholder", ""), help=name_field.get("help", name_field.get("description", "")), key="input_name", ) # Validate package name in real-time if not empty if package_name: is_valid, error_msg = schema.validate_package_name(package_name) if not is_valid: st.error(f"{error_msg}") else: st.session_state.general_details["name"] = package_name elif package_name == "": # Clear from session state if empty st.session_state.general_details.pop("name", None) # Title (required) title_field = field_defs.get("title", {}) title = st.text_input( title_field.get("label", "Title") + " *", value=st.session_state.general_details.get("title", ""), placeholder=title_field.get("placeholder", ""), help=title_field.get("description", "") + " (Required)", key="input_title", ) if title: st.session_state.general_details["title"] = title elif title == "": st.session_state.general_details.pop("title", None) # Description (optional) desc_field = field_defs.get("description", {}) description = st.text_area( desc_field.get("label", "Description"), value=st.session_state.general_details.get("description", ""), placeholder=desc_field.get("placeholder", ""), help=desc_field.get("description", ""), key="input_description", ) if description: st.session_state.general_details["description"] = description elif description == "": st.session_state.general_details.pop("description", None) # Version (optional) version_field = field_defs.get("version", {}) version = st.text_input( version_field.get("label", "Version"), value=st.session_state.general_details.get("version", ""), placeholder=version_field.get("placeholder", ""), help=version_field.get("description", ""), key="input_version", ) # Validate version if not empty if version: is_valid, error_msg = schema.validate_version(version) if not is_valid: st.error(f"{error_msg}") else: st.session_state.general_details["version"] = version elif version == "": st.session_state.general_details.pop("version", None) st.markdown("---") st.markdown("### 📝 Resource Name Configuration") st.markdown( """ The resource name identifies your data file in the package. It must follow specific naming rules: - Only **lowercase letters** (a-z) - **Numbers** (0-9) - **Hyphens** (-), **underscores** (_), and **dots** (.) - No spaces or special characters **Example:** `solar-panel-data`, `emissions_2024`, `my.dataset.v1` """ ) # Initialize validator validator = StandardValidator() # Get original filename + sheet name and check validity if st.session_state.file_name and st.session_state.selected_sheet: # Combine filename and sheet name file_stem = Path(st.session_state.file_name).stem sheet_name = st.session_state.selected_sheet.replace(" ", "_") original_name = f"{file_stem}_{sheet_name}" is_valid_original, _, suggested_name = ( validator.validate_and_sanitize_resource_name(original_name) ) # Only show the error/suggestion if not yet accepted if not st.session_state.resource_name_accepted: # Show source file and sheet info st.info( f" **Source:** `{file_stem}` (file) + `{st.session_state.selected_sheet}` (sheet)" ) # Show original combined name with validation status if is_valid_original: st.success(f"**Combined name is valid:** `{original_name}`") # If valid and not set, use it if not st.session_state.resource_name: st.session_state.resource_name = original_name st.session_state.resource_name_accepted = True else: # Show the problem prominently st.error(f"**Combined name has issues:** `{original_name}`") st.warning( """ **Issues found:** - Uppercase letters → converted to lowercase - Spaces → replaced with underscores - Special characters → removed """ ) # Check if we're in edit mode or display mode if not st.session_state.resource_name_editing: # Display mode: show suggestion with Accept/buttons st.markdown(f"**Suggested sanitized name:** `{suggested_name}`") col1, col2, col3 = st.columns([1, 1, 2]) with col1: if st.button( "Accept", use_container_width=True, type="primary", key="btn_accept_suggestion", ): st.session_state.resource_name = suggested_name st.session_state.resource_name_confirmed = True st.session_state.resource_name_accepted = True st.session_state.resource_name_editing = False st.session_state.general_details["resource_name"] = ( suggested_name ) st.rerun() with col2: if st.button( "Edit", use_container_width=True, key="btn_edit_suggestion" ): st.session_state.resource_name = suggested_name st.session_state.resource_name_editing = True st.rerun() else: # mode: show text input with validation resource_name_edit = st.text_input( "Resource Name", value=st.session_state.resource_name or suggested_name, placeholder="my-data-resource", help="the resource name. Must contain only lowercase letters, numbers, hyphens, underscores, and dots.", key="resource_name_edit_suggestion", ) if resource_name_edit: is_valid_edit, _, suggestion_edit = ( validator.validate_and_sanitize_resource_name( resource_name_edit ) ) if is_valid_edit: st.success(f"**`{resource_name_edit}`** is valid!") else: st.error( f"**`{resource_name_edit}`** contains invalid characters." ) st.markdown(f"**Suggested fix:** `{suggestion_edit}`") # Show buttons for editing col1, col2, col3 = st.columns([1, 1, 2]) with col1: if st.button( "Accept", use_container_width=True, type="primary", key="btn_accept_edit", disabled=not is_valid_edit, ): st.session_state.resource_name = resource_name_edit st.session_state.resource_name_confirmed = True st.session_state.resource_name_accepted = True st.session_state.resource_name_editing = False st.session_state.general_details["resource_name"] = ( resource_name_edit ) st.rerun() with col2: if st.button( "Cancel", use_container_width=True, key="btn_cancel_edit", ): st.session_state.resource_name_editing = False st.rerun() # Show resource name input (either already accepted or for manual editing) # Only show input section if name has been accepted or is being manually entered if st.session_state.resource_name_accepted or st.session_state.resource_name: st.markdown("---") # If accepted, show as info with option to edit if ( st.session_state.resource_name_accepted and st.session_state.resource_name_confirmed ): st.success(f"**Resource name:** `{st.session_state.resource_name}`") if st.button("Resource Name", key="btn_edit_resource_name"): st.session_state.resource_name_accepted = False st.session_state.resource_name_editing = False # Reset editing flag st.rerun() else: # Resource name input with real-time validation resource_name_input = st.text_input( "Resource Name *", value=st.session_state.resource_name or "", placeholder="my-data-resource", help="Enter or edit the resource name. Must follow the naming rules above. (Required)", key="resource_name_input_meta", ) # Validate the entered/edited name in real-time if resource_name_input: is_valid_input, _, suggestion = ( validator.validate_and_sanitize_resource_name(resource_name_input) ) if is_valid_input: st.success(f"**`{resource_name_input}`** is a valid resource name!") # Show accept button for valid name col_btn1, col_btn2, col_btn3 = st.columns([1, 1, 2]) with col_btn1: if st.button( "Accept", use_container_width=True, type="primary", key="btn_accept_manual", ): st.session_state.resource_name = resource_name_input st.session_state.resource_name_confirmed = True st.session_state.resource_name_accepted = True st.session_state.general_details["resource_name"] = ( resource_name_input ) st.rerun() with col_btn2: if st.button( "Reset", help="Reset to sanitized filename + sheet", use_container_width=True, key="btn_reset", ): if ( st.session_state.file_name and st.session_state.selected_sheet ): file_stem = Path(st.session_state.file_name).stem sheet_name = st.session_state.selected_sheet.replace( " ", "_" ) original_name = f"{file_stem}_{sheet_name}" st.session_state.resource_name = ( validator.sanitize_resource_name(original_name) ) st.session_state.resource_name_accepted = False st.rerun() else: st.error( f"**`{resource_name_input}`** contains invalid characters." ) st.markdown(f"**Suggested fix:** `{suggestion}`") # Show buttons for invalid name col_btn1, col_btn2, col_btn3 = st.columns([1, 1, 2]) with col_btn1: if st.button( "Use Suggestion", use_container_width=True, type="primary", key="btn_use_suggestion", ): st.session_state.resource_name = suggestion st.session_state.resource_name_accepted = True st.session_state.resource_name_confirmed = True st.session_state.general_details["resource_name"] = ( suggestion ) st.rerun() with col_btn2: if st.button( "Reset", help="Reset to sanitized filename + sheet", use_container_width=True, key="btn_reset_invalid", ): if ( st.session_state.file_name and st.session_state.selected_sheet ): file_stem = Path(st.session_state.file_name).stem sheet_name = st.session_state.selected_sheet.replace( " ", "_" ) original_name = f"{file_stem}_{sheet_name}" st.session_state.resource_name = ( validator.sanitize_resource_name(original_name) ) st.session_state.resource_name_accepted = False st.rerun() st.session_state.resource_name_confirmed = False st.session_state.general_details.pop("resource_name", None) else: st.session_state.resource_name_confirmed = False st.session_state.general_details.pop("resource_name", None) st.markdown("### Additional Information") # Profile (optional) profile_field = field_defs.get("profile", {}) profile_options = profile_field.get("options", []) profile_labels = [opt["label"] for opt in profile_options] profile_values = [opt["value"] for opt in profile_options] current_profile = st.session_state.general_details.get( "profile", profile_field.get("default", "") ) default_index = 0 if current_profile in profile_values: default_index = profile_values.index(current_profile) profile_label = st.selectbox( profile_field.get("label", "Profile"), options=profile_labels, index=default_index, help=profile_field.get("description", ""), key="input_profile", ) profile = profile_values[profile_labels.index(profile_label)] st.session_state.general_details["profile"] = profile # Keywords (optional) keywords_field = field_defs.get("keywords", {}) keywords_str = st.text_input( keywords_field.get("label", "Keywords"), value=", ".join(st.session_state.general_details.get("keywords", [])), placeholder=keywords_field.get("placeholder", ""), help=(keywords_field.get("description") or "") + " (comma-separated)", key="input_keywords", ) if keywords_str: keywords = [k.strip() for k in keywords_str.split(",") if k.strip()] st.session_state.general_details["keywords"] = keywords elif keywords_str == "": st.session_state.general_details.pop("keywords", None) # Homepage (optional) homepage_field = field_defs.get("homepage", {}) homepage = st.text_input( homepage_field.get("label", "Homepage"), value=st.session_state.general_details.get("homepage", ""), placeholder=homepage_field.get("placeholder", ""), help=homepage_field.get("description", ""), key="input_homepage", ) # Validate homepage if not empty if homepage: is_valid, error_msg = schema.validate_url(homepage) if not is_valid: st.error(f"{error_msg}") else: st.session_state.general_details["homepage"] = homepage elif homepage == "": st.session_state.general_details.pop("homepage", None) # Repository (optional) repository_field = field_defs.get("repository", {}) repository = st.text_input( repository_field.get("label", "Repository"), value=st.session_state.general_details.get("repository", ""), placeholder=repository_field.get("placeholder", ""), help=repository_field.get("description", ""), key="input_repository", ) # Validate repository if not empty if repository: is_valid, error_msg = schema.validate_url(repository) if not is_valid: st.error(f"{error_msg}") else: st.session_state.general_details["repository"] = repository elif repository == "": st.session_state.general_details.pop("repository", None) # Created date (optional, pre-filled with current date) created_field = field_defs.get("created", {}) default_created = st.session_state.general_details.get( "created", datetime.now().strftime("%Y-%m-%d") ) created = st.date_input( created_field.get("label", "Created Date"), value=( datetime.strptime(default_created, "%Y-%m-%d").date() if default_created else datetime.now().date() ), help=created_field.get("description", ""), key="input_created", ) if created: st.session_state.general_details["created"] = created.strftime("%Y-%m-%d") # Modified date (optional) modified_field = field_defs.get("modified", {}) # Get the stored value or None stored_modified = st.session_state.general_details.get("modified") modified_value = None if stored_modified: try: modified_value = datetime.strptime(stored_modified, "%Y-%m-%d").date() except: modified_value = None modified = st.date_input( modified_field.get("label", "Modified Date"), value=modified_value, help=modified_field.get("description", ""), key="input_modified", ) if modified: st.session_state.general_details["modified"] = modified.strftime("%Y-%m-%d") elif not modified: st.session_state.general_details.pop("modified", None) st.markdown("### Licenses *") st.caption("At least one license is required") # License selection license_options = ["None"] + list(COMMON_LICENSES.keys()) + ["Custom"] current_licenses = st.session_state.general_details.get("licenses", []) # Display existing licenses if current_licenses: st.markdown("**Current Licenses:**") for idx, lic in enumerate(current_licenses): col1, col2 = st.columns([4, 1]) with col1: st.text( f"{lic.get('name', 'Unknown')} - {lic.get('title', 'No title')}" ) with col2: if st.button("🗑️", key=f"delete_license_{idx}"): current_licenses.pop(idx) st.session_state.general_details["licenses"] = current_licenses st.rerun() # Add new license st.markdown("**Add License:**") license_choice = st.selectbox( "Select a license", options=license_options, key="license_select", label_visibility="collapsed", ) if license_choice != "None": if license_choice == "Custom": col1, col2 = st.columns(2) with col1: custom_license_name = st.text_input( "License Name", key="custom_license_name", placeholder="MIT" ) with col2: custom_license_title = st.text_input( "License Title", key="custom_license_title", placeholder="MIT License", ) custom_license_url = st.text_input( "License URL", key="custom_license_url", placeholder="https://opensource.org/licenses/MIT", ) if st.button("Add Custom License"): if custom_license_name: new_license = { "name": custom_license_name, "title": ( custom_license_title if custom_license_title else custom_license_name ), "path": custom_license_url if custom_license_url else None, } if "licenses" not in st.session_state.general_details: st.session_state.general_details["licenses"] = [] st.session_state.general_details["licenses"].append(new_license) st.rerun() else: if st.button(f"Add {license_choice}"): license_info = COMMON_LICENSES[license_choice] if "licenses" not in st.session_state.general_details: st.session_state.general_details["licenses"] = [] st.session_state.general_details["licenses"].append(license_info.copy()) st.rerun() st.markdown("### Contributors *") st.caption("At least one contributor is required") # Display existing contributors current_contributors = st.session_state.general_details.get("contributors", []) if current_contributors: st.markdown("**Current Contributors:**") for idx, contrib in enumerate(current_contributors): col1, col2 = st.columns([4, 1]) with col1: contrib_text = f"{contrib.get('name', 'Unknown')} ({contrib.get('role', 'contributor')})" if contrib.get("email"): contrib_text += f" - {contrib['email']}" st.text(contrib_text) with col2: if st.button("🗑️", key=f"delete_contributor_{idx}"): current_contributors.pop(idx) st.session_state.general_details["contributors"] = ( current_contributors ) st.rerun() # Add new contributor st.markdown("**Add Contributor:**") col1, col2 = st.columns(2) with col1: contrib_name = st.text_input("Name", key="contrib_name", placeholder="Jane Doe") with col2: contrib_role = st.selectbox( "Role", options=["author", "contributor", "maintainer", "publisher", "wrangler"], key="contrib_role", ) col1, col2 = st.columns(2) with col1: contrib_email = st.text_input( "Email (optional)", key="contrib_email", placeholder="jane@example.com" ) with col2: contrib_org = st.text_input( "Organization (optional)", key="contrib_org", placeholder="Example Org" ) if st.button("Add Contributor"): if contrib_name: new_contributor = {"name": contrib_name, "role": contrib_role} if contrib_email: new_contributor["email"] = contrib_email if contrib_org: new_contributor["organization"] = contrib_org if "contributors" not in st.session_state.general_details: st.session_state.general_details["contributors"] = [] st.session_state.general_details["contributors"].append(new_contributor) st.rerun() st.markdown("### Sources *") st.caption("At least one source is required") # Display existing sources current_sources = st.session_state.general_details.get("sources", []) if current_sources: st.markdown("**Current Sources:**") for idx, source in enumerate(current_sources): col1, col2 = st.columns([4, 1]) with col1: source_text = f"{source.get('title', 'Unknown')}" if source.get("path"): source_text += f" - {source['path']}" st.text(source_text) with col2: if st.button("🗑️", key=f"delete_source_{idx}"): current_sources.pop(idx) st.session_state.general_details["sources"] = current_sources st.rerun() # Add new source st.markdown("**Add Source:**") source_title = st.text_input( "Source Title", key="source_title", placeholder="Original Dataset" ) col1, col2 = st.columns(2) with col1: source_path = st.text_input( "Source URL (optional)", key="source_path", placeholder="https://example.com/data", ) with col2: source_desc = st.text_input( "Source Description (optional)", key="source_desc", placeholder="Description of the source", ) if st.button("Add Source"): if source_title: new_source = {"title": source_title} if source_path: new_source["path"] = source_path if source_desc: new_source["description"] = source_desc if "sources" not in st.session_state.general_details: st.session_state.general_details["sources"] = [] st.session_state.general_details["sources"].append(new_source) st.rerun() # Check if required fields are filled (per DataPackageSchema.REQUIRED_FIELDS) # Required: name, title, resources (auto), licenses, created (auto), contributors, sources missing_fields = [] if "name" not in st.session_state.general_details: missing_fields.append("Package Name") if "title" not in st.session_state.general_details: missing_fields.append("Title") if not st.session_state.general_details.get("licenses"): missing_fields.append("At least one License") if not st.session_state.general_details.get("contributors"): missing_fields.append("At least one Contributor") if not st.session_state.general_details.get("sources"): missing_fields.append("At least one Source") has_required_fields = len(missing_fields) == 0 # Check if all filled fields are valid all_valid = True if "name" in st.session_state.general_details: is_valid, _ = schema.validate_package_name( st.session_state.general_details["name"] ) all_valid = all_valid and is_valid if "version" in st.session_state.general_details: is_valid, _ = schema.validate_version( st.session_state.general_details["version"] ) all_valid = all_valid and is_valid if "homepage" in st.session_state.general_details: is_valid, _ = schema.validate_url(st.session_state.general_details["homepage"]) all_valid = all_valid and is_valid if "repository" in st.session_state.general_details: is_valid, _ = schema.validate_url( st.session_state.general_details["repository"] ) all_valid = all_valid and is_valid # Navigation col1, col2, col3 = st.columns([1, 1, 1]) with col1: if st.button("Back", use_container_width=True): navigate_to(3) with col3: if has_required_fields and all_valid: # Export section on page 4 export_name = st.text_input( "Export file name", value=f"{st.session_state.general_details['name']}.parquet", help="Name for the exported Parquet file", key="export_filename", ) if st.button( "📦 Generate Parquet File", type="primary", use_container_width=True ): with st.spinner("Building data package..."): try: from trailpack.packing.export_service import DataPackageExporter from trailpack.packing.packing import read_parquet exporter = DataPackageExporter( df=st.session_state.df, column_mappings=st.session_state.column_mappings, general_details=st.session_state.general_details, sheet_name=st.session_state.selected_sheet, file_name=st.session_state.file_name, suggestions_cache=st.session_state.suggestions_cache, column_descriptions=st.session_state.column_descriptions, ) with tempfile.NamedTemporaryFile( delete=False, suffix=".parquet" ) as tmp: output_path, quality_level, validation_result = ( exporter.export(tmp.name) ) # Store in session state for display st.session_state.output_path = output_path st.session_state.quality_level = quality_level st.session_state.validation_result = validation_result st.session_state.exporter = exporter st.session_state.export_complete = True st.session_state.export_name = export_name # Navigate to page 5 to show results navigate_to(5) except Exception as e: st.error(f"Export failed: {e}") st.session_state.export_complete = False else: st.button( "📦 Generate Parquet File", type="primary", disabled=True, use_container_width=True, ) if not has_required_fields: st.caption( f"Please fill in the required fields: {', '.join(missing_fields)}" ) elif not all_valid: st.caption("Please fix validation errors in the form") # Page 5: Review Parquet File elif st.session_state.page == 5: st.title("Step 5: Review Parquet File") # Only show results if export is complete if st.session_state.get("export_complete", False): st.balloons() from trailpack.packing.packing import read_parquet # Read back the exported file exported_df, exported_metadata = read_parquet(st.session_state.output_path) # Display success message with quality level quality_level = st.session_state.get("quality_level", "VALID") st.success( f"Data package created successfully!\n\n**Validation Level:** {quality_level}" ) # Display metadata in JSON format FIRST st.markdown("### Embedded Metadata") st.json(exported_metadata) # Display data sample SECOND st.markdown("### 📊 Data Sample (first 10 rows)") st.dataframe(exported_df.head(10), use_container_width=True) # Get export name from session state export_name = st.session_state.get( "export_name", f"{st.session_state.general_details['name']}.parquet" ) # Offer download st.markdown("### Downloads") with open(st.session_state.output_path, "rb") as f: parquet_data = f.read() st.download_button( label="⬇️ Download Parquet File", data=parquet_data, file_name=export_name, mime="application/vnd.apache.parquet", use_container_width=True, ) # Validation report download if st.session_state.get("validation_result") and st.session_state.get( "exporter" ): validation_report = st.session_state.exporter.generate_validation_report( st.session_state.validation_result ) report_filename = ( f"{export_name.replace('.parquet', '')}_validation_report.txt" ) st.download_button( label="Download Validation Report", data=validation_report, file_name=report_filename, mime="text/plain", use_container_width=True, ) # Config downloads st.markdown("### Configuration Files") st.markdown("Download reusable configuration files for reproducible processing") # Build configs from session state mapping_config = build_mapping_config( column_mappings=st.session_state.column_mappings, file_name=st.session_state.file_name, sheet_name=st.session_state.selected_sheet, language=st.session_state.language, ) metadata_config = build_metadata_config( general_details=st.session_state.general_details ) # Generate filenames package_name = st.session_state.general_details.get("name") mapping_filename = generate_config_filename( config_type="mapping", package_name=package_name, file_name=st.session_state.file_name, sheet_name=st.session_state.selected_sheet, ) metadata_filename = generate_config_filename( config_type="metadata", package_name=package_name, file_name=st.session_state.file_name, ) # Download buttons in two columns col1, col2 = st.columns(2) with col1: st.download_button( label="Download Mapping Config", data=export_mapping_json(mapping_config), file_name=mapping_filename, mime="application/json", use_container_width=True, help="Column-to-ontology mappings for reuse with CLI or other datasets", ) with col2: st.download_button( label="Download Metadata Config", data=export_metadata_json(metadata_config), file_name=metadata_filename, mime="application/json", use_container_width=True, help="Package metadata configuration for reproducible exports", ) else: # If no export has been completed, show message and back button st.info( "No parquet file has been generated yet. Please go back to page 4 and click 'Generate Parquet File'." ) # Navigation st.markdown("---") col1, col2, col3 = st.columns([1, 1, 1]) with col1: if st.button("Back", use_container_width=True): navigate_to(4) # Footer st.markdown("---") st.markdown( '<div style="text-align: center; color: #888;">Trailpack - Excel to PyST Mapper</div>', unsafe_allow_html=True, )