Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create index concurrently #2659

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions libs/checkpoint-postgres/langgraph/checkpoint/postgres/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import random

Check notice on line 1 in libs/checkpoint-postgres/langgraph/checkpoint/postgres/base.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 61.7 ms +- 1.4 ms ......................................... fanout_to_subgraph_10x_sync: Mean +- std dev: 52.3 ms +- 1.0 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (10.4 ms) is 11% of the mean (95.1 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 95.1 ms +- 10.4 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 95.7 ms +- 2.1 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 648 ms +- 30 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 513 ms +- 6 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 1.05 sec +- 0.06 sec ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 962 ms +- 25 ms ......................................... react_agent_10x: Mean +- std dev: 31.8 ms +- 0.8 ms ......................................... react_agent_10x_sync: Mean +- std dev: 23.1 ms +- 0.3 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 48.7 ms +- 1.1 ms ......................................... react_agent_10x_checkpoint_sync: Mean +- std dev: 37.1 ms +- 0.8 ms ......................................... react_agent_100x: Mean +- std dev: 346 ms +- 7 ms ......................................... react_agent_100x_sync: Mean +- std dev: 275 ms +- 4 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 973 ms +- 11 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 878 ms +- 9 ms ......................................... wide_state_25x300: Mean +- std dev: 24.5 ms +- 0.4 ms ......................................... wide_state_25x300_sync: Mean +- std dev: 15.7 ms +- 0.2 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 282 ms +- 5 ms ......................................... wide_state_25x300_checkpoint_sync: Mean +- std dev: 270 ms +- 4 ms ......................................... wide_state_15x600: Mean +- std dev: 28.8 ms +- 0.5 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 18.3 ms +- 0.2 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 490 ms +- 7 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 470 ms +- 8 ms ......................................... wide_state_9x1200: Mean +- std dev: 28.8 ms +- 0.6 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 18.3 ms +- 0.2 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 319 ms +- 4 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 303 ms +- 4 ms

Check notice on line 1 in libs/checkpoint-postgres/langgraph/checkpoint/postgres/base.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------+---------+------------------------+ | Benchmark | main | changes | +=========================================+=========+========================+ | wide_state_25x300_sync | 15.6 ms | 15.7 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_9x1200_sync | 18.2 ms | 18.3 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | react_agent_100x_sync | 273 ms | 275 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_15x600_checkpoint_sync | 467 ms | 470 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_25x300_checkpoint_sync | 268 ms | 270 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_100x_sync | 509 ms | 513 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_25x300 | 24.3 ms | 24.5 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_25x300_checkpoint | 279 ms | 282 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_9x1200_checkpoint | 315 ms | 319 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_9x1200_checkpoint_sync | 300 ms | 303 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | react_agent_10x | 31.4 ms | 31.8 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_100x_checkpoint_sync | 949 ms | 962 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | react_agent_10x_sync | 22.8 ms | 23.1 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_15x600 | 28.3 ms | 28.8 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_15x600_checkpoint | 483 ms | 490 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | react_agent_10x_checkpoint | 47.9 ms | 48.7 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | wide_state_15x600_sync | 18.0 ms | 18.3 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | react_agent_100x_checkpoint | 951 ms | 973 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_100x | 627 ms | 648 ms: 1.03x slower | +-----------------------------------------+---------+------------------------+ | react_agent_100x_checkpoint_sync | 848 ms | 878 ms: 1.04x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_100x_checkpoint | 989 ms | 1.05 sec: 1.06x slower | +-----------------------------------------+---------+------------------------+ | Geometric mean | (ref) | 1.01x slower | +-----------------------------------------+---------+------------------------+ Benchmark hidden because not significant (7): fanout_to_subgraph_10x, react_agent_100x, react_agent_10x_checkpoint_sync, fanout_to_subgraph_10x_sync, wide_state_9x1200, fanout_to_subgraph_10x_checkpoint_sync, fanout_to_subgraph_10x_checkpoint
from collections.abc import Sequence
from typing import Any, Optional, cast

Expand Down Expand Up @@ -57,6 +57,17 @@
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);""",
"ALTER TABLE checkpoint_blobs ALTER COLUMN blob DROP not null;",
"""
""",
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoints_thread_id_idx ON checkpoints(thread_id);
""",
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoint_blobs_thread_id_idx ON checkpoint_blobs(thread_id);
""",
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS checkpoint_writes_thread_id_idx ON checkpoint_writes(thread_id);
""",
]

