Skip to content

Commit

Permalink
Fixing #2.
Browse files Browse the repository at this point in the history
  • Loading branch information
Xof committed May 7, 2018
1 parent 52a0320 commit 5bf907e
Showing 1 changed file with 135 additions and 44 deletions.
179 changes: 135 additions & 44 deletions pg_merge_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
default="host=localhost",
help='connection string for "to" server (defaults to "host=localhost")')

parser.add_argument('--key', dest='key', action='store',
default="",
help='key column for matching; it must exist on both tables, and the only column in a unique index')

parser.add_argument('--delete', dest='delete_missing', action='store_true',
default=False,
help='delete entries in "to" table missing in "from" table"')
Expand All @@ -42,6 +46,8 @@
from_connnection_string = args.from_connection_string
to_connection_string = args.to_connection_string

key = args.key

delete_missing = args.delete_missing
execute = args.execute
progress = args.progress
Expand Down Expand Up @@ -74,43 +80,123 @@
exit(1)

if not table_exists(to_connection, schema_name, table_name):
print >>sys.stderr, 'Table "%s.%s" does not exist on "from" server.' % (schema_name, table_name)
print >>sys.stderr, 'Table "%s.%s" does not exist on "to" server.' % (schema_name, table_name)
exit(1)

# Retrieve the primary key column name, and all table columns

primary_key_from = list(primary_key_for_table(from_connection, schema_name, table_name))
primary_key_to = list(primary_key_for_table(to_connection, schema_name, table_name))
if key:
key_type = type_for_column(from_connection, schema_name, table_name, key)

if len(primary_key_from) == 0:
print >>sys.stderr, 'Table "%s.%s" does not have a primary key on "from" server.' % (schema_name, table_name)
exit(1)
elif len(primary_key_from) > 1:
print >>sys.stderr, 'Table "%s.%s" has a multi-column primary key on "from" server, currently not supported.' % (schema_name, table_name)
exit(1)
if not key_type:
print >> sys.stderr, 'Table "%s.%s" coolumn %s does not exist on "from" server.' % (schema_name, table_name, key)
exit(1)

if len(primary_key_to) == 0:
print >>sys.stderr, 'Table "%s.%s" does not have a primary key on "to" server.' % (schema_name, table_name)
exit(1)
elif len(primary_key_to) > 1:
print >>sys.stderr, 'Table "%s.%s" has a multi-column primary key on "to" server, currently not supported.' % (schema_name, table_name)
exit(1)
to_key_type = type_for_column(to_connection, schema_name, table_name, key)
if not to_key_type:
print >> sys.stderr, 'Table "%s.%s" column "%s" does not exist on "to" server.' % (schema_name, table_name, key)
exit(1)

if primary_key_from[0] != primary_key_to[0]:
print >>sys.stderr, 'Table "%s.%s" has different primary key column names on "from" (%s) and "to" (%s)servers.' % \
(schema_name, table_name, primary_key_from[0], primary_key_to[0],)
exit(1)
if key_type != to_key_type:
print >> sys.stderr, 'Table "%s.%s" column "%s" is not the same type on the "from" (%s) and "to" (%s) server.' % \
(schema_name, table_name, key, key_type, to_key_type)
exit(1)

from_curs = from_connection.cursor()

from_curs.execute("""
SELECT COUNT(DISTINCT i.indexrelid)
FROM pg_class c
JOIN pg_namespace ns ON c.relnamespace = ns.oid
JOIN pg_attribute a ON a.attrelid = c.oid
JOIN pg_index i ON i.indrelid = c.oid
WHERE nspname = %s
AND relname = %s
AND attname = %s
AND indisunique
AND attnum = ANY (indkey::integer[])
AND array_length(indkey::integer[], 1) = 1
""", (schema_name, table_name, key))

from_key_index_count = int(from_curs.fetchone()[0])

primary_key = primary_key_from[0]
primary_key_type = type_for_column(from_connection, schema_name, table_name, primary_key)
if from_key_index_count == 0:
print >> sys.stderr, 'Table "%s.%s" column "%s" does not appear by itself in a UNIQUE index on the "from" server.' % \
(schema_name, table_name, key)
exit(1)

if from_key_index_count > 1:
print >> sys.stderr, 'Table "%s.%s" column "%s" appears by itself in more than one UNIQUE index on the "from" server.' % \
(schema_name, table_name, key)
exit(1)

from_curs.close()


to_curs = to_connection.cursor()

to_curs.execute("""
SELECT COUNT(DISTINCT i.indexrelid)
FROM pg_class c
JOIN pg_namespace ns ON c.relnamespace = ns.oid
JOIN pg_attribute a ON a.attrelid = c.oid
JOIN pg_index i ON i.indrelid = c.oid
WHERE nspname = %s
AND relname = %s
AND attname = %s
AND indisunique
AND attnum = ANY (indkey::integer[])
AND array_length(indkey::integer[], 1) = 1
""", (schema_name, table_name, key))

to_key_index_count = int(to_curs.fetchone()[0])

if to_key_index_count == 0:
print >> sys.stderr, 'Table "%s.%s" column "%s" does not appear by itself in a UNIQUE index on the "to" server.' % \
(schema_name, table_name, key)
exit(1)

