Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse/Replicate SemanticAnalyzer and Enhancer to Agent workflow #811

Merged
merged 21 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions agent/enhancer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An LLM agent to improve a fuzz target's runtime performance.
Use it as a usual module locally, or as script in cloud builds.
"""
import logger
from agent.prototyper import Prototyper
#from experiment.workdir import WorkDirs
from llm_toolkit.prompt_builder import (DefaultTemplateBuilder,
JvmErrorFixingBuilder)
from llm_toolkit.prompts import Prompt
from results import AnalysisResult, Result


class Enhancer(Prototyper):
"""The Agent to refine a compilable fuzz target for higher coverage."""

def _initial_prompt(self, results: list[Result]) -> Prompt:
"""Constructs initial prompt of the agent."""
last_result = results[-1]
benchmark = last_result.benchmark

if not isinstance(last_result, AnalysisResult):
logger.error('The last result in Enhancer is not AnalysisResult: %s',
results,
trial=self.trial)
return Prompt()

if benchmark.language == 'jvm':
# TODO: Do this in a separate agent for JVM coverage.
jvm_coverage_fix = True
error_desc, errors = '', []
builder = JvmErrorFixingBuilder(self.llm, benchmark,
last_result.run_result.fuzz_target_source,
errors, jvm_coverage_fix)
prompt = builder.build([], None, None)
else:
error_desc, errors = last_result.semantic_result.get_error_info()
builder = DefaultTemplateBuilder(self.llm)
prompt = builder.build_fixer_prompt(benchmark,
last_result.fuzz_target_source,
error_desc,
errors,
context='',
instruction='')
# TODO: A different file name/dir.
prompt.save(self.args.work_dirs.prompt)

return prompt
9 changes: 7 additions & 2 deletions agent/prototyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from data_prep import project_targets
from data_prep.project_context.context_introspector import ContextRetriever
from experiment.benchmark import Benchmark
from experiment.workdir import WorkDirs
from llm_toolkit import prompt_builder
from llm_toolkit.prompts import Prompt
from results import BuildResult, Result
Expand Down Expand Up @@ -393,8 +394,11 @@ def _container_handle_conclusion(self, cur_round: int, response: str,
self._validate_fuzz_target_and_build_script(cur_round, build_result))

# Updates build_result with _alt or _ori, depending on their status.
build_result, prompt_final = self._generate_prompt_from_build_result(
final_build_result, prompt_final = self._generate_prompt_from_build_result(
build_result_alt, build_result_ori, build_result, prompt, cur_round)
# Ensure build_result is consistent with the one selected.
if final_build_result is not None:
build_result.__dict__.update(final_build_result.__dict__)

return prompt_final

Expand All @@ -421,8 +425,9 @@ def _container_tool_reaction(self, cur_round: int, response: str,

def execute(self, result_history: list[Result]) -> BuildResult:
"""Executes the agent based on previous result."""
WorkDirs(self.args.work_dirs.base)
last_result = result_history[-1]
logger.info('Executing Prototyper', trial=last_result.trial)
logger.info('Executing %s', self.name, trial=last_result.trial)
benchmark = last_result.benchmark
self.inspect_tool = ProjectContainerTool(benchmark, name='inspect')
self.inspect_tool.compile(extra_commands=' && rm -rf /out/* > /dev/null')
Expand Down
2 changes: 1 addition & 1 deletion agent/semantic_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def execute(self, result_history: list[Result]) -> AnalysisResult:
fuzzer_log, last_result.benchmark.project)

analysis_result = AnalysisResult(
author=repr(self),
author=self,
run_result=last_result,
semantic_result=semantic_result,
chat_history={self.name: semantic_result.to_dict()})
Expand Down
2 changes: 1 addition & 1 deletion experiment/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def __init__(self,
# Special handling of benchmark_id is needed to avoid this situation.
self.id = self.id.replace('::', '-')

def __str__(self):
def __repr__(self):
return (f'Benchmark<id={self.id}, project={self.project}, '
f'language={self.language}, '
f'function_signature={self.function_signature}, '
Expand Down
3 changes: 3 additions & 0 deletions experiment/workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __init__(self, base_dir, keep: bool = False):
os.makedirs(self.dills, exist_ok=True)
os.makedirs(self.fuzz_targets, exist_ok=True)

def __repr__(self) -> str:
return self._base_dir

@property
def base(self):
return self._base_dir
Expand Down
4 changes: 2 additions & 2 deletions logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from google.cloud import storage

from results import Result, RunResult
from results import Result, RunResult, TrialResult

FINAL_RESULT_JSON = 'result.json'

Expand Down Expand Up @@ -54,7 +54,7 @@ def write_build_script(self, result: Result) -> None:
f'{result.trial:02d}.build_script')
self.write_to_file(build_script_path, result.build_script_source)

def write_result(self, result_status_dir: str, result: Result) -> None:
def write_result(self, result_status_dir: str, result: TrialResult) -> None:
"""Writes the final result into JSON for report generation."""
trial_result_dir = os.path.join(result_status_dir, f'{result.trial:02d}')
os.makedirs(trial_result_dir, exist_ok=True)
Expand Down
47 changes: 31 additions & 16 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import logger
from agent.base_agent import BaseAgent
from results import AnalysisResult, BuildResult, Result, RunResult
from results import AnalysisResult, BuildResult, Result, RunResult, TrialResult
from stage.analysis_stage import AnalysisStage
from stage.execution_stage import ExecutionStage
from stage.writing_stage import WritingStage
Expand Down Expand Up @@ -57,50 +57,69 @@ def _terminate(self, result_history: list[Result], cycle_count: int) -> bool:
if not cycle_count:
return False

last_result = result_history[-1]
if cycle_count > 5:
self.logger.warning('[Cycle %d] Terminate after 5 cycles: %s',
cycle_count, result_history)
self.logger.info('[Cycle %d] Terminate after 5 cycles: %s', cycle_count,
result_history)
return True

if not isinstance(last_result, AnalysisResult):
self.logger.warning('[Cycle %d] Last result is not AnalysisResult: %s',
cycle_count, result_history)
last_result = result_history[-1]
if isinstance(last_result, BuildResult) and not last_result.success:
self.logger.debug('[Cycle %d] Last result is failed BuildResult: %s',
cycle_count, last_result)
return True

if last_result.success:
if isinstance(last_result, AnalysisResult) and last_result.success:
self.logger.info('[Cycle %d] Generation succeeds: %s', cycle_count,
result_history)
return True

self.logger.info('[Cycle %d] Generation continues: %s', cycle_count,
result_history)
return False
if isinstance(last_result, AnalysisResult) and not last_result.success:
self.logger.info('[Cycle %d] Generation continues: %s', cycle_count,
result_history)
return False

self.logger.warning('[Cycle %d] Last result is unexpected: %s', cycle_count,
last_result)
return True

def _update_status(self, result_history: list[Result]) -> None:
trial_result = TrialResult(benchmark=result_history[-1].benchmark,
trial=self.trial,
work_dirs=result_history[-1].work_dirs,
result_history=result_history)
self.logger.write_result(
result_status_dir=trial_result.best_result.work_dirs.status,
result=trial_result)

def _execute_one_cycle(self, result_history: list[Result],
cycle_count: int) -> None:
"""Executes the stages once."""
self.logger.info('[Cycle %d] Initial result is %s', cycle_count,
result_history[-1])
# Writing stage.
result_history.append(
self.writing_stage.execute(result_history=result_history))
self._update_status(result_history=result_history)
if (not isinstance(result_history[-1], BuildResult) or
not result_history[-1].success):
self.logger.warning('[Cycle %d] Build failure, skipping the rest steps',
cycle_count)
return

# Execution stage.
result_history.append(
self.execution_stage.execute(result_history=result_history))
self._update_status(result_history=result_history)
if (not isinstance(result_history[-1], RunResult) or
not result_history[-1].log_path):
self.logger.warning('[Cycle %d] Run failure, skipping the rest steps',
cycle_count)
return

# Analysis stage.
result_history.append(
self.analysis_stage.execute(result_history=result_history))

self._update_status(result_history=result_history)
self.logger.info('[Cycle %d] Analysis result %s: %s', cycle_count,
result_history[-1].success, result_history[-1])

Expand All @@ -121,8 +140,4 @@ def execute(self, result_history: list[Result]) -> list[Result]:
cycle_count += 1
self._execute_one_cycle(result_history=result_history,
cycle_count=cycle_count)

final_result = result_history[-1]
self.logger.write_result(result_status_dir=final_result.work_dirs.status,
result=final_result)
return result_history
Loading