# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Utility functions for workflows."""
import functools
from collections.abc import Collection as AbcCollection
from collections.abc import Sequence
from operator import attrgetter
from typing import Any, TYPE_CHECKING, cast, get_args
from django.db.models import QuerySet
from debusine.artifacts import SourcePackage
from debusine.artifacts.models import (
ArtifactCategory,
ArtifactDataWithArch,
BareDataCategory,
CollectionCategory,
DebianBinaryPackage,
DebianPackageBuildLog,
DebianSourcePackage,
DebianSystemTarball,
DebianUpload,
DebusinePromise,
get_architectures,
get_source_package_name,
)
from debusine.client.models import LookupChildType, LookupResultType
from debusine.db.models import Artifact, ArtifactRelation
from debusine.db.models.lookups import LookupResult
from debusine.server.collections.lookup import reconstruct_lookup
from debusine.tasks import TaskConfigError, ensure_artifact_categories
from debusine.tasks.models import (
BackendType,
ExtraExternalRepository,
ExtraRepository,
LookupMultiple,
LookupSingle,
)
if TYPE_CHECKING:
from debusine.server.workflows import Workflow
[docs]
@functools.lru_cache(maxsize=100)
def source_package(
workflow: "Workflow[Any, Any]",
*,
configuration_key: str = "source_artifact",
) -> Artifact:
"""
Retrieve the source package artifact from ``configuration_key``.
If the source artifact is a :artifact:`debian:upload`, returns its
:artifact:`debian:source-package`.
"""
artifact = workflow.work_request.lookup_single(
attrgetter(configuration_key)(workflow.data),
expect_type=LookupChildType.ARTIFACT,
).artifact
return locate_debian_source_package(configuration_key, artifact)
[docs]
@functools.lru_cache(maxsize=100)
def source_package_data(
workflow: "Workflow[Any, Any]",
*,
configuration_key: str = "source_artifact",
) -> DebianSourcePackage:
"""Return source package artifact data for the workflow."""
return SourcePackage.create_data(
source_package(workflow, configuration_key=configuration_key).data
)
[docs]
def lookup_result_artifact_category(result: LookupResult) -> str:
"""
Get artifact category from result of looking up an artifact.
The result may be either an artifact or a promise.
"""
if (
result.result_type == LookupResultType.ARTIFACT
and result.artifact is not None
):
return result.artifact.category
elif (
result.result_type == LookupResultType.BARE
and result.collection_item is not None
and result.collection_item.category == BareDataCategory.PROMISE
):
return DebusinePromise(**result.collection_item.data).promise_category
else:
raise ValueError(
f"Cannot determine artifact category for lookup result: {result}"
)
[docs]
def lookup_result_architectures(result: LookupResult) -> set[str]:
"""Get binary package architectures present within LookupResult."""
if result.artifact is not None:
artifact_data = result.artifact.create_data()
if not isinstance(artifact_data, get_args(ArtifactDataWithArch)):
raise ValueError(f"Unexpected type: {type(artifact_data).__name__}")
return get_architectures(artifact_data)
elif result.collection_item is not None:
architecture = result.collection_item.data.get("architecture")
if type(architecture) is not str:
raise ValueError(
f"Cannot determine architecture for lookup result: {result}"
)
return {architecture}
else:
raise ValueError(
"Unexpected result: must have collection_item or artifact"
)
[docs]
class ArtifactHasNoBinaryPackageName(Exception):
"""Raised if it's not possible to determine the artifact's binary name."""
[docs]
def lookup_result_binary_package_name(result: LookupResult) -> str:
"""Get binary package name from result of looking up an artifact."""
binary_package_name: str | None
if result.artifact is not None:
artifact_data = result.artifact.create_data()
match artifact_data:
case DebianBinaryPackage():
binary_package_name = artifact_data.deb_fields.get("Package")
case _:
raise ArtifactHasNoBinaryPackageName(f"{type(artifact_data)}")
elif result.collection_item is not None:
binary_package_name = result.collection_item.data.get(
"binary_package_name"
)
else:
raise ValueError(
"Unexpected result: must have collection_item or artifact"
)
if not isinstance(binary_package_name, str):
raise ValueError(
f"Cannot determine binary package name for lookup result: {result}"
)
return binary_package_name
[docs]
def filter_artifact_lookup_by_arch(
workflow: "Workflow[Any, Any]",
lookup: LookupMultiple,
architectures: set[str],
) -> LookupMultiple:
"""Filter an artifact lookup by architecture."""
results = workflow.work_request.lookup_multiple(
lookup, expect_type=LookupChildType.ARTIFACT_OR_PROMISE
)
relevant: list[str] = []
for result in results:
arch_in_lookup = lookup_result_architectures(result)
if arch_in_lookup.issubset(architectures):
relevant.append(
reconstruct_lookup(
result, workflow_root=workflow.work_request.workflow_root
)
)
elif arch_in_lookup.intersection(architectures):
# Uploads are the only artifacts with multiple architectures at
# this time.
assert result.artifact is not None
assert result.artifact.category == ArtifactCategory.UPLOAD
for binary_package in follow_artifact_relations(
result.artifact,
ArtifactRelation.Relations.EXTENDS,
ArtifactCategory.BINARY_PACKAGE,
):
pkg_data = cast(
DebianBinaryPackage, binary_package.create_data()
)
pkg_architectures = get_architectures(pkg_data)
if pkg_architectures.issubset(architectures):
relevant.append(f"{binary_package.id}@artifacts")
return LookupMultiple.model_validate(sorted(relevant))
[docs]
def get_lookup_architectures(
workflow: "Workflow[Any, Any]", lookup: LookupMultiple
) -> set[str]:
"""
Return set with all the architectures in the artifacts from the lookup.
The architectures are extracted from each lookup result using
:py:func:`lookup_result_architectures`.
"""
results = workflow.work_request.lookup_multiple(
lookup, expect_type=LookupChildType.ARTIFACT_OR_PROMISE
)
architectures: set[str] = set()
for result in results:
architectures |= lookup_result_architectures(result)
return architectures
[docs]
def follow_artifact_relations(
artifact: Artifact,
relation_type: ArtifactRelation.Relations,
category: ArtifactCategory,
) -> QuerySet[Artifact]:
"""Follow relations from artifact to find artifacts of category."""
return Artifact.objects.filter(
targeted_by__artifact=artifact,
targeted_by__type=relation_type,
category=category,
).order_by("id")
[docs]
def locate_debian_source_package(
configuration_key: str, artifact: Artifact
) -> Artifact:
"""
Find a :artifact:`debian:source-package` artifact for a workflow.
:param configuration_key: The key in the workflow's task data from which
the given artifact was looked up.
:param artifact: A :artifact:`debian:source-package` or
:artifact:`debian:upload` artifact.
:return: If ``artifact`` is a
:artifact:`debian:source-package`, return it; if it is a
:artifact:`debian:upload`, return the related
:artifact:`debian:source-package`.
"""
ensure_artifact_categories(
configuration_key=configuration_key,
category=artifact.category,
expected=[ArtifactCategory.SOURCE_PACKAGE, ArtifactCategory.UPLOAD],
)
match artifact.category:
case ArtifactCategory.SOURCE_PACKAGE:
return artifact
case ArtifactCategory.UPLOAD:
source_packages = follow_artifact_relations(
artifact,
ArtifactRelation.Relations.EXTENDS,
ArtifactCategory.SOURCE_PACKAGE,
)
try:
return source_packages.get()
except Artifact.DoesNotExist:
raise TaskConfigError(
f"Unable to find an artifact of category "
f"{ArtifactCategory.SOURCE_PACKAGE} with a relationship "
f"of type {ArtifactRelation.Relations.EXTENDS} from "
f'"{artifact}"'
)
except Artifact.MultipleObjectsReturned:
raise TaskConfigError(
f"Multiple artifacts of category "
f"{ArtifactCategory.SOURCE_PACKAGE} with a relationship "
f"of type {ArtifactRelation.Relations.EXTENDS} from "
f'"{artifact}" found'
)
case _ as unreachable: # pragma: no cover
raise AssertionError(f"Unexpected artifact category: {unreachable}")
[docs]
def locate_debian_source_package_lookup(
workflow: "Workflow[Any, Any]", configuration_key: str, lookup: LookupSingle
) -> LookupSingle:
"""
Return a lookup to a :artifact:`debian:source-package`.
If the specified lookup returns a :artifact:`debian:source-package`,
return it. If it returns a :artifact:`debian:upload`, find the related
:artifact:`debian:source-package` and return a lookup to it.
"""
artifact = workflow.work_request.lookup_single(
lookup, expect_type=LookupChildType.ARTIFACT
).artifact
if artifact.category == ArtifactCategory.UPLOAD:
source_package = locate_debian_source_package(
configuration_key, artifact
)
return f"{source_package.id}@artifacts"
ensure_artifact_categories(
configuration_key=configuration_key,
category=artifact.category,
expected=[ArtifactCategory.SOURCE_PACKAGE],
)
return lookup
[docs]
def locate_debian_binary_packages(
configuration_key: str, artifacts: Sequence[Artifact]
) -> list[Artifact]:
"""
Find :artifact:`debian:binary-package` artifacts for a workflow.
:param configuration_key: The key in the workflow's task data from which
the given artifacts were looked up.
:param artifacts: A sequence of :artifact:`debian:binary-package` or
:artifact:`debian:upload` artifacts.
:return: A list of artifacts: for each element of ``artifacts``, if it
is a :artifact:`debian:binary-package`, return it; if it is a
:artifact:`debian:upload`, return all the related
:artifact:`debian:binary-package` artifacts.
"""
binary_packages: list[Artifact] = []
for i, artifact in enumerate(artifacts):
ensure_artifact_categories(
configuration_key=f"{configuration_key}[{i}]",
category=artifact.category,
expected=[ArtifactCategory.BINARY_PACKAGE, ArtifactCategory.UPLOAD],
)
match artifact.category:
case ArtifactCategory.BINARY_PACKAGE:
binary_packages.append(artifact)
case ArtifactCategory.UPLOAD:
binary_packages += follow_artifact_relations(
artifact,
ArtifactRelation.Relations.EXTENDS,
ArtifactCategory.BINARY_PACKAGE,
)
case _ as unreachable: # pragma: no cover
raise AssertionError(
f"Unexpected artifact category: {unreachable}"
)
return binary_packages
[docs]
def get_source_package_names(
results: Sequence[LookupResult],
*,
configuration_key: str,
artifact_expected_categories: AbcCollection[ArtifactCategory],
) -> list[str]:
"""
Return a sorted list of source package names from results.
It ensures that:
- The :py:class:`LookupResult` objects contain either an artifact or
promise.
- Artifacts belong to the artifact_expected_categories.
- If :py:class:`LookupResult` is a promise: extracts the name from the
promise data ``source_package_name``.
:param results: A sequence of :py:class:`LookupResult` objects
representing artifacts to be processed. Each entry is expected to be
either an artifact or a promise.
:param configuration_key: A string used by
:py:meth:`ensure_artifact_categories` for the exception
message.
:param artifact_expected_categories: valid :py:class:`ArtifactCategory`
that artifacts must belong to.
:return: A sorted list of source package names.
"""
source_package_names = set()
for result in results:
# lookup_multiple expect_type: only artifacts or promises
match result.result_type:
case LookupResultType.ARTIFACT:
assert result.artifact is not None
category = result.artifact.category
ensure_artifact_categories(
configuration_key=configuration_key,
category=category,
expected=artifact_expected_categories,
)
artifact_data = result.artifact.create_data()
assert isinstance(
artifact_data,
(
DebianSourcePackage,
DebianUpload,
DebianBinaryPackage,
DebianPackageBuildLog,
),
)
source_package_names.add(get_source_package_name(artifact_data))
case _:
# Makes coverage happy
# It's a promise.
assert result.result_type == LookupResultType.BARE
assert result.collection_item is not None
ensure_artifact_categories(
configuration_key=configuration_key,
category=result.collection_item.data["promise_category"],
expected=artifact_expected_categories,
)
if (
package_name := result.collection_item.data.get(
"source_package_name"
)
) is not None:
source_package_names.add(package_name)
return sorted(source_package_names)
[docs]
def get_available_architectures(
workflow: "Workflow[Any, Any]", *, vendor: str, codename: str
) -> set[str]:
"""Get architectures available for use with this vendor/codename."""
architectures = set()
for result in workflow.work_request.lookup_multiple(
LookupMultiple.model_validate(
{"collection": vendor, "data__codename": codename}
),
default_category=CollectionCategory.ENVIRONMENTS,
expect_type=LookupChildType.ARTIFACT,
):
architectures.add(result.artifact.data.get("architecture"))
if not architectures:
raise TaskConfigError(
f"Unable to find any environments for {vendor}:{codename}"
)
architectures.add("all")
return architectures