From e2cf01609cc6660a0ebce904bf98249c7c2dc00a Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Thu, 20 Nov 2025 08:24:59 +0100 Subject: [PATCH] Let the SyncAPI know that we changed account data Signed-off-by: Till Faelligen <2353100+S7evinK@users.noreply.github.com> --- userapi/consumers/roomserver.go | 70 ++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index b33799dc..3e4f383f 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -209,30 +209,57 @@ func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventTy func (s *OutputRoomEventConsumer) handleRoomUpgrade(ctx context.Context, oldRoomID, newRoomID string, localMembers []*localMembership, roomSize int) error { for _, membership := range localMembers { // Copy any existing push rules from old -> new room - if err := s.copyPushrules(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain); err != nil { + changed, err := s.copyPushrules(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain) + if err != nil { return err } + // Inform the SyncAPI about the updated push_rules + if changed { + if err = s.syncProducer.SendAccountData(membership.Localpart, eventutil.AccountData{ + Type: "m.push_rules", + }); err != nil { + return err + } + } // preserve m.direct room state - if err := s.updateMDirect(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain, roomSize); err != nil { + changed, err = s.updateMDirect(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain, roomSize) + if err != nil { return err } + // Inform the SyncAPI about the updated m.direct + if changed { + if err = s.syncProducer.SendAccountData(membership.Localpart, eventutil.AccountData{ + Type: "m.direct", + }); err != nil { + return err + } + } // copy existing m.tag entries, if any - if err := s.copyTags(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain); err != nil { + changed, err = s.copyTags(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain) + if err != nil { return err } + // Inform the SyncAPI about the updated m.tag + if changed { + if err = s.syncProducer.SendAccountData(membership.Localpart, eventutil.AccountData{ + Type: "m.tag", + }); err != nil { + return err + } + } } return nil } -func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, newRoomID string, localpart string, serverName spec.ServerName) error { +func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName) (hasChanges bool, err error) { pushRules, err := s.db.QueryPushRules(ctx, localpart, serverName) if err != nil { - return fmt.Errorf("failed to query pushrules for user: %w", err) + return hasChanges, err } if pushRules == nil { - return nil + return hasChanges, err } for _, roomRule := range pushRules.Global.Room { @@ -244,25 +271,26 @@ func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, pushRules.Global.Room = append(pushRules.Global.Room, &cpRool) rules, err := json.Marshal(pushRules) if err != nil { - return err + return hasChanges, err } if err = s.db.SaveAccountData(ctx, localpart, serverName, "", "m.push_rules", rules); err != nil { - return fmt.Errorf("failed to update pushrules: %w", err) + return false, err } + hasChanges = true } - return nil + return hasChanges, err } // updateMDirect copies the "is_direct" flag from oldRoomID to newROomID -func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName, roomSize int) error { +func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName, roomSize int) (hasChanges bool, err error) { // this is most likely not a DM, so skip updating m.direct state if roomSize > 2 { - return nil + return hasChanges, nil } // Get direct message state directChatsRaw, err := s.db.GetAccountDataByType(ctx, localpart, serverName, "", "m.direct") if err != nil { - return fmt.Errorf("failed to get m.direct from database: %w", err) + return hasChanges, fmt.Errorf("failed to get m.direct from database: %w", err) } directChats := gjson.ParseBytes(directChatsRaw) newDirectChats := make(map[string][]string) @@ -285,25 +313,29 @@ func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, var data []byte data, err = json.Marshal(newDirectChats) if err != nil { - return err + return hasChanges, err } if err = s.db.SaveAccountData(ctx, localpart, serverName, "", "m.direct", data); err != nil { - return fmt.Errorf("failed to update m.direct state: %w", err) + return hasChanges, fmt.Errorf("failed to update m.direct state: %w", err) } + hasChanges = true } - return nil + return hasChanges, nil } -func (s *OutputRoomEventConsumer) copyTags(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName) error { +func (s *OutputRoomEventConsumer) copyTags(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName) (hasChanges bool, err error) { tag, err := s.db.GetAccountDataByType(ctx, localpart, serverName, oldRoomID, "m.tag") if err != nil && !errors.Is(err, sql.ErrNoRows) { - return err + return hasChanges, err } if tag == nil { - return nil + return hasChanges, nil } - return s.db.SaveAccountData(ctx, localpart, serverName, newRoomID, "m.tag", tag) + if err := s.db.SaveAccountData(ctx, localpart, serverName, newRoomID, "m.tag", tag); err != nil { + return hasChanges, err + } + return true, nil } func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *rstypes.HeaderedEvent, streamPos uint64) error {