Source code for debusine.server.workflows.base

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Base Workflow infrastructure.

See :ref:`explanation-workflows` for a high level explanation of concepts used
here.
"""

import logging
from abc import ABCMeta, abstractmethod
from collections.abc import Callable
from functools import partial
from typing import Any, Literal, cast, override

import debusine.tasks.tags as ttags
import debusine.worker.tags as wtags
from debusine.artifacts.models import (
    ArtifactCategory,
    BareDataCategory,
    CollectionCategory,
    TaskTypes,
    WorkRequestResults,
)
from debusine.client.models import LookupChildType, RuntimeParameter
from debusine.db.models import (
    Collection,
    CollectionItem,
    WorkRequest,
    Workspace,
)
from debusine.db.models.work_requests import (
    InternalTaskError,
    StatusChangeError,
)
from debusine.db.tasks import DBTask
from debusine.server.workflows.models import (
    BaseWorkflowData,
    WorkRequestWorkflowData,
)
from debusine.tasks import TaskConfigError, inputs
from debusine.tasks.models import (
    ActionUpdateCollectionWithArtifacts,
    ActionUpdateCollectionWithData,
    BaseDynamicTaskData,
    BaseTaskData,
    InputArtifact,
    LookupMultiple,
    LookupSingle,
    OutputData,
    OutputDataError,
)

logger = logging.getLogger(__name__)


[docs] class WorkflowValidationError(Exception): """Raised if a workflow fails to validate its inputs."""
[docs] class WorkflowRunError(Exception): """Running a workflow orchestrator or callback failed."""
[docs] def __init__( self, work_request: WorkRequest, message: str, code: str ) -> None: """Construct the exception.""" self.work_request = work_request self.message = message self.code = code
[docs] class Workflow[ WD: BaseWorkflowData, DTD: BaseDynamicTaskData = BaseDynamicTaskData, ](DBTask[WD, DTD], metaclass=ABCMeta): """ Base class for workflow orchestrators. This is the base API for running :py:class:`WorkflowInstance` logic. """ TASK_TYPE = TaskTypes.WORKFLOW #: Internal work request when instantiated from a workflow callback work_request_callback: WorkRequest | None
[docs] @override def __init__(self, work_request: WorkRequest) -> None: super().__init__(work_request) self.work_request_callback = None
[docs] @classmethod def from_name(cls, name: str) -> type['Workflow[Any, Any]']: """Instantiate a workflow by name.""" res = super().class_from_name(TaskTypes.WORKFLOW, name) return cast(type[Workflow[Any, Any]], res)
[docs] @classmethod def validate_static_parameters(cls, data: dict[str, Any]) -> None: """Validate WorkflowTemplate static_parameters."""
# By default nothing is enforced
[docs] @classmethod def validate_runtime_parameters( cls, data: dict[str, Any] | Literal[RuntimeParameter.ANY] ) -> None: """Validate WorkflowTemplate runtime_parameters.""" if data == RuntimeParameter.ANY: return assert isinstance(data, dict) for key, value in data.items(): if key not in cls.task_data_type.model_fields: raise WorkflowValidationError( f"{key} is not a task_data parameter for {cls.__name__}" ) if value == RuntimeParameter.ANY: continue if isinstance(value, str): raise WorkflowValidationError( f"{key} is set to a string ({value!r}), not a list of " f"valid values." ) if isinstance(value, list) and all( isinstance(x, str) for x in value ): continue raise WorkflowValidationError( f"{key} is set to {value!r}, not a list of valid values." )
[docs] def validate_input(self) -> None: """ Thorough validation of input data. This is run only once at workflow instantiation time, and can do slower things like database lookups to validate artifact or collection types. """
# Do nothing by default
[docs] @override def compute_scheduler_tags( self, ) -> tuple[ttags.TaskProvidedTags, wtags.TaskRequiredTags]: provided, required = super().compute_scheduler_tags() # Compute dynamic task to extract source package # TODO: these values can be computed in a separate methods if the rest # TODO: of compute_dynamic_data becomes too expensive to run twice dynamic_data = self.compute_dynamic_data() if package_name := dynamic_data.get_source_package_name(): provided.add( ttags.ProvenanceProvided.SYSTEM, {ttags.SOURCE_PACKAGE_PREFIX + package_name}, ) return provided, required
[docs] def ensure_dynamic_data(self) -> None: """Ensure that this workflow's dynamic task data has been computed.""" if self.work_request.dynamic_task_data is None: # TODO: this could be moved to when the workflow is marked running, # to match the invariant for other task types that once a task is # running then their dynamic data is populated self.resolve_inputs(inputs.Stage.RUNNING) dynamic_task_data = self.compute_dynamic_data() self.work_request.dynamic_task_data = dynamic_task_data.model_dump( mode="json", exclude_unset=True ) self.work_request.save() self.dynamic_data = self.dynamic_task_data_type( **self.work_request.dynamic_task_data )
[docs] @override def get_label(self) -> str: """Return a short human-readable label for the task.""" name: str if self.work_request_callback is not None: workflow_data = self.work_request_callback.workflow_data name = ( workflow_data.display_name or workflow_data.step or self.work_request.workflow_data.workflow_template_name or self.name ) else: name = ( self.work_request.workflow_data.workflow_template_name or self.name ) if self.dynamic_data is None: return name if self.dynamic_data.parameter_summary is None: return name return f"{name}: {self.dynamic_data.parameter_summary}"
[docs] @override def get_description(self) -> str: if self.work_request_callback is not None: if step := self.work_request_callback.workflow_data.step: try: cb = self.get_callback(step) except NotImplementedError: pass else: if cb.__doc__: return cb.__doc__ return super().get_description()
[docs] @abstractmethod def populate(self) -> None: """ Create the initial WorkRequest structure. This is called once, when the workflow first becomes runnable. :py:meth:`validate_input` will already have been called. This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same :py:class:`WorkRequest` structure as calling it once. """
[docs] def get_callback(self, step: str) -> Callable[[], bool]: """ Get the callback implementation for a given workflow step name. By default, subclasses may provide ``callback_{step.replace('-', '_')}`` methods to implement workflow callbacks with the corresponding ``step`` set in their workflow data. Alternatively, they may override this method to provide a more complex mapping from work request to callback implementation. """ callback_method = getattr( self, f"callback_{step.replace('-', '_')}", None ) if callback_method is None: raise NotImplementedError( f"Unhandled workflow callback step: {step}" ) assert callable(callback_method) # https://github.com/python/mypy/issues/20748 return callback_method # type: ignore[no-any-return]
[docs] def callback(self, work_request: "WorkRequest") -> bool: """ Perform an orchestration step. Called with the workflow callback work request (note that this is not the same as ``self.work_request``) when the workflow node becomes ready to execute. Called with a :py:class:`WorkRequest` of type internal/workflow to perform an orchestration step triggered by a workflow callback. This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same :py:class:`WorkRequest` structure as calling it once. Subclasses may provide ``callback_{step.replace('-', '_')}`` methods to implement workflow callbacks with the corresponding ``step`` set in their workflow data, or override ``get_callback``. :return: True if the callback succeeded, otherwise False. """ step = work_request.workflow_data.step if step is None: raise NotImplementedError( "Workflow callback called without setting `step`" ) succeeded = self.get_callback(step)() assert isinstance(succeeded, bool) return succeeded
[docs] def orchestrate_child(self, work_request: WorkRequest) -> None: """ Orchestrate a child work request in whatever way is appropriate. Silently skips non-pending/running work requests. """ if ( work_request.status == WorkRequest.Statuses.BLOCKED and work_request.can_be_automatically_unblocked() ): work_request.mark_pending() if work_request.status in { WorkRequest.Statuses.PENDING, WorkRequest.Statuses.RUNNING, }: # This may result in orchestrator errors, but if so the errors # will be logged. Workflows should not immediately propagate # the error; they will eventually be marked as COMPLETED/ERROR # once all their children have completed or aborted. orchestrate_workflow(work_request)
[docs] @staticmethod def provides_artifact( work_request: WorkRequest, category: ArtifactCategory, name: str, *, data: dict[str, Any] | None = None, artifact_filters: dict[str, Any] | None = None, # noqa: U100 ) -> None: """ Indicate work_request will provide an artifact. :param work_request: work request that will provide the artifact :param category: category of the artifact that will be provided :param name: name of this item in the workflow’s internal collection :param data: add it to the data dictionary for the event reaction :param artifact_filters: for the :ref:`action-update-collection-with-artifacts` action, to allow workflows to add filtering :raise LookupError: if a key in "data" starts with ``promise_`` Create an event reaction for ``on_creation`` adding a promise: this work request will create an artifact. Create an event reaction for ``on_success`` to update the collection with the relevant artifact. """ if "/" in name: # This wouldn't work well, because "/" is used to separate # lookup string segments. raise ValueError('Collection item name may not contain "/".') if data is not None: for key, value in data.items(): if key.startswith("promise_"): raise ValueError( f'Field name "{key}" starting with ' f'"promise_" is not allowed.' ) # work_request is part of a workflow assert work_request.parent is not None try: work_request.lookup_single( f"internal@collections/name:{name}", expect_type=LookupChildType.ARTIFACT_OR_PROMISE, ) except KeyError: work_request.process_update_collection_with_data( [ ActionUpdateCollectionWithData( collection="internal@collections", name_template=name, category=BareDataCategory.PROMISE, data={ "promise_work_request_id": work_request.id, "promise_workflow_id": work_request.parent.id, "promise_category": category, **(data or {}), }, ) ] ) artifact_filters = artifact_filters or {} work_request.add_event_reaction( "on_success", ActionUpdateCollectionWithArtifacts( collection="internal@collections", name_template=name, variables=data, artifact_filters={**artifact_filters, "category": category}, ), )
[docs] @staticmethod def requires_artifact( work_request: WorkRequest, lookup: LookupSingle | LookupMultiple ) -> None: """ Indicate that work_request requires input (lookup). :param work_request: for each lookup result call ``work_request.add_dependency(promise["promise_work_request_id"])`` :param lookup: resolve the lookup and iterate over the results (for PROMISES only) """ if isinstance(lookup, LookupMultiple): results = work_request.lookup_multiple( lookup, expect_type=LookupChildType.ARTIFACT_OR_PROMISE ) else: results = [ work_request.lookup_single( lookup, expect_type=LookupChildType.ARTIFACT_OR_PROMISE ) ] for result in results: collection_item = result.collection_item if ( collection_item is not None and collection_item.child_type == CollectionItem.Types.BARE and collection_item.category == BareDataCategory.PROMISE ): work_request_dependency = WorkRequest.objects.get( id=collection_item.data["promise_work_request_id"] ) work_request.add_dependency(work_request_dependency)
[docs] def work_request_ensure_child_worker( self, *, task_name: str, task_data: BaseTaskData | None = None, workflow_data: WorkRequestWorkflowData, relative_priority: int = 0, ) -> tuple[WorkRequest, bool]: """ Create a child WORKER work request if one does not already exist. This assumes that ``workflow_data.step`` is unique among work request children with this task type and name, and that it is stable even if the workflow is orchestrated multiple times. :param task_name: the task name for the child work request. :param task_data: the task data for the child work request. :param workflow_data: the workflow data for the child work request. :param relative_priority: if creating a new work request, set the base priority of the child to the effective priority of this workflow plus this relative priority. :return: tuple of ``(object, created)``, where ``object`` is the new or existing :py:class:`WorkRequest` and ``created`` is a boolean specifying whether a new object was created. """ try: # Index: db_workrequest_wf_child_idx. return ( self.work_request.children.unsuperseded().get( task_type=TaskTypes.WORKER, task_name=task_name, workflow_data_json__step=workflow_data.step, ), False, ) except WorkRequest.DoesNotExist: return ( self.work_request.create_child_worker( task_name=task_name, task_data=task_data, workflow_data=workflow_data, relative_priority=relative_priority, ), True, )
[docs] def work_request_ensure_child_server( self, *, task_name: str, task_data: BaseTaskData | None = None, workflow_data: WorkRequestWorkflowData, ) -> tuple[WorkRequest, bool]: """ Create a child SERVER work request if one does not already exist. This assumes that ``workflow_data.step`` is unique among work request children with this task type and name, and that it is stable even if the workflow is orchestrated multiple times. :param task_name: the task name for the child work request. :param task_data: the task data for the child work request. :param workflow_data: the workflow data for the child work request. :return: tuple of ``(object, created)``, where ``object`` is the new or existing :py:class:`WorkRequest` and ``created`` is a boolean specifying whether a new object was created. """ try: # Index: db_workrequest_wf_child_idx. return ( self.work_request.children.unsuperseded().get( task_type=TaskTypes.SERVER, task_name=task_name, workflow_data_json__step=workflow_data.step, ), False, ) except WorkRequest.DoesNotExist: return ( self.work_request.create_child_server( task_name=task_name, task_data=task_data, workflow_data=workflow_data, ), True, )
[docs] def work_request_ensure_child_internal( self, *, task_name: str, workflow_data: WorkRequestWorkflowData, unblock_strategy: WorkRequest.UnblockStrategy = ( WorkRequest.UnblockStrategy.DEPS ), relative_priority: int = 0, ) -> tuple[WorkRequest, bool]: """ Create a child INTERNAL work request if one does not already exist. This assumes that ``workflow_data.step`` is unique among work request children with this task type and name, and that it is stable even if the workflow is orchestrated multiple times. :param task_name: the task name for the child work request. :param task_data: the task data for the child work request. :param workflow_data: the workflow data for the child work request. :param unblock_strategy: the unblock strategy for the child work request. :param relative_priority: if creating a new work request, set the base priority of the child to the effective priority of this workflow plus this relative priority. :return: tuple of ``(object, created)``, where ``object`` is the new or existing :py:class:`WorkRequest` and ``created`` is a boolean specifying whether a new object was created. """ try: # Index: db_workrequest_wf_child_idx. return ( self.work_request.children.unsuperseded().get( task_type=TaskTypes.INTERNAL, task_name=task_name, workflow_data_json__step=workflow_data.step, ), False, ) except WorkRequest.DoesNotExist: return ( self.work_request.create_child_internal( task_name=task_name, workflow_data=workflow_data, unblock_strategy=unblock_strategy, relative_priority=relative_priority, ), True, )
[docs] def work_request_ensure_child_workflow( self, *, task_name: str, task_data: BaseTaskData | None = None, workflow_data: WorkRequestWorkflowData, relative_priority: int = 0, ) -> tuple[WorkRequest, bool]: """ Create a child WORKFLOW work request if one does not already exist. This assumes that ``workflow_data.step`` is unique among work request children with this task type and name, and that it is stable even if the workflow is orchestrated multiple times. :param task_name: the task name for the child work request. :param task_data: the task data for the child work request. :param workflow_data: the workflow data for the child work request. :param relative_priority: if creating a new work request, set the base priority of the child to the effective priority of this workflow plus this relative priority. :return: tuple of ``(object, created)``, where ``object`` is the new or existing :py:class:`WorkRequest` and ``created`` is a boolean specifying whether a new object was created. """ try: # Index: db_workrequest_wf_child_idx. return ( self.work_request.children.unsuperseded().get( task_type=TaskTypes.WORKFLOW, task_name=task_name, workflow_data_json__step=workflow_data.step, ), False, ) except WorkRequest.DoesNotExist: return ( self.work_request.create_child_workflow( task_name=task_name, task_data=task_data, workflow_data=workflow_data, relative_priority=relative_priority, ), True, )
[docs] def work_request_ensure_child_signing( self, *, task_name: str, task_data: BaseTaskData | None = None, workflow_data: WorkRequestWorkflowData, ) -> tuple[WorkRequest, bool]: """ Create a child SIGNING work request if one does not already exist. This assumes that ``workflow_data.step`` is unique among work request children with this task type and name, and that it is stable even if the workflow is orchestrated multiple times. :param task_name: the task name for the child work request. :param task_data: the task data for the child work request. :param workflow_data: the workflow data for the child work request. :param relative_priority: if creating a new work request, set the base priority of the child to the effective priority of this workflow plus this relative priority. :return: tuple of ``(object, created)``, where ``object`` is the new or existing :py:class:`WorkRequest` and ``created`` is a boolean specifying whether a new object was created. """ try: # Index: db_workrequest_wf_child_idx. return ( self.work_request.children.unsuperseded().get( task_type=TaskTypes.SIGNING, task_name=task_name, workflow_data_json__step=workflow_data.step, ), False, ) except WorkRequest.DoesNotExist: return ( self.work_request.create_child_signing( task_name=task_name, task_data=task_data, workflow_data=workflow_data, ), True, )
[docs] def work_request_ensure_child_wait( self, *, task_name: str, task_data: BaseTaskData | None = None, workflow_data: WorkRequestWorkflowData, ) -> tuple[WorkRequest, bool]: """ Create a child WAIT work request if one does not already exist. This assumes that ``workflow_data.step`` is unique among work request children with this task type and name, and that it is stable even if the workflow is orchestrated multiple times. :param task_name: the task name for the child work request. :param task_data: the task data for the child work request. :param workflow_data: the workflow data for the child work request. :return: tuple of ``(object, created)``, where ``object`` is the new or existing :py:class:`WorkRequest` and ``created`` is a boolean specifying whether a new object was created. """ try: # Index: db_workrequest_wf_child_idx. return ( self.work_request.children.unsuperseded().get( task_type=TaskTypes.WAIT, task_name=task_name, workflow_data_json__step=workflow_data.step, ), False, ) except WorkRequest.DoesNotExist: return ( self.work_request.create_child_wait( task_name=task_name, task_data=task_data, workflow_data=workflow_data, ), True, )
[docs] def lookup_singleton_collection( self, category: CollectionCategory, *, workspace: Workspace | None = None, ) -> Collection: """Look up a singleton collection related to this workflow.""" return self.work_request.lookup_single( f"_@{category}", workspace=workspace or self.workspace, expect_type=LookupChildType.COLLECTION, ).collection
[docs] def get_input_artifacts(self) -> list[InputArtifact]: """Return the list of input artifacts used by this workflow.""" return []
[docs] @override def get_input_artifacts_ids(self) -> list[int]: """ Return the list of artifact ids used by this workflow. .. deprecated:: 2025-12 Use get_input_artifacts() """ return [ artifact_id for input_artifact in self.get_input_artifacts() for artifact_id in input_artifact.resolved_ids() ]
[docs] @override def get_task_configuration_subject_context( self, ) -> tuple[str | None, str | None]: # Compute dynamic task to extract subject and configuration_context # TODO: these values can be computed in a separate methods if the rest # TODO: of compute_dynamic_data becomes too expensive to run twice early_dynamic_task_data = self.compute_dynamic_data() return ( early_dynamic_task_data.subject, early_dynamic_task_data.configuration_context, )
[docs] @override def execute(self) -> WorkRequest.Results: raise AssertionError("execute called on a Workflow task")
@override def _execute(self) -> WorkRequestResults: raise AssertionError("_execute called on a Workflow task")
[docs] def describe_exc(exception: Exception) -> str: """User-facing description of exception.""" message = str(exception) if not message: return repr(exception) return message
def _log_workflow_run_error(exc: WorkflowRunError) -> None: """ Log a WorkflowRunError. This is in a separate function to make it easier to mock. """ logger.warning( "Error running work request %s/%s (%s): %s", exc.work_request.task_type, exc.work_request.task_name, exc.work_request.id, exc.message, exc_info=True, ) def _finish_workflow_callback( work_request: WorkRequest, succeeded: bool ) -> None: """ Handle the results of a workflow callback. Incremental workflow callbacks are set back to ``BLOCKED``. Others are completed with a result depending on the value of ``succeeded``. """ if ( work_request.unblock_strategy == WorkRequest.UnblockStrategy.INCREMENTAL and work_request.dependencies.unterminated().exists() ): work_request.pending_at = None work_request.started_at = None work_request.status = WorkRequest.Statuses.BLOCKED work_request.save() else: work_request.mark_completed( WorkRequest.Results.SUCCESS if succeeded else WorkRequest.Results.FAILURE )
[docs] def orchestrate_workflow(work_request: WorkRequest) -> bool: """ Orchestrate a workflow in whatever way is appropriate. For a workflow callback, run ``callback`` and mark the work request as completed. For a workflow, run ``populate`` and unblock workflow children, but leave the workflow running until all its children have finished. For any other work request, log an error. Return False if any errors were logged, otherwise True. """ def run_or_raise_workflow_run_error[R: Any]( func: Callable[[], R], message_template: str, code: str ) -> R: try: return func() except Exception as exc: message = describe_exc(exc) raise WorkflowRunError( work_request, message_template.format(message=message), code ) from exc try: run_or_raise_workflow_run_error( work_request.mark_running, "Failed to mark work request as running", "status-change-failed", ) match (work_request.task_type, work_request.task_name): case (TaskTypes.INTERNAL, "workflow") | (TaskTypes.WORKFLOW, _): try: orchestrator = work_request.get_task() except InternalTaskError as exc: raise WorkflowRunError( work_request, str(exc), "configure-failed" ) from exc except TaskConfigError as exc: raise WorkflowRunError( work_request, f"Failed to configure: {exc}", "configure-failed", ) from exc else: assert isinstance(orchestrator, Workflow) case _: raise WorkflowRunError( work_request, "Does not have a workflow orchestrator", "configure-failed", ) match (work_request.task_type, work_request.task_name): case (TaskTypes.INTERNAL, "workflow"): parent = orchestrator.work_request # Workflow callbacks compute dynamic data for their parent # workflow. If this fails, it's probably least confusing to # log information about both the workflow callback and the # workflow. run_or_raise_workflow_run_error( orchestrator.ensure_dynamic_data, f"Failed to compute dynamic data for " f"{parent.task_type}/{parent.task_name} ({parent.id}): " f"{{message}}", "dynamic-data-failed", ) succeeded = run_or_raise_workflow_run_error( partial(orchestrator.callback, work_request), "Orchestrator failed: {message}", "orchestrator-failed", ) _finish_workflow_callback(work_request, succeeded) case (TaskTypes.WORKFLOW, _): with work_request.running_workflow_orchestrator(): run_or_raise_workflow_run_error( orchestrator.ensure_dynamic_data, "Failed to compute dynamic data: {message}", "dynamic-data-failed", ) run_or_raise_workflow_run_error( orchestrator.populate, "Orchestrator failed: {message}", "orchestrator-failed", ) if work_request.children.exists(): orchestrator.work_request.unblock_workflow_children() work_request.maybe_finish_workflow() # The workflow is left running until all its children have # finished. case _: # pragma: no cover # Already checked above. raise AssertionError( f"Unexpected work request: " f"{work_request.task_type}/{work_request.task_name}" ) return True except WorkflowRunError as exc: _log_workflow_run_error(exc) try: work_request.mark_completed( WorkRequest.Results.ERROR, output_data=OutputData( errors=[OutputDataError(message=exc.message, code=exc.code)] ), ) except StatusChangeError: pass return False