# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for assets."""
import enum
from collections.abc import Generator, Iterable
from typing import Any, Self, TYPE_CHECKING, TypeAlias, cast
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.db import models
from django.db.models import Q
from django.db.models.constraints import CheckConstraint, UniqueConstraint
from debusine.assets import (
AssetCategory,
BaseAssetDataModel,
BasicAPTAuthenticationData,
SigningKeyData,
asset_data_model,
)
from debusine.db.constraints import JsonDataUniqueConstraint
from debusine.db.models import permissions
from debusine.db.models.auth import Token
from debusine.db.models.permissions import (
Allow,
PermissionUser,
Role,
enforce,
permission_check,
permission_filter,
)
from debusine.db.models.scopes import Scope
from debusine.db.models.workspaces import Workspace
from debusine.tasks.models import WorkerType
from debusine.utils.typing_utils import copy_signature_from
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
else:
TypedModelMeta = object
class AssetRoles(permissions.Roles, permissions.RoleBase, enum.ReprEnum):
"""Available roles for an Asset."""
OWNER = "owner"
AssetRoles.setup()
class AssetUsageRoleBase(permissions.RoleBase):
"""AssetUsage role implementation."""
implied_by_scope_roles: frozenset[Scope.Roles]
implied_by_workspace_roles: frozenset[Workspace.Roles]
implied_by_asset_usage_roles: frozenset["AssetUsageRoles"]
def _setup(self) -> None:
"""Set up implications for a newly constructed role."""
implied_by_scope_roles: set[Scope.Roles] = set()
implied_by_workspace_roles: set[Workspace.Roles] = set()
implied_by_asset_usage_roles: set[AssetUsageRoles] = {
cast(AssetUsageRoles, self)
}
for i in self.implied_by:
match i:
case Workspace.Roles():
implied_by_scope_roles |= i.implied_by_scope_roles
implied_by_workspace_roles |= i.implied_by_workspace_roles
case Role():
# Resolve a role passed during class definition into its
# enum instance
role = self.__class__(i.value)
implied_by_scope_roles |= role.implied_by_scope_roles
implied_by_workspace_roles |= (
role.implied_by_workspace_roles
)
implied_by_asset_usage_roles |= (
role.implied_by_asset_usage_roles
)
case _:
raise ImproperlyConfigured(
f"AssetUsage roles do not support implications by {i!r}"
)
self.implied_by_scope_roles = frozenset(implied_by_scope_roles)
self.implied_by_workspace_roles = frozenset(implied_by_workspace_roles)
self.implied_by_asset_usage_roles = frozenset(
implied_by_asset_usage_roles
)
def q(self, user: PermissionUser) -> Q:
"""Return a Q expression to select asset usages with this role."""
q = Q(
roles__group__users=user,
roles__role__in=self.implied_by_asset_usage_roles,
)
if self.implied_by_workspace_roles:
q |= Q(
workspace__in=Workspace.objects.filter(
Q(
roles__group__users=user,
roles__role__in=self.implied_by_workspace_roles,
)
| Q(
scope__in=Scope.objects.filter(
roles__group__users=user,
roles__role__in=self.implied_by_scope_roles,
)
)
)
)
return q
def implies(self, role: "AssetUsageRoles") -> bool:
"""Check if this role implies the given one."""
return (
self.implied_by_scope_roles <= role.implied_by_scope_roles
and self.implied_by_workspace_roles
<= role.implied_by_workspace_roles
and self.implied_by_asset_usage_roles
<= role.implied_by_asset_usage_roles
)
class AssetUsageRoles(permissions.Roles, AssetUsageRoleBase, enum.ReprEnum):
"""Available roles for an AssetUsage."""
SIGNER = Role("signer")
REPOSITORY_SIGNER = Role(
"repository_signer",
label="Repository signer",
implied_by=[SIGNER, Workspace.Roles.OWNER],
)
APT_AUTHENTICATOR = Role(
"apt_authenticator",
label="Can use APT authentication",
implied_by=[Workspace.Roles.OWNER],
)
AssetUsageRoles.setup()
class AssetQuerySet[A](models.QuerySet["Asset", A]):
"""Custom QuerySet for Asset."""
def in_current_scope(self) -> "AssetQuerySet[A]":
"""Filter to assets in the current scope."""
from debusine.db.context import context
return self.filter(workspace__scope=context.require_scope())
@permission_filter(workers=Allow.PASS, anonymous=Allow.PASS)
def can_display(self, user: PermissionUser) -> "AssetQuerySet[A]":
"""Keep only Assets that can be displayed."""
# Delegate to workspace can_display check
return self.filter(
workspace__in=Workspace.objects.can_display(user)
).exclude(category=AssetCategory.CLOUD_PROVIDER_ACCOUNT)
@permission_filter()
def can_manage_permissions(
self, user: PermissionUser
) -> "AssetQuerySet[A]":
"""Filter to Assets that can be managed by user."""
assert user.is_authenticated
return self.filter(
roles__group__users=user, roles__role=AssetRoles.OWNER
)
class AssetManager(models.Manager["Asset"]):
"""Manager for the Asset model."""
def get_roles_model(self) -> type["AssetRole"]:
"""Get the model used for role assignment."""
return AssetRole
def get_queryset(self) -> AssetQuerySet[Any]:
"""Use the custom QuerySet."""
return AssetQuerySet(self.model, using=self._db)
def get_by_slug(
self, category: str, slug: str, workspace: Workspace | None = None
) -> "Asset":
"""Return an asset with a matching slug."""
match category:
case AssetCategory.SIGNING_KEY:
purpose, fingerprint = slug.split(":", 1)
return self.get(
category=category,
data__purpose=purpose,
data__fingerprint=fingerprint,
)
case AssetCategory.APT_AUTHENTICATION:
assert workspace is not None
assets = self.filter(category=category)
if ":" in slug:
workspace_name, name = slug.split(":", 1)
return assets.get(
workspace__scope=workspace.scope,
workspace__name=workspace_name,
data__name=name,
)
else:
return assets.get(workspace=workspace, data__name=slug)
case _:
raise ValueError(f"No slug defined for category '{category}'")
class AssetUsageQuerySet[A](models.QuerySet["AssetUsage", A]):
"""Custom QuerySet for AssetUsage."""
def with_role(self, user: PermissionUser, role: AssetUsageRoles) -> Self:
"""Keep only resources where the user has the given role."""
if not user.is_authenticated:
return self.none()
return self.filter(
pk__in=self.model.objects.filter(role.q(user)).distinct()
)
@permission_filter(workers=Allow.PASS)
def can_sign_with(self, user: PermissionUser) -> "AssetUsageQuerySet[A]":
"""Keep only AssetUsages that the user can sign with."""
return self.with_role(user, AssetUsageRoles.SIGNER)
@permission_filter(workers=Allow.PASS)
def can_sign_repository_with(
self, user: PermissionUser
) -> "AssetUsageQuerySet[A]":
"""Keep only AssetUsages that the user can sign a repository with."""
return self.with_role(user, AssetUsageRoles.REPOSITORY_SIGNER)
@permission_filter(workers=Allow.PASS)
def can_use_apt_authentication_with(
self, user: PermissionUser
) -> "AssetUsageQuerySet[A]":
"""Keep only AssetUsages that the user can use for APT auth."""
return self.with_role(user, AssetUsageRoles.APT_AUTHENTICATOR)
class AssetUsageManager(models.Manager["AssetUsage"]):
"""Manager for the AssetUsage model."""
def get_roles_model(self) -> type["AssetUsageRole"]:
"""Get the model used for role assignment."""
return AssetUsageRole
[docs]
class Asset(models.Model):
"""Asset model."""
category = models.CharField(max_length=255, choices=AssetCategory.choices)
workspace = models.ForeignKey(
Workspace, on_delete=models.PROTECT, blank=True, null=True
)
data = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
created_by = models.ForeignKey(
"User", blank=True, null=True, on_delete=models.PROTECT
)
created_by_work_request = models.ForeignKey(
"WorkRequest", blank=True, null=True, on_delete=models.SET_NULL
)
Roles: TypeAlias = AssetRoles
objects = AssetManager.from_queryset(AssetQuerySet)()
class Meta(TypedModelMeta):
base_manager_name = "objects"
constraints = [
JsonDataUniqueConstraint(
fields=["data->>'name'"],
condition=models.Q(
category=AssetCategory.CLOUD_PROVIDER_ACCOUNT
),
nulls_distinct=False,
name="%(app_label)s_%(class)s_unique_cloud_provider_acct_name",
),
JsonDataUniqueConstraint(
fields=["data->>'fingerprint'"],
condition=models.Q(category=AssetCategory.SIGNING_KEY),
nulls_distinct=False,
name="%(app_label)s_%(class)s_unique_signing_key_fingerprints",
),
CheckConstraint(
check=(
~models.Q(category=AssetCategory.APT_AUTHENTICATION)
| models.Q(data__name__regex=r"^[A-Za-z][A-Za-z0-9+._-]*$")
),
name="%(app_label)s_%(class)s_apt_auth_name",
),
JsonDataUniqueConstraint(
fields=["workspace", "data->>'name'"],
condition=models.Q(category=AssetCategory.APT_AUTHENTICATION),
nulls_distinct=False,
name="%(app_label)s_%(class)s_unique_apt_auth_workspace_name",
),
# Some categories of asset can have null workspaces, but not
# signing keys or APT authentication.
CheckConstraint(
check=~models.Q(
category__in={
AssetCategory.SIGNING_KEY,
AssetCategory.APT_AUTHENTICATION,
}
)
| models.Q(workspace__isnull=False),
name="%(app_label)s_%(class)s_workspace_not_null",
),
]
def __str__(self) -> str:
"""Return basic information of Asset."""
return (
f"Id: {self.id} "
f"Category: {self.category} "
f"Workspace: {self.workspace}"
)
[docs]
@copy_signature_from(models.Model.save)
def save(self, **kwargs: Any) -> None:
"""Wrap save with permission checks."""
from debusine.db.context import context
if context.permission_checks_disabled:
pass
elif self._state.adding:
enforce(self.can_create)
else:
enforce(self.can_edit)
return super().save(**kwargs)
[docs]
def clean(self) -> None:
"""
Ensure that data is valid for this asset category.
:raise ValidationError: for invalid data.
"""
self.data_model
@property
def slug(self) -> str:
"""Return a string slug that uniquely identifies the asset."""
match self.category:
case AssetCategory.SIGNING_KEY:
data_model = self.data_model
assert isinstance(data_model, SigningKeyData)
return f"{data_model.purpose}:{data_model.fingerprint}"
case AssetCategory.APT_AUTHENTICATION:
assert self.workspace is not None
data_model = self.data_model
assert isinstance(data_model, BasicAPTAuthenticationData)
return f"{self.workspace.name}:{data_model.name}"
case _:
raise NotImplementedError(
f"No slug defined for category '{self.category}'"
)
[docs]
@permission_check("{user} cannot edit asset {resource}")
def can_edit(self, user: PermissionUser) -> bool:
"""Can user edit this asset."""
assert user.is_authenticated
return self.roles.filter(
group__users=user, role=AssetRoles.OWNER
).exists()
[docs]
@permission_check(
"{user} cannot create assets in {resource.workspace.scope}",
workers=Allow.PASS,
anonymous=Allow.PASS,
)
def can_create(self, user: PermissionUser) -> bool:
"""Can user create this asset."""
from debusine.db.context import context
# Allow signing workers to create Assets until we have delegated work
# request permissions (#634)
if (
context.worker_token
and context.worker_token.token_type == Token.Types.WORKER
):
if context.worker_token.worker.worker_type == WorkerType.SIGNING:
return True
if not user.is_authenticated:
return False
if not self.workspace:
return False
if self.category == AssetCategory.CLOUD_PROVIDER_ACCOUNT:
# Not currently creatable through the API
return False
return any(
r.implies(Workspace.Roles.OWNER)
for r in Workspace.Roles.from_iterable(
self.workspace.scope.get_group_roles(user)
)
)
@property
def data_model(self) -> BaseAssetDataModel:
"""Instantiate AssetData from data."""
if not isinstance(self.data, dict):
raise ValidationError({"data": "data must be a dictionary"})
try:
return asset_data_model(self.category, self.data)
except ValueError as e:
raise ValidationError(
{
"category": (
f"{self.category}: invalid asset category or data: {e}"
),
},
) from e
class AssetRole(models.Model):
"""Role assignment for assets."""
resource = models.ForeignKey(
Asset,
on_delete=models.CASCADE,
related_name="roles",
)
group = models.ForeignKey(
"Group",
on_delete=models.CASCADE,
related_name="asset_roles",
)
role = models.CharField(max_length=16, choices=AssetRoles.choices)
[docs]
class AssetUsage(models.Model):
"""Usage of an Asset within a workspace."""
Roles: TypeAlias = AssetUsageRoles
asset = models.ForeignKey(
Asset,
on_delete=models.CASCADE,
related_name="usage",
)
workspace = models.ForeignKey(
"Workspace",
on_delete=models.CASCADE,
related_name="asset_usage",
)
objects = AssetUsageManager.from_queryset(AssetUsageQuerySet)()
class Meta(TypedModelMeta):
base_manager_name = "objects"
constraints = [
UniqueConstraint(
fields=["asset", "workspace"],
name="%(app_label)s_%(class)s_unique_asset_workspace",
)
]
[docs]
def has_role(self, user: PermissionUser, role: AssetUsageRoles) -> bool:
"""Check if the user has the given role on this AssetUsage."""
return (
AssetUsage.objects.with_role(user, role).filter(pk=self.pk).exists()
)
[docs]
@permission_check("{user} cannot sign with {resource}", workers=Allow.PASS)
def can_sign_with(self, user: PermissionUser) -> bool:
"""Check if the user can sign with this resource."""
assert user.is_authenticated
return self.has_role(user, AssetUsageRoles.SIGNER)
[docs]
@permission_check(
"{user} cannot sign a repository with {resource}", workers=Allow.PASS
)
def can_sign_repository_with(self, user: PermissionUser) -> bool:
"""Check if the user can sign a repository with this resource."""
assert user.is_authenticated
return self.has_role(user, AssetUsageRoles.REPOSITORY_SIGNER)
[docs]
@permission_check(
"{user} cannot use {resource} for APT authentication",
workers=Allow.PASS,
)
def can_use_apt_authentication_with(self, user: PermissionUser) -> bool:
"""Check if the user can use APT authentication with this resource."""
assert user.is_authenticated
return self.has_role(user, AssetUsageRoles.APT_AUTHENTICATOR)
class AssetUsageRole(models.Model):
"""Role assignment for assets within a workspace."""
resource = models.ForeignKey(
AssetUsage,
on_delete=models.CASCADE,
related_name="roles",
)
group = models.ForeignKey(
"Group",
on_delete=models.CASCADE,
related_name="asset_usage_roles",
)
role = models.CharField(max_length=32, choices=AssetUsageRoles.choices)
def get_public_keys(signing_keys: Iterable[Asset]) -> Generator[bytes]:
"""Yield public keys from each of some signing-key assets."""
for signing_key in signing_keys:
signing_key_data = signing_key.data_model
assert isinstance(signing_key_data, SigningKeyData)
yield signing_key_data.public_key