Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from settings import REASON_CATEGORIES 

2from flask import current_app 

3from flask_jwt_extended import get_jwt_claims 

4from sqlalchemy import func, text, select, and_ 

5from sqlalchemy.sql.expression import null 

6 

7from src import db, settings 

8from src.utils import pagination_resp, internal_err_resp, message, Paginator, err_resp 

9from src.model import MovieModel, GenreModel, ContentType, UserModel, ContentModel, RecommendedContentModel, RecommendedContentForGroupModel, MetaUserContentModel, BadRecommendationContentModel, MovieAdditionalModel 

10from src.schemas import MovieBase, MovieObject, GenreBase, MovieExtra, MetaUserContentBase, MovieAdditionalBase 

11 

12 

13class MovieService: 

14 @staticmethod 

15 def search_movie_data(search_term, page, connected_user_uuid): 

16 """ Search movie data by title """ 

17 if not (UserModel.query.filter_by(uuid=connected_user_uuid).first()): 

18 return err_resp("User not found!", 404) 

19 movies, total_pages = Paginator.get_from( 

20 MovieModel.query.filter(MovieModel.title.ilike(search_term+"%")).union( 

21 MovieModel.query.filter(MovieModel.title.ilike("%"+search_term+"%"))), 

22 page, 

23 ) 

24 

25 try: 

26 movie_data = MovieBase.loads(movies) 

27 

28 return pagination_resp( 

29 message="Movie data sent", 

30 content=movie_data, 

31 page=page, 

32 total_pages=total_pages 

33 ) 

34 

35 except Exception as error: 

36 current_app.logger.error(error) 

37 return internal_err_resp() 

38 

39 @staticmethod 

40 def get_popular_movies(page, connected_user_uuid): 

41 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()): 

42 return err_resp("User not found!", 404) 

43 

44 movie, total_pages = Paginator.get_from( 

45 MovieModel.query.join(MovieModel.content, aliased=True).order_by( 

46 ContentModel.popularity_score.desc().nullslast(), 

47 ), 

48 page, 

49 ) 

50 

51 try: 

52 movie_data = MovieObject.loads(movie) 

53 

54 return pagination_resp( 

55 message="Most popular movie data sent", 

56 content=movie_data, 

57 page=page, 

58 total_pages=total_pages 

59 ) 

60 

61 except Exception as error: 

62 current_app.logger.error(error) 

63 return internal_err_resp() 

64 

65 @staticmethod 

66 def get_recommended_movies_for_user(page, connected_user_uuid, reco_engine): 

67 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()): 

68 return err_resp("User not found!", 404) 

69 

70 filters = [RecommendedContentModel.user_id == user.user_id] 

71 if reco_engine is not None: 

72 filters.append(RecommendedContentModel.engine == reco_engine) 

73 

74 movies, total_pages = Paginator.get_from( 

75 db.session.query(RecommendedContentModel, MovieModel) 

76 .join(MovieModel.content) 

77 .join(RecommendedContentModel, RecommendedContentModel.content_id == ContentModel.content_id) 

78 .filter( 

79 and_(*filters) 

80 ) 

81 .order_by( 

82 RecommendedContentModel.score.desc().nullslast(), 

83 ContentModel.popularity_score.desc().nullslast(), 

84 ), 

85 page, 

86 ) 

87 

88 try: 

89 def c_load(row): 

90 app = MovieExtra.load(row[1]) 

91 app["reco_engine"] = row[0].engine 

92 app["reco_score"] = row[0].score 

93 return app 

94 

95 movie_data = list(map(c_load, movies)) 

96 

97 return pagination_resp( 

98 message="Most popular movie data sent", 

99 content=movie_data, 

100 page=page, 

101 total_pages=total_pages 

102 ) 

103 

104 except Exception as error: 

105 current_app.logger.error(error) 

106 return internal_err_resp() 

107 

108 @staticmethod 

109 def get_recommended_movies_for_group(page, connected_user_uuid, reco_engine): 

110 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()): 

111 return err_resp("User not found!", 404) 

112 

113 # Query for recommendation from group 

