Source code for debusine.tasks.models

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Models used by debusine tasks."""

import datetime as dt
import re
from abc import ABC
from collections.abc import Collection, Iterator, Sequence
from enum import StrEnum
from functools import partial
from itertools import chain, groupby
from pathlib import Path
from typing import (
    Annotated,
    Any,
    ClassVar,
    Literal,
    Self,
    TypeAlias,
    Union,
    override,
)

import debian.deb822 as deb822
import pydantic

from debusine.artifacts.models import (
    ARTIFACT_DATA_MODELS_BY_CATEGORY,
    ArtifactCategory,
    ArtifactData,
    BareDataCategory,
    CollectionCategory,
    DebianPackageBuildLog,
    RuntimeStatistics,
    WorkRequestResults,
)
from debusine.client.models import LookupChildType
from debusine.utils import DjangoChoicesEnum


[docs] class WorkerType(DjangoChoicesEnum): """The type of a Worker.""" SCHEDULER = "scheduler" EXTERNAL = "external" CELERY = "celery" SIGNING = "signing"
[docs] class BackendType(StrEnum): """Possible values for backend.""" AUTO = "auto" UNSHARE = "unshare" INCUS_LXC = "incus-lxc" INCUS_VM = "incus-vm" QEMU = "qemu"
[docs] class AutopkgtestNeedsInternet(StrEnum): """Possible values for needs_internet.""" RUN = "run" TRY = "try" SKIP = "skip"
[docs] class LintianFailOnSeverity(StrEnum): """Possible values for fail_on_severity.""" ERROR = "error" WARNING = "warning" INFO = "info" PEDANTIC = "pedantic" EXPERIMENTAL = "experimental" OVERRIDDEN = "overridden" NONE = "none"
[docs] class BaseTaskDataModel(pydantic.BaseModel): """Stricter pydantic defaults for task data models.""" model_config = pydantic.ConfigDict(validate_assignment=True, extra="forbid") # Replace with serialize_by_alias=True in model_config once we can # assume pydantic >= 2.11.
[docs] def model_dump(self, **kwargs: Any) -> dict[str, Any]: """Use aliases by default when serializing.""" kwargs.setdefault("by_alias", True) return super().model_dump(**kwargs)
# https://github.com/pydantic/pydantic-core/issues/296
[docs] def require_unique(v: list[Any]) -> list[Any]: """Field validator to require that list items are unique.""" if sorted(v) != sorted(set(v)): raise ValueError(f"{v} contains duplicate items") return v
# Lookups resulting in exactly one collection item. LookupSingle: TypeAlias = int | str def _build_key_value_string(data: dict[str, Any]) -> str: # TODO: Implement URL-quoting to support special characters return ":".join(f"{key}={value}" for key, value in data.items())
[docs] def build_lookup_string( *segments: str, options: dict[str, Any] | None = None ) -> str: """Build a string lookup from segments and options.""" options_prefix = "" if options: inside = _build_key_value_string(options) options_prefix = f"//{inside}/" return f"{options_prefix}{"/".join(segments)}"
def _parse_key_value_string(input_string: str) -> dict[str, str]: data = {} for item in input_string.split(":"): # TODO: Implement URL-unquoting to support special characters name, value = item.split("=", 1) data[name] = value return data
[docs] def parse_lookup_string(lookup: str) -> tuple[list[str], dict[str, str] | None]: """Parse a string lookup into options and segments.""" options = None # Extract and parse options if m := re.match(r"//([^/]*)/", lookup): lookup_options = m.group(1) lookup = lookup[len(lookup_options) + 3 :] options = _parse_key_value_string(lookup_options) return (lookup.split("/"), options)
[docs] def build_key_value_lookup_segment( lookup_type: str, filters: dict[str, str] ) -> str: """Build a lookup segment consisting of a series of `key=value` filters.""" filters_string = _build_key_value_string(filters) return f"{lookup_type}:{filters_string}"
[docs] def parse_key_value_lookup_segment(segment: str) -> tuple[str, dict[str, str]]: """Parse a lookup segment consisting of a series of `key=value` filters.""" lookup_type, remaining = segment.split(":", 1) return lookup_type, _parse_key_value_string(remaining)
[docs] class CollectionItemMatcherKind(StrEnum): """Possible values for CollectionItemMatcher.kind.""" CONTAINS = "contains" ENDSWITH = "endswith" EXACT = "exact" STARTSWITH = "startswith"
[docs] class CollectionItemMatcher(BaseTaskDataModel): """A matcher for collection item name or per-item data fields.""" model_config = BaseTaskDataModel.model_config | pydantic.ConfigDict( frozen=True ) kind: CollectionItemMatcherKind value: Any
[docs] class BaseExtraRepository(BaseTaskDataModel, ABC): """Generic extra repositories for Debian tasks.""" components: ( list[Annotated[str, pydantic.Field(pattern=r"^(\w|-)+$")]] | None ) = None architectures: ( list[Annotated[str, pydantic.Field(pattern=r"^(\w|-)+$")]] | None ) = None
[docs] @pydantic.field_validator("components", mode="after") @classmethod def convert_empty_components_to_none( cls, value: list[str] | None ) -> list[str] | None: """Rewrite an empty components list to None.""" return value or None
[docs] @pydantic.field_validator("architectures", mode="after") @classmethod def convert_empty_architectures_to_none( cls, value: list[str] | None ) -> list[str] | None: """Rewrite an empty architectures list to None.""" return value or None
[docs] class ExtraDebusineRepository(BaseExtraRepository): """An extra Debusine-hosted repository for Debian tasks.""" suite: LookupSingle
[docs] class ExtraExternalRepository(BaseExtraRepository): """An extra external repository for Debian tasks.""" url: Annotated[pydantic.AnyUrl, pydantic.UrlConstraints(host_required=True)] suite: Annotated[str, pydantic.Field(pattern=r"^(\w|[./ -])+$")] signing_key: str | None = None
[docs] @pydantic.model_validator(mode="after") def flat_repository(self) -> Self: """Check if suite is a flat repository.""" if self.suite.endswith("/"): if self.components is not None: raise ValueError( "Components cannot be specified for a flat " "repository (where the suite ends with /)" ) elif not self.components: raise ValueError("Components must be specified") return self
[docs] def as_deb822_source(self, signed_by_filename: str | None = None) -> str: """Render a Deb822 sources.list.d entry.""" source = deb822.Deb822() source["Types"] = "deb" source["URIs"] = str(self.url) source["Suites"] = self.suite if self.components: source["Components"] = " ".join(self.components) if self.signing_key: if signed_by_filename: source["Signed-By"] = signed_by_filename else: source["Signed-By"] = "\n" + "\n".join( " " + (line or ".") for line in self.signing_key.splitlines() ) return source.dump()
ExtraRepository: TypeAlias = ExtraDebusineRepository | ExtraExternalRepository
[docs] class LookupDict(BaseTaskDataModel): """Dictionary lookups for collection items.""" model_config = BaseTaskDataModel.model_config | pydantic.ConfigDict( frozen=True ) collection: LookupSingle child_type: LookupChildType = LookupChildType.ARTIFACT category: str | None = None name_matcher: CollectionItemMatcher | None = None # Logically a mapping, but we turn it into a tuple so that lookups are # hashable. data_matchers: tuple[tuple[str, CollectionItemMatcher], ...] = () # Logically a mapping, but we turn it into a tuple so that lookups are # hashable. In future some filters may want other value types, but they # need to be hashable and our current use cases involve subordinate # lookups, so just declare them to be lookups for now. lookup_filters: tuple[ tuple[str, Union[LookupSingle, "LookupMultiple"]], ... ] = ()
[docs] @pydantic.model_validator(mode="before") @classmethod def normalize_matchers(cls, data: Any) -> dict[str, Any]: # noqa: C901 """ Transform the lookup syntax into a form more convenient for pydantic. `name`, `name__*`, `data__KEY`, and `data__KEY__*` keys in `values` are transformed into :py:class:`CollectionItemMatcher` instances stored in `name_matcher` and `data_matchers`. `lookup__KEY` keys are transformed into entries in `lookup_filters`. Conflicting lookup suffixes are rejected. """ def split_words(s: str) -> list[str]: return s.split("__") def matcher_prefix(s: str) -> tuple[str, ...]: match split_words(s): case ["name", *_]: return ("name",) case ["data", data_key, *_]: return ("data", data_key) case ["lookup", lookup_key]: return ("lookup", lookup_key) case _: return () if not isinstance(data, dict): raise ValueError(f"Expected dict, got {type(data)!r}") data = data.copy() assert isinstance(data, dict) data_matchers: dict[str, CollectionItemMatcher] = dict( data.get("data_matchers", ()) ) lookup_filters: dict[str, LookupSingle | LookupMultiple] = dict( data.get("lookup_filters", ()) ) for key, group in groupby( sorted(data, key=matcher_prefix), matcher_prefix ): if not key: # Only keys with `name`, `data`, or `lookup` as their first # segment need transformation. continue matcher_names = list(group) if len(matcher_names) > 1: raise ValueError( f"Conflicting matchers: {sorted(matcher_names)}" ) assert len(matcher_names) == 1 matcher_name = matcher_names[0] data_key: str | None = None match words := split_words(matcher_name): case ["name"]: kind = "exact" case ["name", kind] if kind != "exact": pass case ["data", data_key]: kind = "exact" case ["data", data_key, kind] if kind != "exact": pass case ["lookup", lookup_key]: match data[matcher_name]: case int() | str(): lookup_filters[lookup_key] = data[matcher_name] case _: lookup_filters[lookup_key] = ( LookupMultiple.model_validate( data[matcher_name] ) ) for k in matcher_names: del data[k] continue case _: # We filter out keys with first segments other than # `name`, `data`, or `lookup` earlier, so anything that # reaches here must be a malformed lookup. raise ValueError(f"Unrecognized matcher: {matcher_name}") matcher = CollectionItemMatcher( kind=CollectionItemMatcherKind(kind), value=data[matcher_name] ) if words[0] == "name": data["name_matcher"] = matcher else: assert words[0] == "data" assert data_key is not None data_matchers[data_key] = matcher for k in matcher_names: del data[k] if data_matchers: data["data_matchers"] = tuple(sorted(data_matchers.items())) if lookup_filters: data["lookup_filters"] = tuple(sorted(lookup_filters.items())) return data
[docs] def export(self) -> dict[str, Any]: """ Export the usual input representation of this lookup. This reverses the transformations applied by :py:meth:`normalize_matchers`. """ value: dict[str, Any] = {"collection": self.collection} if self.child_type != LookupChildType.ARTIFACT: value["child_type"] = str(self.child_type) if self.category is not None: value["category"] = self.category if self.name_matcher is not None: if self.name_matcher.kind == CollectionItemMatcherKind.EXACT: value["name"] = self.name_matcher.value else: value[f"name__{self.name_matcher.kind}"] = ( self.name_matcher.value ) for key, matcher in self.data_matchers: if matcher.kind == CollectionItemMatcherKind.EXACT: value[f"data__{key}"] = matcher.value else: value[f"data__{key}__{matcher.kind}"] = matcher.value for key, filter_value in self.lookup_filters: if isinstance(filter_value, LookupMultiple): value[f"lookup__{key}"] = filter_value.export() else: value[f"lookup__{key}"] = filter_value return value
[docs] class LookupMultiple(pydantic.RootModel[tuple[LookupSingle | LookupDict, ...]]): """Lookups resulting in multiple collection items.""" model_config = pydantic.ConfigDict(validate_assignment=True, frozen=True)
[docs] @pydantic.model_validator(mode="before") @classmethod def normalize(cls, data: Any) -> tuple[Any, ...]: """Normalize into a tuple of multiple matchers.""" if isinstance(data, dict): return (data,) elif isinstance(data, (tuple, list)): return tuple(data) else: raise ValueError( "Lookup of multiple collection items must be a dictionary or " "a list" )
def __iter__( # type: ignore[override] self, ) -> Iterator[LookupSingle | LookupDict]: """Iterate over individual lookups.""" return iter(self.root) def __bool__(self) -> bool: """Return True if and only if this lookup is non-empty.""" return bool(self.root)
[docs] def export(self) -> dict[str, Any] | list[int | str | dict[str, Any]]: """ Export the usual input representation of this lookup. This reverses the transformations applied by :py:meth:`normalize`. """ if len(self.root) == 1 and isinstance(self.root[0], LookupDict): return self.root[0].export() else: return [ lookup.export() if isinstance(lookup, LookupDict) else lookup for lookup in self ]
# Resolve circular reference to LookupMultiple. LookupDict.model_rebuild()
[docs] class NotificationDataEmail(BaseTaskDataModel): """Channel data for email notifications.""" from_: pydantic.EmailStr | None = pydantic.Field(default=None, alias="from") to: list[pydantic.EmailStr] | None = None cc: list[pydantic.EmailStr] = pydantic.Field(default_factory=list) subject: str | None = None
[docs] class ActionTypes(StrEnum): """Possible values for EventReaction actions.""" SKIP_IF_LOOKUP_RESULT_CHANGED = "skip-if-lookup-result-changed" SEND_NOTIFICATION = "send-notification" UPDATE_COLLECTION_WITH_ARTIFACTS = "update-collection-with-artifacts" UPDATE_COLLECTION_WITH_DATA = "update-collection-with-data" RETRY_WITH_DELAYS = "retry-with-delays" RECORD_IN_TASK_HISTORY = "record-in-task-history"
[docs] class ActionSkipIfLookupResultChanged(BaseTaskDataModel): """Action for skipping a work request if a lookup result changed.""" action: Literal[ActionTypes.SKIP_IF_LOOKUP_RESULT_CHANGED] = ( ActionTypes.SKIP_IF_LOOKUP_RESULT_CHANGED ) # Deliberately not LookupSingle; this must be able to return a # collection item, so integer lookups are never valid. lookup: str collection_item_id: int | None promise_name: str | None = None
[docs] class ActionSendNotification(BaseTaskDataModel): """Action for sending a notification.""" action: Literal[ActionTypes.SEND_NOTIFICATION] = ( ActionTypes.SEND_NOTIFICATION ) channel: str data: NotificationDataEmail | None = None
[docs] class ActionUpdateCollectionWithArtifacts(BaseTaskDataModel): """Action for updating a collection with artifacts.""" action: Literal[ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS] = ( ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS ) collection: LookupSingle name_template: str | None = None variables: dict[str, Any] | None = None artifact_filters: dict[str, Any] created_at: dt.datetime | None = None
[docs] class ActionUpdateCollectionWithData(BaseTaskDataModel): """Action for updating a collection with bare data.""" action: Literal[ActionTypes.UPDATE_COLLECTION_WITH_DATA] = ( ActionTypes.UPDATE_COLLECTION_WITH_DATA ) collection: LookupSingle category: BareDataCategory name_template: str | None = None data: dict[str, Any] | None = None created_at: dt.datetime | None = None
[docs] class ActionRetryWithDelays(BaseTaskDataModel): """Action for retrying a work request with delays.""" _delay_re: ClassVar[re.Pattern[str]] = re.compile(r"^([0-9]+)([mhdw])$") action: Literal[ActionTypes.RETRY_WITH_DELAYS] = ( ActionTypes.RETRY_WITH_DELAYS ) delays: Annotated[ list[Annotated[str, pydantic.Field(pattern=_delay_re)]], (pydantic.Field(min_length=1)), ]
[docs] class ActionRecordInTaskHistory(BaseTaskDataModel): """Action for recording the task run in a task-history collection.""" action: Literal[ActionTypes.RECORD_IN_TASK_HISTORY] = ( ActionTypes.RECORD_IN_TASK_HISTORY ) subject: str | None = None context: str | None = None
EventReaction = Annotated[ Union[ ActionSkipIfLookupResultChanged, ActionSendNotification, ActionUpdateCollectionWithArtifacts, ActionUpdateCollectionWithData, ActionRetryWithDelays, ActionRecordInTaskHistory, ], pydantic.Field(discriminator="action"), ]
[docs] class EventReactions(BaseTaskDataModel): """Structure for event reactions.""" on_creation: list[EventReaction] = [] on_unblock: list[EventReaction] = [] on_assignment: list[EventReaction] = [] on_success: list[EventReaction] = [] on_failure: list[EventReaction] = []
[docs] def model_dump(self, **kwargs: Any) -> dict[str, Any]: """Wrap pydantic to ensure discriminators are serializers.""" # TODO: from python 3.5, docstrings are inherited from the parent # class, which would be the right thing here. However, flake8 doesn't # like it. # Mark discriminators as always set. # See: https://github.com/pydantic/pydantic/issues/6465 for name in ( "on_creation", "on_unblock", "on_assignment", "on_success", "on_failure", ): for item in getattr(self, name): item.model_fields_set.add("action") return super().model_dump(**kwargs)
[docs] def add_reaction( self, name: Literal[ "on_creation", "on_unblock", "on_assignment", "on_success", "on_failure", ], action: EventReaction, ) -> None: """ Add an EventReaction to the named event. This is a noop if the named event already contains this EventReaction. """ current = getattr(self, name) if action not in current: current.append(action) # Let pydantic know we modified the field. # See: https://github.com/pydantic/pydantic/issues/9866 self.model_fields_set.add(name)
[docs] class BaseTaskData(BaseTaskDataModel): """ Base class for task data. Task data is encoded as JSON in the database and in the API, and it is modeled as a pydantic data structure in memory for both ease of access and validation. """ DEFAULT_TASK_CONFIGURATION_LOOKUP: ClassVar[str] = ( "default@debusine:task-configuration" ) #: debusine:task-configuration collection to use to configure the task. #: This is resolved to an ID when configuration is applied task_configuration: LookupSingle | None = DEFAULT_TASK_CONFIGURATION_LOOKUP
[docs] class BaseTaskDataWithExecutor(BaseTaskData): """Base task data with fields used to configure executors.""" backend: BackendType = BackendType.AUTO environment: LookupSingle | None = None # Not all executor-based tasks strictly require this, but it can # nevertheless be useful to constrain a task to run on an architecture # for which we know that a suitable environment exists. build_architecture: str | None = None
[docs] class BaseTaskDataWithExtraRepositories(BaseTaskData): """Base task data with fields used to configure extra_repositories.""" extra_repositories: list[ExtraRepository] | None = None # BaseTaskDataWithExecutor declares this as optional, but it's required # here. build_architecture: str
[docs] class BaseDynamicTaskData(BaseTaskDataModel): """ Base class for dynamic task data. This is computed by the scheduler when dispatching a task to a worker. It may involve resolving artifact lookups. """ #: Brief human-readable summary of the most important parameters to this #: work request. parameter_summary: str | None = None #: An abstract string value representing the subject of the task, for #: the purpose of recording statistics. It is meant to group possible #: inputs into groups that we expect to behave similarly. subject: str | None = None #: An abstract string value representing the runtime context in which #: the task is executed, for the purpose of recording statistics. It is #: meant to represent some of the task parameters that can significantly #: alter the runtime behaviour of the task. runtime_context: str | None = None #: Name of the configuration context # This is a stable string representation of the main input artifacts and # parameters, that does not change across similar future invocations of the # same task, and is used to look up configuration parameters. configuration_context: str | None = None #: Storage for resolved task input fields task_input_fields: dict[str, Any] = pydantic.Field(default_factory=dict)
[docs] def get_source_package_name(self) -> str | None: """ Extract the source package name. :returns: the source package name, if applicable, and None otherwise. """ return None
[docs] def get_input_artifacts_ids(self) -> list[int]: """ Return the list of input artifact IDs used by this task. This is used by views to show what artifacts were used by a task. """ # BaseExternalTask.get_input_artifacts_ids() augments this with # artifact IDs extracted from resolved task input fields, which are # not serialized in old dynamic_data records. return []
[docs] class BaseDynamicTaskDataWithExecutor(BaseDynamicTaskData): """Dynamic task data for executors.""" environment_id: int | None = None
[docs] class BaseDynamicTaskDataWithExtraRepositories(BaseDynamicTaskData): """Dynamic task data for extra repositories.""" extra_repositories: list[ExtraExternalRepository] | None = None
[docs] class OutputDataError(BaseTaskDataModel): """An error encountered when running a task.""" message: str code: str
[docs] class RegressionAnalysisStatus(StrEnum): """Possible values for ``RegressionAnalysis.status``.""" NO_RESULT = "no-result" ERROR = "error" IMPROVEMENT = "improvement" STABLE = "stable" REGRESSION = "regression"
[docs] class RegressionAnalysis(BaseTaskDataModel): """The result of a regression analysis.""" original_source_version: str | None = None original_artifact_id: int | None = None new_source_version: str | None = None new_artifact_id: int | None = None status: RegressionAnalysisStatus details: dict[str, Any] | list[Any] | None = None # Obsolete, but may be present in old rows. original_url: str | None = None new_url: str | None = None
[docs] class Confirmation(BaseTaskDataModel): """A record that a work request was confirmed or denied by a user.""" confirmed: bool comment: str | None = None
[docs] class OutputData(BaseTaskDataModel): """Data produced when a task is completed.""" runtime_statistics: RuntimeStatistics | None = None errors: list[OutputDataError] | None = None skip_reason: str | None = None regression_analysis: dict[str, RegressionAnalysis] | None = None confirmation: Confirmation | None = None
[docs] def merge(self, other: "OutputData") -> "OutputData": """ Return the combination of two ``OutputData`` objects. Merge all the fields that were explicitly set on each model. If a field is set on both models, the values in ``other`` take precedence. """ merged = self.model_copy() for name in other.model_fields_set: setattr(merged, name, getattr(other, name)) return merged
[docs] class NoopData(BaseTaskDataWithExecutor): """In memory task data for the Noop task.""" result: WorkRequestResults = WorkRequestResults.SUCCESS
[docs] class AutopkgtestInput(BaseTaskDataModel): """Input for an autopkgtest task.""" source_artifact: LookupSingle binary_artifacts: LookupMultiple context_artifacts: LookupMultiple = pydantic.Field( default_factory=partial(LookupMultiple, ()) )
[docs] class AutopkgtestFailOn(BaseTaskDataModel): """Possible values for fail_on.""" failed_test: bool = True flaky_test: bool = False skipped_test: bool = False
[docs] class AutopkgtestTimeout(BaseTaskDataModel): """Timeout specifications for an autopkgtest task.""" global_: Annotated[int | None, pydantic.Field(ge=0)] = pydantic.Field( default=None, alias="global" ) factor: Annotated[int | None, pydantic.Field(ge=0)] = None short: Annotated[int | None, pydantic.Field(ge=0)] = None install: Annotated[int | None, pydantic.Field(ge=0)] = None test: Annotated[int | None, pydantic.Field(ge=0)] = None copy_: Annotated[int | None, pydantic.Field(ge=0)] = pydantic.Field( default=None, alias="copy" )
[docs] class AutopkgtestData( BaseTaskDataWithExecutor, BaseTaskDataWithExtraRepositories ): """In memory task data for the Autopkgtest task.""" input: AutopkgtestInput include_tests: list[str] = pydantic.Field(default_factory=list) exclude_tests: list[str] = pydantic.Field(default_factory=list) debug_level: Annotated[int, pydantic.Field(ge=0, le=3)] = 0 use_packages_from_base_repository: bool = False extra_environment: dict[str, str] = pydantic.Field(default_factory=dict) needs_internet: AutopkgtestNeedsInternet = AutopkgtestNeedsInternet.RUN fail_on: AutopkgtestFailOn = pydantic.Field( default_factory=AutopkgtestFailOn ) timeout: AutopkgtestTimeout | None = None # BaseTaskDataWithExecutor declares this as optional, but it's required # here. environment: LookupSingle
[docs] class AutopkgtestDynamicData( BaseDynamicTaskDataWithExecutor, BaseDynamicTaskDataWithExtraRepositories ): """Dynamic data for the Autopkgtest task.""" # BaseDynamicTaskDataWithExecutor declares this as optional, but it's # required here. environment_id: int input_source_artifact_id: int input_binary_artifacts_ids: list[int] input_context_artifacts_ids: list[int] = pydantic.Field( default_factory=list )
[docs] @override def get_source_package_name(self) -> str | None: return self.subject
[docs] @override def get_input_artifacts_ids(self) -> list[int]: return [ self.environment_id, self.input_source_artifact_id, *self.input_binary_artifacts_ids, *self.input_context_artifacts_ids, ]
[docs] class LintianInput(BaseTaskDataModel): """Input for a lintian task.""" source_artifact: LookupSingle | None = None binary_artifacts: LookupMultiple = pydantic.Field( default_factory=partial(LookupMultiple, ()) )
[docs] @pydantic.model_validator(mode="after") def check_one_of_source_or_binary(self) -> Self: """Ensure a source or binary artifact is present.""" if self.source_artifact is None and not self.binary_artifacts: raise ValueError( 'One of source_artifact or binary_artifacts must be set' ) return self
[docs] class LintianOutput(BaseTaskDataModel): """Output configuration for a Lintian task.""" source_analysis: bool = True binary_all_analysis: bool = True binary_any_analysis: bool = True
[docs] class LintianData(BaseTaskDataWithExecutor): """In memory task data for the Lintian task.""" input: LintianInput output: LintianOutput = pydantic.Field(default_factory=LintianOutput) target_distribution: str = "debian:unstable" # Passed to --tags to lintian include_tags: list[str] = pydantic.Field(default_factory=list) # Passed to --suppress-tags to lintian exclude_tags: list[str] = pydantic.Field(default_factory=list) # If the analysis emits tags of this severity or higher, the task will # return 'failure' instead of 'success' fail_on_severity: LintianFailOnSeverity = LintianFailOnSeverity.ERROR
[docs] class LintianDynamicData(BaseDynamicTaskDataWithExecutor): """Dynamic data for the Lintian task.""" input_source_artifact_id: int | None = None input_binary_artifacts_ids: list[int] = pydantic.Field(default_factory=list)
[docs] @override def get_source_package_name(self) -> str | None: return self.subject
[docs] @override def get_input_artifacts_ids(self) -> list[int]: result = [] if val := self.input_source_artifact_id: result.append(val) if val := self.environment_id: result.append(val) result += self.input_binary_artifacts_ids return result
[docs] class BlhcInput(BaseTaskDataModel): """Input for a blhc task.""" artifact: LookupSingle
[docs] class BlhcFlags(StrEnum): """Possible values for extra_flags.""" ALL = "--all" BINDNOW = "--bindnow" BUILDD = "--buildd" COLOR = "--color" DEBIAN = "--debian" LINE_NUMBERS = "--line-numbers" PIE = "--pie"
[docs] class BlhcData(BaseTaskDataWithExecutor): """In memory task data for the Blhc task.""" input: BlhcInput # Passed to blhc extra_flags: list[BlhcFlags] = pydantic.Field(default_factory=list) # BaseTaskDataWithExecutor declares this as optional, but it's required # here. build_architecture: str # BaseTaskDataWithExecutor declares this as optional, but it's required # here. environment: LookupSingle
[docs] class BlhcDynamicData(BaseDynamicTaskDataWithExecutor): """Dynamic data for the Blhc task.""" input_artifact_id: int # TODO: Once old tasks created without it have expired, this should # become required. input_artifact_data: DebianPackageBuildLog | None = None # BaseTaskDynamicDataWithExecutor declares this as optional, but it's # required here. environment_id: int
[docs] @override def get_source_package_name(self) -> str | None: return self.subject
[docs] @override def get_input_artifacts_ids(self) -> list[int]: return [self.input_artifact_id, self.environment_id]
[docs] class DebDiffInput(BaseTaskDataModel): """Input for a debdiff task.""" source_artifacts: Annotated[ list[LookupSingle] | None, pydantic.Field(min_length=2, max_length=2) ] = None binary_artifacts: Annotated[ list[LookupMultiple] | None, pydantic.Field(min_length=2, max_length=2) ] = None
[docs] @pydantic.model_validator(mode="after") def check_one_of_source_or_binary(self) -> Self: """Ensure only source or binary artifacts are present.""" if (self.source_artifacts is None) == (self.binary_artifacts is None): raise ValueError( "Exactly one of source_artifacts" " or binary_artifacts must be set" ) return self
[docs] class DebDiffFlags(StrEnum): """Possible values for extra_flags.""" DIRS = "--dirs" NOCONTROL = "--nocontrol" WDIFF = "--wdiff-source-control" SHOW_MOVED = "--show-moved" DIFFSTAT = "--diffstat" PATCHES = "--apply-patches" IGNORE_SPACE = "--ignore-space"
[docs] class DebDiffData(BaseTaskDataWithExecutor): """In memory task data for the DebDiff task.""" input: DebDiffInput # Passed to debdiff extra_flags: list[DebDiffFlags] = pydantic.Field(default_factory=list) # BaseTaskDataWithExecutor declares this as optional, but it's required # here. build_architecture: str # BaseTaskDataWithExecutor declares this as optional, but it's required # here. environment: LookupSingle
[docs] class DebDiffDynamicData(BaseDynamicTaskDataWithExecutor): """Dynamic data for the DebDiff task.""" # BaseTaskDynamicDataWithExecutor declares this as optional, but it's # required here. environment_id: int input_source_artifacts_ids: Annotated[ list[int] | None, pydantic.Field(min_length=2, max_length=2) ] = None input_binary_artifacts_ids: Annotated[ list[list[int]] | None, pydantic.Field(min_length=2, max_length=2) ] = None
[docs] @pydantic.model_validator(mode="after") def validate_source_or_binary_exclusive(self) -> Self: """ Validate mutually exclusive input artifacts. Ensure input_source_artifacts_ids and input_binary_artifacts_ids are not both set. """ # It currently allows to have "input_source_artifacts_ids" == None and # "input_binary_artifacts_ids" == None to allow backwards compatibility # with older data if (self.input_source_artifacts_ids is not None) and ( self.input_binary_artifacts_ids is not None ): raise ValueError( "Only one of 'input_source_artifacts_ids' or " "'input_binary_artifacts_ids' may be set (not both)." ) return self
[docs] @override def get_source_package_name(self) -> str | None: if self.subject and self.subject.startswith("source:"): return self.subject.removeprefix("source:") return None
[docs] @override def get_input_artifacts_ids(self) -> list[int]: input_ids = self.input_source_artifacts_ids or [] binary_ids = self.input_binary_artifacts_ids or [] return list(chain(input_ids, *binary_ids))
[docs] class MmDebstrapVariant(StrEnum): """Variants supported by `mmdebstrap`.""" BUILDD = "buildd" MINBASE = "minbase" DASH = "-" APT = "apt" CUSTOM = "custom" DEBOOTSTRAP = "debootstrap" ESSENTIAL = "essential" EXTRACT = "extract" IMPORTANT = "important" REQUIRED = "required" STANDARD = "standard"
[docs] class DebootstrapVariant(StrEnum): """Variants supported by `debootstrap`.""" BUILDD = "buildd" MINBASE = "minbase"
[docs] class SystemBootstrapOptions(BaseTaskDataModel): """Structure of SystemBootstrap options.""" architecture: str # Specializations of this model for individual tasks should restrict # this to particular values. variant: str | None = None extra_packages: list[str] = pydantic.Field(default_factory=list) use_signed_by: bool = True
[docs] class SystemBootstrapRepositoryType(StrEnum): """Possible values for repository types.""" DEB = "deb" DEB_SRC = "deb-src"
[docs] class SystemBootstrapRepositoryCheckSignatureWith(StrEnum): """Possible values for check_signature_with.""" SYSTEM = "system" EXTERNAL = "external" NO_CHECK = "no-check"
[docs] class SystemBootstrapRepositoryKeyring(BaseTaskDataModel): """Description of a repository keyring.""" url: pydantic.AnyUrl sha256sum: str = "" install: bool = False
[docs] @pydantic.field_validator("url", mode="after") @classmethod def validate_url(cls, url: pydantic.AnyUrl) -> pydantic.AnyUrl: """ Reject file:// URLs outside /usr/(local/)share/keyrings/. We don't want to allow reading arbitrary paths. """ if ( url.scheme == "file" and url.path is not None and not ( Path(url.path).resolve().is_relative_to("/usr/share/keyrings") ) and not ( Path(url.path) .resolve() .is_relative_to("/usr/local/share/keyrings") ) ): raise ValueError( "file:// URLs for keyrings must be under /usr/share/keyrings/ " "or /usr/local/share/keyrings/" ) return url
[docs] class SystemBootstrapRepository(BaseTaskDataModel): """Description of one repository in SystemBootstrapData.""" mirror: str suite: str types: Annotated[ list[SystemBootstrapRepositoryType], pydantic.Field(min_length=1), pydantic.AfterValidator(require_unique), ] = pydantic.Field( default_factory=lambda: [SystemBootstrapRepositoryType.DEB] ) components: ( Annotated[list[str], pydantic.AfterValidator(require_unique)] | None ) = None check_signature_with: SystemBootstrapRepositoryCheckSignatureWith = ( SystemBootstrapRepositoryCheckSignatureWith.SYSTEM ) keyring_package: str | None = None keyring: SystemBootstrapRepositoryKeyring | None = None @pydantic.model_validator(mode="after") def _check_external_keyring(self) -> Self: """Require keyring if check_signature_with is external.""" if ( self.check_signature_with == SystemBootstrapRepositoryCheckSignatureWith.EXTERNAL ): if self.keyring is None: raise ValueError( "repository requires 'keyring': " "'check_signature_with' is set to 'external'" ) return self
[docs] class SystemBootstrapData(BaseTaskData): """Base for in-memory class data for SystemBootstrap tasks.""" bootstrap_options: SystemBootstrapOptions bootstrap_repositories: Annotated[ list[SystemBootstrapRepository], pydantic.Field(min_length=1) ] customization_script: str | None = None
[docs] class MmDebstrapBootstrapOptions(SystemBootstrapOptions): """Structure of MmDebstrap options.""" variant: MmDebstrapVariant | None = None use_signed_by: bool = True
[docs] class MmDebstrapData(SystemBootstrapData): """In memory task data for the MmDebstrap task.""" bootstrap_options: MmDebstrapBootstrapOptions
[docs] class DebootstrapBootstrapOptions(SystemBootstrapOptions): """Structure of debootstrap options.""" variant: DebootstrapVariant | None = None
[docs] class DiskImageFormat(StrEnum): """Possible disk image formats.""" RAW = "raw" QCOW2 = "qcow2"
[docs] class Partition(BaseTaskDataModel): """Partition definition.""" size: int filesystem: str mountpoint: str = "none"
[docs] class DiskImage(BaseTaskDataModel): """Disk image definition.""" format: DiskImageFormat filename: str = "image" kernel_package: str | None = None bootloader: str | None = None partitions: Annotated[list[Partition], pydantic.Field(min_length=1)]
[docs] class SystemImageBuildData(SystemBootstrapData): """Base for in-memory class data for SystemImageBuild tasks.""" bootstrap_options: DebootstrapBootstrapOptions disk_image: DiskImage
[docs] class PiupartsDataInput(BaseTaskDataModel): """Input for a piuparts task.""" binary_artifacts: LookupMultiple
[docs] class PiupartsData(BaseTaskDataWithExecutor, BaseTaskDataWithExtraRepositories): """In memory task data for the Piuparts task.""" input: PiupartsDataInput base_tgz: LookupSingle # BaseTaskDataWithExecutor declares this as optional, but it's required # here. environment: LookupSingle
[docs] class PiupartsDynamicData( BaseDynamicTaskDataWithExecutor, BaseDynamicTaskDataWithExtraRepositories ): """Dynamic data for the Piuparts task.""" # BaseTaskDynamicDataWithExecutor declares this as optional, but it's # required here. environment_id: int input_binary_artifacts_ids: list[int] base_tgz_id: int
[docs] @override def get_input_artifacts_ids(self) -> list[int]: return [ self.environment_id, self.base_tgz_id, *self.input_binary_artifacts_ids, ]
[docs] class SbuildInput(BaseTaskDataModel): """Input for a sbuild task.""" source_artifact: LookupSingle extra_binary_artifacts: LookupMultiple = pydantic.Field( default_factory=partial(LookupMultiple, ()) )
[docs] class SbuildBinNMU(BaseTaskDataModel): """binmu for a sbuild task.""" # --make-binNMU changelog: str # --append-to-version suffix: str # --binNMU-timestamp, default to now timestamp: dt.datetime | None = None # --maintainer, defaults to uploader maintainer: pydantic.NameEmail | None = None
[docs] class SbuildBuildComponent(StrEnum): """Possible values for build_components.""" ANY = "any" ALL = "all" SOURCE = "source"
[docs] class SbuildBuildDepResolver(StrEnum): """Possible values for build_dep_resolver.""" APT = "apt" APTITUDE = "aptitude" ASPCUD = "aspcud" XAPT = "xapt"
[docs] class SbuildData(BaseTaskDataWithExecutor, BaseTaskDataWithExtraRepositories): """In memory task data for the Sbuild task.""" input: SbuildInput # BaseTaskDataWithExecutor declares this as optional, but it's required # here. build_architecture: str build_components: list[SbuildBuildComponent] = pydantic.Field( default_factory=lambda: [SbuildBuildComponent.ANY] ) binnmu: SbuildBinNMU | None = None build_profiles: list[str] | None = None # BaseTaskDataWithExecutor declares this as optional, but it's required # here. environment: LookupSingle build_dep_resolver: SbuildBuildDepResolver | None = None aspcud_criteria: str | None = None resolve_alternatives: bool = False
[docs] @pydantic.model_validator(mode="after") def check_binnmu_against_components(self) -> Self: """Binnmus are incompatible with architecture-independent builds.""" if self.binnmu is not None: if SbuildBuildComponent.ALL in self.build_components: raise ValueError( "Cannot build architecture-independent packages in a binNMU" ) return self
[docs] class SbuildDynamicData( BaseDynamicTaskDataWithExecutor, BaseDynamicTaskDataWithExtraRepositories ): """Dynamic data for the Sbuild task.""" input_source_artifact_id: int input_extra_binary_artifacts_ids: list[int] = pydantic.Field( default_factory=list ) binnmu_maintainer: str | None = None
[docs] @override def get_source_package_name(self) -> str | None: return self.subject
[docs] @override def get_input_artifacts_ids(self) -> list[int]: result: list[int] = [] if val := self.environment_id: result.append(val) result.append(self.input_source_artifact_id) result.extend(self.input_extra_binary_artifacts_ids) return result
[docs] class ImageCacheUsageLogEntry(BaseTaskDataModel): """Entry in ImageCacheUsageLog for cached executor images.""" filename: str backend: BackendType | None = None timestamp: dt.datetime
[docs] @pydantic.field_validator("timestamp", mode="after") @classmethod def timestamp_is_aware(cls, timestamp: dt.datetime) -> dt.datetime: """Ensure that the timestamp is TZ-aware.""" tzinfo = timestamp.tzinfo if tzinfo is None or tzinfo.utcoffset(timestamp) is None: raise ValueError("timestamp is TZ-naive") return timestamp
[docs] class ImageCacheUsageLog(BaseTaskDataModel): """Usage log for cached executor images.""" version: int = 1 backends: set[BackendType] = pydantic.Field(default_factory=set) usage: list[ImageCacheUsageLogEntry] = pydantic.Field(default_factory=list)
[docs] @pydantic.field_validator("version", mode="after") @classmethod def version_is_known(cls, version: int) -> int: """Ensure that the version is known.""" if version != 1: raise ValueError(f"Unknown usage log version {version}") return version
[docs] class ExtractForSigningInput(BaseTaskDataModel): """Input for the ExtractForSigning task.""" template_artifact: LookupSingle binary_artifacts: LookupMultiple
[docs] class ExtractForSigningData(BaseTaskDataWithExecutor): """In-memory task data for the ExtractForSigning task.""" # BaseTaskDataWithExecutor declares this as optional, but it's required # here. environment: LookupSingle input: ExtractForSigningInput
[docs] @pydantic.field_validator("backend", mode="after") @classmethod def backend_is_auto(cls, backend: str) -> str: """Ensure that the backend is "auto".""" if backend != BackendType.AUTO: raise ValueError( f'ExtractForSigning only accepts backend ' f'"{BackendType.AUTO}", not "{backend}"' ) return backend
[docs] class ExtractForSigningDynamicData(BaseDynamicTaskDataWithExecutor): """Dynamic data for the ExtractForSigning task.""" # BaseDynamicTaskDataWithExecutor declares this as optional, but it's # required here. environment_id: int input_template_artifact_id: int input_binary_artifacts_ids: list[int]
[docs] @override def get_input_artifacts_ids(self) -> list[int]: return [ self.environment_id, self.input_template_artifact_id, *self.input_binary_artifacts_ids, ]
[docs] class AssembleSignedSourceData(BaseTaskDataWithExecutor): """In-memory task data for the AssembleSignedSource task.""" # BaseTaskDataWithExecutor declares this as optional, but it's required # here. environment: LookupSingle template: LookupSingle signed: LookupMultiple
[docs] @pydantic.field_validator("backend", mode="after") @classmethod def backend_is_auto(cls, backend: str) -> str: """Ensure that the backend is "auto".""" if backend != BackendType.AUTO: raise ValueError( f'AssembleSignedSource only accepts backend ' f'"{BackendType.AUTO}", not "{backend}"' ) return backend
[docs] class AssembleSignedSourceDynamicData(BaseDynamicTaskDataWithExecutor): """Dynamic data for the AssembleSignedSource task.""" # BaseDynamicTaskDataWithExecutor declares this as optional, but it's # required here. environment_id: int template_id: int signed_ids: list[int] unsigned_ids: list[int] = pydantic.Field(default_factory=list)
[docs] @override def get_input_artifacts_ids(self) -> list[int]: return [ self.environment_id, self.template_id, *self.signed_ids, *self.unsigned_ids, ]
[docs] class MakeSourcePackageUploadInput(BaseTaskDataModel): """Input for a MakeSourcePackageUpload task.""" source_artifact: LookupSingle
[docs] class MakeSourcePackageUploadData(BaseTaskDataWithExecutor): """In memory task data for the MakeSourcePackageUpload task.""" # BaseTaskDataWithExecutor declares this as optional, but it's required # here. environment: LookupSingle input: MakeSourcePackageUploadInput since_version: str | None = None target_distribution: str | None = None
[docs] class MakeSourcePackageUploadDynamicData(BaseDynamicTaskDataWithExecutor): """Expanded dynamic data for the MakeSourcePackageUpload task.""" # BaseDynamicTaskDataWithExecutor declares this as optional, but it's # required here. environment_id: int input_source_artifact_id: int
[docs] @override def get_source_package_name(self) -> str | None: return self.subject
[docs] @override def get_input_artifacts_ids(self) -> list[int]: return [self.environment_id, self.input_source_artifact_id]
[docs] class MergeUploadsInput(BaseTaskDataModel): """Input for a MergeUploads task.""" uploads: LookupMultiple
[docs] class MergeUploadsData(BaseTaskData): """In memory task data for the MergeUploads task.""" # No longer used. backend: BackendType = BackendType.AUTO # No longer used. environment: LookupSingle | None = None input: MergeUploadsInput
[docs] class MergeUploadsDynamicData(BaseDynamicTaskData): """Expanded dynamic data for the MergeUploads task.""" # No longer used. environment_id: int | None = None input_uploads_ids: list[int]
[docs] @override def get_source_package_name(self) -> str | None: return self.subject
[docs] @override def get_input_artifacts_ids(self) -> list[int]: return self.input_uploads_ids
[docs] class BaseTaskInput(pydantic.BaseModel): """Base for structures defining task input artifacts and collections.""" model_config = pydantic.ConfigDict( validate_assignment=True, extra="forbid", frozen=True )
[docs] class BaseLabeledTaskInput(BaseTaskInput): """Task input annotated with a task data field name.""" #: the task data label. label: str
[docs] class BaseTaskInputArtifact(BaseTaskInput): """Represent an artifact as task input.""" #: Artifact ID artifact_id: int | None = None #: Artifact category category: ArtifactCategory | None = None #: Artifact data data: ArtifactData | None = None
[docs] @pydantic.field_serializer("data") def serialize_data( self, data: ArtifactData | None ) -> dict[str, Any] | None: """Serialize the data field.""" if data is None: return None return data.model_dump()
@pydantic.field_validator("category", mode="before") @classmethod def _validate_category(cls, category: Any) -> Any: """Validate the category field.""" if category is None: return None if category not in ARTIFACT_DATA_MODELS_BY_CATEGORY: raise ValueError(f"Category {category!r} is not supported") return category @pydantic.field_validator("data", mode="before") @classmethod def _validate_data(cls, data: Any, info: pydantic.ValidationInfo) -> Any: """Validate and deserialize the data field.""" if data is None: return None if (category := info.data.get("category")) is None: raise ValueError("Category must be present if data is supplied") artifact_cls = ARTIFACT_DATA_MODELS_BY_CATEGORY[category] return artifact_cls.model_validate(data)
[docs] class BaseInputArtifactSingle(BaseTaskInputArtifact, BaseLabeledTaskInput): """Single task input artifact.""" #: Lookup used to obtain the artifact. Usually from the task data but #: could be generated by the task logic. lookup: LookupSingle
[docs] class InputArtifactSingle(BaseInputArtifactSingle): """Represent a declared single input artifact for a task."""
[docs] def resolved_ids(self) -> list[int]: """Return [artifact_id] or [].""" return [] if self.artifact_id is None else [self.artifact_id]
# TODO: merge into InputArtifactSingle when we have a better way to fill the # required fields of ArtifactInfo also for workflows
[docs] class ArtifactInfo(InputArtifactSingle): """Information about an artifact.""" # Redefined as not None artifact_id: int category: ArtifactCategory data: ArtifactData @property def id(self) -> int: """Compatibility alias for artifact_id.""" return self.artifact_id
[docs] class InputArtifactMultiple(BaseLabeledTaskInput): """Represent declared multiple input artifacts for a task.""" #: Lookup used to obtain the artifacts. lookup: LookupMultiple #: Artifact data if it's already known artifacts: Sequence[BaseTaskInputArtifact] = pydantic.Field( default_factory=list )
[docs] def resolved_ids(self) -> list[int]: """Return artifact_ids or [].""" return [ a.artifact_id for a in self.artifacts if a.artifact_id is not None ]
[docs] @classmethod def from_ids( cls, lookup: LookupMultiple, label: str, artifact_ids: Collection[int] | None, ) -> Self: """Create an InputArtifactMultiple from a list of artifact IDs.""" return cls( lookup=lookup, label=label, artifacts=( [ BaseTaskInputArtifact(artifact_id=artifact_id) for artifact_id in artifact_ids or () ] ), )
# TODO: serialize collection_id as id for shorter JSON in the database?
[docs] class InputCollectionSingle(BaseLabeledTaskInput): """Information about a collection.""" #: Lookup used to obtain the collection lookup: LookupSingle #: Collection ID collection_id: int #: Name of the scope that contains the collection scope_name: str #: Name of the workspace that contains the collection workspace_name: str #: Category of the collection category: CollectionCategory #: Collection name name: str #: Collection data data: dict[str, Any]
InputArtifact: TypeAlias = InputArtifactSingle | InputArtifactMultiple