Coverage for src/service/auth_service.py : 82%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from flask import current_app, jsonify, make_response
2from flask_jwt_extended import create_access_token, set_access_cookies, decode_token
4import datetime
6from src import db
7from src.utils import message, err_resp, internal_err_resp, validation_error, mailjet
8from src.model import UserModel, RevokedTokenModel, RoleModel
9from src.schemas import UserBase
10from settings import URL_FRONT
12user_base = UserBase()
15class AuthService:
16 @staticmethod
17 def login(data):
18 # Assign vars
19 email = data["email"]
20 password = data["password"]
22 try:
23 # Fetch user data
24 if not (user := UserModel.query.filter_by(email=email).first()):
25 return err_resp(
26 "Failed to log in.",
27 401,
28 )
30 elif user and user.verify_password(password):
31 user_info = user_base.dump(user)
33 access_token = create_access_token(identity=user)
35 resp = message(True, "Successfully logged in.")
36 resp["user"] = user_info
37 resp["access_token"] = access_token
39 return resp, 200
41 return err_resp(
42 "Failed to log in.", 401
43 )
45 except Exception as error:
46 current_app.logger.error(error)
47 return internal_err_resp()
49 @staticmethod
50 def register(data):
51 # Assign vars
53 # Required values
54 email = data["email"]
55 username = data["username"]
56 password = data["password"]
58 # Check if the email is taken
59 if UserModel.query.filter_by(email=email).first() is not None:
60 return validation_error(False, "Email is already being used.")
62 try:
63 new_user = UserModel(
64 email=email,
65 username=username,
66 password=password,
67 )
68 default_role = RoleModel.query.filter_by(name="user").first()
69 if default_role:
70 new_user.role.append(default_role)
72 db.session.add(new_user)
73 db.session.commit()
75 # Load the new user's info
76 user_info = user_base.dump(new_user)
78 # Send welcome email
79 mailjet.sendNewAccount(new_user, URL_FRONT)
80 # Create an access token
81 access_token = create_access_token(identity=new_user)
83 resp = message(True, "User has been registered.")
84 resp["user"] = user_info
85 resp["access_token"] = access_token
87 return resp, 201
89 except Exception as error:
90 current_app.logger.error(error)
91 return internal_err_resp()
93 @staticmethod
94 def logout(data):
95 jti = data['jti']
96 try:
97 resp = make_response("", 204)
99 revoked_token = RevokedTokenModel(jti=jti)
101 db.session.add(revoked_token)
102 db.session.commit()
104 return resp
105 except Exception as error:
106 current_app.logger.error(error)
107 return internal_err_resp()
109 @staticmethod
110 def forget(email):
111 try:
112 # Fetch user data
113 if user := UserModel.query.filter_by(email=email).first():
115 expires = datetime.timedelta(hours=24)
116 reset_token = create_access_token(
117 identity=user, expires_delta=expires)
119 mailjet.sendForget(user, URL_FRONT+"/app/reset", reset_token)
121 resp = message(
122 True, "If your account exist, you will find an email to recover your password in your mailbox")
123 return resp, 200
124 except Exception as error:
125 current_app.logger.error(error)
126 return internal_err_resp()
128 @staticmethod
129 def reset(data):
130 reset_token = data['reset_password_token']
131 password = data['password']
132 uuid = decode_token(reset_token)['identity']
133 try:
134 # Fetch user data
135 if not (user := UserModel.query.filter_by(uuid=uuid).first()):
136 return err_resp(
137 "Invalid token.",
138 401,
139 )
141 user.password = password
142 if (mailjet.sendReset(user, URL_FRONT) == "error"):
143 return make_response("Something went wrong while sending the password reset confirmation email", 400)
145 db.session.add(user)
146 db.session.commit()
147 resp = message(True, "Password reset successfully")
148 return resp, 200
150 except Exception as error:
151 current_app.logger.error(error)
152 return internal_err_resp()