Skip to content

Commit

Permalink
Fixed pylint score
Browse files Browse the repository at this point in the history
  • Loading branch information
antoninoLorenzo committed Jun 20, 2024
1 parent 9072ceb commit b0d1792
Show file tree
Hide file tree
Showing 19 changed files with 82 additions and 40 deletions.
3 changes: 2 additions & 1 deletion src/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from src.agent.agent import Agent
"""Core component of the system"""
from src.agent.agent import Agent
Binary file modified src/agent/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file added src/agent/__pycache__/agent.cpython-311.pyc
Binary file not shown.
Binary file added src/agent/__pycache__/llm.cpython-311.pyc
Binary file not shown.
Binary file added src/agent/__pycache__/plan.cpython-311.pyc
Binary file not shown.
Binary file added src/agent/__pycache__/prompts.cpython-311.pyc
Binary file not shown.
48 changes: 34 additions & 14 deletions src/agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,38 @@
"""Contains the class `Agent`, the core of the system."""
from src.agent.llm import LLM
from src.agent.knowledge import Store
from src.agent.memory import Memory, Message, Role
from src.agent.prompts import PROMPTS


class Agent:
"""Penetration Testing Assistant"""
def __init__(self, model: str, tools_docs: str, knowledge_base: Store):
self.llm = LLM(model=model)
self.mem = Memory()
self.vdb = knowledge_base
self.system_prompt = 'You are an assistant.' # PROMPTS[ollama_model]['system']['plan'].format(tools=tools_docs)
self.user_prompt = PROMPTS[model]['user']['plan']
# PROMPTS[model]['system']['plan'].format(tools=tools_docs)
self.system_prompt = 'You are an assistant in penetration testing'
self.user_prompt = PROMPTS[model]['plan']['user']

def new_session(self, sid: int):
self.mem.store_message(sid, Message(Role.SYS, self.system_prompt))

def get_session(self, sid: int):
return self.mem.get_session(sid)

def query(self, sid: int, user_in: str):
# retrieval
i = 0
collection_name = ''
for c_name, _ in self.vdb.collections.items():
if i >= 1:
break
collection_name = c_name
i += 1
def query(self, sid: int, user_in: str, rag=True):
"""Performs a query to the Large Language Model, set `rag=True`
to leverage Retrieval Augmented Generation."""
context = ''
for retrieved in self.vdb.retrieve(user_in, collection_name):
context += retrieved.payload['text']
if rag:
context = self._retrieve(user_in)

# user prompt
prompt = self.user_prompt.format(user_input=user_in, context=context)
self.mem.store_message(
sid,
Message(Role.USER, self.user_prompt.format(user_input=user_in, context=context))
Message(Role.USER, prompt)
)
messages = self.mem.get_session(sid).messages_to_dict_list()

Expand All @@ -49,6 +47,14 @@ def query(self, sid: int, user_in: str):
Message(Role.ASSISTANT, response)
)

def _retrieve(self, user_in: str):
"""Get context from Qdrant"""
context = ''
for retrieved in self.vdb.retrieve(user_in):
context += (f"{retrieved.payload['title']}:"
f"\n{retrieved.payload['text']}\n\n")
return context

def save_session(self, sid: int):
"""Saves the specified session to JSON"""
self.mem.save_session(sid)
Expand All @@ -60,3 +66,17 @@ def delete_session(self, sid: int):
def rename_session(self, sid: int, session_name: str):
"""Rename the specified session"""
self.mem.rename_session(sid, session_name)


if __name__ == '__main__':
from src.agent.knowledge.routing import LLMRouter

vector_db = Store(router=LLMRouter())
agent = Agent(model='gemma:2b', tools_docs='', knowledge_base=vector_db)

user_query = 'what are most common authentication issues in websites?'
# user_query = 'How do I perform host discovery with nmap?'

