Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LLM sends a message before state transition completes #67

Closed
Atharv24 opened this issue Dec 19, 2024 · 11 comments
Closed

LLM sends a message before state transition completes #67

Atharv24 opened this issue Dec 19, 2024 · 11 comments
Labels
bug Something isn't working

Comments

@Atharv24
Copy link

Atharv24 commented Dec 19, 2024

Current Setup
Interview Bot with Dynamic flow handling
initial node (greet user) -> call start_interview function (only a print statement here) -> call transition function (picks question to be asked from flow manager state) -> create and set node to question_node (specifies the question to be asked in LLM context)

Expected Functionality

  • LLM greets the user
  • Calls the start interview tool
  • Tool runs successfully
  • Transition to new question node with question to be asked in context
  • LLM asks the question to user

Actual Behaviour

  • LLM greets the user
  • Calls the start interview tool
  • Tool runs successfully
  • Transition to new question node with question to be asked in context
  • LLM asks a question on its own. Checking the LLM context at this point, I can see the new node context is not there. When I say one more line, next context that is printed shows the expected context messages.

As per my understanding, LLM sends a messages before the new node context is updated. Let me know if any code is needed from my end. I use the same format as in the dynamic flow example.

UPDATE: I added a 1 second sleep after setting the node and before the transition function ends, it works fine now.

@Atharv24
Copy link
Author

class InterviewStartResult(FlowResult):
    status: str

class AnswerCollectionResult(FlowResult):
    answer: str

class QuestionType(StrEnum):
    SUBJECTIVE = "SUBJECTIVE"
    CODING = "CODING"
    SUBJECTIVE_PROBING = "SUBJECTIVE_PROBING"

def create_initial_node(bot_prompt: str) -> NodeConfig:
    """Create the initial greeting node."""
    return {
        "messages": [
            {
                "role": "system",
                "content": bot_prompt,
            }
        ],
        "functions": [
            {
                "type": "function",
                "function": {
                    "name": "start_interview",
                    "handler": handle_start_interview,
                    "description": "Start the interview after candidate confirms",
                    "parameters": {"type": "object", "properties": {}},
                },
            }
        ],
    }

def create_question_node(
    question: dict, 
) -> NodeConfig:
    """Create node for asking questions with appropriate prompts."""

    question_content = (
        "Ask the candidate the following question:\n"
        f"QUESTION : {question['question']}\n\n"
        "Instructions:\n"
        "Probe the candidate's answer to ensure they have provided a complete and accurate response, if the candidate's answer is incomplete or unclear. In such case, probe further to clarify their response. If the candidate's answer is complete and accurate, call collect_answer function. If the candidate wants to skip the question or is unable to answer, call collect_answer function with answer as '<<skipped>>'."
    )

    logger.info(
        "\n=== Creating New Question Node ===\n"
        f"Question Type: {question['type']}\n"
        f"Question: {question['question']}\n"
        "================================="
    )

    return {
        "messages": [
            {
                "role": "system",
                "content": question_content,
            }
        ],
        "functions": [
            {
                "type": "function",
                "function": {
                    "name": "move_to_next_question",
                    "handler": move_to_next_question,
                    "description": "Move to next question",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "answer": {"type": "string"}
                        },
                        "required": ["answer"],
                    },
                },
            }
        ],
    }

def create_end_node(ending_prompt: str) -> NodeConfig:
    """Create the final node."""
    return {
        "messages": [
            {
                "role": "system",
                "content": ending_prompt,
            }
        ],
        "functions": [],
        "post_actions": [{"type": "end_conversation"}],
    }

async def handle_start_interview(args: FlowArgs) -> InterviewStartResult:
    """Start the interview."""
    logger.debug("Processing start_interview transition")
    return {"status": "started"}

async def move_to_next_question(args: FlowArgs) -> AnswerCollectionResult:
    """Process answer collection."""
    answer = args["answer"]
    logger.debug(f"move_to_next_question handler executing with answer: {answer}")
    return {"answer": answer} 
    
async def handle_interview_transition(function_name: str, args: Dict, flow_manager: FlowManager):
    """Handle transitions between interview flow states."""
    logger.debug(f"Transition callback executing for function: {function_name} with args: {args}")

    if function_name == "start_interview":
        await handle_start_interview(flow_manager)
    elif function_name == "move_to_next_question":
        await handle_move_to_next_question(args, flow_manager)

async def handle_start_interview(flow_manager: FlowManager):
    """Start interview with first question."""
    question_bank = flow_manager.state["question_bank"]
    if not question_bank:
        await flow_manager.set_node("end", create_end_node(flow_manager.state["ending_prompt"]))
        return
    flow_manager.state["total_questions"] = len(question_bank)
    
    await ask_next_question(flow_manager)