114 groups_ids = [ 

115 *list(map(lambda x: x.group_id, user.groups)), 

116 *list(map(lambda x: x.group_id, user.owned_groups)) 

117 ] 

118 

119 filters = [RecommendedContentForGroupModel.group_id.in_(groups_ids)] 

120 if reco_engine is not None: 

121 filters.append(RecommendedContentModel.engine == reco_engine) 

122 

123 movies, total_pages = Paginator.get_from( 

124 db.session.query(RecommendedContentForGroupModel, MovieModel) 

125 .join(MovieModel.content) 

126 .join(RecommendedContentForGroupModel, RecommendedContentForGroupModel.content_id == ContentModel.content_id) 

127 .filter( 

128 and_(*filters) 

129 ) 

130 .order_by( 

131 RecommendedContentModel.score.desc().nullslast(), 

132 ContentModel.popularity_score.desc().nullslast(), 

133 ), 

134 page, 

135 ) 

136 

137 try: 

138 def c_load(row): 

139 app = MovieExtra.load(row[1]) 

140 app["reco_engine"] = row[0].engine 

141 app["reco_score"] = row[0].score 

142 return app 

143 

144 movie_data = list(map(c_load, movies)) 

145 

146 return pagination_resp( 

147 message="Most popular movie data sent", 

148 content=movie_data, 

149 page=page, 

150 total_pages=total_pages 

151 ) 

152 

153 except Exception as error: 

154 current_app.logger.error(error) 

155 return internal_err_resp() 

156 

157 @staticmethod 

158 def get_ordered_genre(connected_user_uuid): 

159 if not (UserModel.query.filter_by(uuid=connected_user_uuid).first()): 

160 return err_resp("User not found!", 404) 

161 genres = GenreModel.query.filter_by( 

162 content_type=ContentType.MOVIE).order_by(GenreModel.count.desc()).all() 

163 

164 try: 

165 genres_data = GenreBase.loads(genres) 

166 

167 resp = message(True, "Movie genres data sent") 

168 resp["content"] = genres_data 

169 return resp, 200 

170 

171 except Exception as error: 

172 current_app.logger.error(error) 

173 return internal_err_resp() 

174 

175 @staticmethod 

176 def add_bad_recommendation(user_uuid, content_id, data): 

177 """ Add bad user recommendation """ 

178 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()): 

179 return err_resp("User not found!", 404) 

180 

181 if not (movie := MovieModel.query.filter_by(content_id=content_id).first()): 

182 return err_resp("Movie not found!", 404) 

183 

184 try: 

185 for type, value in data.items(): 

186 if type in REASON_CATEGORIES['movie']: 

187 for r in value: 

188 

189 new_bad_reco = BadRecommendationContentModel( 

190 user_id=user.user_id, 

191 content_id=movie.content_id, 

192 reason_categorie=type, 

193 reason=r 

194 ) 

195 

196 db.session.add(new_bad_reco) 

197 db.session.flush() 

198 db.session.commit() 

199 

200 resp = message(True, "Bad recommendation has been registered.") 

201 return resp, 201 

202 

203 except Exception as error: 

204 current_app.logger.error(error) 

205 return internal_err_resp() 

206 

207 @staticmethod 

208 def add_additional_movie(user_uuid, data): 

209 """ Add additional movie""" 

210 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()): 

211 return err_resp("User not found!", 404) 

212 

213 # Check permissions 

214 permissions = get_jwt_claims()['permissions'] 

215 if "add_content" not in permissions: 

216 return err_resp("Permission missing", 403) 

217 

218 try: 

219 

220 new_additional_movie = MovieAdditionalModel( 

221 title=data['title'], 

222 ) 

223 

224 if 'language' in data: 

225 new_additional_movie.language = data['language'] 

226 if 'actors' in data: 

227 new_additional_movie.actors = data['actors'] 

228 if 'year' in data: 

229 new_additional_movie.year = data['year'] 

230 if 'producers' in data: 

231 new_additional_movie.producers = data['producers'] 

232 if 'director' in data: 

233 new_additional_movie.director = data['director'] 

234 if 'writer' in data: 

235 new_additional_movie.writer = data['writer'] 

