Source code for trailpack.pyst.api.client

"""PyST API client for suggest endpoint.

Based on pyst_client.simple.client pattern:
https://github.com/cauldron/pyst-client/blob/main/pyst_client/simple/client.py
"""

import httpx
from typing import Optional, Any

from trailpack.pyst.api.config import config
from trailpack.pyst.api.requests.suggest import SuggestRequest


[docs] class PystSuggestClient: """ Singleton client for PyST concept suggest endpoint. This client manages HTTP connections to the PyST API and provides a simple interface for concept suggestions. Example: >>> client = PystSuggestClient.get_instance() >>> results = await client.suggest("carbon", "en") """
[docs] _instance: Optional["PystSuggestClient"] = None
[docs] _api_client: Optional[httpx.AsyncClient] = None
def __new__(cls): """Singleton pattern - only one instance exists.""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): """Initialize the client with configuration.""" if self._api_client is None: self._initialize_client()
[docs] def _initialize_client(self): """Initialize the HTTP client with configuration.""" headers = { "Content-Type": "application/json", } # Set up authentication if token is provided # PyST uses "x-pyst-auth-token" header, not Bearer token if config.auth_token: headers["x-pyst-auth-token"] = config.auth_token # Create httpx AsyncClient import httpx self._api_client = httpx.AsyncClient( base_url=config.host.rstrip('/'), timeout=config.timeout, headers=headers, follow_redirects=True )
[docs] def _ensure_client_valid(self): """ Ensure the client is valid for the current event loop. If the client is closed or tied to a different event loop, reinitialize it. This is necessary for Streamlit compatibility. """ import asyncio if self._api_client is None: self._initialize_client() return if self._api_client.is_closed: # Client is closed, reinitialize self._initialize_client() return # For Streamlit: always recreate client if we're in a new event loop # This avoids "bound to different event loop" errors try: current_loop = asyncio.get_running_loop() except RuntimeError: # No running loop, client is fine return # Store the loop ID when client is created, check if it changed if not hasattr(self, '_loop_id'): self._loop_id = id(current_loop) elif self._loop_id != id(current_loop): # Different event loop detected, recreate client try: # Try to close old client gracefully import asyncio asyncio.create_task(self._api_client.aclose()) except Exception: pass finally: self._api_client = None self._initialize_client() self._loop_id = id(current_loop)
@classmethod
[docs] def get_instance(cls) -> "PystSuggestClient": """ Get the singleton instance of the client. Returns: PystSuggestClient instance """ if cls._instance is None: cls._instance = cls() return cls._instance
[docs] async def suggest( self, query: str, language: str ) -> list[dict[str, Any]]: """ Get concept suggestions from PyST API. Args: query: Search query string language: ISO 639-1 language code (en, de, es, fr, pt, it, da) Returns: List of concept suggestions Raises: ApiException: If the API request fails ValueError: If request parameters are invalid Example: >>> client = PystSuggestClient.get_instance() >>> results = await client.suggest("carbon", "en") >>> for concept in results: ... print(concept["label"]) """ # Ensure client is valid for current event loop self._ensure_client_valid() # Validate request parameters request = SuggestRequest(query=query, language=language) params = request.to_query_params() # Make API request using httpx AsyncClient response = await self._api_client.get( "/concepts/suggest/", params=params ) # Raise for HTTP errors response.raise_for_status() # Return JSON response return response.json()
[docs] async def get_concept(self, iri: str) -> dict[str, Any]: """ Get concept details from PyST API by IRI. Args: iri: The concept IRI (e.g., "http://data.europa.eu/xsp/cn2024/010021000090") Returns: Concept details including SKOS definition Raises: ApiException: If the API request fails ValueError: If IRI is invalid Example: >>> client = PystSuggestClient.get_instance() >>> concept = await client.get_concept("http://example.com/concept") >>> print(concept.get("http://www.w3.org/2004/02/skos/core#definition")) """ # Ensure client is valid for current event loop self._ensure_client_valid() # Validate IRI is not empty if not iri or not iri.strip(): raise ValueError("IRI cannot be empty") # Make API request using httpx AsyncClient # The endpoint format is /api/v1/concepts/{iri} response = await self._api_client.get( f"/concepts/{iri}" ) # Raise for HTTP errors response.raise_for_status() # Return JSON response return response.json()
[docs] async def close(self): """Close the API client connection.""" if self._api_client is not None: await self._api_client.aclose() self._api_client = None
async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close()
# Convenience function to get client instance
[docs] def get_suggest_client() -> PystSuggestClient: """ Get the singleton PyST suggest client instance. Returns: PystSuggestClient instance Example: >>> from trailpack.pyst.api.client import get_suggest_client >>> client = get_suggest_client() >>> results = await client.suggest("sustainability", "en") """ return PystSuggestClient.get_instance()