Source code for debusine.server.workflows.workflow_utils

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Utility functions for workflows."""

import functools
from collections.abc import Collection as AbcCollection
from collections.abc import Sequence
from operator import attrgetter
from typing import Any, TYPE_CHECKING, cast, get_args

from django.db.models import QuerySet

from debusine.artifacts import SourcePackage
from debusine.artifacts.models import (
    ArtifactCategory,
    ArtifactDataWithArch,
    BareDataCategory,
    CollectionCategory,
    DebianBinaryPackage,
    DebianPackageBuildLog,
    DebianSourcePackage,
    DebianSystemTarball,
    DebianUpload,
    DebusinePromise,
    get_architectures,
    get_source_package_name,
)
from debusine.client.models import LookupChildType, LookupResultType
from debusine.db.models import Artifact, ArtifactRelation
from debusine.db.models.lookups import LookupResult
from debusine.server.collections.lookup import reconstruct_lookup
from debusine.tasks import TaskConfigError, ensure_artifact_categories
from debusine.tasks.models import (
    BackendType,
    ExtraExternalRepository,
    ExtraRepository,
    LookupMultiple,
    LookupSingle,
)

if TYPE_CHECKING:
    from debusine.server.workflows import Workflow


[docs] @functools.lru_cache(maxsize=100) def source_package( workflow: "Workflow[Any, Any]", *, configuration_key: str = "source_artifact", ) -> Artifact: """ Retrieve the source package artifact from ``configuration_key``. If the source artifact is a :artifact:`debian:upload`, returns its :artifact:`debian:source-package`. """ artifact = workflow.work_request.lookup_single( attrgetter(configuration_key)(workflow.data), expect_type=LookupChildType.ARTIFACT, ).artifact return locate_debian_source_package(configuration_key, artifact)
[docs] @functools.lru_cache(maxsize=100) def source_package_data( workflow: "Workflow[Any, Any]", *, configuration_key: str = "source_artifact", ) -> DebianSourcePackage: """Return source package artifact data for the workflow.""" return SourcePackage.create_data( source_package(workflow, configuration_key=configuration_key).data )
[docs] def lookup_result_artifact_category(result: LookupResult) -> str: """ Get artifact category from result of looking up an artifact. The result may be either an artifact or a promise. """ if ( result.result_type == LookupResultType.ARTIFACT and result.artifact is not None ): return result.artifact.category elif ( result.result_type == LookupResultType.BARE and result.collection_item is not None and result.collection_item.category == BareDataCategory.PROMISE ): return DebusinePromise(**result.collection_item.data).promise_category else: raise ValueError( f"Cannot determine artifact category for lookup result: {result}" )
[docs] def lookup_result_architectures(result: LookupResult) -> set[str]: """Get binary package architectures present within LookupResult.""" if result.artifact is not None: artifact_data = result.artifact.create_data() if not isinstance(artifact_data, get_args(ArtifactDataWithArch)): raise ValueError(f"Unexpected type: {type(artifact_data).__name__}") return get_architectures(artifact_data) elif result.collection_item is not None: architecture = result.collection_item.data.get("architecture") if type(architecture) is not str: raise ValueError( f"Cannot determine architecture for lookup result: {result}" ) return {architecture} else: raise ValueError( "Unexpected result: must have collection_item or artifact" )
[docs] class ArtifactHasNoBinaryPackageName(Exception): """Raised if it's not possible to determine the artifact's binary name."""
[docs] def lookup_result_binary_package_name(result: LookupResult) -> str: """Get binary package name from result of looking up an artifact.""" binary_package_name: str | None if result.artifact is not None: artifact_data = result.artifact.create_data() match artifact_data: case DebianBinaryPackage(): binary_package_name = artifact_data.deb_fields.get("Package") case _: raise ArtifactHasNoBinaryPackageName(f"{type(artifact_data)}") elif result.collection_item is not None: binary_package_name = result.collection_item.data.get( "binary_package_name" ) else: raise ValueError( "Unexpected result: must have collection_item or artifact" ) if not isinstance(binary_package_name, str): raise ValueError( f"Cannot determine binary package name for lookup result: {result}" ) return binary_package_name
[docs] def filter_artifact_lookup_by_arch( workflow: "Workflow[Any, Any]", lookup: LookupMultiple, architectures: set[str], ) -> LookupMultiple: """Filter an artifact lookup by architecture.""" results = workflow.work_request.lookup_multiple( lookup, expect_type=LookupChildType.ARTIFACT_OR_PROMISE ) relevant: list[str] = [] for result in results: arch_in_lookup = lookup_result_architectures(result) if arch_in_lookup.issubset(architectures): relevant.append( reconstruct_lookup( result, workflow_root=workflow.work_request.workflow_root ) ) elif arch_in_lookup.intersection(architectures): # Uploads are the only artifacts with multiple architectures at # this time. assert result.artifact is not None assert result.artifact.category == ArtifactCategory.UPLOAD for binary_package in follow_artifact_relations( result.artifact, ArtifactRelation.Relations.EXTENDS, ArtifactCategory.BINARY_PACKAGE, ): pkg_data = cast( DebianBinaryPackage, binary_package.create_data() ) pkg_architectures = get_architectures(pkg_data) if pkg_architectures.issubset(architectures): relevant.append(f"{binary_package.id}@artifacts") return LookupMultiple.model_validate(sorted(relevant))
[docs] def get_lookup_architectures( workflow: "Workflow[Any, Any]", lookup: LookupMultiple ) -> set[str]: """ Return set with all the architectures in the artifacts from the lookup. The architectures are extracted from each lookup result using :py:func:`lookup_result_architectures`. """ results = workflow.work_request.lookup_multiple( lookup, expect_type=LookupChildType.ARTIFACT_OR_PROMISE ) architectures: set[str] = set() for result in results: architectures |= lookup_result_architectures(result) return architectures
[docs] def follow_artifact_relations( artifact: Artifact, relation_type: ArtifactRelation.Relations, category: ArtifactCategory, ) -> QuerySet[Artifact]: """Follow relations from artifact to find artifacts of category.""" return Artifact.objects.filter( targeted_by__artifact=artifact, targeted_by__type=relation_type, category=category, ).order_by("id")
[docs] def locate_debian_source_package( configuration_key: str, artifact: Artifact ) -> Artifact: """ Find a :artifact:`debian:source-package` artifact for a workflow. :param configuration_key: The key in the workflow's task data from which the given artifact was looked up. :param artifact: A :artifact:`debian:source-package` or :artifact:`debian:upload` artifact. :return: If ``artifact`` is a :artifact:`debian:source-package`, return it; if it is a :artifact:`debian:upload`, return the related :artifact:`debian:source-package`. """ ensure_artifact_categories( configuration_key=configuration_key, category=artifact.category, expected=[ArtifactCategory.SOURCE_PACKAGE, ArtifactCategory.UPLOAD], ) match artifact.category: case ArtifactCategory.SOURCE_PACKAGE: return artifact case ArtifactCategory.UPLOAD: source_packages = follow_artifact_relations( artifact, ArtifactRelation.Relations.EXTENDS, ArtifactCategory.SOURCE_PACKAGE, ) try: return source_packages.get() except Artifact.DoesNotExist: raise TaskConfigError( f"Unable to find an artifact of category " f"{ArtifactCategory.SOURCE_PACKAGE} with a relationship " f"of type {ArtifactRelation.Relations.EXTENDS} from " f'"{artifact}"' ) except Artifact.MultipleObjectsReturned: raise TaskConfigError( f"Multiple artifacts of category " f"{ArtifactCategory.SOURCE_PACKAGE} with a relationship " f"of type {ArtifactRelation.Relations.EXTENDS} from " f'"{artifact}" found' ) case _ as unreachable: # pragma: no cover raise AssertionError(f"Unexpected artifact category: {unreachable}")
[docs] def locate_debian_source_package_lookup( workflow: "Workflow[Any, Any]", configuration_key: str, lookup: LookupSingle ) -> LookupSingle: """ Return a lookup to a :artifact:`debian:source-package`. If the specified lookup returns a :artifact:`debian:source-package`, return it. If it returns a :artifact:`debian:upload`, find the related :artifact:`debian:source-package` and return a lookup to it. """ artifact = workflow.work_request.lookup_single( lookup, expect_type=LookupChildType.ARTIFACT ).artifact if artifact.category == ArtifactCategory.UPLOAD: source_package = locate_debian_source_package( configuration_key, artifact ) return f"{source_package.id}@artifacts" ensure_artifact_categories( configuration_key=configuration_key, category=artifact.category, expected=[ArtifactCategory.SOURCE_PACKAGE], ) return lookup
[docs] def locate_debian_binary_packages( configuration_key: str, artifacts: Sequence[Artifact] ) -> list[Artifact]: """ Find :artifact:`debian:binary-package` artifacts for a workflow. :param configuration_key: The key in the workflow's task data from which the given artifacts were looked up. :param artifacts: A sequence of :artifact:`debian:binary-package` or :artifact:`debian:upload` artifacts. :return: A list of artifacts: for each element of ``artifacts``, if it is a :artifact:`debian:binary-package`, return it; if it is a :artifact:`debian:upload`, return all the related :artifact:`debian:binary-package` artifacts. """ binary_packages: list[Artifact] = [] for i, artifact in enumerate(artifacts): ensure_artifact_categories( configuration_key=f"{configuration_key}[{i}]", category=artifact.category, expected=[ArtifactCategory.BINARY_PACKAGE, ArtifactCategory.UPLOAD], ) match artifact.category: case ArtifactCategory.BINARY_PACKAGE: binary_packages.append(artifact) case ArtifactCategory.UPLOAD: binary_packages += follow_artifact_relations( artifact, ArtifactRelation.Relations.EXTENDS, ArtifactCategory.BINARY_PACKAGE, ) case _ as unreachable: # pragma: no cover raise AssertionError( f"Unexpected artifact category: {unreachable}" ) return binary_packages
[docs] def get_source_package_names( results: Sequence[LookupResult], *, configuration_key: str, artifact_expected_categories: AbcCollection[ArtifactCategory], ) -> list[str]: """ Return a sorted list of source package names from results. It ensures that: - The :py:class:`LookupResult` objects contain either an artifact or promise. - Artifacts belong to the artifact_expected_categories. - If :py:class:`LookupResult` is a promise: extracts the name from the promise data ``source_package_name``. :param results: A sequence of :py:class:`LookupResult` objects representing artifacts to be processed. Each entry is expected to be either an artifact or a promise. :param configuration_key: A string used by :py:meth:`ensure_artifact_categories` for the exception message. :param artifact_expected_categories: valid :py:class:`ArtifactCategory` that artifacts must belong to. :return: A sorted list of source package names. """ source_package_names = set() for result in results: # lookup_multiple expect_type: only artifacts or promises match result.result_type: case LookupResultType.ARTIFACT: assert result.artifact is not None category = result.artifact.category ensure_artifact_categories( configuration_key=configuration_key, category=category, expected=artifact_expected_categories, ) artifact_data = result.artifact.create_data() assert isinstance( artifact_data, ( DebianSourcePackage, DebianUpload, DebianBinaryPackage, DebianPackageBuildLog, ), ) source_package_names.add(get_source_package_name(artifact_data)) case _: # Makes coverage happy # It's a promise. assert result.result_type == LookupResultType.BARE assert result.collection_item is not None ensure_artifact_categories( configuration_key=configuration_key, category=result.collection_item.data["promise_category"], expected=artifact_expected_categories, ) if ( package_name := result.collection_item.data.get( "source_package_name" ) ) is not None: source_package_names.add(package_name) return sorted(source_package_names)
[docs] def get_available_architectures( workflow: "Workflow[Any, Any]", *, vendor: str, codename: str ) -> set[str]: """Get architectures available for use with this vendor/codename.""" architectures = set() for result in workflow.work_request.lookup_multiple( LookupMultiple.model_validate( {"collection": vendor, "data__codename": codename} ), default_category=CollectionCategory.ENVIRONMENTS, expect_type=LookupChildType.ARTIFACT, ): architectures.add(result.artifact.data.get("architecture")) if not architectures: raise TaskConfigError( f"Unable to find any environments for {vendor}:{codename}" ) architectures.add("all") return architectures
[docs] def configure_for_overlay_suite( workflow: "Workflow[Any, Any]", *, extra_repositories: Sequence[ExtraRepository] | None, vendor: str, codename: str, environment: LookupSingle, backend: BackendType, architecture: str, try_variant: str, ) -> list[ExtraRepository] | None: """Return any needed extra repository to use an overlay suite.""" match (vendor, codename): case ("debian", "experimental"): pass case _: return ( None if extra_repositories is None else list(extra_repositories) ) if extra_repositories is None: extra_repositories = [] environment_artifact = workflow.work_request.lookup_environment( environment, architecture=architecture, backend=backend, default_category=CollectionCategory.ENVIRONMENTS, try_variant=try_variant, ).artifact.create_data() assert isinstance(environment_artifact, DebianSystemTarball) mirror = environment_artifact.mirror components = environment_artifact.components return list(extra_repositories) + [ ExtraExternalRepository( url=mirror, suite=codename, components=components, ) ]