236 if 'imdbid' in data: 

237 new_additional_movie.imdbid = data['imdbid'] 

238 if 'tmdbid' in data: 

239 new_additional_movie.tmdbid = data['tmdbid'] 

240 if 'cover' in data: 

241 new_additional_movie.cover = data['cover'] 

242 if 'plot_outline' in data: 

243 new_additional_movie.plot_outline = data['plot_outline'] 

244 

245 for genre_id in data["genres"]: 

246 if (ge := GenreModel.query.filter_by(genre_id=genre_id).first()): 

247 new_additional_movie.genres.append(ge) 

248 else: 

249 return err_resp("Genre %s not found!" % genre_id, 404) 

250 

251 db.session.add(new_additional_movie) 

252 db.session.commit() 

253 

254 resp = message(True, "Movie have been added to validation.") 

255 return resp, 201 

256 

257 except Exception as error: 

258 current_app.logger.error(error) 

259 return internal_err_resp() 

260 

261 

262 @staticmethod 

263 def get_additional_movie(connected_user_uuid, page): 

264 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()): 

265 return err_resp("User not found!", 404) 

266 

267 # Check permissions 

268 permissions = get_jwt_claims()['permissions'] 

269 if "add_content" not in permissions: 

270 return err_resp("Permission missing", 403) 

271 

272 movies, total_pages = Paginator.get_from( 

273 MovieAdditionalModel.query, 

274 page, 

275 ) 

276 

277 try: 

278 movie_data = MovieAdditionalBase.loads(movies) 

279 

280 return pagination_resp( 

281 message="Additional movie data sent", 

282 content=movie_data, 

283 page=page, 

284 total_pages=total_pages 

285 ) 

286 

287 except Exception as error: 

288 current_app.logger.error(error) 

289 return internal_err_resp() 

290 

291 @staticmethod 

292 def validate_additional_movie(connected_user_uuid, movie_id): 

293 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()): 

294 return err_resp("User not found!", 404) 

295 

296 # Check permissions 

297 permissions = get_jwt_claims()['permissions'] 

298 if "validate_added_content" not in permissions: 

299 return err_resp("Permission missing", 403) 

300 

301 if not (movie := MovieAdditionalModel.query.filter_by(movie_id=movie_id).first()): 

302 return err_resp("Additional movie not found!", 404) 

303 

304 try: 

305 content = ContentModel(rating=None, genres=movie.genres) 

306 db.session.add(content) 

307 db.session.flush() 

308 

309 new_movie = MovieModel( 

310 title=movie.title, 

311 language=movie.language, 

312 actors=movie.actors, 

313 year=movie.year, 

314 producers=movie.producers, 

315 director=movie.director, 

316 writer=movie.writer, 

317 imdbid=movie.imdbid, 

318 tmdbid=movie.tmdbid, 

319 cover=movie.cover, 

320 plot_outline=movie.plot_outline, 

321 content=content 

322 ) 

323 db.session.add(new_movie) 

324 db.session.delete(movie) 

325 

326 db.session.commit() 

327 

328 resp = message( 

329 True, "Additional movie data successfully validated") 

330 return resp, 201 

331 

332 except Exception as error: 

333 current_app.logger.error(error) 

334 return internal_err_resp() 

335 

336 @staticmethod 

337 def decline_additional_movie(connected_user_uuid, movie_id): 

338 if not (user := UserModel.query.filter_by(uuid=connected_user_uuid).first()): 

339 return err_resp("User not found!", 404) 

340 

341 # Check permissions 

342 permissions = get_jwt_claims()['permissions'] 

343 if "delete_content" not in permissions: 

344 return err_resp("Permission missing", 403) 

345 

346 if not (movie := MovieAdditionalModel.query.filter_by(movie_id=movie_id).first()): 

347 return err_resp("Additional movie not found!", 404) 

348 

349 try: 

350 db.session.delete(movie) 

351 db.session.commit() 

352 

353 resp = message(True, "Additional movie successfully deleted") 

354 return resp, 201 

355 

356 except Exception as error: 

357 current_app.logger.error(error) 

358 return internal_err_resp()