SELECT_SQL = f"""
Expand Down
28 changes: 12 additions & 16 deletions libs/checkpoint-postgres/langgraph/store/postgres/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import orjson
from psycopg import AsyncConnection, AsyncCursor, AsyncPipeline, Capabilities
from psycopg.errors import UndefinedTable
from psycopg.rows import DictRow, dict_row
from psycopg_pool import AsyncConnectionPool

Expand Down Expand Up @@ -219,22 +218,19 @@ async def setup(self) -> None:
"""

async def _get_version(cur: AsyncCursor[DictRow], table: str) -> int:
try:
await cur.execute(f"SELECT v FROM {table} ORDER BY v DESC LIMIT 1")
row = await cur.fetchone()
if row is None:
version = -1
else:
version = row["v"]
except UndefinedTable:
version = -1
await cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
v INTEGER PRIMARY KEY
)
"""
await cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
v INTEGER PRIMARY KEY
)
"""
)
await cur.execute(f"SELECT v FROM {table} ORDER BY v DESC LIMIT 1")
row = cast(dict, await cur.fetchone())
if row is None:
version = -1
else:
version = row["v"]
return version

async with self._cursor() as cur:
Expand Down
32 changes: 14 additions & 18 deletions libs/checkpoint-postgres/langgraph/store/postgres/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import orjson
from psycopg import Capabilities, Connection, Cursor, Pipeline
from psycopg.errors import UndefinedTable
from psycopg.rows import DictRow, dict_row
from psycopg.types.json import Jsonb
from psycopg_pool import ConnectionPool
Expand Down Expand Up @@ -73,7 +72,7 @@ class Migration(NamedTuple):
""",
"""
-- For faster lookups by prefix
CREATE INDEX IF NOT EXISTS store_prefix_idx ON store USING btree (prefix text_pattern_ops);
CREATE INDEX CONCURRENTLY IF NOT EXISTS store_prefix_idx ON store USING btree (prefix text_pattern_ops);
""",
]

Expand Down Expand Up @@ -107,7 +106,7 @@ class Migration(NamedTuple):
),
Migration(
"""
CREATE INDEX IF NOT EXISTS store_vectors_embedding_idx ON store_vectors
CREATE INDEX CONCURRENTLY IF NOT EXISTS store_vectors_embedding_idx ON store_vectors
USING %(index_type)s (embedding %(ops)s)%(index_params)s;
""",
condition=lambda store: bool(
Expand Down Expand Up @@ -847,22 +846,19 @@ def setup(self) -> None:
"""

def _get_version(cur: Cursor[dict[str, Any]], table: str) -> int:
try:
cur.execute(f"SELECT v FROM {table} ORDER BY v DESC LIMIT 1")
row = cast(dict, cur.fetchone())
if row is None:
version = -1
else:
version = row["v"]
except UndefinedTable:
version = -1
cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
v INTEGER PRIMARY KEY
)
"""
cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
v INTEGER PRIMARY KEY
)
"""
)
cur.execute(f"SELECT v FROM {table} ORDER BY v DESC LIMIT 1")
row = cast(dict, cur.fetchone())
if row is None:
version = -1
else:
version = row["v"]
return version

with self._cursor() as cur:
Expand Down
27 changes: 13 additions & 14 deletions libs/checkpoint-postgres/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libs/checkpoint-postgres/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langgraph-checkpoint-postgres"
version = "2.0.7"
version = "2.0.8"
description = "Library with a Postgres implementation of LangGraph checkpoint saver."
authors = []
license = "MIT"
Expand Down
5 changes: 2 additions & 3 deletions libs/checkpoint-postgres/tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ async def _pipe_saver():
prepare_threshold=0,
row_factory=dict_row,
) as conn:
async with conn.pipeline() as pipe:
checkpointer = AsyncPostgresSaver(conn, pipe=pipe)
await checkpointer.setup()
checkpointer = AsyncPostgresSaver(conn)
await checkpointer.setup()
async with conn.pipeline() as pipe:
checkpointer = AsyncPostgresSaver(conn, pipe=pipe)
yield checkpointer
Expand Down
5 changes: 2 additions & 3 deletions libs/checkpoint-postgres/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ def _pipe_saver():
prepare_threshold=0,
row_factory=dict_row,
) as conn:
with conn.pipeline() as pipe:
checkpointer = PostgresSaver(conn, pipe=pipe)
checkpointer.setup()
checkpointer = PostgresSaver(conn)
checkpointer.setup()
with conn.pipeline() as pipe:
checkpointer = PostgresSaver(conn, pipe=pipe)
yield checkpointer
Expand Down
Loading