Skip to content

Watcher Unit Test

Quick and dirty auto exec when editing it

sh
python -m watchdog[watchmedo]
py
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from functools import partialmethod
from typing import Type
from typing import List

from django.db import IntegrityError
from django.db.models import (
    BigIntegerField,
    FloatField,
    ForeignKey,
    IntegerField,
    Model,
    SmallIntegerField,
)
from django.forms import CharField
from factory.django import DjangoModelFactory


@dataclass
class ModelSpecsMetaOptions:
    model: Type[Model]
    factory: Type[DjangoModelFactory]
    unique_for: List = dataclass_field(default_factory=lambda: [["code"]])


class IntegrityMeta(type):
    def __new__(cls, name, bases, attrs, **kwargs):
        super_new = super().__new__
        parents = [b for b in bases if isinstance(b, cls)]
        if not parents:
            return super_new(cls, name, bases, attrs)

        meta = attrs["Meta"]
        attrs["Meta"] = ModelSpecsMetaOptions(
            model=meta.object_type, factory=meta.factory
        )

        if hasattr(meta, "unique_for"):
            attrs["Meta"].unique_for = getattr(meta, "unique_for")
        meta = attrs["Meta"]
        obj = super_new(cls, name, bases, attrs)

        check_integrity = getattr(obj, "check_integrity")
        for fields in meta.unique_for:
            fields_name = "_".join(fields)
            setattr(
                obj,
                f"test_integrity_{fields_name}",
                partialmethod(check_integrity, specs=meta, fields=fields),
            ),
        return obj


class ModelIntegrityTestCase(metaclass=IntegrityMeta):
    """Base class for model integrity tests."""

    _meta: ModelSpecsMetaOptions = None

    def check_integrity(self, specs: ModelSpecsMetaOptions, fields: List[str]):
        """Check the integrity of the model with the given fields."""
        fields_by_name = {
            field.name: field for field in specs.model._meta.get_fields()
        }
        params = {}
        if not fields:
            return

        for field_name in fields:
            field = fields_by_name[field_name]
            if field_name in specs.field_factory_registry:
                params[field_name] = specs.field_factory_registry[
                    field_name
                ].create()
            elif isinstance(field, ForeignKey):
                if field_name not in specs.field_factory_registry:
                    raise ValueError(
                        f"Integrity check with FK, you must provide a factory for {field}."
                    )
                params[field_name] = specs.field_factory_registry[
                    field_name
                ].create()
            elif isinstance(field, CharField):
                params[field_name] = "dummy"
            elif isinstance(
                field,
                BigIntegerField,
                IntegerField,
                SmallIntegerField,
                FloatField,
            ):
                params[field_name] = 1
            else:
                raise NotImplementedError(f"Integrity check on field: {field}")

        with self.assertRaises(IntegrityError):
            specs.factory.create_batch(2, **params)