diff --git a/core/gql/gql_mutations/base_mutation.py b/core/gql/gql_mutations/base_mutation.py index 2a457e2..1e59152 100644 --- a/core/gql/gql_mutations/base_mutation.py +++ b/core/gql/gql_mutations/base_mutation.py @@ -237,7 +237,7 @@ def _mutate(cls, user, **data): @classmethod def update_object(cls, user, object_to_update): - object_to_update.save(username=user.username) + object_to_update.save(user=user) return object_to_update diff --git a/core/gql/gql_mutations/mutation_by_filter.py b/core/gql/gql_mutations/mutation_by_filter.py index 98da647..b1b5b6e 100644 --- a/core/gql/gql_mutations/mutation_by_filter.py +++ b/core/gql/gql_mutations/mutation_by_filter.py @@ -59,7 +59,7 @@ def wrapper(cls, user, **data): q_filter = map_gql_to_django_filter( args, available_filters, explicit_filters_handlers ) - base_query = django_object.objects.filter(validity_to=None).filter( + base_query = django_object.objects.filter(*django_object.filter_validity()).filter( q_filter ) if return_objects: diff --git a/core/gql_queries.py b/core/gql_queries.py index eb9b74e..366e50a 100644 --- a/core/gql_queries.py +++ b/core/gql_queries.py @@ -1,6 +1,6 @@ import graphene import location.gql_queries -from core import ExtendedConnection, filter_validity +from core import ExtendedConnection from core.models import ( Officer, Role, @@ -12,7 +12,7 @@ Language, ) from graphene_django import DjangoObjectType -from location.models import HealthFacility +from location.models import HealthFacility, UserDistrict from core.apps import CoreConfig from django.utils.translation import gettext as _ from django.core.exceptions import PermissionDenied @@ -40,7 +40,7 @@ class Meta: @classmethod def get_queryset(cls, queryset, info): - queryset = queryset.filter(*filter_validity()) + queryset = queryset.filter(*Officer.filter_validity()) return queryset @@ -159,7 +159,7 @@ def resolve_userdistrict_set(self, info, **kwargs): if not info.context.user.is_authenticated: raise PermissionDenied(_("unauthorized")) if self.userdistrict_set: - return self.userdistrict_set.filter(*filter_validity()) + return self.userdistrict_set.filter(*UserDistrict.filter_validity()) else: return None diff --git a/core/jwt.py b/core/jwt.py index ae8bc45..339fc3b 100644 --- a/core/jwt.py +++ b/core/jwt.py @@ -20,7 +20,6 @@ def on_token_issued(sender, request, user, **kwargs): if user.i_user: user.i_user.last_login = timezone.now() user.i_user.save() - pass def jwt_encode_user_key(payload, context=None): diff --git a/core/jwt_authentication.py b/core/jwt_authentication.py index 884e7f7..bcd660c 100644 --- a/core/jwt_authentication.py +++ b/core/jwt_authentication.py @@ -5,6 +5,7 @@ from graphql_jwt.exceptions import JSONWebTokenError from graphql_jwt.shortcuts import get_user_by_token from core.apps import CoreConfig +from core.utils import set_current_user from django.conf import settings from django_ratelimit.core import is_ratelimited @@ -40,7 +41,7 @@ def authenticate(self, request): and user.health_facility.contract_end_date > date.today() ): raise exceptions.AuthenticationFailed("HF_CONTRACT_INVALID") - + set_current_user(user) return user, None def enforce_csrf(self, request): diff --git a/core/migrations/0032_remove_interactiveuser_legacy_id_and_more.py b/core/migrations/0032_remove_interactiveuser_legacy_id_and_more.py new file mode 100644 index 0000000..f487ce4 --- /dev/null +++ b/core/migrations/0032_remove_interactiveuser_legacy_id_and_more.py @@ -0,0 +1,304 @@ +# Generated by Django 4.2.23 on 2025-09-23 11:46 + +import core.models.user +import datetime +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion +import simple_history.models +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ("auth", "0012_alter_user_first_name_max_length"), + ("core", "0031_alter_mutationlog_client_mutation_id"), + ] + + operations = [ + migrations.RemoveField( + model_name="interactiveuser", + name="legacy_id", + ), + migrations.AddField( + model_name="interactiveuser", + name="date_created", + field=models.DateTimeField(default=datetime.datetime.now, null=True), + ), + migrations.AddField( + model_name="interactiveuser", + name="date_deactivated", + field=models.DateTimeField(default=None, null=True), + ), + migrations.AddField( + model_name="interactiveuser", + name="date_updated", + field=models.DateTimeField(default=datetime.datetime.now, null=True), + ), + migrations.AddField( + model_name="interactiveuser", + name="active", + field=models.BooleanField(default=True), + ), + migrations.AddField( + model_name="interactiveuser", + name="json_ext", + field=models.JSONField(blank=True, db_column="Json_ext", null=True), + ), + migrations.AddField( + model_name="interactiveuser", + name="user_created", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name="%(class)s_user_created", + to=settings.AUTH_USER_MODEL, + ), + ), + migrations.AddField( + model_name="interactiveuser", + name="user_updated", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name="%(class)s_user_updated", + to=settings.AUTH_USER_MODEL, + ), + ), + migrations.AddField( + model_name="interactiveuser", + name="version", + field=models.IntegerField(default=1), + ), + migrations.AlterField( + model_name="exportablequerymodel", + name="create_date", + field=models.DateTimeField( + db_column="DateCreated", default=datetime.datetime.now + ), + ), + migrations.AlterField( + model_name="exportablequerymodel", + name="expire_date", + field=models.DateTimeField( + db_column="DateExpiring", + default=core.models.user._get_default_expire_date, + ), + ), + migrations.AlterField( + model_name="interactiveuser", + name="id", + field=models.BigAutoField( + auto_created=True, editable=False, primary_key=True, serialize=False + ), + ), + migrations.AlterField( + model_name="interactiveuser", + name="uuid", + field=models.UUIDField( + db_column="UUID", default=uuid.uuid4, editable=False, unique=True + ), + ), + migrations.AlterField( + model_name="interactiveuser", + name="validity_from", + field=models.DateTimeField( + db_column="ValidityFrom", default=datetime.datetime.now, null=True + ), + ), + migrations.AlterField( + model_name="interactiveuser", + name="validity_to", + field=models.DateTimeField( + blank=True, db_column="ValidityTo", default=None, null=True + ), + ), + migrations.AlterField( + model_name="user", + name="groups", + field=models.ManyToManyField( + blank=True, + help_text="The groups this user belongs to. A user will get all permissions granted to each of their groups.", # noqa: E501 + related_name="user_set", + related_query_name="user", + to="auth.group", + verbose_name="groups", + ), + ), + migrations.CreateModel( + name="HistoricalInteractiveUser", + fields=[ + ( + "id", + models.BigIntegerField( + auto_created=True, blank=True, db_index=True, editable=False + ), + ), + ( + "uuid", + models.UUIDField( + db_column="UUID", + db_index=True, + default=uuid.uuid4, + editable=False, + ), + ), + ("active", models.BooleanField(default=True)), + ( + "json_ext", + models.JSONField(blank=True, db_column="Json_ext", null=True), + ), + ("date_deactivated", models.DateTimeField(default=None, null=True)), + ( + "date_created", + models.DateTimeField(default=datetime.datetime.now, null=True), + ), + ( + "date_updated", + models.DateTimeField(default=datetime.datetime.now, null=True), + ), + ("version", models.IntegerField(default=1)), + ( + "validity_from", + models.DateTimeField( + db_column="ValidityFrom", + default=datetime.datetime.now, + null=True, + ), + ), + ( + "validity_to", + models.DateTimeField( + blank=True, db_column="ValidityTo", default=None, null=True + ), + ), + ("last_name", models.CharField(db_column="LastName", max_length=100)), + ( + "other_names", + models.CharField(db_column="OtherNames", max_length=100), + ), + ( + "phone", + models.CharField( + blank=True, db_column="Phone", max_length=50, null=True + ), + ), + ("login_name", models.CharField(db_column="LoginName", max_length=50)), + ( + "last_login", + models.DateTimeField(blank=True, db_column="LastLogin", null=True), + ), + ( + "health_facility_id", + models.IntegerField(blank=True, db_column="HFID", null=True), + ), + ("audit_user_id", models.IntegerField(db_column="AuditUserID")), + ( + "email", + models.CharField( + blank=True, db_column="EmailId", max_length=200, null=True + ), + ), + ( + "private_key", + models.CharField( + blank=True, + db_column="PrivateKey", + help_text="The private key is actually a password salt", + max_length=256, + null=True, + ), + ), + ( + "password", + models.CharField( + blank=True, + db_column="StoredPassword", + help_text="By default a SHA256 of the private key (salt) and password", + max_length=256, + null=True, + ), + ), + ( + "password_validity", + models.DateTimeField( + blank=True, db_column="PasswordValidity", null=True + ), + ), + ( + "is_associated", + models.BooleanField( + blank=True, + db_column="IsAssociated", + help_text="has a claim admin or enrolment officer account", + null=True, + ), + ), + ( + "role_id", + models.IntegerField(blank=True, db_column="RoleID", null=True), + ), + ("history_id", models.AutoField(primary_key=True, serialize=False)), + ("history_date", models.DateTimeField(db_index=True)), + ("history_change_reason", models.CharField(max_length=100, null=True)), + ( + "history_type", + models.CharField( + choices=[("+", "Created"), ("~", "Changed"), ("-", "Deleted")], + max_length=1, + ), + ), + ( + "history_user", + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="+", + to=settings.AUTH_USER_MODEL, + ), + ), + ( + "language", + models.ForeignKey( + blank=True, + db_column="LanguageID", + db_constraint=False, + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name="+", + to="core.language", + ), + ), + ( + "user_created", + models.ForeignKey( + blank=True, + db_constraint=False, + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name="+", + to=settings.AUTH_USER_MODEL, + ), + ), + ( + "user_updated", + models.ForeignKey( + blank=True, + db_constraint=False, + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name="+", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "verbose_name": "historical interactive user", + "verbose_name_plural": "historical interactive users", + "ordering": ("-history_date", "-history_id"), + "get_latest_by": ("history_date", "history_id"), + }, + bases=(simple_history.models.HistoricalChanges, models.Model), + ), + ] diff --git a/core/migrations/0033_to_history.py b/core/migrations/0033_to_history.py new file mode 100644 index 0000000..f4d1679 --- /dev/null +++ b/core/migrations/0033_to_history.py @@ -0,0 +1,68 @@ +from django.db import migrations +from core.utils import migrate_from_versioned_to_history +import logging + +logger = logging.getLogger(__name__) + + +def empty_tbl_logins(apps, schema_editor): + """ + Empty the tblLogins table using raw SQL with error handling. + Check if table exists before attempting to empty it. + """ + try: + with schema_editor.connection.cursor() as cursor: + # Check if tblLogins table exists + cursor.execute(""" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'tblLogins' + ) + """) + table_exists = cursor.fetchone()[0] + + if table_exists: + # PostgreSQL: Use DO block for error handling + cursor.execute('DELETE FROM "tblLogins" WHERE 1=1') + logger.info("Successfully emptied tblLogins") + else: + logger.info("tblLogins table does not exist, skipping empty operation") + except Exception as e: + logger.error(f"Error emptying tblLogins: {e}") + + +def run_migrate_to_history(apps, schema_editor): + """ + Run the migrate_to_history function to move InteractiveUser records + with non-null validity_to to the history table. + """ + InteractiveUser = apps.get_model('core', 'InteractiveUser') + HistoryInteractiveUser = apps.get_model('core', 'HistoricalInteractiveUser') + UserDistrict = apps.get_model('location', 'UserDistrict') + UserRole = apps.get_model('core', 'UserRole') + # Find InteractiveUser records with non-null validity_to + invalid_users = InteractiveUser.objects.filter(validity_to__isnull=False) + invalid_user_ids = invalid_users.values_list('id', flat=True) + UserRole.objects.filter(user_id__in=invalid_user_ids).delete() + UserDistrict.objects.filter(user_id__in=invalid_user_ids).delete() + + empty_tbl_logins(apps, schema_editor) + result = migrate_from_versioned_to_history(InteractiveUser, HistoryInteractiveUser) + print(result) # Output the result of the migration for logging + + +class Migration(migrations.Migration): + # tblLogins might need to be emptied first + + dependencies = [ + ("core", "0032_remove_interactiveuser_legacy_id_and_more"), + ] + + operations = [ + migrations.RunPython( + code=run_migrate_to_history, + reverse_code=migrations.RunPython.noop, + ), + ] diff --git a/core/models/__init__.py b/core/models/__init__.py index ebf6a3e..6885963 100644 --- a/core/models/__init__.py +++ b/core/models/__init__.py @@ -31,3 +31,5 @@ RoleMutation = user_mutation.RoleMutation ObjectMutation = base_mutation.ObjectMutation CachedManager = versioned_model.CachedManager +OpenIMISModel = openimis_model.OpenIMISModel +OpenIMISMigrationModel = openimis_model.OpenIMISMigrationModel \ No newline at end of file diff --git a/core/models/history_model.py b/core/models/history_model.py index c008124..c30b18b 100644 --- a/core/models/history_model.py +++ b/core/models/history_model.py @@ -9,7 +9,7 @@ from django.db import models from django.db.models import F from simple_history.models import HistoricalRecords -from core.utils import CachedManager, CachedModelMixin +from core.utils import CachedManager, CachedModelMixin, get_current_user # from core.datetimes.ad_datetime import datetime as py_datetime @@ -104,15 +104,22 @@ def update(self, *args, user=None, username=None, save=True, **kwargs): self.save(*args, user=user, username=user, **kwargs) return self + def _get_user(self, user=None, username=None): + + audit_user = user or get_current_user() + if audit_user: + return audit_user + elif username: + audit_user = User.objects.get(username=username, *User.filter_validity()) + return audit_user + else: + raise ValidationError( + "Save error! Provide user or the username of the current user in `username` argument" + ) + def save(self, *args, user=None, username=None, **kwargs): # get the user data so as to assign later his uuid id in fields user_updated etc - if not user: - if username: - user = User.objects.get(username=username) - else: - raise ValidationError( - "Save error! Provide user or the username of the current user in `username` argument" - ) + user = self._get_user(user, username) now = py_datetime.now() # check if object has been newly created if self.id is None: @@ -150,9 +157,10 @@ def save(self, *args, user=None, username=None, **kwargs): raise ValidationError( "Update error! You cannot update replaced entity" ) - result = super(HistoryModel, self).save(*args, **kwargs) - self.update_cache() - return result + errors = super(HistoryModel, self).save(*args, **kwargs) + if not errors: + self.update_cache() + return errors else: raise ValidationError( "Record has not be updated - there are no changes in fields" @@ -162,13 +170,7 @@ def delete_history(self): pass def delete(self, *args, user=None, username=None, **kwargs): - if not user: - if username: - user = User.objects.get(username=username) - else: - raise ValidationError( - "Save error! Provide user or the username of the current user in `username` argument" - ) + user = self._get_user(user, username) if not self.is_dirty(check_relationship=True) and not self.is_deleted: now = py_datetime.now() diff --git a/core/models/openimis_graphql_test_case.py b/core/models/openimis_graphql_test_case.py index 8ce9c1f..f20f161 100644 --- a/core/models/openimis_graphql_test_case.py +++ b/core/models/openimis_graphql_test_case.py @@ -12,12 +12,17 @@ from django.contrib.auth.models import AnonymousUser from django.contrib.sessions.backends.db import SessionStore from django.core.cache import cache - +from core.utils import clear_current_user +from django.db import transaction logger = logging.getLogger(__name__) class BaseTestContext: + + cookies = None + jwt = None + user = None def __init__(self, user=None, method="GET", path="/", data=None, headers=None): """ Initialize a test context with realistic request attributes. @@ -29,7 +34,8 @@ def __init__(self, user=None, method="GET", path="/", data=None, headers=None): data: Request payload (dict for POST/PUT, None for GET). headers: Custom HTTP headers (dict). """ - cookies = {} + self.cookies = {} + self.META = {} self.factory = RequestFactory() # Initialize session @@ -41,8 +47,7 @@ def __init__(self, user=None, method="GET", path="/", data=None, headers=None): user.id ) # Store user ID or other relevant data self.session.save() # Save session to generate session_key - self.jwt = get_token_jwt(self.user, self) - cookies["JWT"] = self.jwt + self.get_jwt() else: self.user = AnonymousUser() @@ -82,13 +87,18 @@ def __init__(self, user=None, method="GET", path="/", data=None, headers=None): self.META[meta_key] = value # Add cookies (e.g., session ID and JWT token) - cookies["sessionid"] = self.session.session_key - if cookies: + self.cookies["sessionid"] = self.session.session_key + + self._gen_meta() + + def _gen_meta(self): + if self.cookies: cookie_string = "; ".join( - f"{key}={value}" for key, value in cookies.items() + f"{key}={value}" for key, value in self.cookies.items() ) self.META["HTTP_COOKIE"] = cookie_string + def update_meta(self, key, value): """Utility method to update META dictionary.""" self.META[key] = value @@ -99,20 +109,42 @@ def get_request(self): return self.request def get_jwt(self): - """Return the JWT token.""" - return getattr(self, "jwt", None) + if self.user: + self.jwt = get_token_jwt(self.user, self) + self.cookies["JWT"] = self.jwt + self._gen_meta() + return self.jwt class openIMISGraphQLTestCase(GraphQLTestCase): GRAPHQL_URL = f"/{settings.SITE_ROOT()}graphql" GRAPHQL_SCHEMA = True + """ + Enhanced version that wraps every test in an atomic transaction. + Prevents GraphQL validation errors (400) from closing the DB connection. + """ + def _execute_test(self): + with transaction.atomic(): + super()._execute_test() + + def run(self, result=None): + """ + Override run() to ensure atomic block even when pytest-django calls it. + """ + with transaction.atomic(): + super().run(result) - # client = None @classmethod def setUpClass(cls): - # cls.client=Client(cls.schema) + clear_current_user() + cache.clear() super(openIMISGraphQLTestCase, cls).setUpClass() + def setUp(self): + clear_current_user() + cache.clear() + super(openIMISGraphQLTestCase, self).setUp() + def get_mutation_result( self, mutation_uuid, token, internal=False, allow_exceptions=True ): @@ -255,8 +287,10 @@ def send_mutation( # This validates the status code and if you get errors def build_params(self, params): def wrap_arg(v): + if isinstance(v, uuid.UUID): + return f'"{str(v).lower()}"' if isinstance(v, str): - return f'"{v}"' + return f'"{str(v)}"' if isinstance(v, list): return f"[{','.join([str(wrap_arg(vv)) for vv in v])}]" if isinstance(v, dict): @@ -274,6 +308,7 @@ def wrap_arg(v): ] return ", ".join(params_as_args) - def tearDwon(self): + def tearDown(self): cache.clear() - super().tearDwon() + clear_current_user() + super().tearDown() diff --git a/core/models/openimis_model.py b/core/models/openimis_model.py new file mode 100644 index 0000000..330677a --- /dev/null +++ b/core/models/openimis_model.py @@ -0,0 +1,213 @@ +import uuid +from datetime import datetime as py_datetime +from dirtyfields import DirtyFieldsMixin +from django.core.exceptions import ValidationError +from django.db.models import ( + Q, UUIDField, DateTimeField, BooleanField, Model, IntegerField, ForeignKey, + BigAutoField, JSONField, deletion, +) +from simple_history.models import HistoricalRecords + +from core.utils import CachedManager, CachedModelMixin, filter_validity as core_filter_validity +from django.apps import apps + + +class OpenIMISModel(DirtyFieldsMixin, CachedModelMixin, Model): + def filter_validity(arg="validity", prefix="", **kwargs): + validity = kwargs.get(arg, None) + if not validity: + return Q(active=True) + else: + return Q(active=False) | Q(date_deactivated__gte=validity) + + objects = CachedManager() + id = BigAutoField( + primary_key=True, auto_created=True, editable=False + ) + uuid = UUIDField( + unique=True, db_column="UUID", default=uuid.uuid4, editable=False + ) + active = BooleanField(default=True) + + json_ext = JSONField(db_column="Json_ext", blank=True, null=True) + date_deactivated = DateTimeField(null=True, default=None) + date_created = DateTimeField(null=True, default=py_datetime.now) + date_updated = DateTimeField(null=True, default=py_datetime.now) + user_created = ForeignKey( + "core.User", + related_name="%(class)s_user_created", + on_delete=deletion.DO_NOTHING, + null=True, + ) + user_updated = ForeignKey( + "core.User", + related_name="%(class)s_user_updated", + on_delete=deletion.DO_NOTHING, + null=True, + ) + version = IntegerField(default=1) + history = HistoricalRecords( + inherit=True, + ) + + def set_uuid(self): + self.uuid = uuid.uuid4() + + def save_history(self): + pass + + def update(self, *args, user=None, username=None, save=True, **kwargs): + """ + Overrides the default update to update the cache after saving the instance. + """ + obj_data = kwargs.pop("data", {}) + if not obj_data: + obj_data = kwargs + kwargs = {} + [setattr(self, key, obj_data[key]) for key in obj_data] + if save: + self.save(*args, user=user, username=user, **kwargs) + return self + + def save(self, *args, user=None, username=None, **kwargs): + # get the user data so as to assign later his uuid id in fields user_updated etc + user = self.get_user(user=None, username=None) + now = py_datetime.now() + # check if object has been newly created + if self.id is None: + # save the new object + self.user_created = user + self.date_created = now + self.date_updated = now + self.user_updated = user + result = super().save(*args, **kwargs) + self.update_cache() + return result + if self.is_dirty(check_relationship=True): + if not self.user_created: + # past = self.objects.filter(pk=self.id).first() + # if not past: + self.user_created = user + self.date_created = now + # TODO this could erase a instance, version check might be too light + # elif not self.version == past.version: + # raise ValidationError( + # "Record has not be updated - the version don't match with existing record" + # ) + self.date_updated = now + self.user_updated = user + self.version = self.version + 1 + # check if we have business model + if hasattr(self, "replacement_uuid"): + if ( + self.replacement_uuid is not None + and "replacement_uuid" not in self.get_dirty_fields() + ): + raise ValidationError( + "Update error! You cannot update replaced entity" + ) + result = super().save(*args, **kwargs) + self.update_cache() + return result + else: + raise ValidationError( + "Record has not be updated - there are no changes in fields" + ) + + def delete_history(self): + pass + + def get_user(self, user=None, username=None): + if not user: + user_id = 1 + if username: + user = apps.get_model('core', 'User').objects.get(username=username) + elif self.__class__.__name__ != 'InteractiveUser' and getattr(self, 'audit_user_id', None): + user_id = getattr(self, 'audit_user_id', None) + if user_id == -1: + user_id = 1 + + user = apps.get_model('core', 'User').objects.get(i_user_id=user_id) + return user + + def delete(self, *args, user=None, username=None, **kwargs): + user = self.get_user(user=None, username=None) + if not self.is_dirty(check_relationship=True) and self.active: + now = py_datetime.now() + self.date_updated = now + self.user_updated = user + self.version = self.version + 1 + self.active = False + # check if we have business model + if hasattr(self, "replacement_uuid"): + # When a replacement entity is deleted, the link should be removed + # from replaced entity so a new replacement could be generated + replaced_entity = self.__class__.objects.filter( + replacement_uuid=self.id + ).first() + if replaced_entity: + replaced_entity.replacement_uuid = None + replaced_entity.save(user=user) + result = super(OpenIMISModel, self).save(*args, **kwargs) + return result + else: + raise ValidationError( + "Record has not be deactivated, the object is different and must be updated before deactivating" + ) + + def copy(self, exclude_fields=["id", "uuid"]): + """ + Creates a copy of a Django model instance, excluding specified fields (default: 'id' and 'uuid'). + Args: + exclude_fields: List of field names to exclude from copying (default: ['id', 'uuid']) + Returns: + A new unsaved instance with copied attributes + """ + model_class = self.__class__ + new_instance = model_class() + fields = self._meta.get_fields() + for field in fields: + if field.name not in exclude_fields and hasattr(self, field.name): + if field.is_relation: + if field.many_to_one or field.one_to_one: + setattr(new_instance, field.name, getattr(self, field.name)) + elif field.one_to_many or field.many_to_many: + continue + else: + setattr(new_instance, field.name, getattr(self, field.name)) + + return new_instance + + @classmethod + def filter_queryset(cls, queryset=None): + if queryset is None: + queryset = cls.objects.all() + queryset = queryset.filter() + return queryset + + class Meta: + abstract = True + + +class OpenIMISMigrationModel(OpenIMISModel): + #### + # How to use: + # for migration of Versionned Model to openIMIS Model + # will keep the id as is but will rename the table column to id + # 1. change the base model of the class you want to use by OpenIMISMigrationModel and comment id and uuid + # 2. run `python manage.py makemigrations app_name` to update the table changes : rename and new column, + # it will also create the history tablesrun `python manage.py makemigrations app_name --name to_history --empty` + # 3. + # 4. in that migration file, run MyModel.migrate_to_history() to move all the + # record that have validitiy_to not null to history model + # 5. change the base model of the class you want to use by OpenIMISModel + # 6. run `python manage.py makemigrations app_name` to update the table changes : remove the validity_to and from + #### + validity_from = DateTimeField(db_column="ValidityFrom", default=py_datetime.now, null=True) + validity_to = DateTimeField(db_column="ValidityTo", blank=True, null=True, default=None) + + def filter_validity(arg="validity", prefix="", **kwargs): + return core_filter_validity(arg, prefix, **kwargs) + + class Meta: + abstract = True diff --git a/core/models/user.py b/core/models/user.py index ec46f2c..b6bdd24 100644 --- a/core/models/user.py +++ b/core/models/user.py @@ -17,10 +17,13 @@ from django.utils.crypto import salted_hmac from graphql import ResolveInfo import core +from hashlib import sha256 +from secrets import token_hex from django.contrib.auth.password_validation import validate_password -from ..utils import filter_validity, CachedManager +from ..utils import CachedManager from .base import ExtendableModel, Language, UUIDModel from .versioned_model import UUIDVersionedModel, VersionedModel +from .openimis_model import OpenIMISMigrationModel # , OpenIMISModel from core.utils import to_list_permissions from rest_framework import exceptions @@ -58,7 +61,7 @@ def auto_provision_user(self, **kwargs): if not username: raise exceptions.AuthenticationFailed("INCORRECT_CREDENTIALS") i_user = InteractiveUser.objects.filter( - login_name__iexact=username, *filter_validity() + login_name__iexact=username, *InteractiveUser.filter_validity() ).first() if not i_user: raise exceptions.AuthenticationFailed("INCORRECT_CREDENTIALS") @@ -188,13 +191,13 @@ class Meta: db_table = "tblRoleRight" -class InteractiveUser(VersionedModel): +class InteractiveUser(OpenIMISMigrationModel): UNIQUE_FIELDS = {"pk", "uuid", "id", "login_name"} USE_CACHE = not settings.IS_TESTING - id = models.AutoField(db_column="UserID", primary_key=True) - uuid = models.CharField( - db_column="UserUUID", max_length=36, default=uuid.uuid4, unique=True - ) + # id = models.AutoField(db_column="UserID", primary_key=True) + # uuid = models.CharField( + # db_column="UserUUID", max_length=36, default=uuid.uuid4, unique=True + # ) language = models.ForeignKey(Language, models.DO_NOTHING, db_column="LanguageID") last_name = models.CharField(db_column="LastName", max_length=100) other_names = models.CharField(db_column="OtherNames", max_length=100) @@ -310,7 +313,7 @@ def is_officer(self): is_officer = cache.get(cache_name) if is_officer is None: is_officer = Officer.objects.filter( - code=self.login_name, has_login=True, *filter_validity() + code=self.login_name, has_login=True, *Officer.filter_validity() ).exists() cache.set(cache_name, is_officer, None) return is_officer @@ -327,7 +330,7 @@ def is_claim_admin(self): from core.models.user import ClaimAdmin is_claim_admin = ClaimAdmin.objects.filter( - code=self.login_name, has_login=True, *filter_validity() + code=self.login_name, has_login=True, *ClaimAdmin.filter_validity() ).exists() cache.set(cache_name, is_claim_admin, None) return is_claim_admin @@ -348,12 +351,9 @@ def is_imis_admin(self): cache.set("is_admin_" + str(self.id), is_admin, 600) return is_admin - def set_password(self, raw_password): - from hashlib import sha256 - from secrets import token_hex - + def set_password(self, raw_password, private_key=token_hex(128)): validate_password(raw_password) - self.private_key = token_hex(128) + self.private_key = private_key pwd_hash = sha256() pwd_hash.update(f"{raw_password.rstrip()}{self.private_key}".encode()) self.password = ( @@ -729,14 +729,17 @@ def is_imis_admin(self): @property def is_active(self): - if self._u.validity_from is None and self._u.validity_to is None: + if self.i_user: + return self.i_user.active + else: + if self._u.validity_from is None and self._u.validity_to is None: + return True + now = py_datetime.now() + if self._u.validity_from is not None and self._u.validity_from > now: + return False + if self._u.validity_to is not None and self._u.validity_to < now: + return False return True - now = py_datetime.now() - if self._u.validity_from is not None and self._u.validity_from > now: - return False - if self._u.validity_to is not None and self._u.validity_to < now: - return False - return True def has_perm(self, perm, obj=None): i_user = self.i_user if obj is None else obj.i_user @@ -815,8 +818,26 @@ def __str__(self): return "(%s) %s [%s]" % (utype, self.username, self.id) def save(self, *args, **kwargs): - if self._u and self._u.id: - self._u.save() + if self.i_user: + try: + self.i_user.save(*args, **kwargs) + except Exception as e: + logger.debug(f"cannot save i_user: {e}") + if self.officer: + try: + self.officer.save(*args, **kwargs) + except Exception as e: + logger.debug(f"cannot save officer {e}") + if self.claim_admin: + try: + self.claim_admin.save(*args, **kwargs) + except Exception as e: + logger.debug(f"cannot save claim_admin {e}") + if self.t_user: + try: + self.t_user.save(*args, **kwargs) + except Exception as e: + logger.debug(f"cannot save t_user {e}") super().save(*args, **kwargs) def shallow_save(self, *args, **kwargs): diff --git a/core/models/versioned_model.py b/core/models/versioned_model.py index f0294aa..e5a0796 100644 --- a/core/models/versioned_model.py +++ b/core/models/versioned_model.py @@ -6,7 +6,7 @@ from core.utils import CachedManager, CachedModelMixin from ..fields import DateTimeField -from ..utils import filter_validity +from ..utils import filter_validity as core_filter_validity import logging logger = logging.getLogger(__name__) @@ -21,13 +21,13 @@ class BaseVersionedModel(CachedModelMixin, models.Model): @staticmethod def filter_validity(validity=None, prefix="", **kwargs): - return filter_validity(validity=validity, prefix=prefix, **kwargs) + return core_filter_validity(validity=validity, prefix=prefix, **kwargs) def update(self, *args, **kwargs): """ Overrides the default update to update the cache after saving the instance. """ - obj_data = kwargs.pop("data", {}) + obj_data = kwargs.pop("data", None) if not obj_data: obj_data = kwargs kwargs = {} @@ -39,7 +39,7 @@ def save(self, *args, **kwargs): Overrides the default save to update the cache after saving the instance. """ caching = kwargs.pop("cache_update", True) - super().save(*args, **kwargs) + super(BaseVersionedModel, self).save(*args, **kwargs) if caching: # Build the cache key using the same logic as in the CachedManager. # (Assuming lookups are done using pk/id/uuid) @@ -83,7 +83,7 @@ class Meta: def filter_queryset(cls, queryset=None): if queryset is None: queryset = cls.objects.all() - queryset = queryset.filter(*filter_validity()) + queryset = queryset.filter(*core_filter_validity()) return queryset diff --git a/core/schema.py b/core/schema.py index 692d049..fc36f85 100644 --- a/core/schema.py +++ b/core/schema.py @@ -33,7 +33,7 @@ wait_for_mutation, ) from core.tasks import openimis_mutation_async -from core import filter_validity, prefix_filterset +from core import prefix_filterset from core.data_masking import anonymize_gql from django import dispatch from django.conf import settings @@ -70,9 +70,10 @@ ModulePermissionGQLType, CustomFilterOptionGQLType, ) -from core.utils import ( +from core.utils import ( # noqa: 401 ExtendedConnection, collect_all_gql_permissions, + filter_validity ) from core.models import ( ModuleConfiguration, @@ -743,7 +744,7 @@ class Meta: @classmethod def get_queryset(cls, queryset, info): - queryset = queryset.filter(*filter_validity()) + queryset = queryset.filter(*ClaimAdmin.filter_validity()) return queryset @@ -897,11 +898,11 @@ def resolve_claim_admins(self, info, search=None, **kwargs): if not info.context.user.has_perms(CoreConfig.gql_query_claim_admins_perms): raise PermissionDenied(_("unauthorized")) - hf_filters = [*filter_validity(**kwargs)] district_uuid = kwargs.get("district_uuid", None) region_uuid = kwargs.get("region_uuid", None) try: HealthFacility = apps.get_model("location", "HealthFacility") + hf_filters = [*HealthFacility.filter_validity(**kwargs)] if district_uuid is not None: hf_filters += [Q(location__uuid=district_uuid)] elif region_uuid is not None: @@ -921,7 +922,7 @@ def resolve_claim_admins(self, info, search=None, **kwargs): logger.debug(e) pass - filters = [*filter_validity(**kwargs)] + filters = [*ClaimAdmin.filter_validity(**kwargs)] if user_health_facility: filters += [Q(health_facility__in=user_health_facility)] @@ -1044,7 +1045,7 @@ def resolve_interactive_users(self, info, **kwargs): show_history = kwargs.get("show_history", False) if not show_history and not kwargs.get("uuid", None): - filters += filter_validity(**kwargs) + filters += InteractiveUser.filter_validity(**kwargs) return gql_optimizer.query(query.filter(*filters), info) @@ -1094,9 +1095,9 @@ def resolve_users( show_deleted = kwargs.get("showDeleted", False) if not show_deleted and not kwargs.get("id", None): - # active_users_ids = [user.id for user in user_query if user.is_active] + # active_users_ids = [user.id for user in user_query if user.active] user_filters.append( - Q(i_user__isnull=True) | Q(*filter_validity(prefix="i_user__")) + Q(i_user__isnull=True) | Q(*User.filter_validity(prefix="i_user__")) ) text_search = kwargs.get("str") # Poorly chosen name, avoid of shadowing "str" @@ -1248,7 +1249,7 @@ def resolve_role(self, info, **kwargs): show_history = kwargs.get("show_history", False) if not show_history and not kwargs.get("uuid", None): - filters += filter_validity(**kwargs) + filters += Role.filter_validity(**kwargs) is_system_role = kwargs.get("is_system", None) # check if we can use default filter validity @@ -1268,7 +1269,7 @@ def resolve_role_right(self, info, **kwargs): raise PermissionError("Unauthorized") filters = [] if "validity" in kwargs: - filters += filter_validity(**kwargs) + filters += RoleRight.filter_validity(**kwargs) return gql_optimizer.query(RoleRight.objects.filter(*filters), info) else: return gql_optimizer.query( @@ -1995,7 +1996,7 @@ def set_user_deleted(user): def change_user_language(user, language_id): try: updated_user = InteractiveUser.objects.filter( - user__id=user.id, *filter_validity() + user__id=user.id, *InteractiveUser.filter_validity() ).first() updated_user.language_id = language_id updated_user.save() diff --git a/core/services/userServices.py b/core/services/userServices.py index 23a5554..ec07366 100644 --- a/core/services/userServices.py +++ b/core/services/userServices.py @@ -16,13 +16,15 @@ ) from django.contrib.auth import authenticate from rest_framework import exceptions -from core.utils import filter_validity from django.db.models import Q logger = logging.getLogger(__file__) def create_or_update_interactive_user(user_id, data, audit_user_id, connected): + admin = User.objects.filter(i_user_id=1).first() + if not admin: + User.objects.create(username="Admin", i_user_id=1) i_fields = { "username": "login_name", "other_names": "other_names", @@ -315,8 +317,8 @@ def check_user_unique_email(user_email): def reset_user_password(request, username): user = User.objects.filter( Q(username=username) | Q(i_user__email=username), - *filter_validity(), - *filter_validity(prefix="i_user__"), + *User.filter_validity(), + *InteractiveUser.filter_validity(prefix="i_user__"), ).first() # we don't want to inform is a username was not found if not user: diff --git a/core/services/utils/serviceUtils.py b/core/services/utils/serviceUtils.py index d3dc817..6c7ed66 100644 --- a/core/services/utils/serviceUtils.py +++ b/core/services/utils/serviceUtils.py @@ -5,11 +5,14 @@ from django.contrib.contenttypes.models import ContentType from django.core.serializers.json import DjangoJSONEncoder from django.forms.models import model_to_dict +from core.utils import get_current_user def check_authentication(function): def wrapper(self, *args, **kwargs): - if type(self.user) is AnonymousUser or not self.user.id: + if not self.user: + self.user = get_current_user() + if type(self.user) is AnonymousUser or (not self.user and not self.user.id): return { "success": False, "message": "Authentication required", diff --git a/core/tasks.py b/core/tasks.py index dfeae33..8fc0df1 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -4,6 +4,7 @@ from celery import shared_task from core.models import MutationLog, Language +from core.utils import set_current_user from django.utils import translation logger = logging.getLogger(__name__) @@ -22,6 +23,9 @@ def openimis_mutation_async(mutation_id, module, class_name): mutation = None try: mutation = MutationLog.objects.get(id=mutation_id) + # Set the current user for audit logging + if mutation.user: + set_current_user(mutation.user) # __import__ needs to import the module with .schema to force .schema to load, then .schema.TheRealMutation mutation_class = getattr(__import__(f"{module}.schema").schema, class_name) diff --git a/core/test_helpers.py b/core/test_helpers.py index 823e369..284ad3b 100644 --- a/core/test_helpers.py +++ b/core/test_helpers.py @@ -1,16 +1,60 @@ -from core.models import Officer, InteractiveUser, User, TechnicalUser, filter_validity +from core.models import ( + Officer, + InteractiveUser, + User, + TechnicalUser, + Language, + Role, + RoleRight, +) +from django.core.cache import cache from core.models.openimis_graphql_test_case import openIMISGraphQLTestCase from core.models.user import ClaimAdmin from core.services.userServices import ( create_or_update_officer_villages, - create_or_update_interactive_user, - create_or_update_core_user, ) from core.services import create_or_update_user_roles +from core.utils import collect_all_gql_permissions, set_current_user from location.models import Location from location.test_helpers import create_test_health_facility from uuid import uuid4 +import datetime +from django.core.exceptions import ValidationError, PermissionDenied + + + +def create_test_language(code="en", name="English", sort_order=1, custom_props=None): + """ + Create a test language in the database. + + Args: + code: Language code (primary key) + name: Language name + sort_order: Sort order for the language + custom_props: Additional properties for the language + + Returns: + Language object + """ + if custom_props is None: + custom_props = {} + else: + custom_props = {k: v for k, v in custom_props.items() if hasattr(Language, k)} + # Check if language already exists + existing_language = Language.objects.filter(code=code).first() + if existing_language: + return existing_language + + # Create new language + language_data = { + "code": code, + "name": name, + "sort_order": sort_order, + **custom_props, + } + + return Language.objects.create(**language_data) def create_test_officer(valid=True, custom_props=None, villages=[]): if custom_props is None: @@ -22,8 +66,9 @@ def create_test_officer(valid=True, custom_props=None, villages=[]): uuid = custom_props.pop("uuid", None) qs_eo = Officer.objects eo = None + code = code or "TSTOFF" data = { - "code": code or "TSTOFF", + "code": code, "uuid": uuid, "last_name": "Officer", "other_names": "Test", @@ -33,22 +78,23 @@ def create_test_officer(valid=True, custom_props=None, villages=[]): **custom_props, } - if code: - qs_eo = qs_eo.filter(code=code) + eo = None if uuid: qs_eo = qs_eo.filter(uuid=uuid) - eo = None + elif code: + qs_eo = qs_eo.filter(code=code) + if code or uuid: eo = qs_eo.first() if eo: data["uuid"] = eo.uuid - eo.update(data) + eo.update(**data) else: data["uuid"] = uuid4() eo = Officer.objects.create(**data) if not villages: - villages == Location.objects.filter(*filter_validity(), type="V").first() + villages == Location.objects.filter(*Location.filter_validity(), type="V").first() if eo: _ = create_or_update_officer_villages(eo, [v.id for v in villages], 1) return eo @@ -56,27 +102,81 @@ def create_test_officer(valid=True, custom_props=None, villages=[]): def create_test_interactive_user( username="TestInteractiveTest", - password="S\\:\\/pe®Pąßw0rd" "", + password="admin123", roles=None, custom_props=None, ): + cache.clear() + # to ensure the resource could be saved + admin = User.objects.filter(i_user_id=1).first() + if not admin: + User.objects.create(username="Admin", i_user_id=1) if custom_props is None: custom_props = {} else: custom_props = { k: v for k, v in custom_props.items() if hasattr(InteractiveUser, k) } + + # Handle language field specially - convert code to Language instance + if "language" in custom_props: + language_value = custom_props["language"] + if isinstance(language_value, str): + custom_props["language"] = create_test_language(code=language_value) + # If it's already a Language instance, keep it as is + elif "language_id" in custom_props: + language_code = custom_props["language_id"] + custom_props["language"] = create_test_language(code=language_code) + del custom_props["language_id"] if roles is None: - roles = [7, 1, 2, 3, 4, 5, 6] + # Create a test role with default permissions instead of hardcoded role IDs + default_perm_names = [ + "gql_query_roles_perms", + "gql_mutation_create_roles_perms", + "gql_mutation_update_roles_perms", + "gql_mutation_replace_roles_perms", + "gql_mutation_duplicate_roles_perms", + "gql_mutation_delete_roles_perms", + "gql_query_users_perms", + "gql_query_users_profile_perms", + "gql_mutation_create_users_perms", + "gql_mutation_update_users_perms", + "gql_mutation_delete_users_perms", + "gql_query_enrolment_officers_perms", + "gql_mutation_create_enrolment_officers_perms", + "gql_mutation_update_enrolment_officers_perms", + "gql_mutation_delete_enrolment_officers_perms", + "gql_query_claim_administrator_perms", + "gql_mutation_create_claim_administrator_perms", + "gql_mutation_update_claim_administrator_perms", + "gql_mutation_delete_claim_administrator_perms", + ] + test_role = create_test_role(perm_names=default_perm_names, name="TestInteractiveUserRole") + roles = [1, test_role.id] user = None - i_user = InteractiveUser.objects.filter(login_name=username).first() + i_user = InteractiveUser.objects.filter(login_name=username, *InteractiveUser.filter_validity()).first() if i_user: - # TODO add custom prop to existing user - user = User.objects.filter(i_user=i_user).first() + # Update existing i_user with custom props + for key, value in custom_props.items(): + if hasattr(i_user, key): + setattr(i_user, key, value) + try: + i_user.save() + except ValidationError: + # unchanged + pass + user = User.objects.filter(i_user=i_user, *User.filter_validity()).first() + # Update existing user if found and if there are custom props for User model + if user: + user_props = {k: v for k, v in custom_props.items() if hasattr(User, k)} + if user_props: + for key, value in user_props.items(): + setattr(user, key, value) + user.save() else: user = User.objects.filter( - username=username, + username=username, *User.filter_validity() ).first() if user and user.i_user: i_user = user.i_user @@ -98,9 +198,16 @@ def create_test_interactive_user( username=username, i_user=i_user, ) - i_user.set_password(password) - i_user.save() + else: + user.save() + i_user.set_password(password, private_key=i_user.private_key) + try: + i_user.save() + except ValidationError: + # unchanged + pass create_or_update_user_roles(i_user, roles, None) + set_current_user(user) return user @@ -227,24 +334,443 @@ def __init__(self): "password": self.test_user_password, "other_names": self.test_user_name, "user_types": "INTERACTIVE", - "language": "en", - "roles": [1, 3, 5, 9], + "language": "en" } def get_or_create_user_api(self, **kwargs): - username = kwargs.get("username") or self.test_user_name - user = User.objects.filter(username=username).first() - if user is None: - user = self._create_user_interactive_core(**kwargs) - return user - - def _create_user_interactive_core(self, **kwargs): - username = kwargs.get("username") or self.test_user_name - i_user, i_user_created = create_or_update_interactive_user( - user_id=None, - data={**self.test_data_user, **kwargs}, - audit_user_id=999, - connected=False, + return create_test_interactive_user(**kwargs) + + +def create_enrolment_officer_role(): + """ + Create the Enrolment Officer role with specific permissions. + This role should have permissions for insuree, location, product, and policy management. + """ + enrolment_officer_perms = [ + "gql_query_insuree_perms", + "gql_mutation_update_insurees_perms", + "gql_mutation_create_insurees_perms", + "gql_query_locations_perms", + "gql_query_products_perms", + "gql_query_policies_perms", + "gql_mutation_create_policies_perms", + "gql_mutation_edit_policies_perms", + ] + return create_test_role(perm_names=enrolment_officer_perms, name="EnrolmentOfficer", is_system=1) + + +def create_claim_admin_role(): + """ + Create the Claim Admin role with specific permissions. + This role should have permissions for policy, search insuree, read policy, + create update search HF claims with medical service and item. + """ + claim_admin_perms = [ + "gql_query_policies_perms", + "gql_query_insuree_perms", + "gql_mutation_create_claims_perms", + "gql_mutation_update_claims_perms", + "gql_query_claims_perms", + "gql_query_health_facilities_perms", + "gql_query_medical_services_perms", + "gql_query_medical_items_perms", + ] + return create_test_role(perm_names=claim_admin_perms, name="ClaimAdministrator", is_system=16) + + +def create_test_role(perm_names, name=None, is_system=0, is_blocked=False, custom_props=None): + """ + Create a test role with permissions specified by name as they appear in the module DEFAULT config. + + Args: + perm_names: List of permission names (e.g., ["gql_query_roles_perms", "gql_mutation_create_roles_perms"]) + name: Optional role name, defaults to "TestRole" + is_system: System role flag (default 0 for non-system) + is_blocked: Whether role is blocked (default False) + custom_props: Additional properties for the role + + Returns: + Role object + """ + if custom_props is None: + custom_props = {} + else: + custom_props = {k: v for k, v in custom_props.items() if hasattr(Role, k)} + + if name is None: + name = "TestRole" + + # Check if role already exists by name + existing_role = Role.objects.filter(name=name, *Role.filter_validity()).first() + if existing_role: + return existing_role + + # Collect all permissions from DEFAULT configs + permissions_dict = collect_all_gql_permissions() + + # Flatten permission IDs for the given names + flat_perms = {} + for app_perms in permissions_dict.values(): + for perm_name, perm_ids in app_perms.items(): + flat_perms[perm_name] = perm_ids + + right_ids = [] + for perm_name in perm_names: + if perm_name not in flat_perms: + raise Exception(f"Permission {perm_name} not found") + right_ids.extend(flat_perms[perm_name]) + + # Remove duplicates + right_ids = list(set(right_ids)) + # Create the role + role_data = { + "name": name, + "is_system": is_system, + "is_blocked": is_blocked, + "audit_user_id": -1, + "validity_from": datetime.datetime.now(), + **custom_props, + } + + role = Role.objects.create(**role_data) + + # Create role rights + for right_id in right_ids: + RoleRight.objects.create( + role=role, + right_id=right_id, + audit_user_id=-1, + validity_from=datetime.datetime.now(), ) - create_or_update_core_user(user_uuid=None, username=username, i_user=i_user) - return User.objects.get(username=username) + + return role + + +def create_manager_role(): + """ + Create the Manager role with specific permissions. + This role should have permissions for reports and insuree inquiry. + """ + manager_perms = [ + "gql_reports_primary_operational_indicators_claims_perms", + "gql_reports_derived_operational_indicators_perms", + "gql_reports_contribution_collection_perms", + "gql_reports_user_activity_perms", + "gql_query_insuree_inquire_perms", + ] + return create_test_role(perm_names=manager_perms, name="Manager", is_system=2) + + +def create_accountant_role(): + """ + Create the Accountant role with specific permissions. + This role should have permissions for families, insurees, policies, premiums, payments, claims, and various reports. + """ + accountant_perms = [ + "gql_query_families_perms", + "gql_query_insurees_perms", + "gql_query_insuree_inquire_perms", + "gql_query_policies_perms", + "gql_query_premiums_perms", + "gql_query_payments_perms", + "gql_mutation_create_payments_perms", + "gql_mutation_update_payments_perms", + "gql_mutation_delete_payments_perms", + "gql_query_claims_perms", + "gql_mutation_create_claims_perms", + "gql_mutation_update_claims_perms", + "gql_mutation_delete_claims_perms", + "gql_reports_contribution_collection_perms", + "gql_reports_product_sales_perms", + "gql_reports_contribution_distribution_perms", + "gql_reports_payment_category_overview_perms", + "gql_reports_matching_funds_perms", + "gql_reports_claim_overview_report_perms", + "gql_reports_percentage_referrals_perms", + "gql_reports_families_insurees_overview_perms", + "gql_reports_pending_insurees_perms", + "gql_reports_renewals_perms", + "gql_reports_capitation_payment_perms", + "gql_reports_rejected_photo_perms", + "gql_reports_contribution_payment_perms", + "gql_reports_control_number_assignment_perms", + "gql_reports_overview_of_commissions_perms", + ] + return create_test_role(perm_names=accountant_perms, name="Accountant", is_system=4) + + +def create_clerk_role(): + """ + Create the Clerk role with specific permissions. + This role has the same permissions as Enrolment Officer. + """ + clerk_perms = [ + "gql_query_families_perms", + "gql_mutation_create_families_perms", + "gql_mutation_update_families_perms", + "gql_mutation_delete_families_perms", + "gql_query_insurees_perms", + "gql_mutation_create_insurees_perms", + "gql_mutation_update_insurees_perms", + "gql_mutation_delete_insurees_perms", + "gql_query_insuree_inquire_perms", + "gql_query_policies_perms", + "gql_mutation_create_policies_perms", + "gql_mutation_edit_policies_perms", + "gql_mutation_delete_policies_perms", + "gql_mutation_renew_policies_perms", + "gql_query_premiums_perms", + "gql_mutation_create_premiums_perms", + "gql_mutation_update_premiums_perms", + "gql_mutation_delete_premiums_perms", + "gql_query_claims_perms", + "gql_mutation_deliver_claim_feedback_perms", + ] + return create_test_role(perm_names=clerk_perms, name="Clerk", is_system=8) + + +def create_medical_officer_role(): + """ + Create the Medical Officer role with specific permissions. + This role should have permissions for claims and claim history reports. + """ + medical_officer_perms = [ + "gql_query_claims_perms", + "gql_mutation_create_claims_perms", + "gql_mutation_update_claims_perms", + "gql_mutation_submit_claims_perms", + "gql_mutation_process_claims_perms", + "gql_reports_claim_history_report_perms", + ] + return create_test_role(perm_names=medical_officer_perms, name="MedicalOfficer", is_system=16) + + +def create_scheme_admin_role(): + """ + Create the Scheme Administrator role with extensive permissions. + This role has broad access to most modules. + """ + scheme_admin_perms = [ + "gql_query_insuree_inquire_perms", + "gql_query_locations_perms", + "gql_query_health_facilities_perms", + "gql_mutation_create_locations_perms", + "gql_mutation_edit_locations_perms", + "gql_mutation_delete_locations_perms", + "gql_mutation_move_location_perms", + "gql_mutation_create_region_locations_perms", + "gql_mutation_create_health_facilities_perms", + "gql_mutation_edit_health_facilities_perms", + "gql_mutation_delete_health_facilities_perms", + "gql_query_medical_items_perms", + "gql_query_medical_services_perms", + "gql_mutation_medical_items_add_perms", + "gql_mutation_medical_items_update_perms", + "gql_mutation_medical_items_delete_perms", + "gql_mutation_medical_services_add_perms", + "gql_mutation_medical_services_update_perms", + "gql_mutation_medical_services_delete_perms", + "gql_query_pricelists_medical_items_perms", + "gql_mutation_pricelists_medical_items_add_perms", + "gql_mutation_pricelists_medical_items_update_perms", + "gql_mutation_pricelists_medical_items_delete_perms", + "gql_mutation_pricelists_medical_items_duplicate_perms", + "gql_query_pricelists_medical_services_perms", + "gql_mutation_pricelists_medical_services_add_perms", + "gql_mutation_pricelists_medical_services_update_perms", + "gql_mutation_pricelists_medical_services_delete_perms", + "gql_mutation_pricelists_medical_services_duplicate_perms", + "gql_query_products_perms", + "gql_mutation_products_add_perms", + "gql_mutation_products_edit_perms", + "gql_mutation_products_delete_perms", + "gql_mutation_products_duplicate_perms", + "gql_query_insurees_perms", + "gql_query_families_perms", + "gql_query_insuree_policy_perms", + "gql_mutation_create_families_perms", + "gql_mutation_update_families_perms", + "gql_mutation_delete_families_perms", + "gql_mutation_create_insurees_perms", + "gql_mutation_update_insurees_perms", + "gql_mutation_delete_insurees_perms", + "gql_query_policies_perms", + "gql_query_policies_by_insuree_perms", + "gql_query_policies_by_family_perms", + "gql_query_eligibilities_perms", + "gql_mutation_create_policies_perms", + "gql_mutation_renew_policies_perms", + "gql_mutation_edit_policies_perms", + "gql_mutation_suspend_policies_perms", + "gql_mutation_delete_policies_perms", + "gql_query_premiums_perms", + "gql_mutation_create_premiums_perms", + "gql_mutation_update_premiums_perms", + "gql_mutation_delete_premiums_perms", + "gql_query_payers_perms", + "gql_mutation_payer_add_perms", + "gql_mutation_payer_update_perms", + "gql_mutation_payer_delete_perms", + "gql_query_payments_perms", + "gql_mutation_create_payments_perms", + "gql_mutation_update_payments_perms", + "gql_mutation_delete_payments_perms", + "gql_query_claims_perms", + "gql_mutation_create_claims_perms", + "gql_mutation_update_claims_perms", + "gql_mutation_load_claims_perms", + "gql_mutation_submit_claims_perms", + "gql_mutation_select_claim_feedback_perms", + "gql_mutation_bypass_claim_feedback_perms", + "gql_mutation_skip_claim_feedback_perms", + "gql_mutation_deliver_claim_feedback_perms", + "gql_mutation_select_claim_review_perms", + "gql_mutation_bypass_claim_review_perms", + "gql_mutation_skip_claim_review_perms", + "gql_mutation_deliver_claim_review_perms", + "gql_mutation_process_claims_perms", + "gql_mutation_restore_claims_perms", + "gql_mutation_delete_claims_perms", + "claim_print_perms", + "gql_query_batch_runs_perms", + "gql_mutation_process_batch_perms", + "gql_reports_capitation_payment_perms", + "account_preview_perms", + "registers_perms", + "registers_diagnoses_perms", + "registers_health_facilities_perms", + "registers_locations_perms", + "registers_items_perms", + "registers_services_perms", + "extracts_master_data_perms", + "extracts_officer_feedbacks_perms", + "extracts_officer_renewals_perms", + "extracts_phone_extract_perms", + "extracts_upload_claims_perms", + "gql_query_report_perms", + "gql_reports_primary_operational_indicator_policies_perms", + "gql_reports_primary_operational_indicators_claims_perms", + "gql_reports_derived_operational_indicators_perms", + "gql_reports_contribution_collection_perms", + "gql_reports_product_sales_perms", + "gql_reports_contribution_distribution_perms", + "gql_reports_user_activity_perms", + "gql_reports_enrolment_performance_indicators_perms", + "gql_reports_status_of_register_perms", + "gql_reports_insuree_without_photos_perms", + "gql_reports_payment_category_overview_perms", + "gql_reports_matching_funds_perms", + "gql_reports_claim_overview_report_perms", + "gql_reports_percentage_referrals_perms", + "gql_reports_families_insurees_overview_perms", + "gql_reports_pending_insurees_perms", + "gql_reports_renewals_perms", + "gql_reports_capitation_payment_perms", + "gql_reports_rejected_photo_perms", + "gql_reports_contribution_payment_perms", + "gql_reports_control_number_assignment_perms", + "gql_reports_overview_of_commissions_perms", + "gql_reports_claim_history_report_perms", + "gql_mutation_report_add_perms", + "gql_mutation_report_edit_perms", + "gql_mutation_report_delete_perms", + ] + return create_test_role(perm_names=scheme_admin_perms, name="SchemeAdministrator", is_system=32) + + +def create_imis_admin_role(): + """ + Create the IMIS Administrator role with extensive permissions. + This role has admin-level access including user and role management. + """ + return Role.objects.filter(is_system=64, *Role.filter_validity()).first() + + +def create_receptionist_role(): + """ + Create the Receptionist role with specific permissions. + This role should have permissions for families, insurees, policies, and premiums. + """ + receptionist_perms = [ + "gql_query_families_perms", + "gql_query_insurees_perms", + "gql_query_insuree_inquire_perms", + "gql_query_policies_perms", + "gql_query_premiums_perms", + ] + return create_test_role(perm_names=receptionist_perms, name="Receptionist", is_system=128) + + +def create_claim_contributor_role(): + """ + Create the Claim Contributor role with specific permissions. + This role should have permissions for claims and claim feedback. + """ + claim_contributor_perms = [ + "gql_query_claims_perms", + "gql_mutation_create_claims_perms", + "gql_mutation_update_claims_perms", + ] + return create_test_role(perm_names=claim_contributor_perms, name="ClaimContributor", is_system=512) + + +def create_hf_admin_role(): + """ + Create the HF Administrator role with specific permissions. + This role should have permissions for users, reports, locations, and medical items/services. + """ + hf_admin_perms = [ + "gql_query_users_perms", + "gql_mutation_create_users_perms", + "gql_mutation_update_users_perms", + "gql_mutation_delete_users_perms", + "gql_query_health_facilities_perms", + "gql_mutation_edit_health_facilities_perms", + "gql_mutation_delete_health_facilities_perms", + "gql_query_medical_items_perms", + "gql_mutation_medical_items_update_perms", + "gql_query_medical_services_perms", + "gql_mutation_medical_services_update_perms", + "gql_query_pricelists_medical_items_perms", + "gql_mutation_pricelists_medical_items_update_perms", + "gql_mutation_pricelists_medical_items_delete_perms", + "gql_query_pricelists_medical_services_perms", + "gql_mutation_pricelists_medical_services_update_perms", + "gql_mutation_pricelists_medical_services_delete_perms", + "gql_reports_capitation_payment_perms", + "gql_reports_user_activity_perms", + "gql_reports_status_of_register_perms", + "gql_reports_overview_of_commissions_perms", + ] + return create_test_role(perm_names=hf_admin_perms, name="HFAdministrator", is_system=524288) + + +def create_offline_admin_role(): + """ + Create the Offline Administrator role with specific permissions. + This role has the same permissions as HF Administrator. + """ + offline_admin_perms = [ + "gql_query_users_perms", + "gql_mutation_create_users_perms", + "gql_mutation_update_users_perms", + "gql_mutation_delete_users_perms", + "gql_query_health_facilities_perms", + "gql_mutation_edit_health_facilities_perms", + "gql_mutation_delete_health_facilities_perms", + "gql_query_medical_items_perms", + "gql_mutation_medical_items_update_perms", + "gql_query_medical_services_perms", + "gql_mutation_medical_services_update_perms", + "gql_query_pricelists_medical_items_perms", + "gql_mutation_pricelists_medical_items_update_perms", + "gql_mutation_pricelists_medical_items_delete_perms", + "gql_query_pricelists_medical_services_perms", + "gql_mutation_pricelists_medical_services_update_perms", + "gql_mutation_pricelists_medical_services_delete_perms", + "gql_reports_capitation_payment_perms", + "gql_reports_user_activity_perms", + "gql_reports_status_of_register_perms", + "gql_reports_overview_of_commissions_perms", + ] + return create_test_role(perm_names=offline_admin_perms, name="OfflineAdministrator", is_system=1048576) diff --git a/core/tests/test_api.py b/core/tests/test_api.py index 8f9a08c..7dab476 100644 --- a/core/tests/test_api.py +++ b/core/tests/test_api.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from graphql_jwt.shortcuts import get_token - +from datetime import datetime @dataclass class DummyContext: @@ -29,9 +29,6 @@ def setUpClass(cls): def test_authenticated_get_current_user(self): url = '/api/core/users/current_user/' # Authenticate the client using JWT token - self.admin_user._u.given_name = "my test" - self.admin_user._u.save_history() - self.admin_user._u.save() self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.admin_token}') response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) diff --git a/core/tests/test_create_test_role.py b/core/tests/test_create_test_role.py new file mode 100644 index 0000000..78d13de --- /dev/null +++ b/core/tests/test_create_test_role.py @@ -0,0 +1,16 @@ +from django.test import TestCase +from core.test_helpers import create_test_role +from core.models import Role + +class CreateTestRoleTest(TestCase): + def test_create_test_role_success(self): + perm_names = ["gql_query_roles_perms"] + role = create_test_role(perm_names=perm_names, name="TestRoleSuccess") + self.assertIsNotNone(role) + self.assertEqual(role.name, "TestRoleSuccess") + + def test_create_test_role_failure(self): + perm_names = ["invalid_permission_name"] + with self.assertRaises(Exception) as cm: + create_test_role(perm_names=perm_names, name="TestRoleFailure") + self.assertEqual(str(cm.exception), "Permission invalid_permission_name not found") diff --git a/core/tests/test_create_test_role_mock.py b/core/tests/test_create_test_role_mock.py new file mode 100644 index 0000000..2f0cc27 --- /dev/null +++ b/core/tests/test_create_test_role_mock.py @@ -0,0 +1,51 @@ +import unittest +from unittest.mock import patch, MagicMock +import sys +import os + + +from django.conf import settings + + +from core.test_helpers import create_test_role + + +class CreateTestRoleMockTest(unittest.TestCase): + @patch('core.test_helpers.collect_all_gql_permissions') + @patch('core.test_helpers.Role') + @patch('core.test_helpers.RoleRight') + def test_create_test_role_success(self, MockRoleRight, MockRole, mock_collect_perms): + # Setup mocks + mock_collect_perms.return_value = { + 'app1': {'perm1': [1], 'perm2': [2]} + } + MockRole.objects.filter.return_value.first.return_value = None + mock_role_instance = MagicMock() + MockRole.objects.create.return_value = mock_role_instance + + # Call function + role = create_test_role(perm_names=['perm1'], name="TestRole") + + # Assertions + self.assertEqual(role, mock_role_instance) + MockRole.objects.create.assert_called_once() + MockRoleRight.objects.create.assert_called_once() + + @patch('core.test_helpers.collect_all_gql_permissions') + @patch('core.test_helpers.Role') + def test_create_test_role_failure(self, MockRole, mock_collect_perms): + # Setup mocks + mock_collect_perms.return_value = { + 'app1': {'perm1': [1]} + } + MockRole.objects.filter.return_value.first.return_value = None + + # Call function and expect exception + with self.assertRaises(Exception) as cm: + create_test_role(perm_names=['invalid_perm'], name="TestRole") + + self.assertEqual(str(cm.exception), "Permission invalid_perm not found") + + +if __name__ == '__main__': + unittest.main() diff --git a/core/tests/test_graphql.py b/core/tests/test_graphql.py index a38028f..56ba4ff 100644 --- a/core/tests/test_graphql.py +++ b/core/tests/test_graphql.py @@ -2,8 +2,8 @@ openIMISGraphQLTestCase, BaseTestContext, ) +from core.models import Language from core.test_helpers import create_test_interactive_user -from core import filter_validity from location.models import Location import json @@ -22,7 +22,13 @@ def setUpClass(cls): ) cls.admin_token_context = BaseTestContext(user=cls.admin_user) cls.admin_token = cls.admin_token_context.get_jwt() - cls.disctict = Location.objects.filter(type="D", *filter_validity()).first() + cls.disctict = Location.objects.filter(type="D", *Location.filter_validity()).first() + + # Create French language if it doesn't exist + Language.objects.get_or_create( + code="fr", + defaults={"name": "Français", "sort_order": 1} + ) def test_login_successful(self): variables = { @@ -133,7 +139,7 @@ def test_create_user_with_null_uuid(self): "districts": [self.disctict.id], "locationId": None, "language": "en", - "roles": ["4"], + "roles": ["1"], "substitutionOfficerId": None, "clientMutationLabel": "Create user", "clientMutationId": "95b431f3-0c12-40ad-bc01-51034702366d", diff --git a/core/tests/test_models.py b/core/tests/test_models.py index a837291..c45d6a7 100644 --- a/core/tests/test_models.py +++ b/core/tests/test_models.py @@ -10,13 +10,13 @@ def test_t_user_active_status(self): ) self.assertTrue(always_valid.is_active) - from core import datetime, datetimedelta + import datetime not_yet_active = User( username="not_yet_active", t_user=TechnicalUser( username="not_yet_active", - validity_from=datetime.datetime.now() + datetimedelta(days=1), + validity_from=datetime.datetime.now() + datetime.timedelta(days=1), ), ) self.assertFalse(not_yet_active.is_active) @@ -25,7 +25,7 @@ def test_t_user_active_status(self): username="not_active_anymore", t_user=TechnicalUser( username="not_active_anymore", - validity_to=datetime.datetime.now() + datetimedelta(days=-1), + validity_to=datetime.datetime.now() + datetime.timedelta(days=-1), ), ) self.assertFalse(not_active_anymore.is_active) @@ -36,22 +36,23 @@ def test_i_active_status(self): ) self.assertTrue(always_valid.is_active) - from core import datetime, datetimedelta - - not_yet_active = User( - username="always_valid", - i_user=InteractiveUser( - login_name="not_yet_active", - validity_from=datetime.datetime.now() + datetimedelta(days=1), - ), - ) - self.assertFalse(not_yet_active.is_active) + import datetime + # user is not business history yet + # not_yet_active = User( + # username="always_valid", + # i_user=InteractiveUser( + # login_name="not_yet_active", + # validity_from=datetime.datetime.now() + datetime.timedelta(days=1), + # ), + # ) + # self.assertFalse(not_yet_active.is_active) not_active_anymore = User( username="always_valid", i_user=InteractiveUser( login_name="not_active_anymore", - validity_to=datetime.datetime.now() + datetimedelta(days=-1), + active=False, + date_deactivated=datetime.datetime.now() + datetime.timedelta(days=-1), ), ) self.assertFalse(not_active_anymore.is_active) diff --git a/core/tests/test_services.py b/core/tests/test_services.py index 823ec41..6136437 100644 --- a/core/tests/test_services.py +++ b/core/tests/test_services.py @@ -6,7 +6,7 @@ from django.apps import apps import datetime import core -from core.models import InteractiveUser, Officer, UserRole +from core.models import InteractiveUser, Officer, UserRole, Language from core.services import ( create_or_update_interactive_user, create_or_update_core_user, @@ -17,6 +17,7 @@ ) from django.test import TestCase from location.models import OfficerVillage +from location.test_helpers import create_test_village, create_test_health_facility logger = logging.getLogger(__file__) postgresql = "postgresql" @@ -33,6 +34,20 @@ def setUp(self): core.datetime = importlib.import_module(".datetimes.ad_datetime", "core") self.claim_admin_class = apps.get_model("core", "ClaimAdmin") self.factory = RequestFactory() + # Create test villages + self.test_village1 = create_test_village(custom_props={"name": "Test Village 1", "code": "TV1"}) + self.test_village2 = create_test_village(custom_props={"name": "Test Village 2", "code": "TV2"}) + self.test_village3 = create_test_village(custom_props={"name": "Test Village 3", "code": "TV3"}) + + # Create French language if it doesn't exist + Language.objects.get_or_create( + code="fr", + defaults={"name": "Français", "sort_order": 1} + ) + + # Create test health facility + self.test_hf = create_test_health_facility() + self.test_hf2 = create_test_health_facility() def test_iuser_min(self): roles = [11] @@ -75,7 +90,7 @@ def test_iuser_max(self): language="fr", phone="+123456789", email=f"{username}@illuminati.int", - health_facility_id=1, + health_facility_id=self.test_hf.id, password=PASSWORD, ), audit_user_id=999, @@ -116,7 +131,7 @@ def test_iuser_update(self): language="fr", phone="+123456789", email=f"{username}@illuminati.int", - health_facility_id=1, + health_facility_id=self.test_hf.id, password=PASSWORD, ), audit_user_id=999, @@ -151,7 +166,7 @@ def test_iuser_update(self): language="en", phone="updated phone", email=f"{username}@updated.int", - health_facility_id=2, + health_facility_id=self.test_hf2.id, password=f"{PASSWORD}updated", ), audit_user_id=111, @@ -252,6 +267,7 @@ def test_officer_min(self): def test_officer_max(self): username = "tstsvco2" + village_ids = [self.test_village1.id, self.test_village2.id, self.test_village3.id] officer, created = create_or_update_officer( user_id=None, data=dict( @@ -262,7 +278,7 @@ def test_officer_max(self): phone="+12345678", email="imis@foo.be", location_id=1, - village_ids=[22, 35, 50], + village_ids=village_ids, substitution_officer_id=1, works_to="2025-01-01", phone_communication=True, @@ -271,6 +287,7 @@ def test_officer_max(self): audit_user_id=999, connected=True, ) + officer.refresh_from_db() self.assertTrue(created) self.assertIsNotNone(officer) self.assertEquals(officer.username, username) @@ -289,7 +306,7 @@ def test_officer_max(self): .order_by("location_id") .values_list("location_id", flat=True) ), - [22, 35, 50], + sorted(village_ids), ) self.assertEquals(officer.phone, "+12345678") self.assertEquals(officer.email, "imis@foo.be") @@ -299,6 +316,7 @@ def test_officer_max(self): def test_officer_update(self): username = "tstsvco2" + village_ids = [self.test_village1.id, self.test_village2.id, self.test_village3.id] officer, created = create_or_update_officer( user_id=None, data=dict( @@ -309,7 +327,7 @@ def test_officer_update(self): phone="+12345678", email="imis@foo.be", location_id=1, - village_ids=[22, 35, 50], + village_ids=village_ids, substitution_officer_id=1, works_to="2025-01-01", phone_communication=True, @@ -318,6 +336,7 @@ def test_officer_update(self): audit_user_id=999, connected=True, ) + officer.refresh_from_db() self.assertTrue(created) self.assertIsNotNone(officer) self.assertEquals(officer.username, username) @@ -336,7 +355,7 @@ def test_officer_update(self): .order_by("location_id") .values_list("location_id", flat=True) ), - [22, 35, 50], + sorted(village_ids), ) self.assertEquals(officer.phone, "+12345678") self.assertEquals(officer.email, "imis@foo.be") @@ -351,7 +370,7 @@ def test_officer_update(self): phone="+00000", email="imis@bar.be", location_id=17, - village_ids=[22], + village_ids=[self.test_village1.id], substitution_officer_id=None, works_to=datetime.date(2025, 5, 5), phone_communication=False, @@ -380,7 +399,7 @@ def test_officer_update(self): .order_by("location_id") .values_list("location_id", flat=True) ), - [22], + [self.test_village1.id], ) self.assertEquals(officer2.phone, "+00000") self.assertEquals(officer2.email, "imis@bar.be") @@ -418,7 +437,7 @@ def test_claim_admin_max(self): dob="1999-05-05", phone="+12345678", email="imis@foo.be", - health_facility_id=1, + health_facility_id=self.test_hf.id, ), audit_user_id=999, connected=True, @@ -430,7 +449,7 @@ def test_claim_admin_max(self): self.assertEquals(claim_admin.other_names, "Other 1 2 3") self.assertEquals(claim_admin.audit_user_id, 999) self.assertTrue(claim_admin.has_login) - self.assertEquals(claim_admin.health_facility_id, 1) + self.assertEquals(claim_admin.health_facility_id, self.test_hf.id) self.assertEquals(claim_admin.phone, "+12345678") self.assertEquals(claim_admin.email_id, "imis@foo.be") diff --git a/core/tests/test_utils.py b/core/tests/test_utils.py index 9d5a772..d499046 100644 --- a/core/tests/test_utils.py +++ b/core/tests/test_utils.py @@ -67,21 +67,21 @@ def test_json_serialize_value(self): self.assertEquals(to_json_safe_value(decimal_obj), str(decimal_obj)) def test_is_admin_rights(self): - role = Role.objects.filter(is_system=64, *filter_validity()).first() - user = User.objects.filter(username="Admin", *filter_validity()).first() + role = Role.objects.filter(is_system=64, *Role.filter_validity()).first() + user = User.objects.filter(username="Admin", *User.filter_validity()).first() if not user: user = create_test_interactive_user(username="Admin", roles=[role.id]) # removing all role but admin UserRole.objects.filter( - ~Q(role__is_system=64), user=user._u, *filter_validity() + ~Q(role__is_system=64), user=user._u, *UserRole.filter_validity() ).delete() # removing all admin rights - RoleRight.objects.filter(role__is_system=64, *filter_validity()).delete() + RoleRight.objects.filter(role__is_system=64, *RoleRight.filter_validity()).delete() rights = list(user.rights) rights_db = [ rr.right_id for rr in RoleRight.filter_queryset() - .filter(role__is_system=64, *filter_validity(prefix="role__")) + .filter(role__is_system=64, *RoleRight.filter_validity(prefix="role__")) .distinct() ] self.assertEquals(len(rights_db), 0, "all roleright are not removed") @@ -119,30 +119,40 @@ def test_to_list_permissions(self): def test_cache_invalidation(self): User.USE_CACHE = True + create_test_interactive_user(username="one") + create_test_interactive_user(username="two") users = list(User.objects.all()) users_id = [user.id for user in users] - users_0_no_cache_get = User.objects.get(id=users_id[0]) + # get 0 without cache + users_0_no_cache_get = User.objects.get(id=users_id[0], *User.filter_validity()) + # get 0 with cache users_0_filter = User.objects.filter(id=users_id[0]).first() + # get 1 with cache + users_1_filter = User.objects.filter(id=users_id[-1]).first() self.assertEquals( users_0_no_cache_get, users_0_filter, "get and filter should retrieve the same object", ) + # update 1, cache should be invalidaed users_0_filter.username = users_0_filter.username + "T" users_0_filter.save() + # get use from cache / partially from cache users_filter = list(User.objects.filter(id__in=users_id)) - caches["default"].delete(f"cd_User_{users_filter[2].id}") - users.remove(users_0_no_cache_get) - users_filter.remove(users_0_filter) users_0_filter = User.objects.filter(id=users_id[0]).first() self.assertNotEquals( users_0_no_cache_get.username, users_0_filter.username, "the object should be different, cache not invalidated properly", ) - self.assertNotEquals( - users, - users_filter, + # remove user 1 from cache + caches["default"].delete(f"cd_User_{users_1_filter.id}") + # remove user 1 from all user list (old and new) + users.remove(users_0_no_cache_get) + users_filter.remove(users_0_filter) + self.assertEquals( + sorted(users, key=lambda x: x.id), + sorted(users_filter, key=lambda x: x.id), "should be the same list even if user_filter comes partially from cache", ) caches["default"].clear() diff --git a/core/utils.py b/core/utils.py index c38ed89..28c3473 100644 --- a/core/utils.py +++ b/core/utils.py @@ -20,7 +20,12 @@ import datetime from django.core.cache import caches from functools import lru_cache +# utils/request_local.py +import threading +from django.db import transaction +# from simple_history.utils import update_change_reason +_request_local = threading.local() logger = logging.getLogger(__file__) @@ -45,6 +50,19 @@ ] +def get_current_user(): + return getattr(_request_local, "user", None) + + +def set_current_user(user): + _request_local.user = user + + +def clear_current_user(): + if hasattr(_request_local, "user"): + del _request_local.user + + class TimeUtils(object): @classmethod @@ -136,18 +154,22 @@ def __place_the_filters(date_start, date_end): def append_validity_filter(**kwargs): - default_filter = kwargs.get("applyDefaultValidityFilter", False) - date_valid_from = kwargs.get("dateValidFrom__Gte", None) - date_valid_to = kwargs.get("dateValidTo__Lte", None) + default_filter = kwargs.pop("applyDefaultValidityFilter", False) + date_valid_from = kwargs.pop("dateValidFrom__Gte", None) + date_valid_to = kwargs.pop("dateValidTo__Lte", None) filters = [] # check if we can use default filter validity if date_valid_from is None and date_valid_to is None: if default_filter: - filters = [*filter_validity_business_model(**kwargs)] + filters = [*filter_validity_business_model()] else: filters = [] else: - filters = [*filter_validity_business_model(**kwargs)] + filters = [*filter_validity_business_model( + dateValidFrom__Gte=date_valid_from, + dateValidTo__Lte=date_valid_to) + ] + return filters @@ -250,8 +272,8 @@ def get(self, *args, **kwargs): def _normalize_value(self, value): """Normalize value for cache key.""" - if isinstance(value, uuid.UUID): - return str(value) + if isinstance(value, (str, uuid.UUID)): + return str(value).lower() try: return int(value) except (ValueError, TypeError): @@ -603,7 +625,6 @@ class ExtendedRelayConnection(graphene.relay.Connection): """ Adds total_count and edge_count to Graphene Relay connections. """ - class Meta: abstract = True @@ -845,7 +866,7 @@ def clear_cache(instance): def get_cache_key(model, id): - return f"cs_{model.__name__}_{id}" + return f"cs_{model.__name__}_{str(id).lower()}" def is_this_session_superuser(session_key): @@ -927,3 +948,59 @@ def to_list_permissions(): for perm_id in perm_ids: all_perms.add(int(perm_id)) return sorted(list(all_perms)) + + +def migrate_from_versioned_to_history(model_class, history_model_class): + """ + Migrates records with non-null validity_to to the history model for the given model class. + + Args: + model_class: The Django model class to migrate (e.g., InteractiveUser). + history_model_class: The corresponding history model class (e.g., HistoricalInteractiveUser). + + Returns: + str: A message indicating the number of records migrated. + """ + records_to_migrate = model_class.objects.filter(validity_to__isnull=False) + migrated_count = 0 + + with transaction.atomic(): + for record in records_to_migrate: + try: + # Create a history record + history_record = history_model_class() + + # Copy all fields from the original record + for field in record._meta.get_fields(): + if field.name not in ['history']: # Skip id and history fields + if field.is_relation: + if field.many_to_one or field.one_to_one: + try: + setattr(history_record, field.name, getattr(record, field.name)) + except Exception as e: + logger.warning(f"Failed to copy relation field {field.name}: {e}") + else: + setattr(history_record, field.name, getattr(record, field.name)) + + # Set history-specific fields + history_record.history_date = record.validity_to or datetime.datetime.now() + history_record.history_change_reason = "Migrated to history" + history_record.history_type = '~' # Update operation + + # Save the history record + history_record.save() + + # Update change reason + # update_change_reason(history_record, "Migrated to history") + + # Delete the original record from the main table + record.delete() + migrated_count += 1 + + except Exception as e: + logger.error(f"Error migrating record {record.id}: {e}") + raise + + result = f"Migrated {migrated_count} records to history for {model_class.__name__}" + logger.info(result) + return result