Coverage for src/service/user_service.py : 71%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from flask import current_app
2from flask_jwt_extended import get_jwt_claims
3from sqlalchemy.orm import subqueryload
4import requests
6from src import db
7from settings import ENGINE_APIKEY, ENGINE_URL
8from src.utils import err_resp, message, pagination_resp, internal_err_resp, Paginator
9from src.model import UserModel, GenreModel
10from src.schemas import UserBase, UserObject, UserFullObject, GenreBase
13class UserService:
14 @staticmethod
15 def search_user_data(search_term, page, connected_user_uuid):
16 """ Search user data by username """
17 if not (UserModel.query.filter_by(uuid=connected_user_uuid).first()):
18 return err_resp("User not found!", 404)
19 users, total_pages = Paginator.get_from(
20 UserModel.query.filter(UserModel.username.ilike(search_term+"%")).filter(UserModel.password_hash != 'no_pwd').union(
21 UserModel.query.filter(UserModel.username.ilike("%"+search_term+"%"))),
22 page,
23 )
25 try:
26 user_data = UserBase.loads(users)
28 return pagination_resp(
29 message="Track data sent",
30 content=user_data,
31 page=page,
32 total_pages=total_pages
33 )
35 except Exception as error:
36 current_app.logger.error(error)
37 return internal_err_resp()
39 @staticmethod
40 def get_user_data(uuid, connected_user_uuid):
41 """ Get user's data by uuid """
42 if not (user := UserModel.query.filter_by(uuid=uuid).first()):
43 return err_resp("User not found!", 404)
45 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()):
46 return err_resp("User not found!", 404)
48 try:
49 user_data = UserObject.load(user)
51 resp = message(True, "User data sent")
52 resp["user"] = user_data
53 return resp, 200
55 except Exception as error:
56 current_app.logger.error(error)
57 return internal_err_resp()
59 @staticmethod
60 def export_all_user_data(uuid):
61 """ Get user's data by uuid """
62 if not (user := UserModel.query.filter_by(uuid=uuid).first()):
63 return err_resp("User not found!", 404)
65 try:
66 user_data = UserFullObject.load(user)
68 resp = message(True, "User data exported")
69 resp["user"] = user_data
70 return resp, 200
72 except Exception as error:
73 current_app.logger.error(error)
74 return internal_err_resp()
76 @staticmethod
77 def get_genres(user_uuid):
78 """ Get user liked genre list """
79 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()):
80 return err_resp("User not found!", 404)
82 try:
83 genres_data = GenreBase.loads(user.liked_genres)
85 resp = message(True, "User liked genre sent")
86 resp["content"] = genres_data
87 return resp, 200
88 except Exception as error:
89 current_app.logger.error(error)
90 return internal_err_resp()
92 @staticmethod
93 def like_genre(genre_id, user_uuid):
94 """" Like a genre """
95 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()):
96 return err_resp("User not found!", 404)
98 # Check permissions
99 permissions = get_jwt_claims()['permissions']
100 if "modify_user_profil" not in permissions:
101 return err_resp("Permission missing", 403)
103 if not (genre := GenreModel.query.filter_by(genre_id=genre_id).first()):
104 return err_resp("Genre not found!", 404)
106 try:
107 user.liked_genres.append(genre)
109 db.session.add(user)
110 db.session.commit()
112 resp = message(True, "User liked this genre")
113 return resp, 201
114 except Exception as error:
115 current_app.logger.error(error)
116 return internal_err_resp()
118 @staticmethod
119 def unlike_genre(genre_id, user_uuid):
120 """" Unlike a genre """
121 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()):
122 return err_resp("User not found!", 404)
124 # Check permissions
125 permissions = get_jwt_claims()['permissions']
126 if "modify_user_profil" not in permissions:
127 return err_resp("Permission missing", 403)
129 if not (genre := GenreModel.query.filter_by(genre_id=genre_id).first()):
130 return err_resp("Genre not found!", 404)
132 if genre not in user.liked_genres:
133 return err_resp("You didn't like this genre", 400)
135 try:
136 user.liked_genres.remove(genre)
138 db.session.add(user)
139 db.session.commit()
141 resp = message(True, "User liked this genre")
142 return resp, 201
143 except Exception as error:
144 current_app.logger.error(error)
145 return internal_err_resp()
147 @staticmethod
148 def update_user_data(user_uuid, connected_user_uuid, data):
149 """ Update user data username - email - password """
150 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()):
151 return err_resp("User not found!", 404)
153 # Check permissions
154 permissions = get_jwt_claims()['permissions']
155 if "modify_user_profil" not in permissions:
156 return err_resp("Permission missing", 403)
158 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()):
159 return err_resp("User not found!", 404)
161 if str(user_uuid) != connected_user_uuid:
162 return err_resp("Unable to update an account which is not your's", 403)
164 try:
165 if 'username' in data.keys():
166 user.username = data['username']
167 if 'password' in data.keys():
168 user.password = data['password']
169 if 'email' in data.keys():
170 user.email = data['email']
172 db.session.add(user)
173 db.session.commit()
175 resp = message(True, "User updated successfully")
176 return resp, 201
178 except Exception as error:
179 current_app.logger.error(error)
180 return internal_err_resp()
182 @staticmethod
183 def set_preferences_defined(connected_user_uuid):
184 """ """
185 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()):
186 return err_resp("User not found!", 404)
188 # Check permissions
189 permissions = get_jwt_claims()['permissions']
190 if "modify_user_profil" not in permissions:
191 return err_resp("Permission missing", 403)
193 try:
194 user.preferences_defined = True
196 db.session.add(user)
197 db.session.commit()
199 # Send request to reco_engine
200 requests.put('%s/recommend/%s' % (ENGINE_URL, user.uuid),
201 headers={'X-API-TOKEN': ENGINE_APIKEY})
203 resp = message(True, "User updated successfully")
204 return resp, 201
206 except Exception as error:
207 current_app.logger.error(error)
208 return internal_err_resp()
210 @staticmethod
211 def delete_account(user_uuid, connected_user_uuid):
212 """" Delete user account """
213 if not (UserModel.query.filter_by(uuid=user_uuid).first()):
214 return err_resp("User not found!", 404)
216 # Check permissions
217 permissions = get_jwt_claims()['permissions']
218 if "modify_user_profil" not in permissions:
219 return err_resp("Permission missing", 403)
221 if not (UserModel.query.filter_by(uuid=connected_user_uuid).first()):
222 return err_resp("User not found!", 404)
224 if str(user_uuid) != str(connected_user_uuid):
225 return err_resp("Unable to delete an account which is not your's", 403)
227 try:
228 UserModel.query.filter_by(uuid=user_uuid).delete()
230 db.session.commit()
232 resp = message(True, "User account deleted successfully")
233 return resp, 201
235 except Exception as error:
236 current_app.logger.error(error)
237 return internal_err_resp()