Skip to content

Commit

Permalink
Create index concurrently (#2659)
Browse files Browse the repository at this point in the history
  • Loading branch information
hinthornw authored Dec 5, 2024
1 parent b7e441d commit 93e4c8c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 55 deletions.
11 changes: 11 additions & 0 deletions libs/checkpoint-postgres/langgraph/checkpoint/postgres/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);""",
"ALTER TABLE checkpoint_blobs ALTER COLUMN blob DROP not null;",
"""
""",
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoints_thread_id_idx ON checkpoints(thread_id);
""",
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoint_blobs_thread_id_idx ON checkpoint_blobs(thread_id);
""",
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoint_writes_thread_id_idx ON checkpoint_writes(thread_id);
""",
]

SELECT_SQL = f"""
Expand Down
28 changes: 12 additions & 16 deletions libs/checkpoint-postgres/langgraph/store/postgres/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import orjson
from psycopg import AsyncConnection, AsyncCursor, AsyncPipeline, Capabilities
from psycopg.errors import UndefinedTable
from psycopg.rows import DictRow, dict_row
from psycopg_pool import AsyncConnectionPool

Expand Down Expand Up @@ -219,22 +218,19 @@ async def setup(self) -> None:
"""

async def _get_version(cur: AsyncCursor[DictRow], table: str) -> int:
try:
await cur.execute(f"SELECT v FROM {table} ORDER BY v DESC LIMIT 1")
row = await cur.fetchone()
if row is None:
version = -1
else:
version = row["v"]
except UndefinedTable:
version = -1
await cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
v INTEGER PRIMARY KEY
)
"""
await cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
v INTEGER PRIMARY KEY
)
"""
)
await cur.execute(f"SELECT v FROM {table} ORDER BY v DESC LIMIT 1")
row = cast(dict, await cur.fetchone())
if row is None:
version = -1
else:
version = row["v"]
return version

async with self._cursor() as cur:
Expand Down
32 changes: 14 additions & 18 deletions libs/checkpoint-postgres/langgraph/store/postgres/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import orjson
from psycopg import Capabilities, Connection, Cursor, Pipeline
from psycopg.errors import UndefinedTable
from psycopg.rows import DictRow, dict_row
from psycopg.types.json import Jsonb
from psycopg_pool import ConnectionPool
Expand Down Expand Up @@ -73,7 +72,7 @@ class Migration(NamedTuple):
""",
"""
-- For faster lookups by prefix
CREATE INDEX IF NOT EXISTS store_prefix_idx ON store USING btree (prefix text_pattern_ops);
CREATE INDEX CONCURRENTLY IF NOT EXISTS store_prefix_idx ON store USING btree (prefix text_pattern_ops);
""",
]

Expand Down Expand Up @@ -107,7 +106,7 @@ class Migration(NamedTuple):
),
Migration(
"""
CREATE INDEX IF NOT EXISTS store_vectors_embedding_idx ON store_vectors
CREATE INDEX CONCURRENTLY IF NOT EXISTS store_vectors_embedding_idx ON store_vectors
USING %(index_type)s (embedding %(ops)s)%(index_params)s;
""",
condition=lambda store: bool(
Expand Down Expand Up @@ -847,22 +846,19 @@ def setup(self) -> None:
"""

def _get_version(cur: Cursor[dict[str, Any]], table: str) -> int:
try:
cur.execute(f"SELECT v FROM {table} ORDER BY v DESC LIMIT 1")
row = cast(dict, cur.fetchone())
if row is None:
version = -1
else:
version = row["v"]
except UndefinedTable:
version = -1
cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
v INTEGER PRIMARY KEY
)
"""
cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
v INTEGER PRIMARY KEY
)
"""
)
cur.execute(f"SELECT v FROM {table} ORDER BY v DESC LIMIT 1")
row = cast(dict, cur.fetchone())
if row is None:
version = -1
else:
version = row["v"]
return version

with self._cursor() as cur:
Expand Down
27 changes: 13 additions & 14 deletions libs/checkpoint-postgres/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libs/checkpoint-postgres/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langgraph-checkpoint-postgres"
version = "2.0.7"
version = "2.0.8"
description = "Library with a Postgres implementation of LangGraph checkpoint saver."
authors = []
license = "MIT"
Expand Down
5 changes: 2 additions & 3 deletions libs/checkpoint-postgres/tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ async def _pipe_saver():
prepare_threshold=0,
row_factory=dict_row,
) as conn:
async with conn.pipeline() as pipe:
checkpointer = AsyncPostgresSaver(conn, pipe=pipe)
await checkpointer.setup()
checkpointer = AsyncPostgresSaver(conn)
await checkpointer.setup()
async with conn.pipeline() as pipe:
checkpointer = AsyncPostgresSaver(conn, pipe=pipe)
yield checkpointer
Expand Down
5 changes: 2 additions & 3 deletions libs/checkpoint-postgres/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ def _pipe_saver():
prepare_threshold=0,
row_factory=dict_row,
) as conn:
with conn.pipeline() as pipe:
checkpointer = PostgresSaver(conn, pipe=pipe)
checkpointer.setup()
checkpointer = PostgresSaver(conn)
checkpointer.setup()
with conn.pipeline() as pipe:
checkpointer = PostgresSaver(conn, pipe=pipe)
yield checkpointer
Expand Down

0 comments on commit 93e4c8c

Please sign in to comment.