Let the SyncAPI know that we changed account data

Signed-off-by: Till Faelligen <2353100+S7evinK@users.noreply.github.com>
This commit is contained in:
Till Faelligen 2025-11-20 08:24:59 +01:00
parent fbbdf84ac6
commit e2cf01609c
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E

View file

@ -209,30 +209,57 @@ func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventTy
func (s *OutputRoomEventConsumer) handleRoomUpgrade(ctx context.Context, oldRoomID, newRoomID string, localMembers []*localMembership, roomSize int) error {
for _, membership := range localMembers {
// Copy any existing push rules from old -> new room
if err := s.copyPushrules(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain); err != nil {
changed, err := s.copyPushrules(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain)
if err != nil {
return err
}
// Inform the SyncAPI about the updated push_rules
if changed {
if err = s.syncProducer.SendAccountData(membership.Localpart, eventutil.AccountData{
Type: "m.push_rules",
}); err != nil {
return err
}
}
// preserve m.direct room state
if err := s.updateMDirect(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain, roomSize); err != nil {
changed, err = s.updateMDirect(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain, roomSize)
if err != nil {
return err
}
// Inform the SyncAPI about the updated m.direct
if changed {
if err = s.syncProducer.SendAccountData(membership.Localpart, eventutil.AccountData{
Type: "m.direct",
}); err != nil {
return err
}
}
// copy existing m.tag entries, if any
if err := s.copyTags(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain); err != nil {
changed, err = s.copyTags(ctx, oldRoomID, newRoomID, membership.Localpart, membership.Domain)
if err != nil {
return err
}
// Inform the SyncAPI about the updated m.tag
if changed {
if err = s.syncProducer.SendAccountData(membership.Localpart, eventutil.AccountData{
Type: "m.tag",
}); err != nil {
return err
}
}
}
return nil
}
func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, newRoomID string, localpart string, serverName spec.ServerName) error {
func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName) (hasChanges bool, err error) {
pushRules, err := s.db.QueryPushRules(ctx, localpart, serverName)
if err != nil {
return fmt.Errorf("failed to query pushrules for user: %w", err)
return hasChanges, err
}
if pushRules == nil {
return nil
return hasChanges, err
}
for _, roomRule := range pushRules.Global.Room {
@ -244,25 +271,26 @@ func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID,
pushRules.Global.Room = append(pushRules.Global.Room, &cpRool)
rules, err := json.Marshal(pushRules)
if err != nil {
return err
return hasChanges, err
}
if err = s.db.SaveAccountData(ctx, localpart, serverName, "", "m.push_rules", rules); err != nil {
return fmt.Errorf("failed to update pushrules: %w", err)
return false, err
}
hasChanges = true
}
return nil
return hasChanges, err
}
// updateMDirect copies the "is_direct" flag from oldRoomID to newROomID
func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName, roomSize int) error {
func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName, roomSize int) (hasChanges bool, err error) {
// this is most likely not a DM, so skip updating m.direct state
if roomSize > 2 {
return nil
return hasChanges, nil
}
// Get direct message state
directChatsRaw, err := s.db.GetAccountDataByType(ctx, localpart, serverName, "", "m.direct")
if err != nil {
return fmt.Errorf("failed to get m.direct from database: %w", err)
return hasChanges, fmt.Errorf("failed to get m.direct from database: %w", err)
}
directChats := gjson.ParseBytes(directChatsRaw)
newDirectChats := make(map[string][]string)
@ -285,25 +313,29 @@ func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID,
var data []byte
data, err = json.Marshal(newDirectChats)
if err != nil {
return err
return hasChanges, err
}
if err = s.db.SaveAccountData(ctx, localpart, serverName, "", "m.direct", data); err != nil {
return fmt.Errorf("failed to update m.direct state: %w", err)
return hasChanges, fmt.Errorf("failed to update m.direct state: %w", err)
}
hasChanges = true
}
return nil
return hasChanges, nil
}
func (s *OutputRoomEventConsumer) copyTags(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName) error {
func (s *OutputRoomEventConsumer) copyTags(ctx context.Context, oldRoomID, newRoomID, localpart string, serverName spec.ServerName) (hasChanges bool, err error) {
tag, err := s.db.GetAccountDataByType(ctx, localpart, serverName, oldRoomID, "m.tag")
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
return hasChanges, err
}
if tag == nil {
return nil
return hasChanges, nil
}
return s.db.SaveAccountData(ctx, localpart, serverName, newRoomID, "m.tag", tag)
if err := s.db.SaveAccountData(ctx, localpart, serverName, newRoomID, "m.tag", tag); err != nil {
return hasChanges, err
}
return true, nil
}
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *rstypes.HeaderedEvent, streamPos uint64) error {