diff --git a/changelog.d/18677.bugfix b/changelog.d/18677.bugfix new file mode 100644 index 0000000000..3b443d2055 --- /dev/null +++ b/changelog.d/18677.bugfix @@ -0,0 +1 @@ +Fix sliding_sync_connections related errors when porting from SQLite to Postgres. diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 573c70696e..6c3e380355 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -190,13 +190,18 @@ APPEND_ONLY_TABLES = [ "users", ] +# These tables declare their id column with "PRIMARY KEY AUTOINCREMENT" on sqlite side +# and with "PRIMARY KEY GENERATED ALWAYS AS IDENTITY" on postgres side. This creates an +# implicit sequence that needs its value to be migrated separately. Additionally, +# inserting on postgres side needs to use the "OVERRIDING SYSTEM VALUE" modifier. +AUTOINCREMENT_TABLES = { + "sliding_sync_connections", + "sliding_sync_connection_positions", + "sliding_sync_connection_required_state", + "state_groups_pending_deletion", +} IGNORED_TABLES = { - # Porting the auto generated sequence in this table is non-trivial. - # None of the entries in this list are mandatory for Synapse to keep working. - # If state group disk space is an issue after the port, the - # `mark_unreferenced_state_groups_for_deletion_bg_update` background task can be run again. - "state_groups_pending_deletion", # We don't port these tables, as they're a faff and we can regenerate # them anyway. "user_directory", @@ -284,11 +289,17 @@ class Store( return self.db_pool.runInteraction("execute_sql", r) def insert_many_txn( - self, txn: LoggingTransaction, table: str, headers: List[str], rows: List[Tuple] + self, + txn: LoggingTransaction, + table: str, + headers: List[str], + rows: List[Tuple], + override_system_value: bool = False, ) -> None: - sql = "INSERT INTO %s (%s) VALUES (%s)" % ( + sql = "INSERT INTO %s (%s) %s VALUES (%s)" % ( table, ", ".join(k for k in headers), + "OVERRIDING SYSTEM VALUE" if override_system_value else "", ", ".join("%s" for _ in headers), ) @@ -532,7 +543,13 @@ class Porter: def insert(txn: LoggingTransaction) -> None: assert headers is not None - self.postgres_store.insert_many_txn(txn, table, headers[1:], rows) + self.postgres_store.insert_many_txn( + txn, + table, + headers[1:], + rows, + override_system_value=table in AUTOINCREMENT_TABLES, + ) self.postgres_store.db_pool.simple_update_one_txn( txn, @@ -884,6 +901,19 @@ class Porter: ], ) + await self._setup_autoincrement_sequence( + "sliding_sync_connection_positions", "connection_position" + ) + await self._setup_autoincrement_sequence( + "sliding_sync_connection_required_state", "required_state_id" + ) + await self._setup_autoincrement_sequence( + "sliding_sync_connections", "connection_key" + ) + await self._setup_autoincrement_sequence( + "state_groups_pending_deletion", "sequence_number" + ) + # Step 3. Get tables. self.progress.set_state("Fetching tables") sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol( @@ -1216,6 +1246,49 @@ class Porter: "_setup_%s" % (sequence_name,), r ) + async def _setup_autoincrement_sequence( + self, + sqlite_table_name: str, + sqlite_id_column_name: str, + ) -> None: + """Set a sequence to the correct value. Use where id column was declared with PRIMARY KEY AUTOINCREMENT.""" + seq_name = await self._pg_get_serial_sequence( + sqlite_table_name, sqlite_id_column_name + ) + if seq_name is None: + raise Exception( + "implicit sequence not found for table " + sqlite_table_name + ) + + seq_value = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="sqlite_sequence", + keyvalues={"name": sqlite_table_name}, + retcol="seq", + allow_none=True, + ) + if seq_value is None: + return + + def r(txn: LoggingTransaction) -> None: + sql = "ALTER SEQUENCE %s RESTART WITH" % (seq_name,) + txn.execute(sql + " %s", (seq_value + 1,)) + + await self.postgres_store.db_pool.runInteraction("_setup_%s" % (seq_name,), r) + + async def _pg_get_serial_sequence(self, table: str, column: str) -> Optional[str]: + """Returns the name of the postgres sequence associated with a column, or NULL.""" + + def r(txn: LoggingTransaction) -> Optional[str]: + txn.execute("SELECT pg_get_serial_sequence('%s', '%s')" % (table, column)) + result = txn.fetchone() + if not result: + return None + return result[0] + + return await self.postgres_store.db_pool.runInteraction( + "_pg_get_serial_sequence", r + ) + async def _setup_auth_chain_sequence(self) -> None: curr_chain_id: Optional[ int