# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db scopes."""
import enum
import re
from typing import Any, Self, TYPE_CHECKING, TypeAlias, cast
import pgtrigger
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.db import models
from django.db.models import CheckConstraint, F, Q, QuerySet, UniqueConstraint
from debusine.db.models import permissions
from debusine.db.models.files import File, FileStore
from debusine.db.models.permissions import (
Allow,
PermissionUser,
permission_check,
permission_filter,
)
if TYPE_CHECKING:
from django.http import HttpRequest
from django_stubs_ext.db.models import TypedModelMeta
from debusine.db.models.auth import User
from debusine.server.file_backend.interface import FileBackendInterface
from debusine.web.views.ui.scopes import ScopeUI
else:
TypedModelMeta = object
#: Scope names reserved for use in toplevel URL path components
RESERVED_SCOPE_NAMES = frozenset(
(
"accounts",
"admin",
"api",
"api-auth",
"artifact",
"task-status",
"user",
"workers",
"work-request",
"workspace",
)
)
#: Regexp matching the structure of scope names
scope_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")
def is_valid_scope_name(value: str) -> bool:
"""Check if value is a valid scope name."""
if value in RESERVED_SCOPE_NAMES:
return False
return bool(scope_name_regex.match(value))
def validate_scope_name(value: str) -> None:
"""Validate scope names."""
if not is_valid_scope_name(value):
raise ValidationError(
"%(value)r is not a valid scope name", params={"value": value}
)
class ScopeRoleBase(permissions.RoleBase):
"""Scope role implementation."""
implied_by_scope_roles: frozenset["ScopeRoles"]
def _setup(self) -> None:
"""Set up implications for a newly constructed role."""
for i in self.implied_by:
raise ImproperlyConfigured(
f"Scope roles do not support implications by {type(i)}"
)
self.implied_by_scope_roles = frozenset((cast(ScopeRoles, self),))
def q(self, user: "User") -> Q:
"""Return a Q expression to select scopes with this role."""
return Q(
roles__group__users=user,
roles__role__in=self.implied_by_scope_roles,
)
def implies(self, role: "ScopeRoles") -> bool:
"""Check if this role implies the given one."""
return self in role.implied_by_scope_roles
class ScopeRoles(permissions.Roles, ScopeRoleBase, enum.ReprEnum):
"""Available roles for a Scope."""
OWNER = "owner"
ScopeRoles.setup()
class ScopeQuerySet[A](QuerySet["Scope", A]):
"""Custom QuerySet for Scope."""
def with_role(self, user: PermissionUser, role: ScopeRoles) -> Self:
"""Keep only resources where the user has the given role."""
if not user.is_authenticated:
return self.none()
if user.is_system:
return self
return self.filter(
pk__in=self.model.objects.filter(role.q(user)).distinct()
)
@permission_filter(workers=Allow.ALWAYS, anonymous=Allow.ALWAYS)
def can_display(self, user: PermissionUser) -> Self: # noqa: ARG002, U100
"""Keep only Scopes that can be displayed."""
return self
@permission_filter()
def can_create_workspace(self, user: PermissionUser) -> Self:
"""Keep only Scopes where the user can create workspaces."""
return self.with_role(user, Scope.Roles.OWNER)
class ScopeManager(models.Manager["Scope"]):
"""Manager for Scope model."""
def get_roles_model(self) -> type["ScopeRole"]:
"""Get the model used for role assignment."""
return ScopeRole
def get_queryset(self) -> ScopeQuerySet[Any]:
"""Use the custom QuerySet."""
return ScopeQuerySet(self.model, using=self._db)
[docs]
class Scope(models.Model):
"""
Scope model.
This is used to create different distinct sets of groups and workspaces
"""
Roles: TypeAlias = ScopeRoles
name = models.CharField(
max_length=255,
unique=True,
validators=[validate_scope_name],
help_text="internal name for the scope",
)
label = models.CharField(
max_length=255,
unique=True,
help_text="User-visible name for the scope",
)
icon = models.CharField(
max_length=255,
default="",
blank=True,
help_text=(
"Optional user-visible icon,"
" resolved via ``{% static %}`` in templates"
),
)
file_stores = models.ManyToManyField(
FileStore, related_name="scopes", through="db.FileStoreInScope"
)
objects = ScopeManager.from_queryset(ScopeQuerySet)()
class Meta(TypedModelMeta):
base_manager_name = "objects"
def __str__(self) -> str:
"""Return basic information of Scope."""
return self.name
[docs]
def ui(self, request: "HttpRequest") -> "ScopeUI":
"""Return a UI helper for this instance."""
from debusine.web.views.ui.scopes import ScopeUI
return ScopeUI.get(request, self)
[docs]
@permission_check(
"{user} cannot display scope {resource}",
workers=Allow.ALWAYS,
anonymous=Allow.ALWAYS,
)
def can_display(self, user: PermissionUser) -> bool: # noqa: ARG002, U100
"""Check if the scope can be displayed."""
return True
[docs]
@permission_check(
"{user} cannot create workspaces in {resource}",
)
def can_create_workspace(self, user: PermissionUser) -> bool:
"""Check if the user can create workspaces in this scope."""
return self.has_role(user, Scope.Roles.OWNER)
[docs]
def has_role(self, user: PermissionUser, role: ScopeRoles) -> bool:
"""Check if the user has the given role on this Scope."""
from debusine.db.context import context
if not user.is_authenticated:
return False
if user.is_system:
return True
if context.user == user and context.scope == self:
return any(r.implies(role) for r in context.scope_roles)
return Scope.objects.with_role(user, role).filter(pk=self.pk).exists()
# See https://github.com/typeddjango/django-stubs/issues/1047 for the typing
[docs]
def get_group_roles(
self, user: PermissionUser
) -> QuerySet["ScopeRole", str]:
"""Get the roles of the user on this scope."""
if not user.is_authenticated:
return ScopeRole.objects.none().values_list("role", flat=True)
else:
return (
ScopeRole.objects.filter(resource=self, group__users=user)
.values_list("role", flat=True)
.distinct()
)
[docs]
def upload_file_stores(
self,
fileobj: File,
*,
enforce_soft_limits: bool = False,
include_write_only: bool = False,
) -> QuerySet[FileStore]:
"""
Find the file stores in this scope where `fileobj` can be uploaded.
The returned query set is in descending order of upload priority,
breaking ties by ascending file store IDs.
:param enforce_soft_limits: Enforce `soft_max_size` policies as well
as `max_size`.
:param include_write_only: Uploading a file to a write-only store
will mean that debusine won't download it from there again, so
that only makes sense in specialized situations such as populating
a backup store. Set this to True to include write-only stores in
the returned query set.
"""
file_stores = (
self.file_stores.exclude(filestoreinscope__read_only=True)
.exclude(max_size__lt=F("total_size") + fileobj.size)
.order_by(
F("filestoreinscope__upload_priority").desc(nulls_last=True),
"id",
)
)
if enforce_soft_limits:
# TODO: We should also enforce FileStoreInScope.soft_max_size.
file_stores = file_stores.exclude(
soft_max_size__lt=F("total_size") + fileobj.size
)
if not include_write_only:
file_stores = file_stores.exclude(filestoreinscope__write_only=True)
return file_stores
[docs]
def upload_file_backend(self, fileobj: File) -> "FileBackendInterface[Any]":
"""
Find the best file backend for uploading `fileobj`.
:raises IndexError: if there is no such file backend.
"""
return self.upload_file_stores(fileobj)[0].get_backend_object()
[docs]
def download_file_stores(self, fileobj: File) -> QuerySet[FileStore]:
"""
Find the file stores in this scope that have `fileobj`, if any.
The returned query set is in descending order of download priority,
breaking ties in descending order of upload priority and then by
ascending file store IDs.
"""
return (
self.file_stores.exclude(filestoreinscope__write_only=True)
.filter(files=fileobj)
.order_by(
F("filestoreinscope__download_priority").desc(nulls_last=True),
F("filestoreinscope__upload_priority").desc(nulls_last=True),
"id",
)
)
[docs]
def download_file_backend(
self, fileobj: File
) -> "FileBackendInterface[Any]":
"""
Find the best file backend for downloading `fileobj`.
:raises IndexError: if there is no such file backend.
"""
return self.download_file_stores(fileobj)[0].get_backend_object()
class ScopeRole(models.Model):
"""Role assignments for scopes."""
Roles: TypeAlias = ScopeRoles
resource = models.ForeignKey(
Scope,
on_delete=models.CASCADE,
related_name="roles",
)
group = models.ForeignKey(
"Group",
on_delete=models.CASCADE,
related_name="scope_roles",
)
role = models.CharField(max_length=16, choices=Roles.choices)
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["resource", "group", "role"],
name="%(app_label)s_%(class)s_unique_resource_group_role",
),
]
def __str__(self) -> str:
"""Return a description of the role assignment."""
return f"{self.group}─{self.role}⟶{self.resource}"
[docs]
class FileStoreInScope(models.Model):
"""Database model used for extra data on Scope/FileStore relations."""
scope = models.ForeignKey(Scope, on_delete=models.PROTECT)
file_store = models.ForeignKey(FileStore, on_delete=models.PROTECT)
_file_store_instance_wide = models.BooleanField(
default=True,
editable=False,
db_column="file_store_instance_wide",
help_text="Synced from FileStore.instance_wide; do not update directly",
)
#: The priority of this store for the purpose of storing new files.
#: When adding a new file, debusine tries stores whose policies allow
#: adding new files in descending order of upload priority, counting
#: null as the lowest.
upload_priority = models.IntegerField(blank=True, null=True)
#: The priority of this store for the purpose of serving files to
#: clients. When downloading a file, debusine tries stores in
#: descending order of download priority, counting null as the lowest;
#: it breaks ties in descending order of upload priority, again counting
#: null as the lowest. If there is still a tie, it picks one of the
#: possibilities arbitrarily.
download_priority = models.IntegerField(blank=True, null=True)
#: If True, the storage maintenance job ensures that this store has a
#: copy of all files in the scope.
populate = models.BooleanField(default=False)
#: If True, the storage maintenance job moves all files in this scope to
#: some other store in the same scope, following the same rules for
#: finding a target store as for uploads of new files. It does not move
#: into a store if that would take its total size over `soft_max_size`
#: (either for the scope or the file store), and it logs an error if it
#: cannot find any eligible target store.
drain = models.BooleanField(default=False)
#: If this field is set, then constrain `drain` to use the store with
#: the given name in this scope.
#
# TODO: ruff is correct that we shouldn't use null=True on a TextField,
# but fixing that retroactively is non-trivial.
drain_to = models.TextField(blank=True, null=True) # noqa: DJ001
#: If True, debusine will not add new files to this store. Use this in
#: combination with `drain` to prepare for removing the file store.
read_only = models.BooleanField(default=False)
#: If True, debusine will not read files from this store. This is
#: suitable for provider storage classes that are designed for long-term
#: archival rather than routine retrieval, such as S3 Glacier Deep
#: Archive.
write_only = models.BooleanField(default=False)
#: An integer specifying the number of bytes that the file store can
#: hold for this scope (accounting files that are in multiple scopes to
#: all of the scopes in question). This limit may be exceeded
#: temporarily during uploads; the storage maintenance job will move the
#: least-recently-used files to another file store to get back below the
#: limit.
soft_max_size = models.IntegerField(blank=True, null=True)
class Meta(TypedModelMeta):
triggers = [
pgtrigger.Trigger(
name="db_filestoreinscope_sync_instance_wide",
operation=pgtrigger.Insert | pgtrigger.Update,
when=pgtrigger.Before,
func=" ".join(
"""
NEW.file_store_instance_wide =
(SELECT db_filestore.instance_wide
FROM db_filestore
WHERE db_filestore.id = NEW.file_store_id);
RETURN NEW;
""".split()
),
)
]
constraints = [
UniqueConstraint(
fields=["scope", "file_store"],
name="%(app_label)s_%(class)s_unique_scope_file_store",
),
UniqueConstraint(
fields=["file_store"],
name=(
"%(app_label)s_%(class)s_"
"unique_file_store_not_instance_wide"
),
condition=Q(_file_store_instance_wide=False),
),
# It does not make sense to request a store to be populated
# while also requesting it to be either drained or read-only.
CheckConstraint(
name="%(app_label)s_%(class)s_consistent_populate",
check=Q(populate=False) | Q(drain=False, read_only=False),
),
]
def __str__(self) -> str:
"""Return basic information of FileStoreInScope."""
return f"{self.scope}/{self.file_store.name}"