Source code for dal.views

"""Base views for autocomplete widgets."""

import json
import operator
from functools import reduce

import django
from django import http
if django.VERSION >= (4, 0):
    from django.contrib.admin.utils import lookup_spawns_duplicates
else:
    from django.contrib.admin.utils import lookup_needs_distinct \
        as lookup_spawns_duplicates
from django.contrib.auth import get_permission_codename
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.db.models import Q
from django.http import HttpResponseBadRequest, HttpResponseNotAllowed
from django.template.loader import render_to_string
from django.views.generic.list import BaseListView


[docs] class ViewMixin(object): """Common methods for autocomplete views. It is assumed this view will be used in conjunction with a Django :py:class:`View` based class that will implement OPTIONS. .. py:attribute:: forwarded Dict of field values that were forwarded from the form, may be used to filter autocompletion results based on the form state. See ``linked_data`` example for reference. .. py:attribute:: q Query string as typed by the user in the autocomplete field. """ http_method_allowed = ('GET', 'POST')
[docs] def dispatch(self, request, *args, **kwargs): """Set :py:attr:`forwarded` and :py:attr:`q`.""" if request.method.upper() not in self.http_method_allowed: return HttpResponseNotAllowed(self.http_method_allowed) try: self.forwarded = json.loads( getattr(request, request.method).get('forward', '{}') ) except ValueError: return HttpResponseBadRequest('Invalid JSON data') if not isinstance(self.forwarded, dict): return HttpResponseBadRequest('Not a JSON object') self.q = request.GET.get('q', '') return super(ViewMixin, self).dispatch(request, *args, **kwargs)
[docs] class BaseQuerySetView(ViewMixin, BaseListView): """Base view to get results from a QuerySet. .. py:attribute:: create_field Name of the field to use to create missing values. For example, if create_field='title', and the user types in "foo", then the autocomplete view will propose an option 'Create "foo"' if it can't find any value matching "foo". When the user does click 'Create "foo"', the autocomplete script should POST to this view to create the object and get back the newly created object id. .. py:attribute:: model_field_name Name of the Model field to run filter against. """ paginate_by = 10 context_object_name = 'results' model_field_name = 'name' create_field = None search_fields = [] split_words = None template = None validate_create = None
[docs] def has_more(self, context): """For widgets that have infinite-scroll feature.""" return context['page_obj'].has_next() if context['page_obj'] else False
[docs] def get_result_value(self, result): """Return the value of a result.""" return str(result.pk)
[docs] def get_result_label(self, result): """Return the label of a result.""" if self.template: return render_to_string(self.template, {"result": result}) else: return str(result)
[docs] def get_selected_result_label(self, result): """Return the label of a selected result.""" return self.get_result_label(result)
[docs] def get_queryset(self): """Filter the queryset with GET['q'].""" qs = super(BaseQuerySetView, self).get_queryset() qs = self.get_search_results(qs, self.q) return qs
[docs] def get_search_fields(self): """Get the fields to search over.""" if self.search_fields: return self.search_fields else: return [self.model_field_name]
def _construct_search(self, field_name): """Apply keyword searches.""" if field_name.startswith("^"): return "%s__istartswith" % field_name[1:] elif field_name.startswith("="): return "%s__iexact" % field_name[1:] elif field_name.startswith("@"): return "%s__search" % field_name[1:] else: return "%s__icontains" % field_name
[docs] def get_search_results(self, queryset, search_term): """Filter the results based on the query.""" search_fields = self.get_search_fields() if search_fields and search_term: orm_lookups = [ self._construct_search(search_field) for search_field in search_fields ] if self.split_words is not None: word_conditions = [] for word in search_term.split(): or_queries = [Q(**{orm_lookup: word}) for orm_lookup in orm_lookups] word_conditions.append(reduce(operator.or_, or_queries)) op_ = operator.or_ if self.split_words == "or" else operator.and_ queryset = queryset.filter(reduce(op_, word_conditions)) else: or_queries = [ Q(**{orm_lookup: search_term}) for orm_lookup in orm_lookups ] queryset = queryset.filter(reduce(operator.or_, or_queries)) if self.lookup_needs_distinct(queryset, orm_lookups): queryset = queryset.distinct() return queryset
[docs] def lookup_needs_distinct(self, queryset, orm_lookups): """Return True if an orm_lookup requires calling qs.distinct().""" return any( lookup_spawns_duplicates(queryset.model._meta, search_spec) for search_spec in orm_lookups )
[docs] def create_object(self, text): """Create an object given a text.""" return self.get_queryset().get_or_create( **{self.create_field: text})[0]
[docs] def has_add_permission(self, request): """Return True if the user has the permission to add a model.""" if django.VERSION < (2, 0, 0): auth = request.user.is_authenticated() else: auth = request.user.is_authenticated if not auth: return False opts = self.get_queryset().model._meta codename = get_permission_codename('add', opts) return request.user.has_perm("%s.%s" % (opts.app_label, codename))
[docs] def post(self, request, *args, **kwargs): """ Create an object given a text after checking permissions. Runs self.validate() if self.validate_create is True. """ if not self.has_add_permission(request): return http.HttpResponseForbidden() if not self.create_field: raise ImproperlyConfigured('Missing "create_field"') text = request.POST.get('text', None) if text is None: return http.HttpResponseBadRequest() if self.validate_create: try: self.validate(text) except ValidationError as error: if self.create_field in error.message_dict: return http.JsonResponse(dict(error=error)) result = self.create_object(text) return http.JsonResponse({ 'id': self.get_result_value(result), 'text': self.get_selected_result_label(result), })
[docs] def validate(self, text): """ Validate a given text for new option creation. Raise ValidationError or return None. """ model = self.get_queryset().model obj = model(**{self.create_field: text}) obj.full_clean()