Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing #50

Merged
merged 23 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0725a5b
Adds spans within actions for tracing/visibility
elijahbenizzy Mar 2, 2024
f2a8e55
Fixes up tracking -- adds serialization filter + __init__.py file
elijahbenizzy Mar 2, 2024
9756adc
Adds MANIFEST.in required for publishing
elijahbenizzy Mar 2, 2024
320f8da
Fixes up bug in which single step async actions didn't work
elijahbenizzy Mar 2, 2024
ecfdb37
Fixes sequence ID to work in all cases
elijahbenizzy Mar 2, 2024
4f4542c
Adds documentation for tracing
elijahbenizzy Mar 2, 2024
154477e
Fixes tracker to log spans
elijahbenizzy Mar 3, 2024
8639edc
Ensures application does not wipe internal state (__sequence_id for e…
elijahbenizzy Mar 4, 2024
42e3543
Adds integration with tracing for local backend
elijahbenizzy Mar 4, 2024
36faf83
Integrates tracing with frontend
elijahbenizzy Mar 4, 2024
a80cb5a
Adds minimization so we can view the state/data with just a small ver…
elijahbenizzy Mar 4, 2024
4b40133
Adds icons to minimized table sections
elijahbenizzy Mar 4, 2024
6726375
Adds chips for demo to clarify
elijahbenizzy Mar 4, 2024
28b5897
Adds better error message for log_artifact
elijahbenizzy Mar 4, 2024
a04e19b
PR updates
elijahbenizzy Mar 4, 2024
a0d982a
Adds demo for tracing feature
elijahbenizzy Mar 5, 2024
69f45a1
Updates the demo data to work with the new tracing-focused data model
elijahbenizzy Mar 5, 2024
dd03737
Adds minor waterfall chart for within spans
elijahbenizzy Mar 5, 2024
79ea2eb
Fixes bug in which displayed state was incorrect
elijahbenizzy Mar 5, 2024
6ab8338
Adds a gap between the project name and the chip for status (demo/test)
elijahbenizzy Mar 5, 2024
af29e69
Fixes action indexing
elijahbenizzy Mar 5, 2024
5437bd8
Disables expand button for step list that doesn't have span
elijahbenizzy Mar 5, 2024
8de0304
Changes from PR
elijahbenizzy Mar 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
recursive-include burr/tracking/server/demo_data *
recursive-include burr/tracking/server/build *
15 changes: 10 additions & 5 deletions burr/cli/demo_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from examples.conversational_rag import application as conversational_rag_application
from examples.counter import application as counter_application
from examples.gpt import application as chatbot_application
from examples.gpt import application_with_traces as chatbot_application_with_traces


class ProgressHook(
Expand All @@ -27,7 +28,7 @@ def post_run_step(
print(f">>> Action {action.name} completed")


def generate_chatbot_data(data_dir: str):
def generate_chatbot_data(data_dir: str, use_traces: bool):
working_conversations = {
"chat-1-giraffe": [
"Please draw a giraffe.", # Answered by the image mode
Expand All @@ -54,7 +55,7 @@ def generate_chatbot_data(data_dir: str):
"What is the meaning of life?", # answered by the question mode (ish)
],
"chat-5-jokes": [
"Please draw a picture of a good joke joke", # answered by the image mode
"Please draw a picture of a good joke", # answered by the image mode
"Please write code for an interactive knock-knock joke", # answered by the code mode
"What is a good joke?", # answered by the question mode
"The chicken crossed the road because it was a free-range chicken", # answered by nothing
Expand All @@ -63,11 +64,11 @@ def generate_chatbot_data(data_dir: str):
broken_conversations = {"chat-6-demonstrate-errors": working_conversations["chat-1-giraffe"]}

def _run_conversation(app_id, prompts):
app = chatbot_application.application(
use_hamilton=False,
app = (chatbot_application_with_traces if use_traces else chatbot_application).application(
app_id=app_id,
storage_dir=data_dir,
hooks=[ProgressHook()],
**({"use_hamilton": False} if not use_traces else {}),
)
for prompt in prompts:
app.run(halt_after=["response"], inputs={"prompt": prompt})
Expand Down Expand Up @@ -136,6 +137,10 @@ def generate_rag_data(data_dir: str = "~/.burr"):


def generate_all(data_dir: str):
generate_chatbot_data(data_dir)
generate_chatbot_data(data_dir, False)
generate_chatbot_data(data_dir, True)
generate_counter_data(data_dir)
generate_rag_data(data_dir)


#
3 changes: 3 additions & 0 deletions burr/core/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ def with_params(self, **kwargs: Any) -> "FunctionBasedAction":
def run_and_update(self, state: State, **run_kwargs) -> Tuple[dict, State]:
return self._fn(state, **self._bound_params, **run_kwargs)

def is_async(self) -> bool:
return inspect.iscoroutinefunction(self._fn)


def _validate_action_function(fn: Callable):
"""Validates that an action has the signature: (state: State) -> Tuple[dict, State]
Expand Down
104 changes: 92 additions & 12 deletions burr/core/application.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import dataclasses
import functools
import logging
import pprint
from typing import (
Expand All @@ -16,6 +17,7 @@
Union,
)

from burr import visibility
from burr.core.action import (
Action,
Condition,
Expand Down Expand Up @@ -100,7 +102,10 @@ def _state_update(state_to_modify: State, modified_state: State) -> State:
old_state_keys = set(state_to_modify.keys())
new_state_keys = set(modified_state.keys())
deleted_keys = list(old_state_keys - new_state_keys)
return state_to_modify.merge(modified_state).wipe(delete=deleted_keys)
# TODO -- unify the logic of choosing whether a key is internal or not
# Right now this is __sequence_id and __prior_step, but it could be more
deleted_keys_filtered = [item for item in deleted_keys if not item.startswith("__")]
return state_to_modify.merge(modified_state).wipe(delete=deleted_keys_filtered)


def _run_reducer(reducer: Reducer, state: State, result: dict, name: str) -> State:
Expand Down Expand Up @@ -221,21 +226,32 @@ def __init__(
self._adapter_set.call_all_lifecycle_hooks_sync(
"post_application_create", state=self._state, application_graph=self._graph
)
# TODO -- consider adding global inputs + global input factories to the builder
self.dependency_factory = {
"__tracer": functools.partial(
visibility.tracing.TracerFactory, lifecycle_adapters=self._adapter_set
)
}

def step(self, inputs: Optional[Dict[str, Any]] = None) -> Optional[Tuple[Action, dict, State]]:
"""Performs a single step, advancing the state machine along.
This returns a tuple of the action that was run, the result of running
the action, and the new state.

Use this if you just want to do something with the state and not rely on generators.
E.G. press forward/backwards, hnuman in the loop, etc... Odds are this is not
E.G. press forward/backwards, human in the loop, etc... Odds are this is not
the method you want -- you'll want iterate() (if you want to see the state/
results along the way), or run() (if you just want the final state/results).

:param inputs: Inputs to the action -- this is if this action requires an input that is passed in from the outside world :param __run_hooks:
:param inputs: Inputs to the action -- this is if this action requires an input that is passed in from the outside world
:return: Tuple[Function, dict, State] -- the function that was just ran, the result of running it, and the new state
"""
return self._step(inputs=inputs, _run_hooks=True)

try:
out = self._step(inputs=inputs, _run_hooks=True)
return out
finally:
self._increment_sequence_id()

def _step(
self, inputs: Optional[Dict[str, Any]] = None, _run_hooks: bool = True
Expand All @@ -247,9 +263,14 @@ def _step(
return None
if inputs is None:
inputs = {}
inputs = self._process_inputs(inputs, next_action)
if _run_hooks:
self._adapter_set.call_all_lifecycle_hooks_sync(
"pre_run_step", action=next_action, state=self._state, inputs=inputs
"pre_run_step",
action=next_action,
state=self._state,
inputs=inputs,
sequence_id=self.sequence_id,
)
exc = None
result = None
Expand All @@ -274,6 +295,7 @@ def _step(
action=next_action,
state=new_state,
result=result,
sequence_id=self.sequence_id,
exception=exc,
)
return next_action, result, new_state
Expand All @@ -283,12 +305,37 @@ def update_internal_state_value(self, new_state: State, next_action: Action) ->
new_state = new_state.update(
**{
PRIOR_STEP: next_action.name,
# make it a string for future proofing
SEQUENCE_ID: str(int(self._state.get(SEQUENCE_ID, 0)) + 1),
}
)
return new_state

def _process_inputs(self, inputs: Dict[str, Any], action: Action) -> Dict[str, Any]:
inputs = inputs.copy()
processed_inputs = {}
for key in list(inputs.keys()):
if key in action.inputs:
processed_inputs[key] = inputs.pop(key)
if len(inputs) > 0:
raise ValueError(
f"Keys {inputs.keys()} were passed in as inputs to action "
f"{action.name}, but not declared by the action as an input! "
f"Action needs: {action.inputs}"
)
missing_inputs = set(action.inputs) - set(processed_inputs.keys())
for required_input in list(missing_inputs):
# if we can find it in the dependency factory, we'll use that
if required_input in self.dependency_factory:
processed_inputs[required_input] = self.dependency_factory[required_input](
action, self.sequence_id
)
missing_inputs.remove(required_input)
if len(missing_inputs) > 0:
raise ValueError(
f"Action {action.name} is missing required inputs: {missing_inputs}. "
f"Has inputs: {processed_inputs}"
)
return processed_inputs

async def astep(self, inputs: Dict[str, Any] = None) -> Optional[Tuple[Action, dict, State]]:
"""Asynchronous version of step.

Expand All @@ -298,12 +345,17 @@ async def astep(self, inputs: Dict[str, Any] = None) -> Optional[Tuple[Action, d
:return: Tuple[Function, dict, State] -- the action that was just ran, the result of running it, and the new state
"""
next_action = self.get_next_action()
if inputs is None:
inputs = {}
if next_action is None:
return None
if inputs is None:
inputs = {}
inputs = self._process_inputs(inputs, next_action)
await self._adapter_set.call_all_lifecycle_hooks_sync_and_async(
"pre_run_step", action=next_action, state=self._state, inputs=inputs
"pre_run_step",
action=next_action,
state=self._state,
inputs=inputs,
sequence_id=self.sequence_id,
)
exc = None
result = None
Expand All @@ -326,15 +378,23 @@ async def astep(self, inputs: Dict[str, Any] = None) -> Optional[Tuple[Action, d
result = await _arun_function(next_action, self._state, inputs=inputs)
new_state = _run_reducer(next_action, self._state, result, next_action.name)
new_state = self.update_internal_state_value(new_state, next_action)
self._set_state(new_state)
except Exception as e:
exc = e
logger.exception(_format_error_message(next_action, self._state, inputs))
raise e
finally:
await self._adapter_set.call_all_lifecycle_hooks_sync_and_async(
"post_run_step", action=next_action, state=new_state, result=result, exception=exc
"post_run_step",
action=next_action,
state=new_state,
result=result,
sequence_id=self.sequence_id,
exception=exc,
)
self._set_state(new_state)
# we want to increment regardless of failure
self._increment_sequence_id()

return next_action, result, new_state

def _clean_iterate_params(
Expand Down Expand Up @@ -515,6 +575,9 @@ async def arun(
"""
prior_action = None
result = None
halt_before, halt_after, inputs = self._clean_iterate_params(
halt_before, halt_after, inputs
)
async for prior_action, result, state in self.aiterate(
halt_before=halt_before, halt_after=halt_after, inputs=inputs
):
Expand Down Expand Up @@ -573,6 +636,9 @@ def visualize(
)
digraph.node(action.name, label=label, shape="box", style="rounded")
for input_ in action.inputs:
if input_.startswith("__"):
# These are internally injected by the framework
continue
input_name = f"input__{input_}"
digraph.node(input_name, shape="oval", style="dashed", label=f"input: {input_}")
digraph.edge(input_name, action.name)
Expand Down Expand Up @@ -647,6 +713,20 @@ def graph(self) -> ApplicationGraph:
"""
return self._graph

@property
def sequence_id(self) -> Optional[int]:
"""gives the sequence ID of the current (next) action.
This is incremented after every step is taken -- meaning that incremeneting
it is the very last action that is done. Any logging, etc... will use the current
step's sequence ID

:return:
"""
return self._state.get(SEQUENCE_ID, 0)

def _increment_sequence_id(self):
self._state = self._state.update(**{SEQUENCE_ID: self.sequence_id + 1})


def _assert_set(value: Optional[Any], field: str, method: str):
if value is None:
Expand Down
Loading
Loading