async def handle_move_to_next_question(args: Dict, flow_manager: FlowManager):
    """Process answer collection."""
    answer = args["answer"]
    current_question = flow_manager.state["question_bank"][flow_manager.state["current_question_index"]]

    logger.info(
        "\n=== Processing Answer ===\n"
        f"Question Type: {current_question['type']}\n"
        f"Answer: {answer}\n"
        f"Current Question Index: {flow_manager.state['current_question_index']}\n"
        "======================="
    )

    flow_manager.state["current_question_index"] += 1
    await ask_next_question(flow_manager)

async def ask_next_question(flow_manager: FlowManager):
    """Helper function to ask the next question or end interview."""
    current_index = flow_manager.state["current_question_index"]
    question_bank = flow_manager.state["question_bank"]
    
    if current_index >= len(question_bank):
        await flow_manager.set_node("end", create_end_node(flow_manager.state["ending_prompt"]))
        return
    
    next_question = question_bank[current_index]
    
    await flow_manager.set_node(
        f"question_{current_index + 1}",
        create_question_node(
            next_question,
        )
    )
    await flow_manager.state["websocket"].send(json.dumps({
        "type": "question_asked",
        "question": next_question
    }))

@captaincaius
Copy link

captaincaius commented Jan 11, 2025

I noticed this too.

It seems to be a race condition. The problem is that FlowManager's _update_llm_context() just queues the update frame which puts it in the back of the line. Meanwhile, the state transition function call is behaving like a normal function call, so it pushes the FunctionCallResultFrame upstream, so the context aggregator pushes the context frame back downstream, which triggers the LLM prematurely before the update frame has had a chance to be processed.

@captaincaius
Copy link

I just noticed that FunctionCallResultFrame has a run_llm property, but it's not used in https://github.com/pipecat-ai/pipecat/blob/a8ae79831ef81223a55b565ea55c04b3ff26f1a6/src/pipecat/services/openai.py#L584 (and the equivalent line for anthropic)...

So to get this to work as intended (the context includes the function call and result and THEN the new system prompt), it would require adding a feature to LLMService in pipecat itself allowing registered functions to optionally bypass running the LLM. Most of what's needed is already there by way of run_llm.

@markbackman
Copy link
Contributor

markbackman commented Jan 11, 2025

Thanks for flagging this and apologies for the silence. Some of our team has been out on vacation still and I've been catching up on core Pipecat issues from the last few weeks.

I've noticed this as well. Sometimes, it's helpful for the LLM to generate a response from the function call results, but other times, it's not. This should be a configurable parameter within the function call. That is, we should add a function that indicates if the function call should generate a completion.

This will be a core Pipecat change. I'm going to look into this issue to see what our options are.

To note, this is not a bug. This is the desired behavior for many cases. But, for Flows, the whole point is that you're in control of the conversation, so this is another behavior that should be under control.

@captaincaius
Copy link

Thanks Mark. I was going to open up a sister issue in pipecat core as a feature request, but I wasn't sure if it was something that would be welcome just for the sake of pipecat-flows mostly.

@markbackman
Copy link
Contributor

markbackman commented Jan 11, 2025

Here's the Pipecat change: pipecat-ai/pipecat#970.

I have this working locally for dynamic flows. I still need to make sure everything works as expected for static flows. I'll post something when I have it.

@captaincaius
Copy link

Ha. I was hoping to submit my first pipecat PR but you beat me to it.

I took a slightly different approach here: pipecat-ai/pipecat@main...captaincaius:pipecat:feat-bypass-run-llm

It doesn't allow a given registered function to sometimes run_llm and sometimes not, but since you mentioned in your PR that you're open to other options, I figured I might as well share it in case you prefer not having a special key in the function call result or some other tradeoff.

@markbackman
Copy link
Contributor

Thanks for sharing. I think your approach could be a good one too. I'll talk it over with my team tomorrow to see what they prefer. Either way, I think we're on track to get Flows working very predictably after we get these changes in.

@captaincaius
Copy link

Sweet! I'm super stoked either way. Thanks again for all you folks' hard work!

@markbackman
Copy link
Contributor

Ok! I paired with Aleix and we improved the Pipecat core change to make it easier to build with.

With that change, here is the Pipecat Flows change: #75. I've tested for the different LLM providers, both static and dynamic and the timing works well in all cases! Note that this is a breaking change, but a minor one in that you only need to pass the context_aggregator to the FlowManager.

@markbackman
Copy link
Contributor

PR #75 has been merged. This will be in the next Flows release, which I'll cut at some point today or early tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants