Skip to content

Commit efed369

Browse files
committed
feat: optimizing BatchGetIncrementalGroupMember (#3180)
* pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: user msg timestamp * seq read config * seq read config * fix: the source message of the reference is withdrawn, and the referenced message is deleted * feat: optimize the default notification.yml * fix: shouldPushOffline * fix: optimizing BatchGetIncrementalGroupMember
1 parent aaf8985 commit efed369

File tree

1 file changed

+26
-150
lines changed

1 file changed

+26
-150
lines changed

internal/rpc/group/sync.go

Lines changed: 26 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ import (
1111
"github.com/openimsdk/protocol/constant"
1212
pbgroup "github.com/openimsdk/protocol/group"
1313
"github.com/openimsdk/protocol/sdkws"
14-
"github.com/openimsdk/tools/errs"
15-
"github.com/openimsdk/tools/log"
1614
)
1715

18-
func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) {
19-
vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID)
16+
const versionSyncLimit = 500
17+
18+
func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) {
19+
vl, err := g.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID)
2020
if err != nil {
2121
return nil, err
2222
}
@@ -132,152 +132,8 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
132132
return resp, nil
133133
}
134134

135-
func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (resp *pbgroup.BatchGetIncrementalGroupMemberResp, err error) {
136-
type VersionInfo struct {
137-
GroupID string
138-
VersionID string
139-
VersionNumber uint64
140-
}
141-
142-
var groupIDs []string
143-
144-
groupsVersionMap := make(map[string]*VersionInfo)
145-
groupsMap := make(map[string]*model.Group)
146-
hasGroupUpdateMap := make(map[string]bool)
147-
sortVersionMap := make(map[string]uint64)
148-
149-
var targetKeys, versionIDs []string
150-
var versionNumbers []uint64
151-
152-
var requestBodyLen int
153-
154-
for _, group := range req.ReqList {
155-
groupsVersionMap[group.GroupID] = &VersionInfo{
156-
GroupID: group.GroupID,
157-
VersionID: group.VersionID,
158-
VersionNumber: group.Version,
159-
}
160-
161-
groupIDs = append(groupIDs, group.GroupID)
162-
}
163-
164-
groups, err := s.db.FindGroup(ctx, groupIDs)
165-
if err != nil {
166-
return nil, errs.Wrap(err)
167-
}
168-
169-
for _, group := range groups {
170-
if group.Status == constant.GroupStatusDismissed {
171-
err = servererrs.ErrDismissedAlready.Wrap()
172-
log.ZError(ctx, "This group is Dismissed Already", err, "group is", group.GroupID)
173-
174-
delete(groupsVersionMap, group.GroupID)
175-
} else {
176-
groupsMap[group.GroupID] = group
177-
}
178-
}
179-
180-
for groupID, vInfo := range groupsVersionMap {
181-
targetKeys = append(targetKeys, groupID)
182-
versionIDs = append(versionIDs, vInfo.VersionID)
183-
versionNumbers = append(versionNumbers, vInfo.VersionNumber)
184-
}
185-
186-
opt := incrversion.BatchOption[[]*sdkws.GroupMemberFullInfo, pbgroup.BatchGetIncrementalGroupMemberResp]{
187-
Ctx: ctx,
188-
TargetKeys: targetKeys,
189-
VersionIDs: versionIDs,
190-
VersionNumbers: versionNumbers,
191-
Versions: func(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) {
192-
vLogs, err := s.db.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits)
193-
if err != nil {
194-
return nil, errs.Wrap(err)
195-
}
196-
197-
for groupID, vlog := range vLogs {
198-
vlogElems := make([]model.VersionLogElem, 0, len(vlog.Logs))
199-
for i, log := range vlog.Logs {
200-
switch log.EID {
201-
case model.VersionGroupChangeID:
202-
vlog.LogLen--
203-
hasGroupUpdateMap[groupID] = true
204-
case model.VersionSortChangeID:
205-
vlog.LogLen--
206-
sortVersionMap[groupID] = uint64(log.Version)
207-
default:
208-
vlogElems = append(vlogElems, vlog.Logs[i])
209-
}
210-
}
211-
vlog.Logs = vlogElems
212-
if vlog.LogLen > 0 {
213-
hasGroupUpdateMap[groupID] = true
214-
}
215-
}
216-
217-
return vLogs, nil
218-
},
219-
CacheMaxVersions: s.db.BatchFindMaxGroupMemberVersionCache,
220-
Find: func(ctx context.Context, groupID string, ids []string) ([]*sdkws.GroupMemberFullInfo, error) {
221-
memberInfo, err := s.getGroupMembersInfo(ctx, groupID, ids)
222-
if err != nil {
223-
return nil, err
224-
}
225-
226-
return memberInfo, err
227-
},
228-
Resp: func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]*sdkws.GroupMemberFullInfo, fullMap map[string]bool) *pbgroup.BatchGetIncrementalGroupMemberResp {
229-
resList := make(map[string]*pbgroup.GetIncrementalGroupMemberResp)
230-
231-
for groupID, versionLog := range versions {
232-
resList[groupID] = &pbgroup.GetIncrementalGroupMemberResp{
233-
VersionID: versionLog.ID.Hex(),
234-
Version: uint64(versionLog.Version),
235-
Full: fullMap[groupID],
236-
Delete: deleteIdsMap[groupID],
237-
Insert: insertListMap[groupID],
238-
Update: updateListMap[groupID],
239-
SortVersion: sortVersionMap[groupID],
240-
}
241-
242-
requestBodyLen += len(insertListMap[groupID]) + len(updateListMap[groupID]) + len(deleteIdsMap[groupID])
243-
if requestBodyLen > 200 {
244-
break
245-
}
246-
}
247-
248-
return &pbgroup.BatchGetIncrementalGroupMemberResp{
249-
RespList: resList,
250-
}
251-
},
252-
}
253-
254-
resp, err = opt.Build()
255-
if err != nil {
256-
return nil, errs.Wrap(err)
257-
}
258-
259-
for groupID, val := range resp.RespList {
260-
if val.Full || hasGroupUpdateMap[groupID] {
261-
count, err := s.db.FindGroupMemberNum(ctx, groupID)
262-
if err != nil {
263-
return nil, err
264-
}
265-
266-
owner, err := s.db.TakeGroupOwner(ctx, groupID)
267-
if err != nil {
268-
return nil, err
269-
}
270-
271-
resp.RespList[groupID].Group = s.groupDB2PB(groupsMap[groupID], owner.UserID, count)
272-
}
273-
}
274-
275-
return resp, nil
276-
277-
}
278-
279-
func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) {
280-
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
135+
func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) {
136+
if err := authverify.CheckAccessV3(ctx, req.UserID, g.config.Share.IMAdminUserID); err != nil {
281137
return nil, err
282138
}
283139
opt := incrversion.Option[*sdkws.GroupInfo, pbgroup.GetIncrementalJoinGroupResp]{
@@ -301,3 +157,23 @@ func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.
301157
}
302158
return opt.Build()
303159
}
160+
161+
func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) {
162+
var num int
163+
resp := make(map[string]*pbgroup.GetIncrementalGroupMemberResp)
164+
for _, memberReq := range req.ReqList {
165+
if _, ok := resp[memberReq.GroupID]; ok {
166+
continue
167+
}
168+
memberResp, err := g.GetIncrementalGroupMember(ctx, memberReq)
169+
if err != nil {
170+
return nil, err
171+
}
172+
resp[memberReq.GroupID] = memberResp
173+
num += len(memberResp.Insert) + len(memberResp.Update) + len(memberResp.Delete)
174+
if num >= versionSyncLimit {
175+
break
176+
}
177+
}
178+
return &pbgroup.BatchGetIncrementalGroupMemberResp{RespList: resp}, nil
179+
}

0 commit comments

Comments
 (0)