Skip to content

Commit

Permalink
Chromia integration (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
superoo7 authored Oct 1, 2024
1 parent 660c7cb commit 81b24f1
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 25 deletions.
3 changes: 2 additions & 1 deletion dispute/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
venv
__pycache__
processed_histories.json
processed_histories.json
processed_histories_chromia.json
2 changes: 1 addition & 1 deletion dispute/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ LLM_API_KEY=ollama
MODELS=llama3.1:8b,qwen:4b,mistral:7b
SIMULATION_MODEL=llama3.1:8b
ORCHESTRATOR_URL=https://orchestrator.chasm.net
WEBHOOK_API_KEY=WlRFN20pc1IpMmlwZ2hBXk9ec1JWNkpzSzJjI2EqNHc
WEBHOOK_API_KEY=
MIN_CONFIDENCE_SCORE=0.5
```

Expand Down
86 changes: 63 additions & 23 deletions dispute/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from config import LOG_LEVEL, MIN_CONFIDENCE_SCORE
import asyncio
import os
from util.chromia import ChromiaDatabase
from strategy import analyze_text
from util.chasm import ChasmConnection
import json
import logging
import time

logging.basicConfig(
level=LOG_LEVEL,
Expand All @@ -13,61 +15,99 @@
)

chasm = ChasmConnection()
PROCESSED_HISTORIES_FILE = "processed_histories.json"
chromia = ChromiaDatabase(
base_url=[
"https://dapps0.chromaway.com:7740",
"https://chromia-mainnet.w3coins.io:7740",
"https://mainnet-dapp1.sunube.net:7740",
"https://chromia.01node.com:7740",
"https://chromia-mainnet.caliber.build:443",
"https://chromia.nocturnallabs.org:7740",
"https://chromia2.stablelab.cloud:7740",
],
brid="BD8A4A23FD35BF0A711A8D65E94C06E651A286A412663A82AC2240416264C74D"
)
PROCESSED_HISTORIES_FILE = "processed_histories_chromia.json"


def load_processed_histories():
if os.path.exists(PROCESSED_HISTORIES_FILE):
with open(PROCESSED_HISTORIES_FILE, "r") as f:
return set(json.load(f))
try:
with open(PROCESSED_HISTORIES_FILE, "r") as f:
return json.load(f)
except:
return set()
return set()


def save_processed_histories(histories):
with open(PROCESSED_HISTORIES_FILE, "w") as f:
json.dump(list(histories), f)

async def process_histories(pointer):
processed_histories = load_processed_histories()
processed_histories_prompt_ids = set(map(lambda x: x[0], processed_histories))
start_time = -1
if len(processed_histories) > 0:
start_time = max(map(lambda x: x[1], processed_histories))
current_time = round(time.time() * 1000)
histories, last_pointer = chromia.get_prompt_histories(
start_time,
end_time=current_time,
pointer=pointer,
n_prompts=50
)

processed_histories = load_processed_histories()


async def process_histories():
histories = chasm.get_prompt_history()
logging.info(f"Histories: {len(histories)}")
for history in histories:
if history["_id"] in processed_histories:
prompt_id = history.prompt_id
if prompt_id in processed_histories_prompt_ids:
logging.debug(f"Skipping already processed history: {history['_id']}")
continue
output = history["result"]["choices"][0]["message"]["content"]
result = await analyze_text(
history["messages"],
logging.debug(f"Processing history: {prompt_id}")
created_at = history.created_at
seed = history.seed
provider = history.provider
model = history.model
messages = json.loads(history.messages)
result = json.loads(history.result)
input = map(lambda x: x["content"], messages)
input = "\n".join(input)
output = result["choices"][0]["message"]["content"]
res = await analyze_text(
messages,
output,
history["seed"],
history["result"]["scout"]["provider"],
history["result"]["scout"]["model"],
seed,
provider,
model
)
logging.debug(f"Result: {result}")
logging.debug(f"Score: {result['confidence_score']}")
logging.debug(f"Result: {res}")
logging.debug(f"Score: {res['confidence_score']}")

if result["confidence_score"] > MIN_CONFIDENCE_SCORE:
rs_output = result.get("rs_output")
if res["confidence_score"] > MIN_CONFIDENCE_SCORE:
rs_output = res.get("rs_output")
assert rs_output is not None, "rs_output is not generated"
response = chasm.file_dispute(
history["_id"],
history["messages"],
prompt_id,
messages,
{"role": "assistant", "content": rs_output},
)
if response is not None:
logging.info(f"Dispute filed: {response['result']}")

# Cache history
processed_histories.add(history["_id"])
processed_histories.add((
prompt_id,
created_at,
))
save_processed_histories(processed_histories)
return last_pointer


async def main():
history_pointer = 0
while True:
await process_histories()
history_pointer = await process_histories(history_pointer)
await asyncio.sleep(1)


Expand Down
90 changes: 90 additions & 0 deletions dispute/util/chromia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import json
import time
from typing import List, Optional, Union
from pydantic import BaseModel
import requests
from requests.models import PreparedRequest

class PromptHistory(BaseModel):
UID: int
created_at: int
messages: str
model: str
prompt_id: int
provider: str
result: str
seed: int

class ChromiaDatabase:
def __init__(self, base_url: Union[str, List[str]], brid = None):
if isinstance(base_url, str):
self.base_urls = [base_url]
else:
self.base_urls = base_url
self.brid = brid

def set_brid(self, brid):
self.brid = brid

def _try_request(self, endpoint: str, params: Optional[dict] = None, headers: Optional[dict] = None) -> requests.Response:
"""
Helper function to try making a request to multiple base URLs until one succeeds.
:param endpoint: The API endpoint to append to the base URL.
:param params: The query parameters to include in the request.
:param headers: The headers to include in the request.
:return: The successful requests.Response object.
:raises Exception: If all requests fail.
"""
for base_url in self.base_urls:
try:
url = f"{base_url}{endpoint}"
req = PreparedRequest()
req.prepare_url(url, params)
response = requests.get(req.url, headers=headers)
if response.status_code == 200:
return response
else:
print(f"Failed to retrieve data from {base_url}. Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Request to {base_url} failed: {e}")
raise Exception("All base URLs failed to retrieve data.")

def get_blockchain_brid(self, iid):
response = self._try_request(endpoint=f"/brid/iid_{iid}", headers={"Content-Type": "text/plain"})
return response.text

def get_prompt_histories(self, start_time, end_time, pointer, n_prompts) -> (set[PromptHistory], int):
response = self._try_request(
endpoint=f"/query/{self.brid}",
params={
"type": "get_prompt_histories",
"start_time": start_time,
"end_time": end_time,
"pointer": pointer,
"n_prompts": n_prompts,
},
headers={"Content-Type": "application/json"}
)
data = response.json()
return [PromptHistory(**x) for x in data["prompts"]], data["pointer"]

if __name__ == "__main__":
base_url: str = ["https://dapps0.chromaway.com:7740"]
brid = "BD8A4A23FD35BF0A711A8D65E94C06E651A286A412663A82AC2240416264C74D"
chromia = ChromiaDatabase(base_url, brid)

# Get prompt history
current_time = round(time.time() * 1000)
data, last_pointer = chromia.get_prompt_histories(
start_time=-1,
end_time=current_time,
pointer = 0,
n_prompts=20
)
if data is None or len(data) == 0:
print("No data")
exit()
print(data[0])
print(len(data))
print(f"Last pointer: {last_pointer}")

0 comments on commit 81b24f1

Please sign in to comment.