From 8039d98f23f993d842ac148dadc5d9a19914c28f Mon Sep 17 00:00:00 2001 From: Stefan Krawczyk Date: Thu, 21 Mar 2024 10:53:12 -0700 Subject: [PATCH] Changes state serialization to use get_all() This is technically backwards incompatible, but we're so early, we can migrate people manually. Otherwise adds Redis to docs. --- burr/core/persistence.py | 4 ++-- burr/integrations/persisters/b_redis.py | 6 +++--- burr/integrations/persisters/postgresql.py | 4 ++-- docs/reference/persister.rst | 6 ++++++ pyproject.toml | 4 ++++ 5 files changed, 17 insertions(+), 7 deletions(-) diff --git a/burr/core/persistence.py b/burr/core/persistence.py index ef3a2ae6..a0d21083 100644 --- a/burr/core/persistence.py +++ b/burr/core/persistence.py @@ -229,7 +229,7 @@ def load( row = cursor.fetchone() if row is None: return None - _state = State(json.loads(row[1])["_state"]) + _state = State(json.loads(row[1])) return { "partition_key": partition_key, "app_id": row[3], @@ -277,7 +277,7 @@ def save( status, ) cursor = self.connection.cursor() - json_state = json.dumps(state.__dict__) + json_state = json.dumps(state.get_all()) cursor.execute( f"INSERT INTO {self.table_name} (partition_key, app_id, sequence_id, position, state, status) " f"VALUES (?, ?, ?, ?, ?, ?)", diff --git a/burr/integrations/persisters/b_redis.py b/burr/integrations/persisters/b_redis.py index d643ff2b..0b03fbc0 100644 --- a/burr/integrations/persisters/b_redis.py +++ b/burr/integrations/persisters/b_redis.py @@ -60,7 +60,7 @@ def load( data = self.connection.hgetall(key) if not data: return None - _state = state.State(json.loads(data[b"state"].decode())["_state"]) + _state = state.State(json.loads(data[b"state"].decode())) return { "partition_key": partition_key, "app_id": app_id, @@ -100,7 +100,7 @@ def save( key = self.create_key(app_id, partition_key, sequence_id) if self.connection.exists(key): raise ValueError(f"partition_key:app_id:sequence_id[{key}] already exists.") - json_state = json.dumps(state.__dict__) + json_state = json.dumps(state.get_all()) self.connection.hset( key, mapping={ @@ -124,6 +124,6 @@ def __del__(self): persister = RedisPersister("localhost", 6379, 0) persister.initialize() - persister.save("pk", "app_id", 1, "pos", state.State({"a": 1, "b": 2}), "completed") + persister.save("pk", "app_id", 2, "pos", state.State({"a": 1, "b": 2}), "completed") print(persister.list_app_ids("pk")) print(persister.load("pk", "app_id")) diff --git a/burr/integrations/persisters/postgresql.py b/burr/integrations/persisters/postgresql.py index 40eac979..80ce0107 100644 --- a/burr/integrations/persisters/postgresql.py +++ b/burr/integrations/persisters/postgresql.py @@ -160,7 +160,7 @@ def load( row = cursor.fetchone() if row is None: return None - _state = state.State(row[1]["_state"]) + _state = state.State(row[1]) return { "partition_key": partition_key, "app_id": row[3], @@ -208,7 +208,7 @@ def save( status, ) cursor = self.connection.cursor() - json_state = json.dumps(state.__dict__) + json_state = json.dumps(state.get_all()) cursor.execute( f"INSERT INTO {self.table_name} (partition_key, app_id, sequence_id, position, state, status) " "VALUES (%s, %s, %s, %s, %s, %s)", diff --git a/docs/reference/persister.rst b/docs/reference/persister.rst index 0f0de180..3786847a 100644 --- a/docs/reference/persister.rst +++ b/docs/reference/persister.rst @@ -47,5 +47,11 @@ Currently we support the following, although we highly recommend you contribute .. automethod:: __init__ +.. autoclass:: burr.integrations.persisters.b_redis.RedisPersister + :members: + + .. automethod:: __init__ + + Note that the :py:class:`LocalTrackingClient ` leverages the :py:class:`BaseStateLoader ` to allow loading state, although it uses different mechanisms to save state (as it tracks more than just state). diff --git a/pyproject.toml b/pyproject.toml index 2cf125f6..246fbb70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,10 @@ postgresql = [ "psycopg2-binary" ] +redis = [ + "redis" +] + tests = [ "pytest", "pytest-asyncio",