Source code for app.core.artifacts

"""Artifacts API router - full CRUD for Kumiho artifacts."""

from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status

from app.dependencies import get_kumiho_client
from app.models.artifact import ArtifactCreate, ArtifactUpdate, ArtifactResponse
from app.models.common import parse_kref_params
from app.core.kumiho_http import translate_kumiho_exception
from typing import Any
import kumiho

router = APIRouter()


[docs] @router.get("", response_model=List[ArtifactResponse]) async def list_artifacts( revision_kref: str = Query(..., description="Revision kref (e.g., 'MyProject/Assets/Hero.model?r=1')"), r: Optional[int] = Query(None, description="Revision number"), client: Any = Depends(get_kumiho_client) ): """List all artifacts for a revision.""" try: params = parse_kref_params(revision_kref, r=r) revision = kumiho.get_revision(params.kref_uri) artifacts = revision.get_artifacts() return [ArtifactResponse.from_domain(artifact) for artifact in artifacts] except Exception as e: raise translate_kumiho_exception(e)
[docs] @router.get("/by-location", response_model=List[ArtifactResponse]) async def get_artifacts_by_location( location: str = Query(..., description="Artifact location/path"), client: Any = Depends(get_kumiho_client) ): """Find all artifacts at a specific location.""" try: artifacts = kumiho.get_artifacts_by_location(location=location) return [ArtifactResponse.from_domain(artifact) for artifact in artifacts] except Exception as e: raise translate_kumiho_exception(e)
[docs] @router.post("", response_model=ArtifactResponse, status_code=status.HTTP_201_CREATED) async def create_artifact( request: ArtifactCreate, client: Any = Depends(get_kumiho_client) ): """Create a new artifact for a revision.""" try: params = parse_kref_params(request.revision_kref) revision = kumiho.get_revision(params.kref_uri) artifact = revision.create_artifact( name=request.name, location=request.location, metadata=request.metadata or None, ) return ArtifactResponse.from_domain(artifact) except Exception as e: raise translate_kumiho_exception(e)
[docs] @router.get("/by-kref", response_model=ArtifactResponse) async def get_artifact( revision_kref: Optional[str] = Query(None, description="Revision kref"), name: Optional[str] = Query(None, description="Artifact name"), kref: Optional[str] = Query(None, description="Artifact kref"), r: Optional[int] = Query(None, description="Revision number"), client: Any = Depends(get_kumiho_client) ): """Get a specific artifact by kref or by revision kref + name.""" try: if kref: params = parse_kref_params(kref) artifact = kumiho.get_artifact(params.kref_uri) return ArtifactResponse.from_domain(artifact) if not revision_kref or not name: raise HTTPException(status_code=400, detail="revision_kref and name are required") params = parse_kref_params(revision_kref, r=r) revision = kumiho.get_revision(params.kref_uri) artifact = revision.get_artifact(name=name) return ArtifactResponse.from_domain(artifact) except HTTPException: raise except Exception as e: raise translate_kumiho_exception(e)
[docs] @router.patch("/by-kref", response_model=ArtifactResponse) async def update_artifact_metadata( kref: str = Query(..., description="Artifact kref"), request: Optional[ArtifactUpdate] = None, client: Any = Depends(get_kumiho_client) ): """Update an artifact's metadata.""" try: params = parse_kref_params(kref) artifact = kumiho.get_artifact(params.kref_uri) updated = artifact.set_metadata(request.metadata) return ArtifactResponse.from_domain(updated) except Exception as e: raise translate_kumiho_exception(e)
[docs] @router.delete("/by-kref", status_code=status.HTTP_204_NO_CONTENT) async def delete_artifact( kref: str = Query(..., description="Artifact kref"), force: bool = Query(False, description="Force deletion"), client: Any = Depends(get_kumiho_client) ): """Delete an artifact.""" try: params = parse_kref_params(kref) artifact = kumiho.get_artifact(params.kref_uri) artifact.delete(force=force) except Exception as e: raise translate_kumiho_exception(e)
[docs] @router.post("/deprecate", response_model=ArtifactResponse) async def deprecate_artifact( kref: str = Query(..., description="Artifact kref"), deprecated: bool = Query(True, description="Deprecated status"), client: Any = Depends(get_kumiho_client) ): """Deprecate or restore an artifact.""" try: params = parse_kref_params(kref) artifact = kumiho.get_artifact(params.kref_uri) artifact.set_deprecated(deprecated) return ArtifactResponse.from_domain(artifact) except Exception as e: raise translate_kumiho_exception(e)