for chunk in agent.query(1, user_query):
print(chunk, end='')
print()
Binary file modified src/agent/knowledge/__pycache__/routing.cpython-311.pyc
Binary file not shown.
11 changes: 6 additions & 5 deletions src/agent/knowledge/collections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""RAG related data"""
from dataclasses import dataclass
from enum import StrEnum
from typing import List, Optional


Expand Down Expand Up @@ -40,19 +39,21 @@ def __str__(self):
@dataclass
class Collection:
"""Represents a Qdrant collection"""
id: int
collection_id: int
title: str
documents: List[Document]
topics: List[Topic]
size: Optional[int] = 0 # points to the number of chunks in a Collection

def document_names(self) -> list:
"""The document names are used to filter queries to the Knowledge Database"""
"""The document names are used to filter queries to the
Knowledge Database"""
return [doc.name for doc in self.documents]

def __str__(self):
docs = "| - Documents\n"
for doc in self.documents:
docs += f' | - {doc.name}\n'
return (f'Title: {self.title} ({self.id})\n'
f'| - Topics: {", ".join([topic.name for topic in self.topics])}\n{docs}')
return (f'Title: {self.title} ({self.collection_id})\n'
f'| - Topics: {", ".join([topic.name for topic in self.topics])}\n'
f'{docs}')
10 changes: 5 additions & 5 deletions src/agent/knowledge/nlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ def extract_keywords(text: str, top_k: int = 3) -> list:
return [kw for kw, score in sorted(candidates, key=lambda x: x[1])][:top_k]


def similarity(a: str, b: str) -> float:
def similarity(text_a: str, text_b: str) -> float:
"""Computes similarity between two strings"""
doc1 = nlp(a)
doc2 = nlp(b)
doc1 = nlp(text_a)
doc2 = nlp(text_b)
return doc1.similarity(doc2)


def chunk_str(document: str):
"""Chunks a text string.
The chunking strategy is NLP sentence extraction -> sentence grouping by similarity.
"""Chunks a text string, the chunking strategy is:
NLP sentence extraction -> sentence grouping by similarity.
"""
doc = nlp(document)
sentences = [sent for sent in list(doc.sents) if str(sent).strip() not in ['*']]
Expand Down
27 changes: 18 additions & 9 deletions src/agent/knowledge/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
"""
import json
from abc import ABC, abstractmethod
from typing import Dict

from src.agent.llm import LLM
from src.agent.prompts import PROMPTS
from src.agent.knowledge.nlp import extract_keywords, similarity
from src.agent.knowledge.collections import Collection


class Router(ABC):
Expand All @@ -27,25 +25,29 @@ def find_route(self, user_query: str, collections) -> str:
print(f'Collections: {collections}')
keywords = extract_keywords(user_query)
points = {}
for c_name, collection in collections.items():
for _, collection in collections.items():
doc_names = collection.document_names()

similarities = []
threshold = 0.7
for kw in keywords:
for key in keywords:
for doc_name in doc_names:
if kw.strip().lower() == doc_name.strip().lower():
if key.strip().lower() == doc_name.strip().lower():
sim = 1
else:
sim = similarity(kw, doc_name)
sim = similarity(key, doc_name)

similarities.append({
'document': doc_name,
'keyword': kw,
'keyword': key,
'similarity': sim
})

similarities = sorted(similarities, key=lambda k: k['similarity'], reverse=True)
similarities = sorted(
similarities,
key=lambda k: k['similarity'],
reverse=True
)
# add documents with high similarity to filter

similarities = [sim for sim in similarities if sim['similarity'] > threshold]
Expand All @@ -55,6 +57,9 @@ def find_route(self, user_query: str, collections) -> str:


class LLMRouter(Router):
"""Uses a Large Language Model to find candidate collection for given query.
Using a local model is not the best choice for performance, HuggingFace Inference
API could be used in future"""

def __init__(self, model: str = 'gemma:2b'):
self.llm = LLM(model)
Expand All @@ -66,9 +71,13 @@ def find_route(self, user_query: str, collections, verbose=False) -> str:
for _, collection in collections.items():
collection_string += str(collection)

prompt = self.user_prompt.format(
user_query=user_query,
collections=collection_string
)
messages = [
{'role': 'system', 'content': self.system_prompt},
{'role': 'user', 'content': self.user_prompt.format(user_query=user_query, collections=collection_string)}
{'role': 'user', 'content': prompt}
]

if verbose:
Expand Down
4 changes: 2 additions & 2 deletions src/agent/knowledge/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def create_collection(self, collection: Collection):
})

collection_metadata = {
'id': collection.id,
'id': collection.collection_id,
'title': collection.title,
'documents': docs,
'topics': [topic.name for topic in collection.topics]
Expand Down Expand Up @@ -180,7 +180,7 @@ def get_available_collections(self):
))

collections.append(Collection(
id=data['id'],
collection_id=data['id'],
title=data['title'],
documents=docs,
topics=[Topic(topic) for topic in data['topics']]
Expand Down
2 changes: 1 addition & 1 deletion src/agent/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
- [x] Ollama
- [ ] HuggingFace
"""
from ollama import Client
from dataclasses import dataclass
from ollama import Client

AVAILABLE_MODELS = ['phi3', 'gemma:2b', 'gemma:7b']

Expand Down
Binary file modified src/agent/memory/__pycache__/base.cpython-311.pyc
Binary file not shown.
5 changes: 2 additions & 3 deletions src/agent/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class TaskStatus(StrEnum):
@dataclass
class Task:
"""
A Task represent a single unit of execution of a Tool with a specific command.
The thought is the reason why the command should be executed.
A Task represent a single unit of execution of a Tool with a specific
command. The thought is the reason why the command should be executed.
"""
thought: str
tool: Tool
Expand All @@ -35,7 +35,6 @@ class Plan:
@staticmethod
def from_str(text: str):
"""Converts a structured LLM response in a Plan object"""
pass

def execute(self):
"""Executes the tasks and yields the output of each task"""
Expand Down
12 changes: 12 additions & 0 deletions src/agent/prompts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
"""
Contains the prompts for the entire system, prompts are organized in a
hierarchical structure, they are grouped by model, then objective, finally
divided in system prompts (instructions) and user prompts.
model: {
objective: {
system: '...',
user: '...'
}
}
"""
import textwrap

PROMPTS = {
Expand Down
Binary file modified src/agent/tools/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/tools/__pycache__/base.cpython-311.pyc
Binary file not shown.
Binary file modified src/agent/tools/__pycache__/terminal.cpython-311.pyc
Binary file not shown.

0 comments on commit b0d1792

Please sign in to comment.