Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from flask import current_app, jsonify, make_response 

2from flask_jwt_extended import create_access_token, set_access_cookies, decode_token 

3 

4import datetime 

5 

6from src import db 

7from src.utils import message, err_resp, internal_err_resp, validation_error, mailjet 

8from src.model import UserModel, RevokedTokenModel, RoleModel 

9from src.schemas import UserBase 

10from settings import URL_FRONT 

11 

12user_base = UserBase() 

13 

14 

15class AuthService: 

16 @staticmethod 

17 def login(data): 

18 # Assign vars 

19 email = data["email"] 

20 password = data["password"] 

21 

22 try: 

23 # Fetch user data 

24 if not (user := UserModel.query.filter_by(email=email).first()): 

25 return err_resp( 

26 "Failed to log in.", 

27 401, 

28 ) 

29 

30 elif user and user.verify_password(password): 

31 user_info = user_base.dump(user) 

32 

33 access_token = create_access_token(identity=user) 

34 

35 resp = message(True, "Successfully logged in.") 

36 resp["user"] = user_info 

37 resp["access_token"] = access_token 

38 

39 return resp, 200 

40 

41 return err_resp( 

42 "Failed to log in.", 401 

43 ) 

44 

45 except Exception as error: 

46 current_app.logger.error(error) 

47 return internal_err_resp() 

48 

49 @staticmethod 

50 def register(data): 

51 # Assign vars 

52 

53 # Required values 

54 email = data["email"] 

55 username = data["username"] 

56 password = data["password"] 

57 

58 # Check if the email is taken 

59 if UserModel.query.filter_by(email=email).first() is not None: 

60 return validation_error(False, "Email is already being used.") 

61 

62 try: 

63 new_user = UserModel( 

64 email=email, 

65 username=username, 

66 password=password, 

67 ) 

68 default_role = RoleModel.query.filter_by(name="user").first() 

69 if default_role: 

70 new_user.role.append(default_role) 

71 

72 db.session.add(new_user) 

73 db.session.commit() 

74 

75 # Load the new user's info 

76 user_info = user_base.dump(new_user) 

77 

78 # Send welcome email 

79 mailjet.sendNewAccount(new_user, URL_FRONT) 

80 # Create an access token 

81 access_token = create_access_token(identity=new_user) 

82 

83 resp = message(True, "User has been registered.") 

84 resp["user"] = user_info 

85 resp["access_token"] = access_token 

86 

87 return resp, 201 

88 

89 except Exception as error: 

90 current_app.logger.error(error) 

91 return internal_err_resp() 

92 

93 @staticmethod 

94 def logout(data): 

95 jti = data['jti'] 

96 try: 

97 resp = make_response("", 204) 

98 

99 revoked_token = RevokedTokenModel(jti=jti) 

100 

101 db.session.add(revoked_token) 

102 db.session.commit() 

103 

104 return resp 

105 except Exception as error: 

106 current_app.logger.error(error) 

107 return internal_err_resp() 

108 

109 @staticmethod 

110 def forget(email): 

111 try: 

112 # Fetch user data 

113 if user := UserModel.query.filter_by(email=email).first(): 

114 

115 expires = datetime.timedelta(hours=24) 

116 reset_token = create_access_token( 

117 identity=user, expires_delta=expires) 

118 

119 mailjet.sendForget(user, URL_FRONT+"/app/reset", reset_token) 

120 

121 resp = message( 

122 True, "If your account exist, you will find an email to recover your password in your mailbox") 

123 return resp, 200 

124 except Exception as error: 

125 current_app.logger.error(error) 

126 return internal_err_resp() 

127 

128 @staticmethod 

129 def reset(data): 

130 reset_token = data['reset_password_token'] 

131 password = data['password'] 

132 uuid = decode_token(reset_token)['identity'] 

133 try: 

134 # Fetch user data 

135 if not (user := UserModel.query.filter_by(uuid=uuid).first()): 

136 return err_resp( 

137 "Invalid token.", 

138 401, 

139 ) 

140 

141 user.password = password 

142 if (mailjet.sendReset(user, URL_FRONT) == "error"): 

143 return make_response("Something went wrong while sending the password reset confirmation email", 400) 

144 

145 db.session.add(user) 

146 db.session.commit() 

147 resp = message(True, "Password reset successfully") 

148 return resp, 200 

149 

150 except Exception as error: 

151 current_app.logger.error(error) 

152 return internal_err_resp()