# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db workers."""
import copy
import datetime as dt
import hashlib
from typing import Any, Optional, TYPE_CHECKING, TypedDict, cast, overload
import pgtrigger
from django.db import IntegrityError, models, transaction
from django.db.models import (
CheckConstraint,
Count,
Exists,
F,
JSONField,
Max,
OuterRef,
Q,
QuerySet,
Subquery,
)
from django.db.models.expressions import Case, When
from django.db.models.functions import Extract, Greatest, Now
from django.urls import reverse
from django.utils import timezone
from django.utils.text import slugify
from debusine.db.models import WorkRequest
from debusine.db.models.auth import Token
from debusine.db.models.worker_pools import WorkerPool
from debusine.tasks.models import WorkerType
if TYPE_CHECKING:
from django.http import HttpRequest
from django_stubs_ext import WithAnnotations
from django_stubs_ext.db.models import TypedModelMeta
from debusine.web.views.ui.workers import WorkerUI
else:
TypedModelMeta = object
class IdleTimeDict(TypedDict):
"""Additional fields returned by WorkerQuerySet.with_idle_time."""
idle_time: float
class WorkerQuerySet[A](QuerySet["Worker", A]):
"""Custom QuerySet for Worker."""
def connected(self) -> "WorkerQuerySet[Any]":
"""Return connected workers."""
# WorkerConsumer.REQUEST_DYNAMIC_METADATA_SECONDS = 3600, so this
# seems like a reasonable threshold beyond which to assume that a
# worker is no longer really connected even if it hasn't explicitly
# disconnected.
threshold = Now() - dt.timedelta(hours=1, minutes=5)
return (
self.filter(connected_at__isnull=False)
.filter(
Q(worker_type=WorkerType.CELERY)
| Q(dynamic_metadata_updated_at__gte=threshold)
)
.order_by("-dynamic_metadata_updated_at")
)
def active(self) -> "WorkerQuerySet[Any]":
"""Exclude inactive workers from worker pools."""
return self.exclude(
worker_pool__isnull=False, instance_created_at__isnull=True
)
def waiting_for_work_request(self) -> "WorkerQuerySet[Any]":
"""
Return workers that can be assigned a new work request.
The workers with fewer associated pending or running work requests
than their concurrency level could take more work right now and are
thus waiting for a work request.
Worker's token must be enabled.
"""
# Import here to prevent circular imports
from debusine.db.models.work_requests import WorkRequest
running_work_request_count = Count(
'assigned_work_requests',
filter=Q(
assigned_work_requests__status__in=[
WorkRequest.Statuses.RUNNING,
WorkRequest.Statuses.PENDING,
]
),
)
workers = (
self.connected()
.order_by(F('worker_pool').asc(nulls_first=True), 'connected_at')
.annotate(count_running=running_work_request_count)
.filter(count_running__lt=F("concurrency"))
.filter(Q(worker_type=WorkerType.CELERY) | Q(token__enabled=True))
)
return workers
def with_idle_time(
self,
) -> "WorkerQuerySet[WithAnnotations[Worker, IdleTimeDict]]":
"""
Annotate workers with `idle_time`.
`Worker.idle_time` will be 0 for workers currently assigned a work
request, or the number of seconds the worker has been spending without
a work request assigned.
"""
tasks = WorkRequest.objects.filter(worker=OuterRef("pk"))
active_tasks = tasks.filter(
status__in=(
WorkRequest.Statuses.RUNNING,
WorkRequest.Statuses.PENDING,
)
)
last_tasks = tasks.filter(
completed_at__gt=F("worker__instance_created_at")
)
last_completed_at = Subquery(
last_tasks.annotate(last_completed_at=Max("completed_at")).values(
"last_completed_at"
)
)
# Work-around https://code.djangoproject.com/ticket/28296
last_completed_at.query.set_group_by()
last_completed_at.query.clear_ordering()
return self.annotate(
idle_time=Case(
When(
Exists(active_tasks),
then=0,
),
When(
Exists(last_tasks),
then=Extract(Now() - last_completed_at, "epoch"),
),
default=Extract(
Now() - Greatest("registered_at", "instance_created_at"),
"epoch",
),
)
)
def get_worker_by_token_key_or_none(
self, token_key: str
) -> Optional["Worker"]:
"""Return a Worker identified by its associated secret token."""
try:
token_hash = hashlib.sha256(token_key.encode()).hexdigest()
return cast("Worker", self.get(token__hash=token_hash))
except Worker.DoesNotExist:
return None
def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]:
"""Return the worker with worker_name or None."""
try:
return cast("Worker", self.get(name=worker_name))
except Worker.DoesNotExist:
return None
class WorkerManager(models.Manager["Worker"]):
"""Manager for Worker model."""
@staticmethod
def _generate_unique_name(name: str, counter: int) -> str:
"""Return name slugified adding "-counter" if counter != 1."""
new_name = slugify(name.replace('.', '-'))
if counter != 1:
new_name += f'-{counter}'
return new_name
@overload
def create_with_fqdn(
self,
fqdn: str,
*,
token: Token,
activation_token: None = None,
worker_type: WorkerType = WorkerType.EXTERNAL,
worker_pool: WorkerPool | None = None,
) -> "Worker": ...
@overload
def create_with_fqdn(
self,
fqdn: str,
*,
token: None = None,
activation_token: Token,
worker_type: WorkerType = WorkerType.EXTERNAL,
worker_pool: WorkerPool | None = None,
) -> "Worker": ...
def create_with_fqdn(
self,
fqdn: str,
*,
token: Token | None = None,
activation_token: Token | None = None,
worker_type: WorkerType = WorkerType.EXTERNAL,
worker_pool: WorkerPool | None = None,
) -> "Worker":
"""Return a new Worker with its name based on fqdn, with token."""
counter = 1
while True:
name = self._generate_unique_name(fqdn, counter)
try:
with transaction.atomic():
return self.create(
name=name,
token=token,
activation_token=activation_token,
worker_type=worker_type,
registered_at=timezone.now(),
worker_pool=worker_pool,
)
except IntegrityError:
counter += 1
def create_pool_members(
self,
worker_pool: WorkerPool,
count: int,
) -> None:
"""
Create count new External Workers in worker_pool.
Use throw-away activation tokens; when workers start, they'll
exchange their activation tokens for full worker tokens.
"""
created = 0
index = 1
while created < count:
with transaction.atomic():
try:
self.create(
name=f"{worker_pool.name}-{index:03}",
activation_token=(
Token.objects.create_worker_activation()
),
worker_type=WorkerType.EXTERNAL,
worker_pool=worker_pool,
registered_at=timezone.now(),
)
created += 1
except IntegrityError:
pass
index += 1
def get_or_create_scheduler(self) -> "Worker":
"""
Return a new Worker representing the scheduler.
The scheduler of course can't dispatch tasks to itself, but
representing it as a worker is convenient because it allows the
scheduler to call back to the server to emit metrics.
"""
try:
return self.get(name="scheduler", worker_type=WorkerType.SCHEDULER)
except Worker.DoesNotExist:
return self.create(
name="scheduler",
worker_type=WorkerType.SCHEDULER,
registered_at=timezone.now(),
)
def get_or_create_celery(self) -> "Worker":
"""Return a new Worker representing the Celery task queue."""
try:
return self.get(name="celery", worker_type=WorkerType.CELERY)
except Worker.DoesNotExist:
return self.create(
name="celery",
worker_type=WorkerType.CELERY,
registered_at=timezone.now(),
)
def mark_all_disconnected(self) -> None:
"""
Mark all non-Celery workers as disconnected.
When debusine-server starts, depending on how it exited, workers
might still be registered as connected in the database. To avoid
confusion, mark them as disconnected until they explicitly connect
again.
"""
Worker.objects.connected().exclude(
worker_type=WorkerType.CELERY
).update(connected_at=None)
[docs]
class Worker(models.Model):
"""Database model of a worker."""
name = models.SlugField(
unique=True,
help_text='Human readable name of the worker based on the FQDN',
)
registered_at = models.DateTimeField()
connected_at = models.DateTimeField(blank=True, null=True)
instance_created_at = models.DateTimeField(blank=True, null=True)
# This is the token used by the Worker to authenticate
# Users have their own tokens - this is specific to a single worker.
token = models.OneToOneField(
Token, null=True, on_delete=models.PROTECT, related_name="worker"
)
# This is used to grant workers the ability to bootstrap authentication
# for themselves, in situations where their provisioning data may be
# revealed to untrusted code later. The activation token will be
# deleted once it has been used.
activation_token = models.OneToOneField(
Token,
null=True,
on_delete=models.PROTECT,
related_name="activating_worker",
)
# Set by debusine-admin worker edit_metadata.
# Note: contents will be displayed to users in the web UI
static_metadata = JSONField(default=dict, blank=True)
# Information about features supported by workers.
# Note: contents will be displayed to users in the web UI
dynamic_metadata = JSONField(default=dict, blank=True)
# Information needed to track cloud workers.
# Note: contents will be displayed to users in the web UI
dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True)
worker_type = models.CharField(
choices=WorkerType.choices, default=WorkerType.EXTERNAL, editable=False
)
# Only Celery workers currently support concurrency levels greater than
# 1.
concurrency = models.PositiveIntegerField(
default=1,
help_text="Number of tasks this worker can run simultaneously",
)
worker_pool = models.ForeignKey(
WorkerPool, blank=True, null=True, on_delete=models.PROTECT
)
worker_pool_data = JSONField(null=True, blank=True)
objects = WorkerManager.from_queryset(WorkerQuerySet)()
class Meta(TypedModelMeta):
base_manager_name = "objects"
triggers = [
# Both Worker and Token need check constraints on the foreign
# keys between them, so keep copies in sync using triggers.
*(
pgtrigger.Trigger(
name=f"db_worker_sync_unset_{worker_column}",
operation=(
pgtrigger.UpdateOf(f"{worker_column}_id") # type: ignore[no-untyped-call]
| pgtrigger.Delete
),
when=pgtrigger.Before,
condition=pgtrigger.Q(
**{f"old__{worker_column}__isnull": False}
),
func=" ".join(
f"""
UPDATE db_token
SET {token_column}_id = NULL
WHERE db_token.id = OLD.{worker_column}_id;
IF TG_OP = 'DELETE' THEN
RETURN OLD;
ELSE
RETURN NEW;
END IF;
""".split()
),
)
for worker_column, token_column in (
("token", "worker"),
("activation_token", "activating_worker"),
)
),
*(
pgtrigger.Trigger(
name=f"db_worker_sync_set_{worker_column}",
operation=(
pgtrigger.Insert
| pgtrigger.UpdateOf(f"{worker_column}_id") # type: ignore[no-untyped-call]
),
when=pgtrigger.After,
condition=pgtrigger.Q(
**{f"new__{worker_column}__isnull": False}
),
func=" ".join(
f"""
UPDATE db_token
SET {token_column}_id = NEW.id
WHERE db_token.id = NEW.{worker_column}_id;
RETURN NEW;
""".split()
),
)
for worker_column, token_column in (
("token", "worker"),
("activation_token", "activating_worker"),
)
),
]
constraints = [
# Non-Celery workers must have a token.
CheckConstraint(
name="%(app_label)s_%(class)s_celery_or_token",
check=Q(
worker_type__in={WorkerType.CELERY, WorkerType.SCHEDULER}
)
| Q(activation_token__isnull=False)
| Q(token__isnull=False)
# Exclude worker_pool tombstones
| Q(
worker_pool_id__isnull=False,
instance_created_at__isnull=True,
),
)
]
def __str__(self) -> str:
"""Return the id and name of the Worker."""
return f"Id: {self.id} Name: {self.name}"
[docs]
def get_absolute_url(self) -> str:
"""Return an absolute URL to view this worker."""
return reverse("workers:detail", kwargs={"name": self.name})
[docs]
def mark_disconnected(self) -> None:
"""Update and save relevant Worker fields after disconnecting."""
self.connected_at = None
self.save()
[docs]
def running_work_requests(self) -> QuerySet["WorkRequest"]:
"""Return queryset of work requests running on this worker."""
return self.assigned_work_requests.filter(
status=WorkRequest.Statuses.RUNNING
).order_by("id")
[docs]
def mark_connected(self) -> None:
"""Update and save relevant Worker fields after connecting."""
from debusine.server.scheduler import schedule_soon
self.connected_at = timezone.now()
self.save()
schedule_soon()
[docs]
def connected(self) -> bool:
"""Return True if the Worker is connected."""
return self.connected_at is not None
[docs]
def is_busy(self) -> bool:
"""Return True if the Worker has any active work requests."""
# Import here to prevent circular imports
from debusine.db.models.work_requests import WorkRequest
return (
WorkRequest.objects.running(worker=self)
| WorkRequest.objects.pending(worker=self)
).exists()
[docs]
def is_at_capacity(self) -> bool:
"""
Return True if the Worker is busy with work requests.
A Worker is at capacity if it has as many running or pending work
requests as its concurrency level.
"""
# Import here to prevent circular imports
from debusine.db.models.work_requests import WorkRequest
return (
WorkRequest.objects.running(worker=self)
| WorkRequest.objects.pending(worker=self)
).count() >= self.concurrency
[docs]
def ui(self, request: "HttpRequest") -> "WorkerUI":
"""Return a UI helper for this instance."""
from debusine.web.views.ui.workers import WorkerUI
return WorkerUI.get(request, self)