Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres copy explicates columns and column order #1137

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions parsons/databases/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from parsons.databases.alchemy import Alchemy
from parsons.databases.database_connector import DatabaseConnector
from parsons.etl.table import Table
from typing import Optional
import logging
import os

Expand Down Expand Up @@ -87,7 +88,7 @@ def copy(
self.query_with_connection(sql, connection, commit=False)
logger.info(f"{table_name} created.")

sql = f"COPY {table_name} FROM STDIN CSV HEADER;"
sql = f"""COPY "{table_name}" ("{'","'.join(tbl.columns)}") FROM STDIN CSV HEADER;"""

with self.cursor(connection) as cursor:
cursor.copy_expert(sql, open(tbl.to_csv(), "r"))
Expand All @@ -102,4 +103,40 @@ def table(self, table_name):
class PostgresTable(BaseTable):
# Postgres table object.

pass
def max_value(self, column: str):
"""Get the max value of this column from the table."""
return self.db.query(
f"""
SELECT "{column}"
FROM {self.table}
ORDER BY "{column}" DESC
LIMIT 1
"""
).first

def get_updated_rows(
self,
updated_at_column: str,
cutoff_value,
offset: int = 0,
chunk_size: Optional[int] = None,
) -> Table:
"""Get rows that have a greater updated_at_column value than the one provided."""
sql = f"""
SELECT *
FROM {self.table}
"""
parameters = []

if cutoff_value is not None:
sql += f'WHERE "{updated_at_column}" > %s'
parameters.append(cutoff_value)

if chunk_size:
sql += f" LIMIT {chunk_size}"

sql += f" OFFSET {offset}"

result = self.db.query(sql, parameters=parameters)

return result
4 changes: 2 additions & 2 deletions parsons/databases/postgres/postgres_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ def table_exists_with_connection(self, table_name, connection, view=True):
# Extract the table and schema from this. If no schema is detected then
# will default to the public schema.
try:
schema, table = table_name.lower().split(".", 1)
schema, table = table_name.split(".", 1)
except ValueError:
schema, table = "public", table_name.lower()
schema, table = "public", table_name

with self.cursor(connection) as cursor:
# Check in pg tables for the table
Expand Down
Loading