Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solves #31 -- enables deletion operation on state #34

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion burr/core/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Reducer(abc.ABC):
@property
@abc.abstractmethod
def writes(self) -> list[str]:
"""Returns the keys from the state that this reducer writes
"""Returns the keys from the state that this reducer writes.

:return: A list of keys
"""
Expand Down
54 changes: 39 additions & 15 deletions burr/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ async def _arun_function(function: Function, state: State, inputs: Dict[str, Any
return await function.run(state_to_use, **inputs)


def _state_update(state_to_modify: State, modified_state: State) -> State:
"""This is a hack to apply state updates and ensure that we are respecting deletions. Specifically, the process is:

1. We subset the state to what we want to read
2. We perform a set of state-specific writes to it
3. We measure which ones were deleted
4. We then merge the whole state back in
5. We then delete the keys that were deleted

This is suboptimal -- we should not be observing the state, we should be using the state commands and layering in deltas.
That said, we currently eagerly evaluate the state at all operations, which means we have to do it this way. See
https://github.com/DAGWorks-Inc/burr/issues/33 for a more details plan.

This function was written to solve this issue: https://github.com/DAGWorks-Inc/burr/issues/28.


:param state_subset_pre_update: The subset of state passed to the update() function
:param modified_state: The subset of state realized after the update() function
:param state_to_modify: The state to modify-- this is the original
:return:
"""
old_state_keys = set(state_to_modify.keys())
new_state_keys = set(modified_state.keys())
deleted_keys = list(old_state_keys - new_state_keys)
return state_to_modify.merge(modified_state).wipe(delete=deleted_keys)


def _run_reducer(reducer: Reducer, state: State, result: dict, name: str) -> State:
"""Runs the reducer, returning the new state. Note this restricts the
keys in the state to only those that the function writes.
Expand All @@ -84,17 +111,17 @@ def _run_reducer(reducer: Reducer, state: State, result: dict, name: str) -> Sta
:param result:
:return:
"""

state_to_use = state.subset(*reducer.writes)
new_state = reducer.update(result, state_to_use).subset(*reducer.writes)
# TODO -- better guarding on state reads/writes
new_state = reducer.update(result, state)
keys_in_new_state = set(new_state.keys())
extra_keys = keys_in_new_state - set(reducer.writes)
if extra_keys:
new_keys = keys_in_new_state - set(state.keys())
extra_keys = new_keys - set(reducer.writes)
if len(extra_keys) > 0:
raise ValueError(
f"Action {name} attempted to write to keys {extra_keys} "
f"that it did not declare. It declared: ({reducer.writes})!"
)
return state.merge(new_state)
return _state_update(state, new_state)


def _create_dict_string(kwargs: dict) -> str:
Expand Down Expand Up @@ -142,24 +169,21 @@ def _run_single_step_action(
:param inputs: Inputs to pass directly to the action
:return: The result of running the action, and the new state
"""
state_to_use = state.subset(
*action.reads, *action.writes
) # TODO -- specify some as required and some as not
# TODO -- guard all reads/writes with a subset of the state
action.validate_inputs(inputs)
result, new_state = action.run_and_update(state_to_use, **inputs)
return result, state.merge(new_state.subset(*action.writes)) # we just want the writes action
result, new_state = action.run_and_update(state, **inputs)
out = result, _state_update(state, new_state)
return out


async def _arun_single_step_action(
action: SingleStepAction, state: State, inputs: Optional[Dict[str, Any]]
) -> Tuple[dict, State]:
"""Runs a single step action in async. See the synchronous version for more details."""
state_to_use = state.subset(
*action.reads, *action.writes
) # TODO -- specify some as required and some as not
state_to_use = state
action.validate_inputs(inputs)
result, new_state = await action.run_and_update(state_to_use, **inputs)
return result, state.merge(new_state.subset(*action.writes)) # we just want the writes action
return result, _state_update(state, new_state)


@dataclasses.dataclass
Expand Down
2 changes: 2 additions & 0 deletions examples/gpt/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def base_application(hooks: List[LifecycleAdapter] = None):
("error", "prompt", default),
)
.with_hooks(*hooks)
.with_tracker("demo:chatbot")
.build()
)

Expand Down Expand Up @@ -250,6 +251,7 @@ def hamilton_application(hooks: List[LifecycleAdapter] = None):
("error", "prompt", default),
)
.with_hooks(*hooks)
.with_tracker("demo:chatbot")
.build()
)
return application
Expand Down
41 changes: 36 additions & 5 deletions telemetry/ui/src/components/routes/AppList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,60 @@ import { ApplicationSummary, DefaultService } from '../../api';
import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from '../common/table';
import { DateTimeDisplay } from '../common/dates';
import { Button } from '../common/button';
import { useState } from 'react';
import { FunnelIcon } from '@heroicons/react/24/outline';

