-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
There should be support of uploading custom profile pictures for groups and users. But currently backend doesn't support media uploading.
Here some example some from where you can take inspiration:
# models.py
import mimetypes
from uuid import uuid4
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.core.files import base
from django.db import models
from django.db.models import Q
from django.db.models.functions import Now
from django_cleanup import cleanup
from humanize.filesize import naturalsize
from common.mixins.models import BaseModel
from files.decorators import public_files
def upload_to(instance, filename):
return f"{instance.alias}.{instance.extension}"
class FileManager(models.Manager):
@public_files
def create_many(self, files: list, is_public: bool = False):
bulk_items = [
self.model(
file=each,
alias=uuid4().hex,
raw_name=each.name,
size=each.size,
mime_type=each.content_type,
extension=each.name.split(".")[-1],
metadata={"is_public": is_public},
)
for each in files
]
return self.bulk_create(bulk_items)
def attachable(self, **kwargs):
"""
This method returns files that are not already attached to any model. Also,
it takes optional kwargs and applies an OR filter along with the above filter.
"""
return self.filter(
Q(
**{
"created__gte": Now() - settings.MODEL_GARBAGE_COLLECTION_DELTA,
"object_id__isnull": True,
}
)
| Q(**kwargs)
)
def orphans(self):
return self.filter(
**{
"created__lte": Now() - settings.MODEL_GARBAGE_COLLECTION_DELTA,
"object_id__isnull": True,
}
)
@cleanup.select
class File(BaseModel):
file = models.FileField(upload_to=upload_to)
alias = models.CharField(max_length=settings.CHARFIELD_LENGTH, editable=False)
raw_name = models.CharField(max_length=settings.CHARFIELD_LENGTH, editable=False)
size = models.DecimalField(max_digits=15, decimal_places=2, editable=False)
mime_type = models.CharField(max_length=settings.CHARFIELD_LENGTH, editable=False, blank=True)
extension = models.CharField(max_length=settings.CHARFIELD_LENGTH, editable=False)
metadata = models.JSONField(default=dict)
content_type = models.ForeignKey(
"contenttypes.ContentType", on_delete=models.CASCADE, blank=True, null=True
)
object_id = models.UUIDField(blank=True, null=True)
content_object = GenericForeignKey()
@property
def hsize(self):
"""
Humanize file size.
"""
return naturalsize(self.size)
objects = FileManager()
class Meta:
indexes = [
models.Index(fields=["content_type", "object_id"]),
]
def save(self, *args, **kwargs):
no_infer = kwargs.pop("no_infer", False)
if self.file and not no_infer:
self.alias = uuid4().hex
self.raw_name = self.file.name
self.size = self.file.size
if isinstance(self.file.file, base.File):
self.mime_type = mimetypes.guess_type(self.raw_name)[0]
else:
self.mime_type = self.file.file.content_type
self.extension = self.file.name.split(".")[-1]
return super().save(*args, **kwargs)
def __str__(self):
return self.raw_name# serializers.py
import uuid
from typing import Callable
from rest_framework import serializers
from common.mixins.serializers import CommonValidationsMixin
from files.decorators import remove_querystring_auth
from files.models import File
from files.tasks import parse_file
class FileParserSerializer(serializers.Serializer):
task_id = serializers.UUIDField(read_only=True)
def update(self, instance, validated_data):
chain_id = parse_file(str(instance.id))
return {"task_id": chain_id}
class ListFileSerializer(serializers.ListSerializer):
def create(self, validated_data):
files = [each["file"] for each in validated_data]
return File.objects.create_many(files, is_public=self.context["is_public"])
class FileSerializer(serializers.ModelSerializer):
class Meta:
model = File
fields = (
"id",
"file",
"alias",
"raw_name",
"size",
"mime_type",
"extension",
"metadata",
)
read_only = (
"id",
"alias",
"raw_name",
"size",
"mime_type",
"extension",
)
list_serializer_class = ListFileSerializer
@remove_querystring_auth
def to_representation(self, instance):
outgoing = super().to_representation(instance)
outgoing["size"] = instance.hsize
return outgoing
class FileAttachmentSerializer(CommonValidationsMixin, serializers.Serializer):
"""
This serializer is used to attach files to a model. It can be used in one of the
following two ways:
1. Inherit from this serializer. And define a list of file fields in the inheriting
class.
2. Simply use this serializer and pass a list of file ids as data.
"""
def __init__(self, *args, **kwargs):
file_fields = kwargs.pop("file_fields", [])
super().__init__(*args, **kwargs)
self.inheritence_mode = True
if file_fields:
self.file_fields = file_fields
elif getattr(self, "file_fields", None):
pass
else:
self.inheritence_mode = False
def _validate_files_data(self, content_object, file_field, field_data):
def _get_qs():
return File.objects.attachable(
id__in=getattr(content_object, file_field).values_list("id", flat=True)
)
if not field_data:
return []
def get_error_key_template():
return "{field_name}.{index}".replace("{field_name}", file_field)
if isinstance(field_data, list):
from_db = _get_qs().in_bulk(field_data)
self.ensure_distinctness(
field_data,
default_error_key_template=get_error_key_template(),
)
self.ensure_membership(
field_data,
from_db.keys(),
default_error_message="File not found.",
default_error_key_template=get_error_key_template(),
)
return from_db.values()
else:
try:
return [_get_qs().get(pk=field_data)]
except File.DoesNotExist:
raise serializers.ValidationError({file_field: "File not found."})
def _attach_files(self, content_object, file_field, field_data):
field_data = self._validate_files_data(content_object, file_field, field_data)
getattr(content_object, file_field).set(field_data)
def _convert_to_uuid(self, field_name, value, index=""):
if isinstance(value, uuid.UUID):
return value
try:
return uuid.UUID(value)
except ValueError:
raise serializers.ValidationError(
{f"{field_name}.{index}".rstrip("."): "Not a valid UUID"}
)
def _gather_fields_data(self, validated_data):
data = {}
for k, v in validated_data.items():
data[k] = []
if isinstance(v, list):
for index, id in enumerate(v):
data[k].append(self._convert_to_uuid(k, id, index))
else:
data[k] = self._convert_to_uuid(k, v)
return data
def _wrapped_crud(self, crud_func: Callable, *args, **kwargs):
if self.inheritence_mode:
field_with_data = self._gather_fields_data(
{k: kwargs["validated_data"].pop(k, []) for k in self.file_fields}
)
obj = crud_func(*args, **kwargs)
else:
obj = kwargs["validated_data"].pop("content_object")
field_with_data = self._gather_fields_data(self.initial_data)
for field, field_data in field_with_data.items():
if field_data:
self._attach_files(obj, field, field_data)
return obj
def create(self, validated_data):
return self._wrapped_crud(super().create, validated_data=validated_data)
def update(self, instance, validated_data):
return self._wrapped_crud(
super().update, instance=instance, validated_data=validated_data
)# views.py
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.parsers import JSONParser, MultiPartParser
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
from files.models import File
from files.serializers import FileParserSerializer, FileSerializer
from roles.mixins.views import RecruiterOrInternalServerPermissions
class FileViewSet(RecruiterOrInternalServerPermissions, ModelViewSet):
parser_classes = (
JSONParser,
MultiPartParser,
)
queryset = File.objects.all()
required_scopes = ["read", "write"]
def get_serializer_class(self):
if self.action == "parse_file":
return FileParserSerializer
else:
return FileSerializer
def get_serializer_context(self):
context = super().get_serializer_context()
context["is_public"] = "HTTP_X_IS_PUBLIC_FILE_UPLOAD" in self.request.META.keys()
return context
def create(self, request, *args, **kwargs):
files = []
for _, v in dict(request.FILES).items():
if isinstance(v, list):
for each in v:
files.append({"file": each})
else:
files.append({"file": v})
serializer = self.get_serializer(data=files, many=True)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data, status=status.HTTP_201_CREATED, headers=headers
)
@action(detail=True, methods=["post"], url_name="parse", url_path="parse")
def parse_file(self, request, *args, **kwargs):
return super().update(request, *args, **kwargs)# decorators.py
def public_files(func):
def wrapper(*args, **kwargs):
if not hasattr(args[0].model.file.field.storage, "object_parameters") or not kwargs.get("is_public", False):
return func(*args, **kwargs)
# Save the original file storage parameters
original_params = args[0].model.file.field.storage.object_parameters.copy()
# Set the file storage to be public for the duration of the function call
args[0].model.file.field.storage.object_parameters = {
**original_params,
**{
"Tagging": "is_public=true",
}
}
try:
# Call the function
fun_ret_val = func(*args, **kwargs)
except Exception as e:
# Reset the file storage to its original state
args[0].model.file.field.storage.object_parameters = original_params.copy()
raise e
# Reset the file storage to its original state
args[0].model.file.field.storage.object_parameters = original_params.copy()
return fun_ret_val
return wrapper
def remove_querystring_auth(func):
def wrapper(*args, **kwargs):
if not args[1].metadata.get("is_public") or not hasattr(args[1].file.storage, "querystring_auth"):
return func(*args, **kwargs)
original_qs_auth = args[1].file.storage.querystring_auth
args[1].file.storage.querystring_auth = False
try:
# Call the function
fun_ret_val = func(*args, **kwargs)
except Exception as e:
# Reset the file storage to its original state
args[1].file.storage.querystring_auth = original_qs_auth
raise e
# Reset the file storage to its original state
args[1].file.storage.querystring_auth = original_qs_auth
return fun_ret_val
return wrapperMetadata
Metadata
Assignees
Labels
No labels