Skip to content

Commit

Permalink
Changes state serialization to use get_all()
Browse files Browse the repository at this point in the history
This is technically backwards incompatible,
but we're so early, we can migrate people manually.

Otherwise adds Redis to docs.
  • Loading branch information
skrawcz committed Mar 21, 2024
1 parent d81aad1 commit 8039d98
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 7 deletions.
4 changes: 2 additions & 2 deletions burr/core/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def load(
row = cursor.fetchone()
if row is None:
return None
_state = State(json.loads(row[1])["_state"])
_state = State(json.loads(row[1]))
return {
"partition_key": partition_key,
"app_id": row[3],
Expand Down Expand Up @@ -277,7 +277,7 @@ def save(
status,
)
cursor = self.connection.cursor()
json_state = json.dumps(state.__dict__)
json_state = json.dumps(state.get_all())
cursor.execute(
f"INSERT INTO {self.table_name} (partition_key, app_id, sequence_id, position, state, status) "
f"VALUES (?, ?, ?, ?, ?, ?)",
Expand Down
6 changes: 3 additions & 3 deletions burr/integrations/persisters/b_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def load(
data = self.connection.hgetall(key)
if not data:
return None
_state = state.State(json.loads(data[b"state"].decode())["_state"])
_state = state.State(json.loads(data[b"state"].decode()))
return {
"partition_key": partition_key,
"app_id": app_id,
Expand Down Expand Up @@ -100,7 +100,7 @@ def save(
key = self.create_key(app_id, partition_key, sequence_id)
if self.connection.exists(key):
raise ValueError(f"partition_key:app_id:sequence_id[{key}] already exists.")
json_state = json.dumps(state.__dict__)
json_state = json.dumps(state.get_all())
self.connection.hset(
key,
mapping={
Expand All @@ -124,6 +124,6 @@ def __del__(self):
persister = RedisPersister("localhost", 6379, 0)

persister.initialize()
persister.save("pk", "app_id", 1, "pos", state.State({"a": 1, "b": 2}), "completed")
persister.save("pk", "app_id", 2, "pos", state.State({"a": 1, "b": 2}), "completed")
print(persister.list_app_ids("pk"))
print(persister.load("pk", "app_id"))
4 changes: 2 additions & 2 deletions burr/integrations/persisters/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def load(
row = cursor.fetchone()
if row is None:
return None
_state = state.State(row[1]["_state"])
_state = state.State(row[1])
return {
"partition_key": partition_key,
"app_id": row[3],
Expand Down Expand Up @@ -208,7 +208,7 @@ def save(
status,
)
cursor = self.connection.cursor()
json_state = json.dumps(state.__dict__)
json_state = json.dumps(state.get_all())
cursor.execute(
f"INSERT INTO {self.table_name} (partition_key, app_id, sequence_id, position, state, status) "
"VALUES (%s, %s, %s, %s, %s, %s)",
Expand Down
6 changes: 6 additions & 0 deletions docs/reference/persister.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,11 @@ Currently we support the following, although we highly recommend you contribute
.. automethod:: __init__


.. autoclass:: burr.integrations.persisters.b_redis.RedisPersister
:members:

.. automethod:: __init__


Note that the :py:class:`LocalTrackingClient <burr.tracking.client.LocalTrackingClient>` leverages the :py:class:`BaseStateLoader <burr.core.persistence.BaseStateLoader>` to allow loading state,
although it uses different mechanisms to save state (as it tracks more than just state).
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ postgresql = [
"psycopg2-binary"
]

redis = [
"redis"
]

tests = [
"pytest",
"pytest-asyncio",
Expand Down

0 comments on commit 8039d98

Please sign in to comment.