Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from flask import current_app 

2 

3from src import db 

4from src.utils import err_resp, message, internal_err_resp, Paginator, err_resp 

5from src.model import GroupModel, UserModel 

6from src.schemas import GroupObject 

7 

8 

9class GroupService: 

10 @staticmethod 

11 def get_group_data(group_id): 

12 """ Get group's data by id """ 

13 if not (group := GroupModel.query.filter_by(group_id=group_id).first()): 

14 return err_resp("Group not found!", 404) 

15 

16 try: 

17 group_data = GroupObject.load(group) 

18 

19 resp = message(True, "Group data sent") 

20 resp["group"] = group_data 

21 return resp, 200 

22 

23 except Exception as error: 

24 current_app.logger.error(error) 

25 return internal_err_resp() 

26 

27 @staticmethod 

28 def create_group(group_name, creator_uuid): 

29 """ Create group object """ 

30 try: 

31 group = GroupModel(name=group_name) 

32 if not (user := UserModel.query.filter_by(uuid=creator_uuid).first()): 

33 return err_resp("User not found!", 404) 

34 user.owned_groups.append(group) 

35 

36 db.session.add(user) 

37 db.session.commit() 

38 

39 group_data = GroupObject.load(group) 

40 

41 resp = message(True, "Group data created") 

42 resp["group"] = group_data 

43 return resp, 201 

44 

45 except Exception as error: 

46 current_app.logger.error(error) 

47 return internal_err_resp() 

48 

49 @staticmethod 

50 def invite_user(group_id, new_member_uuid, current_user_uuid): 

51 """ Invite member to a group """ 

52 if not (group := GroupModel.query.filter_by(group_id=group_id).first()): 

53 return err_resp("Group not found!", 404) 

54 

55 if str(group.owner.uuid) != current_user_uuid: 

56 return err_resp("Unable to invite member to a not owned group", 403) 

57 

58 if not (member := UserModel.query.filter_by(uuid=new_member_uuid).first()): 

59 return err_resp("Member not found!", 404) 

60 

61 if group.invitations.filter_by(user_id=member.user_id).scalar() is not None: 

62 return err_resp("Invitation already sended !", 400) 

63 

64 if group.members.filter_by(user_id=member.user_id).scalar() is not None: 

65 return err_resp("User is already a member of this group !", 400) 

66 

67 if group.owner.user_id == member.user_id: 

68 return err_resp("You can not invite yourself to your group !", 400) 

69 

70 try: 

71 group.invitations.append(member) 

72 

73 db.session.add(group) 

74 db.session.commit() 

75 

76 group_data = GroupObject.load(group) 

77 

78 resp = message(True, "User invited to group") 

79 resp["group"] = group_data 

80 return resp, 200 

81 except Exception as error: 

82 current_app.logger.error(error) 

83 return internal_err_resp() 

84 

85 @staticmethod 

86 def accept_invitation(group_id, user_uuid, current_user_uuid): 

87 """ Accept invitation """ 

88 if not (group := GroupModel.query.filter_by(group_id=group_id).first()): 

89 return err_resp("Group not found!", 404) 

90 

91 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()): 

92 return err_resp("Member not found!", 404) 

93 

94 if not ( UserModel.query.filter_by(uuid=current_user_uuid).first()): 

95 return err_resp("Member not found!", 404) 

96 

97 if str(user.uuid) != current_user_uuid: 

98 return err_resp("Unable to accept an invitation that is not intended to you", 403) 

99 

100 if group.invitations.filter_by(user_id=user.user_id).scalar() is None: 

101 return err_resp("Invitation not found !", 404) 

102 

103 try: 

104 group.invitations.remove(user) 

105 group.members.append(user) 

106 

107 db.session.add(group) 

108 db.session.commit() 

109 

110 group_data = GroupObject.load(group) 

111 

112 resp = message(True, "Member add to group") 

113 resp["group"] = group_data 

114 return resp, 200 

115 except Exception as error: 

116 current_app.logger.error(error) 

117 return internal_err_resp() 

118 

119 @staticmethod 

120 def delete_invitation(group_id, user_uuid, current_user_uuid): 

121 """ Refuse / delete invitation """ 

122 if not (group := GroupModel.query.filter_by(group_id=group_id).first()): 

123 return err_resp("Group not found!", 404) 

124 

125 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()): 

126 return err_resp("Member not found!", 404) 

127 

128 if not (UserModel.query.filter_by(uuid=current_user_uuid).first()): 

129 return err_resp("Member not found!", 404) 

130 

131 if str(user.uuid) != current_user_uuid and str(group.owner.uuid) != current_user_uuid: 

132 return err_resp("Unable to delete an invitation that is not intended to you if you are not the group owner", 403) 

133 

134 if group.invitations.filter_by(user_id=user.user_id).scalar() is None: 

135 return err_resp("Invitation not found !", 404) 

136 

137 try: 

138 group.invitations.remove(user) 

139 

140 db.session.add(group) 

141 db.session.commit() 

142 

143 return "", 204 

144 except Exception as error: 

145 current_app.logger.error(error) 

146 return internal_err_resp() 

147 

148 @staticmethod 

149 def leave_group(group_id, current_user_uuid): 

150 """ Leave group (if current user is the group owner, this group will be deleted) """ 

151 if not (group := GroupModel.query.filter_by(group_id=group_id).first()): 

152 return err_resp("Group not found!", 404) 

153 

154 if not (UserModel.query.filter_by(uuid=current_user_uuid).first()): 

155 return err_resp("User not found!", 404) 

156 

157 try: 

158 if str(group.owner.uuid) == current_user_uuid: 

159 group.members = [] 

160 group.invitations = [] 

161 db.session.delete(group) 

162 else: 

163 current_user = UserModel.query.filter_by( 

164 uuid=current_user_uuid).first() 

165 group.members.remove(current_user) 

166 

167 db.session.commit() 

168 

169 return "", 204 

170 except Exception as error: 

171 current_app.logger.error(error) 

172 return internal_err_resp()