Coverage for src/service/group_service.py : 83%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from flask import current_app
3from src import db
4from src.utils import err_resp, message, internal_err_resp, Paginator, err_resp
5from src.model import GroupModel, UserModel
6from src.schemas import GroupObject
9class GroupService:
10 @staticmethod
11 def get_group_data(group_id):
12 """ Get group's data by id """
13 if not (group := GroupModel.query.filter_by(group_id=group_id).first()):
14 return err_resp("Group not found!", 404)
16 try:
17 group_data = GroupObject.load(group)
19 resp = message(True, "Group data sent")
20 resp["group"] = group_data
21 return resp, 200
23 except Exception as error:
24 current_app.logger.error(error)
25 return internal_err_resp()
27 @staticmethod
28 def create_group(group_name, creator_uuid):
29 """ Create group object """
30 try:
31 group = GroupModel(name=group_name)
32 if not (user := UserModel.query.filter_by(uuid=creator_uuid).first()):
33 return err_resp("User not found!", 404)
34 user.owned_groups.append(group)
36 db.session.add(user)
37 db.session.commit()
39 group_data = GroupObject.load(group)
41 resp = message(True, "Group data created")
42 resp["group"] = group_data
43 return resp, 201
45 except Exception as error:
46 current_app.logger.error(error)
47 return internal_err_resp()
49 @staticmethod
50 def invite_user(group_id, new_member_uuid, current_user_uuid):
51 """ Invite member to a group """
52 if not (group := GroupModel.query.filter_by(group_id=group_id).first()):
53 return err_resp("Group not found!", 404)
55 if str(group.owner.uuid) != current_user_uuid:
56 return err_resp("Unable to invite member to a not owned group", 403)
58 if not (member := UserModel.query.filter_by(uuid=new_member_uuid).first()):
59 return err_resp("Member not found!", 404)
61 if group.invitations.filter_by(user_id=member.user_id).scalar() is not None:
62 return err_resp("Invitation already sended !", 400)
64 if group.members.filter_by(user_id=member.user_id).scalar() is not None:
65 return err_resp("User is already a member of this group !", 400)
67 if group.owner.user_id == member.user_id:
68 return err_resp("You can not invite yourself to your group !", 400)
70 try:
71 group.invitations.append(member)
73 db.session.add(group)
74 db.session.commit()
76 group_data = GroupObject.load(group)
78 resp = message(True, "User invited to group")
79 resp["group"] = group_data
80 return resp, 200
81 except Exception as error:
82 current_app.logger.error(error)
83 return internal_err_resp()
85 @staticmethod
86 def accept_invitation(group_id, user_uuid, current_user_uuid):
87 """ Accept invitation """
88 if not (group := GroupModel.query.filter_by(group_id=group_id).first()):
89 return err_resp("Group not found!", 404)
91 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()):
92 return err_resp("Member not found!", 404)
94 if not ( UserModel.query.filter_by(uuid=current_user_uuid).first()):
95 return err_resp("Member not found!", 404)
97 if str(user.uuid) != current_user_uuid:
98 return err_resp("Unable to accept an invitation that is not intended to you", 403)
100 if group.invitations.filter_by(user_id=user.user_id).scalar() is None:
101 return err_resp("Invitation not found !", 404)
103 try:
104 group.invitations.remove(user)
105 group.members.append(user)
107 db.session.add(group)
108 db.session.commit()
110 group_data = GroupObject.load(group)
112 resp = message(True, "Member add to group")
113 resp["group"] = group_data
114 return resp, 200
115 except Exception as error:
116 current_app.logger.error(error)
117 return internal_err_resp()
119 @staticmethod
120 def delete_invitation(group_id, user_uuid, current_user_uuid):
121 """ Refuse / delete invitation """
122 if not (group := GroupModel.query.filter_by(group_id=group_id).first()):
123 return err_resp("Group not found!", 404)
125 if not (user := UserModel.query.filter_by(uuid=user_uuid).first()):
126 return err_resp("Member not found!", 404)
128 if not (UserModel.query.filter_by(uuid=current_user_uuid).first()):
129 return err_resp("Member not found!", 404)
131 if str(user.uuid) != current_user_uuid and str(group.owner.uuid) != current_user_uuid:
132 return err_resp("Unable to delete an invitation that is not intended to you if you are not the group owner", 403)
134 if group.invitations.filter_by(user_id=user.user_id).scalar() is None:
135 return err_resp("Invitation not found !", 404)
137 try:
138 group.invitations.remove(user)
140 db.session.add(group)
141 db.session.commit()
143 return "", 204
144 except Exception as error:
145 current_app.logger.error(error)
146 return internal_err_resp()
148 @staticmethod
149 def leave_group(group_id, current_user_uuid):
150 """ Leave group (if current user is the group owner, this group will be deleted) """
151 if not (group := GroupModel.query.filter_by(group_id=group_id).first()):
152 return err_resp("Group not found!", 404)
154 if not (UserModel.query.filter_by(uuid=current_user_uuid).first()):
155 return err_resp("User not found!", 404)
157 try:
158 if str(group.owner.uuid) == current_user_uuid:
159 group.members = []
160 group.invitations = []
161 db.session.delete(group)
162 else:
163 current_user = UserModel.query.filter_by(
164 uuid=current_user_uuid).first()
165 group.members.remove(current_user)
167 db.session.commit()
169 return "", 204
170 except Exception as error:
171 current_app.logger.error(error)
172 return internal_err_resp()