From 28c9b640dbb2fc8c5a7c9050dd227de5306153db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Thu, 30 Nov 2023 14:24:29 -0600 Subject: [PATCH] fix: Treat streams with `LOG_BASED` replication as unsorted (#301) Closes https://github.com/MeltanoLabs/tap-postgres/issues/297 --- tap_postgres/client.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 90dfdf78..0c6d1483 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -2,6 +2,7 @@ This includes PostgresStream and PostgresConnector. """ + from __future__ import annotations import datetime @@ -20,6 +21,7 @@ from singer_sdk import typing as th from singer_sdk.helpers._state import increment_state from singer_sdk.helpers._typing import TypeConformanceLevel +from singer_sdk.streams.core import REPLICATION_INCREMENTAL from sqlalchemy import nullsfirst from sqlalchemy.engine import Engine from sqlalchemy.engine.reflection import Inspector @@ -481,3 +483,10 @@ def logical_replication_connection(self): application_name="tap_postgres", connection_factory=extras.LogicalReplicationConnection, ) + + # TODO: Make this change upstream in the SDK? + # I'm not sure if in general SQL databases don't guarantee order of records log + # replication, but at least Postgres does not. + def is_sorted(self) -> bool: + """Return True if the stream is sorted by the replication key.""" + return self.replication_method == REPLICATION_INCREMENTAL