Skip to content

Commit

Permalink
Reset sequences outside of pgloader when migrating db
Browse files Browse the repository at this point in the history
Using pgloader to reset sequences while running multiple pgloader
simultaneously seems to cause this error:

```
2021-01-27T19:45:45.131000Z LOG Data errors in '/tmp/pgloader/'
2021-01-27T19:45:45.138000Z LOG Parsing commands from file #P"/tmp/tmpppm9z91v/wbia.load"
KABOOM!
FATAL error: Asynchronous notification "seqs" (payload: "0") received from server process with PID 28472.
An unhandled error condition has been signalled:
   Asynchronous notification "seqs" (payload: "0") received from server process with PID 28472.
```

We can reset sequences outside of pgloader instead and avoid this problem.
  • Loading branch information
karenc committed Jan 28, 2021
1 parent ee15f18 commit 82c84e3
Showing 1 changed file with 23 additions and 1 deletion.
24 changes: 23 additions & 1 deletion wbia/dtool/copy_sqlite_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def before_pgloader(engine, schema):
WITH include drop,
create tables,
create indexes,
reset sequences
reset no sequences
SET work_mem to '16MB',
maintenance_work_mem to '512 MB',
Expand Down Expand Up @@ -479,10 +479,12 @@ def after_pgloader(sqlite_engine, pg_engine, schema):
WHERE table_schema = '{schema}'
AND constraint_type = 'PRIMARY KEY'"""
).fetchall()
exclude_sequences = set()
for (table_name, pkey) in table_pkeys:
# Create sequences for rowid fields
for column_name in ('rowid', pkey):
seq_name = f'{table_name}_{column_name}_seq'
exclude_sequences.add(seq_name)
connection.execute(f'CREATE SEQUENCE {seq_name}')
connection.execute(
f"SELECT setval('{seq_name}', (SELECT max({column_name}) FROM {table_name}))"
Expand All @@ -494,6 +496,26 @@ def after_pgloader(sqlite_engine, pg_engine, schema):
f'ALTER SEQUENCE {seq_name} OWNED BY {table_name}.{column_name}'
)

# Reset all sequences except the ones we just created (doing it here
# instead of in pgloader because it causes a fatal error in pgloader
# when pgloader runs in parallel:
#
# Asynchronous notification "seqs" (payload: "0") received from
# server process with PID 28472.)
sequences = connection.execute(
f"""\
SELECT table_name, column_name, column_default
FROM information_schema.columns
WHERE table_schema = '{schema}'
AND column_default LIKE 'nextval%%'"""
).fetchall()
for table_name, column_name, column_default in sequences:
seq_name = re.sub(r"nextval\('([^']*)'.*", r'\1', column_default)
if seq_name not in exclude_sequences:
connection.execute(
f"SELECT setval('{seq_name}', (SELECT max({column_name}) FROM {table_name}))"
)


def drop_schema(engine, schema_name):
connection = engine.connect()
Expand Down

0 comments on commit 82c84e3

Please sign in to comment.