# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Models used by debusine tasks."""
import datetime as dt
import re
from abc import ABC
from collections.abc import Collection, Iterator, Sequence
from enum import StrEnum
from functools import partial
from itertools import chain, groupby
from pathlib import Path
from typing import (
Annotated,
Any,
ClassVar,
Literal,
Self,
TypeAlias,
Union,
override,
)
import debian.deb822 as deb822
import pydantic
from debusine.artifacts.models import (
ARTIFACT_DATA_MODELS_BY_CATEGORY,
ArtifactCategory,
ArtifactData,
BareDataCategory,
CollectionCategory,
DebianPackageBuildLog,
RuntimeStatistics,
WorkRequestResults,
)
from debusine.client.models import LookupChildType
from debusine.utils import DjangoChoicesEnum
[docs]
class WorkerType(DjangoChoicesEnum):
"""The type of a Worker."""
SCHEDULER = "scheduler"
EXTERNAL = "external"
CELERY = "celery"
SIGNING = "signing"
[docs]
class BackendType(StrEnum):
"""Possible values for backend."""
AUTO = "auto"
UNSHARE = "unshare"
INCUS_LXC = "incus-lxc"
INCUS_VM = "incus-vm"
QEMU = "qemu"
[docs]
class AutopkgtestNeedsInternet(StrEnum):
"""Possible values for needs_internet."""
RUN = "run"
TRY = "try"
SKIP = "skip"
[docs]
class LintianFailOnSeverity(StrEnum):
"""Possible values for fail_on_severity."""
ERROR = "error"
WARNING = "warning"
INFO = "info"
PEDANTIC = "pedantic"
EXPERIMENTAL = "experimental"
OVERRIDDEN = "overridden"
NONE = "none"
[docs]
class BaseTaskDataModel(pydantic.BaseModel):
"""Stricter pydantic defaults for task data models."""
model_config = pydantic.ConfigDict(validate_assignment=True, extra="forbid")
# Replace with serialize_by_alias=True in model_config once we can
# assume pydantic >= 2.11.
[docs]
def model_dump(self, **kwargs: Any) -> dict[str, Any]:
"""Use aliases by default when serializing."""
kwargs.setdefault("by_alias", True)
return super().model_dump(**kwargs)
# https://github.com/pydantic/pydantic-core/issues/296
[docs]
def require_unique(v: list[Any]) -> list[Any]:
"""Field validator to require that list items are unique."""
if sorted(v) != sorted(set(v)):
raise ValueError(f"{v} contains duplicate items")
return v
# Lookups resulting in exactly one collection item.
LookupSingle: TypeAlias = int | str
def _build_key_value_string(data: dict[str, Any]) -> str:
# TODO: Implement URL-quoting to support special characters
return ":".join(f"{key}={value}" for key, value in data.items())
[docs]
def build_lookup_string(
*segments: str, options: dict[str, Any] | None = None
) -> str:
"""Build a string lookup from segments and options."""
options_prefix = ""
if options:
inside = _build_key_value_string(options)
options_prefix = f"//{inside}/"
return f"{options_prefix}{"/".join(segments)}"
def _parse_key_value_string(input_string: str) -> dict[str, str]:
data = {}
for item in input_string.split(":"):
# TODO: Implement URL-unquoting to support special characters
name, value = item.split("=", 1)
data[name] = value
return data
[docs]
def parse_lookup_string(lookup: str) -> tuple[list[str], dict[str, str] | None]:
"""Parse a string lookup into options and segments."""
options = None
# Extract and parse options
if m := re.match(r"//([^/]*)/", lookup):
lookup_options = m.group(1)
lookup = lookup[len(lookup_options) + 3 :]
options = _parse_key_value_string(lookup_options)
return (lookup.split("/"), options)
[docs]
def build_key_value_lookup_segment(
lookup_type: str, filters: dict[str, str]
) -> str:
"""Build a lookup segment consisting of a series of `key=value` filters."""
filters_string = _build_key_value_string(filters)
return f"{lookup_type}:{filters_string}"
[docs]
def parse_key_value_lookup_segment(segment: str) -> tuple[str, dict[str, str]]:
"""Parse a lookup segment consisting of a series of `key=value` filters."""
lookup_type, remaining = segment.split(":", 1)
return lookup_type, _parse_key_value_string(remaining)
[docs]
class CollectionItemMatcherKind(StrEnum):
"""Possible values for CollectionItemMatcher.kind."""
CONTAINS = "contains"
ENDSWITH = "endswith"
EXACT = "exact"
STARTSWITH = "startswith"
[docs]
class CollectionItemMatcher(BaseTaskDataModel):
"""A matcher for collection item name or per-item data fields."""
model_config = BaseTaskDataModel.model_config | pydantic.ConfigDict(
frozen=True
)
kind: CollectionItemMatcherKind
value: Any
ExtraRepository: TypeAlias = ExtraDebusineRepository | ExtraExternalRepository
[docs]
class LookupDict(BaseTaskDataModel):
"""Dictionary lookups for collection items."""
model_config = BaseTaskDataModel.model_config | pydantic.ConfigDict(
frozen=True
)
collection: LookupSingle
child_type: LookupChildType = LookupChildType.ARTIFACT
category: str | None = None
name_matcher: CollectionItemMatcher | None = None
# Logically a mapping, but we turn it into a tuple so that lookups are
# hashable.
data_matchers: tuple[tuple[str, CollectionItemMatcher], ...] = ()
# Logically a mapping, but we turn it into a tuple so that lookups are
# hashable. In future some filters may want other value types, but they
# need to be hashable and our current use cases involve subordinate
# lookups, so just declare them to be lookups for now.
lookup_filters: tuple[
tuple[str, Union[LookupSingle, "LookupMultiple"]], ...
] = ()
[docs]
@pydantic.model_validator(mode="before")
@classmethod
def normalize_matchers(cls, data: Any) -> dict[str, Any]: # noqa: C901
"""
Transform the lookup syntax into a form more convenient for pydantic.
`name`, `name__*`, `data__KEY`, and `data__KEY__*` keys in `values`
are transformed into :py:class:`CollectionItemMatcher` instances
stored in `name_matcher` and `data_matchers`. `lookup__KEY` keys
are transformed into entries in `lookup_filters`. Conflicting
lookup suffixes are rejected.
"""
def split_words(s: str) -> list[str]:
return s.split("__")
def matcher_prefix(s: str) -> tuple[str, ...]:
match split_words(s):
case ["name", *_]:
return ("name",)
case ["data", data_key, *_]:
return ("data", data_key)
case ["lookup", lookup_key]:
return ("lookup", lookup_key)
case _:
return ()
if not isinstance(data, dict):
raise ValueError(f"Expected dict, got {type(data)!r}")
data = data.copy()
assert isinstance(data, dict)
data_matchers: dict[str, CollectionItemMatcher] = dict(
data.get("data_matchers", ())
)
lookup_filters: dict[str, LookupSingle | LookupMultiple] = dict(
data.get("lookup_filters", ())
)
for key, group in groupby(
sorted(data, key=matcher_prefix), matcher_prefix
):
if not key:
# Only keys with `name`, `data`, or `lookup` as their first
# segment need transformation.
continue
matcher_names = list(group)
if len(matcher_names) > 1:
raise ValueError(
f"Conflicting matchers: {sorted(matcher_names)}"
)
assert len(matcher_names) == 1
matcher_name = matcher_names[0]
data_key: str | None = None
match words := split_words(matcher_name):
case ["name"]:
kind = "exact"
case ["name", kind] if kind != "exact":
pass
case ["data", data_key]:
kind = "exact"
case ["data", data_key, kind] if kind != "exact":
pass
case ["lookup", lookup_key]:
match data[matcher_name]:
case int() | str():
lookup_filters[lookup_key] = data[matcher_name]
case _:
lookup_filters[lookup_key] = (
LookupMultiple.model_validate(
data[matcher_name]
)
)
for k in matcher_names:
del data[k]
continue
case _:
# We filter out keys with first segments other than
# `name`, `data`, or `lookup` earlier, so anything that
# reaches here must be a malformed lookup.
raise ValueError(f"Unrecognized matcher: {matcher_name}")
matcher = CollectionItemMatcher(
kind=CollectionItemMatcherKind(kind), value=data[matcher_name]
)
if words[0] == "name":
data["name_matcher"] = matcher
else:
assert words[0] == "data"
assert data_key is not None
data_matchers[data_key] = matcher
for k in matcher_names:
del data[k]
if data_matchers:
data["data_matchers"] = tuple(sorted(data_matchers.items()))
if lookup_filters:
data["lookup_filters"] = tuple(sorted(lookup_filters.items()))
return data
[docs]
def export(self) -> dict[str, Any]:
"""
Export the usual input representation of this lookup.
This reverses the transformations applied by
:py:meth:`normalize_matchers`.
"""
value: dict[str, Any] = {"collection": self.collection}
if self.child_type != LookupChildType.ARTIFACT:
value["child_type"] = str(self.child_type)
if self.category is not None:
value["category"] = self.category
if self.name_matcher is not None:
if self.name_matcher.kind == CollectionItemMatcherKind.EXACT:
value["name"] = self.name_matcher.value
else:
value[f"name__{self.name_matcher.kind}"] = (
self.name_matcher.value
)
for key, matcher in self.data_matchers:
if matcher.kind == CollectionItemMatcherKind.EXACT:
value[f"data__{key}"] = matcher.value
else:
value[f"data__{key}__{matcher.kind}"] = matcher.value
for key, filter_value in self.lookup_filters:
if isinstance(filter_value, LookupMultiple):
value[f"lookup__{key}"] = filter_value.export()
else:
value[f"lookup__{key}"] = filter_value
return value
[docs]
class LookupMultiple(pydantic.RootModel[tuple[LookupSingle | LookupDict, ...]]):
"""Lookups resulting in multiple collection items."""
model_config = pydantic.ConfigDict(validate_assignment=True, frozen=True)
[docs]
@pydantic.model_validator(mode="before")
@classmethod
def normalize(cls, data: Any) -> tuple[Any, ...]:
"""Normalize into a tuple of multiple matchers."""
if isinstance(data, dict):
return (data,)
elif isinstance(data, (tuple, list)):
return tuple(data)
else:
raise ValueError(
"Lookup of multiple collection items must be a dictionary or "
"a list"
)
def __iter__( # type: ignore[override]
self,
) -> Iterator[LookupSingle | LookupDict]:
"""Iterate over individual lookups."""
return iter(self.root)
def __bool__(self) -> bool:
"""Return True if and only if this lookup is non-empty."""
return bool(self.root)
[docs]
def export(self) -> dict[str, Any] | list[int | str | dict[str, Any]]:
"""
Export the usual input representation of this lookup.
This reverses the transformations applied by :py:meth:`normalize`.
"""
if len(self.root) == 1 and isinstance(self.root[0], LookupDict):
return self.root[0].export()
else:
return [
lookup.export() if isinstance(lookup, LookupDict) else lookup
for lookup in self
]
# Resolve circular reference to LookupMultiple.
LookupDict.model_rebuild()
[docs]
class NotificationDataEmail(BaseTaskDataModel):
"""Channel data for email notifications."""
from_: pydantic.EmailStr | None = pydantic.Field(default=None, alias="from")
to: list[pydantic.EmailStr] | None = None
cc: list[pydantic.EmailStr] = pydantic.Field(default_factory=list)
subject: str | None = None
[docs]
class ActionTypes(StrEnum):
"""Possible values for EventReaction actions."""
SKIP_IF_LOOKUP_RESULT_CHANGED = "skip-if-lookup-result-changed"
SEND_NOTIFICATION = "send-notification"
UPDATE_COLLECTION_WITH_ARTIFACTS = "update-collection-with-artifacts"
UPDATE_COLLECTION_WITH_DATA = "update-collection-with-data"
RETRY_WITH_DELAYS = "retry-with-delays"
RECORD_IN_TASK_HISTORY = "record-in-task-history"
[docs]
class ActionSkipIfLookupResultChanged(BaseTaskDataModel):
"""Action for skipping a work request if a lookup result changed."""
action: Literal[ActionTypes.SKIP_IF_LOOKUP_RESULT_CHANGED] = (
ActionTypes.SKIP_IF_LOOKUP_RESULT_CHANGED
)
# Deliberately not LookupSingle; this must be able to return a
# collection item, so integer lookups are never valid.
lookup: str
collection_item_id: int | None
promise_name: str | None = None
[docs]
class ActionSendNotification(BaseTaskDataModel):
"""Action for sending a notification."""
action: Literal[ActionTypes.SEND_NOTIFICATION] = (
ActionTypes.SEND_NOTIFICATION
)
channel: str
data: NotificationDataEmail | None = None
[docs]
class ActionUpdateCollectionWithArtifacts(BaseTaskDataModel):
"""Action for updating a collection with artifacts."""
action: Literal[ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS] = (
ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS
)
collection: LookupSingle
name_template: str | None = None
variables: dict[str, Any] | None = None
artifact_filters: dict[str, Any]
created_at: dt.datetime | None = None
[docs]
class ActionUpdateCollectionWithData(BaseTaskDataModel):
"""Action for updating a collection with bare data."""
action: Literal[ActionTypes.UPDATE_COLLECTION_WITH_DATA] = (
ActionTypes.UPDATE_COLLECTION_WITH_DATA
)
collection: LookupSingle
category: BareDataCategory
name_template: str | None = None
data: dict[str, Any] | None = None
created_at: dt.datetime | None = None
[docs]
class ActionRetryWithDelays(BaseTaskDataModel):
"""Action for retrying a work request with delays."""
_delay_re: ClassVar[re.Pattern[str]] = re.compile(r"^([0-9]+)([mhdw])$")
action: Literal[ActionTypes.RETRY_WITH_DELAYS] = (
ActionTypes.RETRY_WITH_DELAYS
)
delays: Annotated[
list[Annotated[str, pydantic.Field(pattern=_delay_re)]],
(pydantic.Field(min_length=1)),
]
[docs]
class ActionRecordInTaskHistory(BaseTaskDataModel):
"""Action for recording the task run in a task-history collection."""
action: Literal[ActionTypes.RECORD_IN_TASK_HISTORY] = (
ActionTypes.RECORD_IN_TASK_HISTORY
)
subject: str | None = None
context: str | None = None
EventReaction = Annotated[
Union[
ActionSkipIfLookupResultChanged,
ActionSendNotification,
ActionUpdateCollectionWithArtifacts,
ActionUpdateCollectionWithData,
ActionRetryWithDelays,
ActionRecordInTaskHistory,
],
pydantic.Field(discriminator="action"),
]
[docs]
class EventReactions(BaseTaskDataModel):
"""Structure for event reactions."""
on_creation: list[EventReaction] = []
on_unblock: list[EventReaction] = []
on_assignment: list[EventReaction] = []
on_success: list[EventReaction] = []
on_failure: list[EventReaction] = []
[docs]
def model_dump(self, **kwargs: Any) -> dict[str, Any]:
"""Wrap pydantic to ensure discriminators are serializers."""
# TODO: from python 3.5, docstrings are inherited from the parent
# class, which would be the right thing here. However, flake8 doesn't
# like it.
# Mark discriminators as always set.
# See: https://github.com/pydantic/pydantic/issues/6465
for name in (
"on_creation",
"on_unblock",
"on_assignment",
"on_success",
"on_failure",
):
for item in getattr(self, name):
item.model_fields_set.add("action")
return super().model_dump(**kwargs)
[docs]
def add_reaction(
self,
name: Literal[
"on_creation",
"on_unblock",
"on_assignment",
"on_success",
"on_failure",
],
action: EventReaction,
) -> None:
"""
Add an EventReaction to the named event.
This is a noop if the named event already contains this EventReaction.
"""
current = getattr(self, name)
if action not in current:
current.append(action)
# Let pydantic know we modified the field.
# See: https://github.com/pydantic/pydantic/issues/9866
self.model_fields_set.add(name)
[docs]
class BaseTaskData(BaseTaskDataModel):
"""
Base class for task data.
Task data is encoded as JSON in the database and in the API, and it is
modeled as a pydantic data structure in memory for both ease of access and
validation.
"""
DEFAULT_TASK_CONFIGURATION_LOOKUP: ClassVar[str] = (
"default@debusine:task-configuration"
)
#: debusine:task-configuration collection to use to configure the task.
#: This is resolved to an ID when configuration is applied
task_configuration: LookupSingle | None = DEFAULT_TASK_CONFIGURATION_LOOKUP
[docs]
class BaseTaskDataWithExecutor(BaseTaskData):
"""Base task data with fields used to configure executors."""
backend: BackendType = BackendType.AUTO
environment: LookupSingle | None = None
# Not all executor-based tasks strictly require this, but it can
# nevertheless be useful to constrain a task to run on an architecture
# for which we know that a suitable environment exists.
build_architecture: str | None = None
[docs]
class BaseDynamicTaskData(BaseTaskDataModel):
"""
Base class for dynamic task data.
This is computed by the scheduler when dispatching a task to a worker.
It may involve resolving artifact lookups.
"""
#: Brief human-readable summary of the most important parameters to this
#: work request.
parameter_summary: str | None = None
#: An abstract string value representing the subject of the task, for
#: the purpose of recording statistics. It is meant to group possible
#: inputs into groups that we expect to behave similarly.
subject: str | None = None
#: An abstract string value representing the runtime context in which
#: the task is executed, for the purpose of recording statistics. It is
#: meant to represent some of the task parameters that can significantly
#: alter the runtime behaviour of the task.
runtime_context: str | None = None
#: Name of the configuration context
# This is a stable string representation of the main input artifacts and
# parameters, that does not change across similar future invocations of the
# same task, and is used to look up configuration parameters.
configuration_context: str | None = None
#: Storage for resolved task input fields
task_input_fields: dict[str, Any] = pydantic.Field(default_factory=dict)
[docs]
def get_source_package_name(self) -> str | None:
"""
Extract the source package name.
:returns: the source package name, if applicable, and None otherwise.
"""
return None
[docs]
class BaseDynamicTaskDataWithExecutor(BaseDynamicTaskData):
"""Dynamic task data for executors."""
environment_id: int | None = None
[docs]
class OutputDataError(BaseTaskDataModel):
"""An error encountered when running a task."""
message: str
code: str
[docs]
class RegressionAnalysisStatus(StrEnum):
"""Possible values for ``RegressionAnalysis.status``."""
NO_RESULT = "no-result"
ERROR = "error"
IMPROVEMENT = "improvement"
STABLE = "stable"
REGRESSION = "regression"
[docs]
class RegressionAnalysis(BaseTaskDataModel):
"""The result of a regression analysis."""
original_source_version: str | None = None
original_artifact_id: int | None = None
new_source_version: str | None = None
new_artifact_id: int | None = None
status: RegressionAnalysisStatus
details: dict[str, Any] | list[Any] | None = None
# Obsolete, but may be present in old rows.
original_url: str | None = None
new_url: str | None = None
[docs]
class Confirmation(BaseTaskDataModel):
"""A record that a work request was confirmed or denied by a user."""
confirmed: bool
comment: str | None = None
[docs]
class OutputData(BaseTaskDataModel):
"""Data produced when a task is completed."""
runtime_statistics: RuntimeStatistics | None = None
errors: list[OutputDataError] | None = None
skip_reason: str | None = None
regression_analysis: dict[str, RegressionAnalysis] | None = None
confirmation: Confirmation | None = None
[docs]
def merge(self, other: "OutputData") -> "OutputData":
"""
Return the combination of two ``OutputData`` objects.
Merge all the fields that were explicitly set on each model. If a
field is set on both models, the values in ``other`` take
precedence.
"""
merged = self.model_copy()
for name in other.model_fields_set:
setattr(merged, name, getattr(other, name))
return merged
[docs]
class NoopData(BaseTaskDataWithExecutor):
"""In memory task data for the Noop task."""
result: WorkRequestResults = WorkRequestResults.SUCCESS
[docs]
class AutopkgtestFailOn(BaseTaskDataModel):
"""Possible values for fail_on."""
failed_test: bool = True
flaky_test: bool = False
skipped_test: bool = False
[docs]
class AutopkgtestTimeout(BaseTaskDataModel):
"""Timeout specifications for an autopkgtest task."""
global_: Annotated[int | None, pydantic.Field(ge=0)] = pydantic.Field(
default=None, alias="global"
)
factor: Annotated[int | None, pydantic.Field(ge=0)] = None
short: Annotated[int | None, pydantic.Field(ge=0)] = None
install: Annotated[int | None, pydantic.Field(ge=0)] = None
test: Annotated[int | None, pydantic.Field(ge=0)] = None
copy_: Annotated[int | None, pydantic.Field(ge=0)] = pydantic.Field(
default=None, alias="copy"
)
[docs]
class AutopkgtestData(
BaseTaskDataWithExecutor, BaseTaskDataWithExtraRepositories
):
"""In memory task data for the Autopkgtest task."""
input: AutopkgtestInput
include_tests: list[str] = pydantic.Field(default_factory=list)
exclude_tests: list[str] = pydantic.Field(default_factory=list)
debug_level: Annotated[int, pydantic.Field(ge=0, le=3)] = 0
use_packages_from_base_repository: bool = False
extra_environment: dict[str, str] = pydantic.Field(default_factory=dict)
needs_internet: AutopkgtestNeedsInternet = AutopkgtestNeedsInternet.RUN
fail_on: AutopkgtestFailOn = pydantic.Field(
default_factory=AutopkgtestFailOn
)
timeout: AutopkgtestTimeout | None = None
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
environment: LookupSingle
[docs]
class AutopkgtestDynamicData(
BaseDynamicTaskDataWithExecutor, BaseDynamicTaskDataWithExtraRepositories
):
"""Dynamic data for the Autopkgtest task."""
# BaseDynamicTaskDataWithExecutor declares this as optional, but it's
# required here.
environment_id: int
input_source_artifact_id: int
input_binary_artifacts_ids: list[int]
input_context_artifacts_ids: list[int] = pydantic.Field(
default_factory=list
)
[docs]
@override
def get_source_package_name(self) -> str | None:
return self.subject
[docs]
class LintianOutput(BaseTaskDataModel):
"""Output configuration for a Lintian task."""
source_analysis: bool = True
binary_all_analysis: bool = True
binary_any_analysis: bool = True
[docs]
class LintianData(BaseTaskDataWithExecutor):
"""In memory task data for the Lintian task."""
input: LintianInput
output: LintianOutput = pydantic.Field(default_factory=LintianOutput)
target_distribution: str = "debian:unstable"
# Passed to --tags to lintian
include_tags: list[str] = pydantic.Field(default_factory=list)
# Passed to --suppress-tags to lintian
exclude_tags: list[str] = pydantic.Field(default_factory=list)
# If the analysis emits tags of this severity or higher, the task will
# return 'failure' instead of 'success'
fail_on_severity: LintianFailOnSeverity = LintianFailOnSeverity.ERROR
[docs]
class LintianDynamicData(BaseDynamicTaskDataWithExecutor):
"""Dynamic data for the Lintian task."""
input_source_artifact_id: int | None = None
input_binary_artifacts_ids: list[int] = pydantic.Field(default_factory=list)
[docs]
@override
def get_source_package_name(self) -> str | None:
return self.subject
[docs]
class BlhcFlags(StrEnum):
"""Possible values for extra_flags."""
ALL = "--all"
BINDNOW = "--bindnow"
BUILDD = "--buildd"
COLOR = "--color"
DEBIAN = "--debian"
LINE_NUMBERS = "--line-numbers"
PIE = "--pie"
[docs]
class BlhcData(BaseTaskDataWithExecutor):
"""In memory task data for the Blhc task."""
input: BlhcInput
# Passed to blhc
extra_flags: list[BlhcFlags] = pydantic.Field(default_factory=list)
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
build_architecture: str
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
environment: LookupSingle
[docs]
class BlhcDynamicData(BaseDynamicTaskDataWithExecutor):
"""Dynamic data for the Blhc task."""
input_artifact_id: int
# TODO: Once old tasks created without it have expired, this should
# become required.
input_artifact_data: DebianPackageBuildLog | None = None
# BaseTaskDynamicDataWithExecutor declares this as optional, but it's
# required here.
environment_id: int
[docs]
@override
def get_source_package_name(self) -> str | None:
return self.subject
[docs]
class DebDiffFlags(StrEnum):
"""Possible values for extra_flags."""
DIRS = "--dirs"
NOCONTROL = "--nocontrol"
WDIFF = "--wdiff-source-control"
SHOW_MOVED = "--show-moved"
DIFFSTAT = "--diffstat"
PATCHES = "--apply-patches"
IGNORE_SPACE = "--ignore-space"
[docs]
class DebDiffData(BaseTaskDataWithExecutor):
"""In memory task data for the DebDiff task."""
input: DebDiffInput
# Passed to debdiff
extra_flags: list[DebDiffFlags] = pydantic.Field(default_factory=list)
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
build_architecture: str
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
environment: LookupSingle
[docs]
class DebDiffDynamicData(BaseDynamicTaskDataWithExecutor):
"""Dynamic data for the DebDiff task."""
# BaseTaskDynamicDataWithExecutor declares this as optional, but it's
# required here.
environment_id: int
input_source_artifacts_ids: Annotated[
list[int] | None, pydantic.Field(min_length=2, max_length=2)
] = None
input_binary_artifacts_ids: Annotated[
list[list[int]] | None, pydantic.Field(min_length=2, max_length=2)
] = None
[docs]
@pydantic.model_validator(mode="after")
def validate_source_or_binary_exclusive(self) -> Self:
"""
Validate mutually exclusive input artifacts.
Ensure input_source_artifacts_ids and input_binary_artifacts_ids
are not both set.
"""
# It currently allows to have "input_source_artifacts_ids" == None and
# "input_binary_artifacts_ids" == None to allow backwards compatibility
# with older data
if (self.input_source_artifacts_ids is not None) and (
self.input_binary_artifacts_ids is not None
):
raise ValueError(
"Only one of 'input_source_artifacts_ids' or "
"'input_binary_artifacts_ids' may be set (not both)."
)
return self
[docs]
@override
def get_source_package_name(self) -> str | None:
if self.subject and self.subject.startswith("source:"):
return self.subject.removeprefix("source:")
return None
[docs]
class MmDebstrapVariant(StrEnum):
"""Variants supported by `mmdebstrap`."""
BUILDD = "buildd"
MINBASE = "minbase"
DASH = "-"
APT = "apt"
CUSTOM = "custom"
DEBOOTSTRAP = "debootstrap"
ESSENTIAL = "essential"
EXTRACT = "extract"
IMPORTANT = "important"
REQUIRED = "required"
STANDARD = "standard"
[docs]
class DebootstrapVariant(StrEnum):
"""Variants supported by `debootstrap`."""
BUILDD = "buildd"
MINBASE = "minbase"
[docs]
class SystemBootstrapOptions(BaseTaskDataModel):
"""Structure of SystemBootstrap options."""
architecture: str
# Specializations of this model for individual tasks should restrict
# this to particular values.
variant: str | None = None
extra_packages: list[str] = pydantic.Field(default_factory=list)
use_signed_by: bool = True
[docs]
class SystemBootstrapRepositoryType(StrEnum):
"""Possible values for repository types."""
DEB = "deb"
DEB_SRC = "deb-src"
[docs]
class SystemBootstrapRepositoryCheckSignatureWith(StrEnum):
"""Possible values for check_signature_with."""
SYSTEM = "system"
EXTERNAL = "external"
NO_CHECK = "no-check"
[docs]
class SystemBootstrapRepositoryKeyring(BaseTaskDataModel):
"""Description of a repository keyring."""
url: pydantic.AnyUrl
sha256sum: str = ""
install: bool = False
[docs]
@pydantic.field_validator("url", mode="after")
@classmethod
def validate_url(cls, url: pydantic.AnyUrl) -> pydantic.AnyUrl:
"""
Reject file:// URLs outside /usr/(local/)share/keyrings/.
We don't want to allow reading arbitrary paths.
"""
if (
url.scheme == "file"
and url.path is not None
and not (
Path(url.path).resolve().is_relative_to("/usr/share/keyrings")
)
and not (
Path(url.path)
.resolve()
.is_relative_to("/usr/local/share/keyrings")
)
):
raise ValueError(
"file:// URLs for keyrings must be under /usr/share/keyrings/ "
"or /usr/local/share/keyrings/"
)
return url
[docs]
class SystemBootstrapRepository(BaseTaskDataModel):
"""Description of one repository in SystemBootstrapData."""
mirror: str
suite: str
types: Annotated[
list[SystemBootstrapRepositoryType],
pydantic.Field(min_length=1),
pydantic.AfterValidator(require_unique),
] = pydantic.Field(
default_factory=lambda: [SystemBootstrapRepositoryType.DEB]
)
components: (
Annotated[list[str], pydantic.AfterValidator(require_unique)] | None
) = None
check_signature_with: SystemBootstrapRepositoryCheckSignatureWith = (
SystemBootstrapRepositoryCheckSignatureWith.SYSTEM
)
keyring_package: str | None = None
keyring: SystemBootstrapRepositoryKeyring | None = None
@pydantic.model_validator(mode="after")
def _check_external_keyring(self) -> Self:
"""Require keyring if check_signature_with is external."""
if (
self.check_signature_with
== SystemBootstrapRepositoryCheckSignatureWith.EXTERNAL
):
if self.keyring is None:
raise ValueError(
"repository requires 'keyring': "
"'check_signature_with' is set to 'external'"
)
return self
[docs]
class SystemBootstrapData(BaseTaskData):
"""Base for in-memory class data for SystemBootstrap tasks."""
bootstrap_options: SystemBootstrapOptions
bootstrap_repositories: Annotated[
list[SystemBootstrapRepository], pydantic.Field(min_length=1)
]
customization_script: str | None = None
[docs]
class MmDebstrapBootstrapOptions(SystemBootstrapOptions):
"""Structure of MmDebstrap options."""
variant: MmDebstrapVariant | None = None
use_signed_by: bool = True
[docs]
class MmDebstrapData(SystemBootstrapData):
"""In memory task data for the MmDebstrap task."""
bootstrap_options: MmDebstrapBootstrapOptions
[docs]
class DebootstrapBootstrapOptions(SystemBootstrapOptions):
"""Structure of debootstrap options."""
variant: DebootstrapVariant | None = None
[docs]
class Partition(BaseTaskDataModel):
"""Partition definition."""
size: int
filesystem: str
mountpoint: str = "none"
[docs]
class DiskImage(BaseTaskDataModel):
"""Disk image definition."""
format: DiskImageFormat
filename: str = "image"
kernel_package: str | None = None
bootloader: str | None = None
partitions: Annotated[list[Partition], pydantic.Field(min_length=1)]
[docs]
class SystemImageBuildData(SystemBootstrapData):
"""Base for in-memory class data for SystemImageBuild tasks."""
bootstrap_options: DebootstrapBootstrapOptions
disk_image: DiskImage
[docs]
class PiupartsData(BaseTaskDataWithExecutor, BaseTaskDataWithExtraRepositories):
"""In memory task data for the Piuparts task."""
input: PiupartsDataInput
base_tgz: LookupSingle
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
environment: LookupSingle
[docs]
class PiupartsDynamicData(
BaseDynamicTaskDataWithExecutor, BaseDynamicTaskDataWithExtraRepositories
):
"""Dynamic data for the Piuparts task."""
# BaseTaskDynamicDataWithExecutor declares this as optional, but it's
# required here.
environment_id: int
input_binary_artifacts_ids: list[int]
base_tgz_id: int
[docs]
class SbuildBinNMU(BaseTaskDataModel):
"""binmu for a sbuild task."""
# --make-binNMU
changelog: str
# --append-to-version
suffix: str
# --binNMU-timestamp, default to now
timestamp: dt.datetime | None = None
# --maintainer, defaults to uploader
maintainer: pydantic.NameEmail | None = None
[docs]
class SbuildBuildComponent(StrEnum):
"""Possible values for build_components."""
ANY = "any"
ALL = "all"
SOURCE = "source"
[docs]
class SbuildBuildDepResolver(StrEnum):
"""Possible values for build_dep_resolver."""
APT = "apt"
APTITUDE = "aptitude"
ASPCUD = "aspcud"
XAPT = "xapt"
[docs]
class SbuildData(BaseTaskDataWithExecutor, BaseTaskDataWithExtraRepositories):
"""In memory task data for the Sbuild task."""
input: SbuildInput
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
build_architecture: str
build_components: list[SbuildBuildComponent] = pydantic.Field(
default_factory=lambda: [SbuildBuildComponent.ANY]
)
binnmu: SbuildBinNMU | None = None
build_profiles: list[str] | None = None
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
environment: LookupSingle
build_dep_resolver: SbuildBuildDepResolver | None = None
aspcud_criteria: str | None = None
resolve_alternatives: bool = False
[docs]
@pydantic.model_validator(mode="after")
def check_binnmu_against_components(self) -> Self:
"""Binnmus are incompatible with architecture-independent builds."""
if self.binnmu is not None:
if SbuildBuildComponent.ALL in self.build_components:
raise ValueError(
"Cannot build architecture-independent packages in a binNMU"
)
return self
[docs]
class SbuildDynamicData(
BaseDynamicTaskDataWithExecutor, BaseDynamicTaskDataWithExtraRepositories
):
"""Dynamic data for the Sbuild task."""
input_source_artifact_id: int
input_extra_binary_artifacts_ids: list[int] = pydantic.Field(
default_factory=list
)
binnmu_maintainer: str | None = None
[docs]
@override
def get_source_package_name(self) -> str | None:
return self.subject
[docs]
class ImageCacheUsageLogEntry(BaseTaskDataModel):
"""Entry in ImageCacheUsageLog for cached executor images."""
filename: str
backend: BackendType | None = None
timestamp: dt.datetime
[docs]
@pydantic.field_validator("timestamp", mode="after")
@classmethod
def timestamp_is_aware(cls, timestamp: dt.datetime) -> dt.datetime:
"""Ensure that the timestamp is TZ-aware."""
tzinfo = timestamp.tzinfo
if tzinfo is None or tzinfo.utcoffset(timestamp) is None:
raise ValueError("timestamp is TZ-naive")
return timestamp
[docs]
class ImageCacheUsageLog(BaseTaskDataModel):
"""Usage log for cached executor images."""
version: int = 1
backends: set[BackendType] = pydantic.Field(default_factory=set)
usage: list[ImageCacheUsageLogEntry] = pydantic.Field(default_factory=list)
[docs]
@pydantic.field_validator("version", mode="after")
@classmethod
def version_is_known(cls, version: int) -> int:
"""Ensure that the version is known."""
if version != 1:
raise ValueError(f"Unknown usage log version {version}")
return version
[docs]
class AssembleSignedSourceData(BaseTaskDataWithExecutor):
"""In-memory task data for the AssembleSignedSource task."""
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
environment: LookupSingle
template: LookupSingle
signed: LookupMultiple
[docs]
@pydantic.field_validator("backend", mode="after")
@classmethod
def backend_is_auto(cls, backend: str) -> str:
"""Ensure that the backend is "auto"."""
if backend != BackendType.AUTO:
raise ValueError(
f'AssembleSignedSource only accepts backend '
f'"{BackendType.AUTO}", not "{backend}"'
)
return backend
[docs]
class AssembleSignedSourceDynamicData(BaseDynamicTaskDataWithExecutor):
"""Dynamic data for the AssembleSignedSource task."""
# BaseDynamicTaskDataWithExecutor declares this as optional, but it's
# required here.
environment_id: int
template_id: int
signed_ids: list[int]
unsigned_ids: list[int] = pydantic.Field(default_factory=list)
[docs]
class MakeSourcePackageUploadData(BaseTaskDataWithExecutor):
"""In memory task data for the MakeSourcePackageUpload task."""
# BaseTaskDataWithExecutor declares this as optional, but it's required
# here.
environment: LookupSingle
input: MakeSourcePackageUploadInput
since_version: str | None = None
target_distribution: str | None = None
[docs]
class MakeSourcePackageUploadDynamicData(BaseDynamicTaskDataWithExecutor):
"""Expanded dynamic data for the MakeSourcePackageUpload task."""
# BaseDynamicTaskDataWithExecutor declares this as optional, but it's
# required here.
environment_id: int
input_source_artifact_id: int
[docs]
@override
def get_source_package_name(self) -> str | None:
return self.subject
[docs]
class MergeUploadsData(BaseTaskData):
"""In memory task data for the MergeUploads task."""
# No longer used.
backend: BackendType = BackendType.AUTO
# No longer used.
environment: LookupSingle | None = None
input: MergeUploadsInput
[docs]
class MergeUploadsDynamicData(BaseDynamicTaskData):
"""Expanded dynamic data for the MergeUploads task."""
# No longer used.
environment_id: int | None = None
input_uploads_ids: list[int]
[docs]
@override
def get_source_package_name(self) -> str | None:
return self.subject
# TODO: merge into InputArtifactSingle when we have a better way to fill the
# required fields of ArtifactInfo also for workflows
[docs]
class ArtifactInfo(InputArtifactSingle):
"""Information about an artifact."""
# Redefined as not None
artifact_id: int
category: ArtifactCategory
data: ArtifactData
@property
def id(self) -> int:
"""Compatibility alias for artifact_id."""
return self.artifact_id
# TODO: serialize collection_id as id for shorter JSON in the database?
InputArtifact: TypeAlias = InputArtifactSingle | InputArtifactMultiple