Skip to content

Commit

Permalink
Merge pull request #1797 from langchain-ai/nc/22sep/perf-intern-strings
Browse files Browse the repository at this point in the history
perf: Intern string constants
  • Loading branch information
nfcampos authored Sep 23, 2024
2 parents d5a1152 + 47fe321 commit 0b0b6e1
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 162 deletions.
60 changes: 32 additions & 28 deletions libs/langgraph/langgraph/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
from types import MappingProxyType
from typing import Any, Mapping
from typing import Any, Literal, Mapping, cast

from langgraph.types import Interrupt, Send # noqa: F401

Expand All @@ -11,67 +12,69 @@
EMPTY_SEQ: tuple[str, ...] = tuple()

# --- Public constants ---
TAG_HIDDEN = "langsmith:hidden"
TAG_HIDDEN = sys.intern("langsmith:hidden")
# tag to hide a node/edge from certain tracing/streaming environments
START = "__start__"
START = sys.intern("__start__")
# the first (maybe virtual) node in graph-style Pregel
END = "__end__"
END = sys.intern("__end__")
# the last (maybe virtual) node in graph-style Pregel

# --- Reserved write keys ---
INPUT = "__input__"
INPUT = sys.intern("__input__")
# for values passed as input to the graph
INTERRUPT = "__interrupt__"
INTERRUPT = sys.intern("__interrupt__")
# for dynamic interrupts raised by nodes
ERROR = "__error__"
ERROR = sys.intern("__error__")
# for errors raised by nodes
NO_WRITES = "__no_writes__"
NO_WRITES = sys.intern("__no_writes__")
# marker to signal node didn't write anything
SCHEDULED = "__scheduled__"
SCHEDULED = sys.intern("__scheduled__")
# marker to signal node was scheduled (in distributed mode)
TASKS = "__pregel_tasks"
TASKS = sys.intern("__pregel_tasks")
# for Send objects returned by nodes/edges, corresponds to PUSH below

# --- Reserved config.configurable keys ---
CONFIG_KEY_SEND = "__pregel_send"
CONFIG_KEY_SEND = sys.intern("__pregel_send")
# holds the `write` function that accepts writes to state/edges/reserved keys
CONFIG_KEY_READ = "__pregel_read"
CONFIG_KEY_READ = sys.intern("__pregel_read")
# holds the `read` function that returns a copy of the current state
CONFIG_KEY_CHECKPOINTER = "__pregel_checkpointer"
CONFIG_KEY_CHECKPOINTER = sys.intern("__pregel_checkpointer")
# holds a `BaseCheckpointSaver` passed from parent graph to child graphs
CONFIG_KEY_STREAM = "__pregel_stream"
CONFIG_KEY_STREAM = sys.intern("__pregel_stream")
# holds a `StreamProtocol` passed from parent graph to child graphs
CONFIG_KEY_STREAM_WRITER = "__pregel_stream_writer"
CONFIG_KEY_STREAM_WRITER = sys.intern("__pregel_stream_writer")
# holds a `StreamWriter` for stream_mode=custom
CONFIG_KEY_STORE = "__pregel_store"
CONFIG_KEY_STORE = sys.intern("__pregel_store")
# holds a `BaseStore` made available to managed values
CONFIG_KEY_RESUMING = "__pregel_resuming"
CONFIG_KEY_RESUMING = sys.intern("__pregel_resuming")
# holds a boolean indicating if subgraphs should resume from a previous checkpoint
CONFIG_KEY_TASK_ID = "__pregel_task_id"
CONFIG_KEY_TASK_ID = sys.intern("__pregel_task_id")
# holds the task ID for the current task
CONFIG_KEY_DEDUPE_TASKS = "__pregel_dedupe_tasks"
CONFIG_KEY_DEDUPE_TASKS = sys.intern("__pregel_dedupe_tasks")
# holds a boolean indicating if tasks should be deduplicated (for distributed mode)
CONFIG_KEY_ENSURE_LATEST = "__pregel_ensure_latest"
CONFIG_KEY_ENSURE_LATEST = sys.intern("__pregel_ensure_latest")
# holds a boolean indicating whether to assert the requested checkpoint is the latest
# (for distributed mode)
CONFIG_KEY_DELEGATE = "__pregel_delegate"
CONFIG_KEY_DELEGATE = sys.intern("__pregel_delegate")
# holds a boolean indicating whether to delegate subgraphs (for distributed mode)
CONFIG_KEY_CHECKPOINT_MAP = "checkpoint_map"
CONFIG_KEY_CHECKPOINT_MAP = sys.intern("checkpoint_map")
# holds a mapping of checkpoint_ns -> checkpoint_id for parent graphs
CONFIG_KEY_CHECKPOINT_ID = "checkpoint_id"
CONFIG_KEY_CHECKPOINT_ID = sys.intern("checkpoint_id")
# holds the current checkpoint_id, if any
CONFIG_KEY_CHECKPOINT_NS = "checkpoint_ns"
CONFIG_KEY_CHECKPOINT_NS = sys.intern("checkpoint_ns")
# holds the current checkpoint_ns, "" for root graph

# --- Other constants ---
PUSH = "__pregel_push"
PUSH = sys.intern("__pregel_push")
# denotes push-style tasks, ie. those created by Send objects
PULL = "__pregel_pull"
PULL = sys.intern("__pregel_pull")
# denotes pull-style tasks, ie. those triggered by edges
NS_SEP = "|"
NS_SEP = sys.intern("|")
# for checkpoint_ns, separates each level (ie. graph|subgraph|subsubgraph)
NS_END = ":"
NS_END = sys.intern(":")
# for checkpoint_ns, for each level, separates the namespace from the task_id
CONF = cast(Literal["configurable"], sys.intern("configurable"))
# key for the configurable dict in RunnableConfig

RESERVED = {
TAG_HIDDEN,
Expand Down Expand Up @@ -103,4 +106,5 @@
PULL,
NS_SEP,
NS_END,
CONF,
}
6 changes: 3 additions & 3 deletions libs/langgraph/langgraph/managed/shared_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from langchain_core.runnables import RunnableConfig
from typing_extensions import NotRequired, Required, Self

from langgraph.constants import CONFIG_KEY_STORE
from langgraph.constants import CONF, CONFIG_KEY_STORE
from langgraph.errors import InvalidUpdateError
from langgraph.managed.base import (
ChannelKeyPlaceholder,
Expand Down Expand Up @@ -83,10 +83,10 @@ def __init__(
raise ValueError("SharedValue must be a dict")
self.scope = scope
self.value: Value = {}
self.store = cast(BaseStore, config["configurable"].get(CONFIG_KEY_STORE))
self.store = cast(BaseStore, config[CONF].get(CONFIG_KEY_STORE))
if self.store is None:
pass
elif scope_value := config["configurable"].get(self.scope):
elif scope_value := config[CONF].get(self.scope):
self.ns = f"scoped:{scope}:{key}:{scope_value}"
else:
raise ValueError(
Expand Down
Loading

0 comments on commit 0b0b6e1

Please sign in to comment.