if to_key_index_count > 1:
print >> sys.stderr, 'Table "%s.%s" column "%s" appears by itself in more than one UNIQUE index on the "to" server.' % \
(schema_name, table_name, key)
exit(1)

to_curs.close()


else:
primary_key_from = list(primary_key_for_table(from_connection, schema_name, table_name))
primary_key_to = list(primary_key_for_table(to_connection, schema_name, table_name))

if len(primary_key_from) == 0:
print >>sys.stderr, 'Table "%s.%s" does not have a primary key on "from" server.' % (schema_name, table_name)
exit(1)
elif len(primary_key_from) > 1:
print >>sys.stderr, 'Table "%s.%s" has a multi-column primary key on "from" server, currently not supported.' % (schema_name, table_name)
exit(1)

if len(primary_key_to) == 0:
print >>sys.stderr, 'Table "%s.%s" does not have a primary key on "to" server.' % (schema_name, table_name)
exit(1)
elif len(primary_key_to) > 1:
print >>sys.stderr, 'Table "%s.%s" has a multi-column primary key on "to" server, currently not supported.' % (schema_name, table_name)
exit(1)

if primary_key_from[0] != primary_key_to[0]:
print >>sys.stderr, 'Table "%s.%s" has different primary key column names on "from" (%s) and "to" (%s)servers.' % \
(schema_name, table_name, primary_key_from[0], primary_key_to[0],)
exit(1)

key = primary_key_from[0]
key_type = type_for_column(from_connection, schema_name, table_name, key)

columns = list(table_columns(from_connection, schema_name, table_name))

# For convenience, we make sure the primary key is the first column in our list.

if columns[0] != primary_key:
columns.remove(primary_key)
columns.insert(0, primary_key)
if columns[0] != key:
columns.remove(key)
columns.insert(0, key)

# These are used to build the SQL for SELECT, INSERT, and UPDATE operations.

Expand All @@ -126,7 +212,7 @@
table_name,
column_list_string,
replacement_string,
primary_key,
key,
', '.join([ column + ' = EXCLUDED.' + column for column in columns[1:] ]))

# To avoid memory issues, we use a server-side cursor rather than just issuing what might
Expand All @@ -146,14 +232,14 @@
CREATE TEMPORARY TABLE %s (
pk %s PRIMARY KEY
) ON COMMIT DROP
""" % (tracking_table_name, primary_key_type,))
""" % (tracking_table_name, key_type,))

tracking_table_insert = "INSERT INTO " + tracking_table_name + "(pk) VALUES(%s)"
else:
tracking_table_name = ""
tracking_table_insert = ""

probe_statement = "SELECT COUNT(*) FROM %s.%s WHERE %s=" % (schema_name, table_name, primary_key,)
probe_statement = "SELECT COUNT(*) FROM %s.%s WHERE %s=" % (schema_name, table_name, key,)
probe_statement += '%s'

rows_processed = 0
Expand All @@ -166,10 +252,6 @@

rows_processed += 1

if progress and (rows_processed % 1000) == 0:
print >> sys.stdout, "%s rows processed, %s updated, %s inserted" % \
(rows_processed, rows_updated, rows_inserted,)

if tracking_table_insert:
to_curs.execute(tracking_table_insert, row[:1])

Expand All @@ -181,40 +263,49 @@
else:
rows_inserted += 1

continue

to_curs.execute(insert_statement, row)
inserted = to_curs.fetchone()[0]
if inserted:
rows_inserted += 1
else:
rows_updated += 1
to_curs.execute(insert_statement, row)
inserted = to_curs.fetchone()[0]
if inserted:
rows_inserted += 1
else:
rows_updated += 1

if delete_missing:
if progress and (rows_processed % 1000) == 0:
print >> sys.stdout, "%s rows processed, %s updated, %s inserted" % \
(rows_processed, rows_updated, rows_inserted,)


from_curs.close()

if delete_missing and progress:
print >>sys.stdout, "deleting."

if not execute:
to_curs.execute("""
SELECT COUNT(*) FROM %s.%s WHERE %s NOT IN (SELECT pk FROM %s)
""" % (schema_name, table_name, primary_key, tracking_table_name))
""" % (schema_name, table_name, key, tracking_table_name))

rows_deleted = int(to_curs.fetchone()[0])
else:
to_curs.execute("""
DELETE FROM %s.%s WHERE %s NOT IN (SELECT pk FROM %s)
""" % (schema_name, table_name, primary_key, tracking_table_name))
""" % (schema_name, table_name, key, tracking_table_name))

rows_deleted = to_curs.rowcount

to_curs.close()
from_curs.close()

from_connection.rollback()

if not execute:
to_connection.rollback()
else:
to_connection.commit()
to_connection.autocommit = True
if progress:
print >> sys.stdout, "vacuuming."
to_curs.execute("VACUUM ANALYZE %s.%s" % (schema_name, table_name,) )

to_curs.close()

if not execute:
print >>sys.stdout, "dry run estimates: %s rows processed, %s updated, %s inserted, %s deleted" % \
Expand Down

0 comments on commit 5bf907e

Please sign in to comment.