const StepCountHeader = (props: {
displayZeroCount: boolean;
setDisplayZeroCount: (displayZeroCount: boolean) => void;
}) => {
const fillColor = props.displayZeroCount ? 'fill-gray-300' : 'fill-gray-700';
const borderColor = props.displayZeroCount ? 'text-gray-300' : 'text-gray-700';
return (
<div className="flex flex-row items-center gap-1">
<FunnelIcon
className={`h-5 w-5 hover:scale-125 cursor-pointer ${fillColor} ${borderColor}`}
onClick={() => {
props.setDisplayZeroCount(!props.displayZeroCount);
}}
/>
<span>Count</span>
</div>
);
};

/**
* List of applications. Purely rendering a list, also sorts them.
*/
export const AppListTable = (props: { apps: ApplicationSummary[]; projectId: string }) => {
const appsCopy = [...props.apps];
const appsSorted = appsCopy.sort((a, b) => {
return new Date(a.last_written) > new Date(b.last_written) ? -1 : 1;
});
const [displayZeroCount, setDisplayZeroCount] = useState(false);
const appsToDisplay = appsCopy
.sort((a, b) => {
return new Date(a.last_written) > new Date(b.last_written) ? -1 : 1;
})
.filter((app) => {
return app.num_steps > 0 || displayZeroCount;
});
return (
<Table>
<TableHead>
<TableRow>
<TableHeader>ID</TableHeader>
<TableHeader>First Seen</TableHeader>
<TableHeader>Last Run</TableHeader>
<TableHeader>Step Count</TableHeader>
<TableHeader>
<StepCountHeader
displayZeroCount={displayZeroCount}
setDisplayZeroCount={setDisplayZeroCount}
/>
</TableHeader>
<TableHeader></TableHeader>
<TableHeader></TableHeader>
</TableRow>
</TableHead>
<TableBody>
{appsSorted.map((app) => (
{appsToDisplay.map((app) => (
<TableRow key={app.app_id}>
<TableCell className="font-semibold text-gray-700">{app.app_id}</TableCell>
<TableCell>
Expand Down
2 changes: 1 addition & 1 deletion telemetry/ui/src/components/routes/ProjectList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export const ProjectListTable = (props: { projects: Project[] }) => {
<TableHeader>Name</TableHeader>
<TableHeader>Created</TableHeader>
<TableHeader>Last Run</TableHeader>
<TableHeader># of traces</TableHeader>
<TableHeader>App Runs</TableHeader>
<TableHeader>Path</TableHeader>
<TableHeader></TableHeader>
</TableRow>
Expand Down
61 changes: 61 additions & 0 deletions tests/core/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
_arun_single_step_action,
_assert_set,
_run_function,
_run_reducer,
_run_single_step_action,
_validate_actions,
_validate_start,
Expand Down Expand Up @@ -169,6 +170,34 @@ def test__run_function_cant_run_async():
_run_function(action, state, inputs={})


def test__run_reducer_modifies_state():
"""Tests that we can run a reducer and it behaves as expected"""
reducer = PassedInAction(
reads=["counter"],
writes=["counter"],
fn=...,
update_fn=lambda result, state: state.update(**result),
inputs=[],
)
state = State({"counter": 0})
state = _run_reducer(reducer, state, {"counter": 1}, "reducer")
assert state["counter"] == 1


def test__run_reducer_deletes_state():
"""Tests that we can run a reducer that deletes an item from state"""
reducer = PassedInAction(
reads=["counter"],
writes=[], # TODO -- figure out how we can better know that it deletes items...ß
fn=...,
update_fn=lambda result, state: state.wipe(delete=["counter"]),
inputs=[],
)
state = State({"counter": 0})
state = _run_reducer(reducer, state, {}, "deletion_reducer")
assert "counter" not in state


async def test__arun_function():
"""Tests that we can run an async function"""
action = base_counter_action_async
Expand Down Expand Up @@ -279,6 +308,38 @@ async def test__arun_single_step_action_with_inputs():
assert state.subset("count", "tracker").get_all() == {"count": 4, "tracker": [2, 4]}


class SingleStepActionWithDeletion(SingleStepAction):
def run_and_update(self, state: State, **run_kwargs) -> Tuple[dict, State]:
return {}, state.wipe(delete=["to_delete"])

@property
def reads(self) -> list[str]:
return ["to_delete"]

@property
def writes(self) -> list[str]:
return ["to_delete"]


def test__run_single_step_action_deletes_state():
action = SingleStepActionWithDeletion()
state = State({"to_delete": 0})
result, state = _run_single_step_action(action, state, inputs={})
assert "to_delete" not in state


class SingleStepActionWithDeletionAsync(SingleStepActionWithDeletion):
async def run_and_update(self, state: State, **run_kwargs) -> Tuple[dict, State]:
return {}, state.wipe(delete=["to_delete"])


async def test__arun_single_step_action_deletes_state():
action = SingleStepActionWithDeletionAsync()
state = State({"to_delete": 0})
result, state = await _arun_single_step_action(action, state, inputs={})
assert "to_delete" not in state


def test_app_step():
"""Tests that we can run a step in an app"""
counter_action = base_counter_action.with_name("counter")
Expand Down
Loading