Source code for users_api.models

from __future__ import annotations

import uuid
import logging

from datetime import timedelta
from typing import Any, cast, Callable
from typing import Optional, Type, Union
from functools import wraps

from django.db import models, transaction
from django.core.mail import send_mail
from django.core.exceptions import ObjectDoesNotExist, ValidationError

from django.contrib.auth.models import \
    User, \
    AbstractBaseUser, \
    BaseUserManager, \
    PermissionsMixin

from django.contrib.sessions.models import Session
from django.utils import timezone
from django.contrib.auth.models import Group
from django.db.models.signals import m2m_changed, post_delete, pre_delete

from oauth2_provider.models import \
    get_application_model, Application

from datasets_api.permissions import get_directly_allowed_genotype_data

logger = logging.getLogger(__name__)


[docs]class WdaeUserManager(BaseUserManager): """User manager for wdae users.""" def _create_user( self, email: str, password: Optional[str], **kwargs: Any ) -> WdaeUser: """Create and save a User with the given email and password.""" if not email: raise ValueError("The given email must be set") email = self.normalize_email(email) email = email.lower() user = cast(WdaeUser, self.model(email=email, **kwargs)) user.set_password(password) user.save(using=self._db) return user
[docs] def get_or_create( # type: ignore self, **kwargs: Any ) -> tuple[WdaeUser, bool]: try: return cast(WdaeUser, self.get(**kwargs)), False except WdaeUser.DoesNotExist: # pylint: disable=no-member return self.create_user(**kwargs), True
[docs] def create(self, **kwargs: Any) -> WdaeUser: # type: ignore return self.create_user(**kwargs)
[docs] def create_user( self, email: str, password: Optional[str] = None, **kwargs: Any ) -> WdaeUser: user = self._create_user(email, password, **kwargs) return user
[docs] def create_superuser( self, email: str, password: str, **kwargs: Any ) -> WdaeUser: """Create and save a superuser.""" user = self._create_user(email, password, **kwargs) user.is_superuser = True user.is_active = True user.is_staff = True user.save() return user
[docs]class WdaeUser(AbstractBaseUser, PermissionsMixin): """Class representing a user in wdae.""" name: models.CharField = models.CharField(max_length=100) email: models.EmailField = models.EmailField(unique=True) is_staff: models.BooleanField = models.BooleanField(default=False) is_active: models.BooleanField = models.BooleanField(default=False) date_joined: models.DateTimeField = models.DateTimeField(null=True) USERNAME_FIELD = "email" REQUIRED_FIELDS = ["name"] SUPERUSER_GROUP = "admin" UMLIMITED_DOWNLOAD_GROUP = "unlimited" objects = WdaeUserManager() @property def has_unlimited_download(self) -> bool: return self.groups.filter( # pylint: disable=no-member name=self.UMLIMITED_DOWNLOAD_GROUP).count() > 0 @property def allowed_datasets(self) -> list[dict[str, Any]]: return get_directly_allowed_genotype_data(cast(User, self))
[docs] def email_user( self, subject: str, message: str, from_email: Optional[str] = None ) -> int: """Send an email to the user.""" # pylint: disable=import-outside-toplevel from django.conf import settings if from_email is None: from_email = settings.DEFAULT_FROM_EMAIL # type: ignore override = None try: override = settings.EMAIL_OVERRIDE # type: ignore except Exception: # pylint: disable=broad-exception-caught logger.debug("no email override; sending email") override = None if override: to_email = override else: to_email = self.email mail = send_mail(subject, message, from_email, [to_email]) logger.info("email sent: to: <%s>", str(self.email)) logger.info("email sent: from: <%s>", str(from_email)) logger.info("email sent: subject: %s", str(subject)) logger.info("email sent: message: %s", str(message)) return mail
[docs] def set_password(self, raw_password: Optional[str]) -> None: super().set_password(raw_password) has_password = bool(raw_password) if self.is_active != has_password: self.is_active = has_password
[docs] def set_unusable_password(self) -> None: super().set_unusable_password() if self.is_active: self.is_active = False
[docs] def reset_password(self, by_admin: bool = False) -> None: verif_code = ResetPasswordCode.create(self) send_reset_email(self, verif_code, by_admin)
[docs] def deauthenticate(self) -> None: all_sessions = Session.objects.all() for session in all_sessions: session_data = session.get_decoded() if self.pk == session_data.get("_auth_user_id"): session.delete()
[docs] def register_preexisting_user(self, name: Optional[str]) -> None: """Register already existing user.""" if self.is_active: send_already_existing_email(self) else: self.date_joined = timezone.now() if name is not None and name != "": self.name = name verif_code = SetPasswordCode.create(self) send_verif_email(self, verif_code) self.save()
[docs] @staticmethod def change_password( verification_path: Union[SetPasswordCode, ResetPasswordCode], new_password: str ) -> WdaeUser: """Initiate password reset for the user.""" user = verification_path.user user.set_password(new_password) user.save() # Reset account lockout AuthenticationLog( email=user.email, time=timezone.now(), failed_attempt=0 ).save() verification_path.delete() return cast(WdaeUser, user)
def __str__(self) -> str: return str(self.email) class Meta: # pylint: disable=too-few-public-methods db_table = "users"
[docs]class BaseVerificationCode(models.Model): """Base class for temporary codes for verifying the user without login.""" path: models.Field = models.CharField(max_length=255, unique=True) user: models.Field = models.OneToOneField( WdaeUser, on_delete=models.CASCADE) created_at: models.Field = models.DateTimeField(auto_now_add=True) def __str__(self) -> str: return str(self.path)
[docs] def validate(self) -> bool: raise NotImplementedError()
[docs] class Meta: # pylint: disable=too-few-public-methods abstract = True
[docs] @classmethod def get_code( cls, user: WdaeUser ) -> Optional[BaseVerificationCode]: """Get a verification code for a user.""" try: # pylint: disable=no-member return cast( BaseVerificationCode, cls.objects.get(user=user)) # type: ignore except ObjectDoesNotExist: return None
[docs] @classmethod def create(cls, user: WdaeUser) -> BaseVerificationCode: """Create an email verification code.""" try: # pylint: disable=no-member verif_code = cls.objects.get(user=user) # type: ignore except ObjectDoesNotExist: # pylint: disable=no-member verif_code = cls.objects.create( # type: ignore user=user, path=uuid.uuid4()) return cast(BaseVerificationCode, verif_code) if verif_code.validate is not True: verif_code.delete() return cls.create(user) return cast(BaseVerificationCode, verif_code)
[docs]class SetPasswordCode(BaseVerificationCode): """Base class for temporary paths for verifying user without login.""" class Meta: # pylint: disable=too-few-public-methods db_table = "set_password_verification_codes"
[docs] def validate(self) -> bool: return True
[docs]class ResetPasswordCode(BaseVerificationCode): """Class used for verification of password resets.""" class Meta: # pylint: disable=too-few-public-methods db_table = "reset_password_verification_codes"
[docs] def validate(self) -> bool: # pylint: disable=import-outside-toplevel from django.conf import settings max_delta = timedelta( hours=getattr(settings, "RESET_PASSWORD_TIMEOUT_HOURS", 24)) if timezone.now() - self.created_at > max_delta: return False return True
[docs]class AuthenticationLog(models.Model): """A model to keep track of all requests for authentication. Which email was used, when they were made and what number of consecutive failed attempts have been made on this email. The failed attempt counter is reset on a succesful login or a changed password. """ email: models.EmailField = models.EmailField() time: models.DateTimeField = models.DateTimeField() failed_attempt: models.IntegerField = models.IntegerField() class Meta: # pylint: disable=too-few-public-methods db_table = "authentication_log"
[docs] @staticmethod def get_last_login_for(email: str) -> Optional[AuthenticationLog]: """Get the latest authentication attempt for a specified email.""" query = AuthenticationLog.objects.filter( # pylint: disable=no-member email__iexact=email ).order_by("-time", "-failed_attempt") try: result = query[0] except IndexError: result = None return result
[docs] @staticmethod def is_user_locked_out(email: str) -> bool: last_login = AuthenticationLog.get_last_login_for(email) return ( last_login is not None and last_login.failed_attempt > LOCKOUT_THRESHOLD and AuthenticationLog.get_remaining_lockout_time(email) > 0 )
[docs] @staticmethod def get_locked_out_error(email: str) -> ValidationError: seconds_left = AuthenticationLog.get_remaining_lockout_time(email) hours = int(seconds_left / 3600) minutes = int(seconds_left / 60) % 60 time_to_unlock = f"{hours} hours and {minutes} minutes" return ValidationError( "This account is locked out for %(time)s", code="locked_out", params={"time": time_to_unlock}, )
[docs] @staticmethod def get_remaining_lockout_time(email: str) -> float: """Get the remaining lockout time for a specified email.""" last_login = AuthenticationLog.get_last_login_for(email) if last_login is None: return 0 if last_login.failed_attempt is None: return 0 if last_login.time is None: return 0 assert last_login is not None assert last_login.time is not None assert last_login.failed_attempt is not None current_time = timezone.now().replace(microsecond=0) lockout_time = \ pow(2, int(last_login.failed_attempt) - LOCKOUT_THRESHOLD) return float( ( - (current_time - last_login.time) + timedelta(minutes=lockout_time) ).total_seconds())
[docs] @staticmethod def log_authentication_attempt(email: str, failed: bool) -> None: """Log an attempt for authentication.""" last_login = AuthenticationLog.get_last_login_for(email) if failed: failed_attempt = last_login.failed_attempt if last_login else 0 failed_attempt += 1 else: failed_attempt = 0 login_attempt = AuthenticationLog( email=email, time=timezone.now().replace(microsecond=0), failed_attempt=failed_attempt ) login_attempt.save()
[docs]def staff_update( sender: Any, **kwargs: Any # pylint: disable=unused-argument ) -> None: """Update if user is part of staff when SUPERUSER_GROUP is added/rmed.""" for key in ["action", "instance", "reverse"]: if key not in kwargs: return if kwargs["action"] not in ["post_add", "post_remove", "post_clear"]: return if kwargs["reverse"]: users = WdaeUser.objects.filter(pk__in=kwargs["pk_set"]) else: users = [kwargs["instance"]] # type: ignore with transaction.atomic(): for user in users: should_be_staff = user.groups.filter( name=WdaeUser.SUPERUSER_GROUP ).exists() if user.is_staff != should_be_staff: user.is_staff = should_be_staff user.save()
[docs]def group_post_delete( sender: Type[Group], **kwargs: Any # pylint: disable=unused-argument ) -> None: """Automatically remove staff privileges of SUPERUSER_GROUP users. Automatically remove staff privileges of users belonging to the SUPERUSER_GROUP group if that group is deleted. """ if "instance" not in kwargs: return group = kwargs["instance"] if group.name != WdaeUser.SUPERUSER_GROUP: return if not hasattr(group, "_user_ids"): return with transaction.atomic(): # pylint: disable=protected-access for user in WdaeUser.objects.filter(pk__in=group._user_ids).all(): user.is_staff = False user.save()
# a hack to save the users the group had, used in the post_delete signal
[docs]def group_pre_delete( sender: Type[Group], **kwargs: Any # pylint: disable=unused-argument ) -> None: """Attach user-ids when a group is being deleted. When deleting a group, attaches the ids of the users who belonged to it in order to be used in the post_delete signal. Used only for the SUPERUSER_GROUP group. """ if "instance" not in kwargs: return group = kwargs["instance"] if group.name == WdaeUser.SUPERUSER_GROUP: # pylint: disable=protected-access group._user_ids = [u.pk for u in group.user_set.all()]
m2m_changed.connect( staff_update, WdaeUser.groups.through, # pylint: disable=no-member weak=False) post_delete.connect(group_post_delete, Group, weak=False) pre_delete.connect(group_pre_delete, Group, weak=False) LOCKOUT_THRESHOLD = 4
[docs]def csrf_clear(view_func: Callable) -> Any: """Skips the CSRF checks by setting the 'csrf_processing_done' to true.""" def wrapped_view(*args: Any, **kwargs: Any) -> Any: request = args[0] request.csrf_processing_done = True return view_func(*args, **kwargs) return wraps(view_func)(wrapped_view)
[docs]def get_default_application() -> Application: # pylint: disable=import-outside-toplevel from django.conf import settings client_id = settings.DEFAULT_OAUTH_APPLICATION_CLIENT model = get_application_model() return model.objects.get(client_id=client_id)
[docs]def send_verif_email(user: WdaeUser, verif_path: BaseVerificationCode) -> None: """Send a verification email to the user.""" # pylint: disable=import-outside-toplevel from django.conf import settings email = _create_verif_email( settings.EMAIL_VERIFICATION_ENDPOINT, # type: ignore settings.EMAIL_VERIFICATION_SET_PATH, str(verif_path.path), ) user.email_user(email["subject"], email["message"])
[docs]def send_already_existing_email(user: WdaeUser) -> None: """Send an email to already existing user.""" subject = "GPF: Attempted registration with email in use" message = ( "Hello. Someone has attempted to create an account in GPF " "using an email that your account was registered with. " "If this was you, you can simply log in to your existing account, " "or if you've forgotten your password, you can reset it " "by using the 'Forgotten password' button on the login window. \n" "Otherwise, please ignore this email." ) user.email_user(subject, message)
[docs]def send_reset_inactive_acc_email(user: WdaeUser) -> None: """Send an email to an inactive user.""" subject = "GPF: Password reset for inactive account" message = ( "Hello. You've requested a password reset for an inactive account. " "You must first finish your registration by following the " "account validation link in the email you received when registering. " "If you have lost that email or the link in it has expired, you can " "register again to get a new validation email sent. \n" "If you did not request this, please ignore this email." ) user.email_user(subject, message)
[docs]def send_reset_email( user: WdaeUser, verif_path: BaseVerificationCode, by_admin: bool = False ) -> None: """Return dict with subject and message of the email.""" # pylint: disable=import-outside-toplevel from django.conf import settings email = _create_reset_mail( settings.EMAIL_VERIFICATION_ENDPOINT, # type: ignore settings.EMAIL_VERIFICATION_RESET_PATH, str(verif_path.path), by_admin, ) user.email_user(email["subject"], email["message"])
def _create_verif_email( endpoint: str, path: str, verification_path: str ) -> dict[str, str]: message = ( "Welcome to GPF: Genotype and Phenotype in Families! " "Follow the link below to validate your new account " "and set your password:\n {link}" ) email_settings = { "subject": "GPF: Registration validation", "initial_message": message, "endpoint": endpoint, "path": path, "verification_path": verification_path, } return _build_email_template(email_settings) def _create_reset_mail( endpoint: str, path: str, verification_path: str, by_admin: bool = False ) -> dict[str, str]: message = ( "Hello. You have requested to reset your password for " "your GPF account. To do so, please follow the link below:\n {link}\n" "If you did not request for your GPF account password to be reset, " "please ignore this email." ) if by_admin: message = ( "Hello. Your password has been reset by an admin. Your old " "password will not work. To set a new password in " "GPF: Genotype and Phenotype in Families " "please follow the link below:\n {link}" ) email_settings = { "subject": "GPF: Password reset request", "initial_message": message, "endpoint": endpoint, "path": path, "verification_path": verification_path, } return _build_email_template(email_settings) def _build_email_template(email_settings: dict[str, str]) -> dict[str, str]: subject = email_settings["subject"] message = email_settings["initial_message"] path = email_settings["path"].format(email_settings["verification_path"]) message = message.format(link=f"{email_settings['endpoint']}{path}") return {"subject": subject, "message": message}