123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- """
- Provides various authentication policies.
- """
- import base64
- import binascii
- from django.contrib.auth import authenticate, get_user_model
- from django.middleware.csrf import CsrfViewMiddleware
- from django.utils.translation import gettext_lazy as _
- from rest_framework import HTTP_HEADER_ENCODING, exceptions
- def get_authorization_header(request):
- """
- Return request's 'Authorization:' header, as a bytestring.
- Hide some test client ickyness where the header can be unicode.
- """
- auth = request.META.get('HTTP_AUTHORIZATION', b'')
- if isinstance(auth, str):
- # Work around django test client oddness
- auth = auth.encode(HTTP_HEADER_ENCODING)
- return auth
- class CSRFCheck(CsrfViewMiddleware):
- def _reject(self, request, reason):
- # Return the failure reason instead of an HttpResponse
- return reason
- class BaseAuthentication:
- """
- All authentication classes should extend BaseAuthentication.
- """
- def authenticate(self, request):
- """
- Authenticate the request and return a two-tuple of (user, token).
- """
- raise NotImplementedError(".authenticate() must be overridden.")
- def authenticate_header(self, request):
- """
- Return a string to be used as the value of the `WWW-Authenticate`
- header in a `401 Unauthenticated` response, or `None` if the
- authentication scheme should return `403 Permission Denied` responses.
- """
- pass
- class BasicAuthentication(BaseAuthentication):
- """
- HTTP Basic authentication against username/password.
- """
- www_authenticate_realm = 'api'
- def authenticate(self, request):
- """
- Returns a `User` if a correct username and password have been supplied
- using HTTP Basic authentication. Otherwise returns `None`.
- """
- auth = get_authorization_header(request).split()
- if not auth or auth[0].lower() != b'basic':
- return None
- if len(auth) == 1:
- msg = _('Invalid basic header. No credentials provided.')
- raise exceptions.AuthenticationFailed(msg)
- elif len(auth) > 2:
- msg = _('Invalid basic header. Credentials string should not contain spaces.')
- raise exceptions.AuthenticationFailed(msg)
- try:
- try:
- auth_decoded = base64.b64decode(auth[1]).decode('utf-8')
- except UnicodeDecodeError:
- auth_decoded = base64.b64decode(auth[1]).decode('latin-1')
- auth_parts = auth_decoded.partition(':')
- except (TypeError, UnicodeDecodeError, binascii.Error):
- msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
- raise exceptions.AuthenticationFailed(msg)
- userid, password = auth_parts[0], auth_parts[2]
- return self.authenticate_credentials(userid, password, request)
- def authenticate_credentials(self, userid, password, request=None):
- """
- Authenticate the userid and password against username and password
- with optional request for context.
- """
- credentials = {
- get_user_model().USERNAME_FIELD: userid,
- 'password': password
- }
- user = authenticate(request=request, **credentials)
- if user is None:
- raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
- if not user.is_active:
- raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
- return (user, None)
- def authenticate_header(self, request):
- return 'Basic realm="%s"' % self.www_authenticate_realm
- class SessionAuthentication(BaseAuthentication):
- """
- Use Django's session framework for authentication.
- """
- def authenticate(self, request):
- """
- Returns a `User` if the request session currently has a logged in user.
- Otherwise returns `None`.
- """
- # Get the session-based user from the underlying HttpRequest object
- user = getattr(request._request, 'user', None)
- # Unauthenticated, CSRF validation not required
- if not user or not user.is_active:
- return None
- self.enforce_csrf(request)
- # CSRF passed with authenticated user
- return (user, None)
- def enforce_csrf(self, request):
- """
- Enforce CSRF validation for session based authentication.
- """
- def dummy_get_response(request): # pragma: no cover
- return None
- check = CSRFCheck(dummy_get_response)
- # populates request.META['CSRF_COOKIE'], which is used in process_view()
- check.process_request(request)
- reason = check.process_view(request, None, (), {})
- if reason:
- # CSRF failed, bail with explicit error message
- raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
- class TokenAuthentication(BaseAuthentication):
- """
- Simple token based authentication.
- Clients should authenticate by passing the token key in the "Authorization"
- HTTP header, prepended with the string "Token ". For example:
- Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a
- """
- keyword = 'Token'
- model = None
- def get_model(self):
- if self.model is not None:
- return self.model
- from rest_framework.authtoken.models import Token
- return Token
- """
- A custom token model may be used, but must have the following properties.
- * key -- The string identifying the token
- * user -- The user to which the token belongs
- """
- def authenticate(self, request):
- auth = get_authorization_header(request).split()
- if not auth or auth[0].lower() != self.keyword.lower().encode():
- return None
- if len(auth) == 1:
- msg = _('Invalid token header. No credentials provided.')
- raise exceptions.AuthenticationFailed(msg)
- elif len(auth) > 2:
- msg = _('Invalid token header. Token string should not contain spaces.')
- raise exceptions.AuthenticationFailed(msg)
- try:
- token = auth[1].decode()
- except UnicodeError:
- msg = _('Invalid token header. Token string should not contain invalid characters.')
- raise exceptions.AuthenticationFailed(msg)
- return self.authenticate_credentials(token)
- def authenticate_credentials(self, key):
- model = self.get_model()
- try:
- token = model.objects.select_related('user').get(key=key)
- except model.DoesNotExist:
- raise exceptions.AuthenticationFailed(_('Invalid token.'))
- if not token.user.is_active:
- raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
- return (token.user, token)
- def authenticate_header(self, request):
- return self.keyword
- class RemoteUserAuthentication(BaseAuthentication):
- """
- REMOTE_USER authentication.
- To use this, set up your web server to perform authentication, which will
- set the REMOTE_USER environment variable. You will need to have
- 'django.contrib.auth.backends.RemoteUserBackend in your
- AUTHENTICATION_BACKENDS setting
- """
- # Name of request header to grab username from. This will be the key as
- # used in the request.META dictionary, i.e. the normalization of headers to
- # all uppercase and the addition of "HTTP_" prefix apply.
- header = "REMOTE_USER"
- def authenticate(self, request):
- user = authenticate(request=request, remote_user=request.META.get(self.header))
- if user and user.is_active:
- return (user, None)
|