diff --git a/dispute/.gitignore b/dispute/.gitignore index fbfd55e..ae8bda5 100644 --- a/dispute/.gitignore +++ b/dispute/.gitignore @@ -1,3 +1,4 @@ venv __pycache__ -processed_histories.json \ No newline at end of file +processed_histories.json +processed_histories_chromia.json \ No newline at end of file diff --git a/dispute/README.md b/dispute/README.md index 7521de3..1407819 100644 --- a/dispute/README.md +++ b/dispute/README.md @@ -6,7 +6,7 @@ LLM_API_KEY=ollama MODELS=llama3.1:8b,qwen:4b,mistral:7b SIMULATION_MODEL=llama3.1:8b ORCHESTRATOR_URL=https://orchestrator.chasm.net -WEBHOOK_API_KEY=WlRFN20pc1IpMmlwZ2hBXk9ec1JWNkpzSzJjI2EqNHc +WEBHOOK_API_KEY= MIN_CONFIDENCE_SCORE=0.5 ``` diff --git a/dispute/main.py b/dispute/main.py index 705bb92..e3148c2 100644 --- a/dispute/main.py +++ b/dispute/main.py @@ -1,10 +1,12 @@ from config import LOG_LEVEL, MIN_CONFIDENCE_SCORE import asyncio import os +from util.chromia import ChromiaDatabase from strategy import analyze_text from util.chasm import ChasmConnection import json import logging +import time logging.basicConfig( level=LOG_LEVEL, @@ -13,13 +15,28 @@ ) chasm = ChasmConnection() -PROCESSED_HISTORIES_FILE = "processed_histories.json" +chromia = ChromiaDatabase( + base_url=[ + "https://dapps0.chromaway.com:7740", + "https://chromia-mainnet.w3coins.io:7740", + "https://mainnet-dapp1.sunube.net:7740", + "https://chromia.01node.com:7740", + "https://chromia-mainnet.caliber.build:443", + "https://chromia.nocturnallabs.org:7740", + "https://chromia2.stablelab.cloud:7740", + ], + brid="BD8A4A23FD35BF0A711A8D65E94C06E651A286A412663A82AC2240416264C74D" +) +PROCESSED_HISTORIES_FILE = "processed_histories_chromia.json" def load_processed_histories(): if os.path.exists(PROCESSED_HISTORIES_FILE): - with open(PROCESSED_HISTORIES_FILE, "r") as f: - return set(json.load(f)) + try: + with open(PROCESSED_HISTORIES_FILE, "r") as f: + return json.load(f) + except: + return set() return set() @@ -27,47 +44,70 @@ def save_processed_histories(histories): with open(PROCESSED_HISTORIES_FILE, "w") as f: json.dump(list(histories), f) +async def process_histories(pointer): + processed_histories = load_processed_histories() + processed_histories_prompt_ids = set(map(lambda x: x[0], processed_histories)) + start_time = -1 + if len(processed_histories) > 0: + start_time = max(map(lambda x: x[1], processed_histories)) + current_time = round(time.time() * 1000) + histories, last_pointer = chromia.get_prompt_histories( + start_time, + end_time=current_time, + pointer=pointer, + n_prompts=50 + ) -processed_histories = load_processed_histories() - - -async def process_histories(): - histories = chasm.get_prompt_history() logging.info(f"Histories: {len(histories)}") for history in histories: - if history["_id"] in processed_histories: + prompt_id = history.prompt_id + if prompt_id in processed_histories_prompt_ids: logging.debug(f"Skipping already processed history: {history['_id']}") continue - output = history["result"]["choices"][0]["message"]["content"] - result = await analyze_text( - history["messages"], + logging.debug(f"Processing history: {prompt_id}") + created_at = history.created_at + seed = history.seed + provider = history.provider + model = history.model + messages = json.loads(history.messages) + result = json.loads(history.result) + input = map(lambda x: x["content"], messages) + input = "\n".join(input) + output = result["choices"][0]["message"]["content"] + res = await analyze_text( + messages, output, - history["seed"], - history["result"]["scout"]["provider"], - history["result"]["scout"]["model"], + seed, + provider, + model ) - logging.debug(f"Result: {result}") - logging.debug(f"Score: {result['confidence_score']}") + logging.debug(f"Result: {res}") + logging.debug(f"Score: {res['confidence_score']}") - if result["confidence_score"] > MIN_CONFIDENCE_SCORE: - rs_output = result.get("rs_output") + if res["confidence_score"] > MIN_CONFIDENCE_SCORE: + rs_output = res.get("rs_output") assert rs_output is not None, "rs_output is not generated" response = chasm.file_dispute( - history["_id"], - history["messages"], + prompt_id, + messages, {"role": "assistant", "content": rs_output}, ) if response is not None: logging.info(f"Dispute filed: {response['result']}") # Cache history - processed_histories.add(history["_id"]) + processed_histories.add(( + prompt_id, + created_at, + )) save_processed_histories(processed_histories) + return last_pointer async def main(): + history_pointer = 0 while True: - await process_histories() + history_pointer = await process_histories(history_pointer) await asyncio.sleep(1) diff --git a/dispute/util/chromia.py b/dispute/util/chromia.py new file mode 100644 index 0000000..6fb6d31 --- /dev/null +++ b/dispute/util/chromia.py @@ -0,0 +1,90 @@ +import json +import time +from typing import List, Optional, Union +from pydantic import BaseModel +import requests +from requests.models import PreparedRequest + +class PromptHistory(BaseModel): + UID: int + created_at: int + messages: str + model: str + prompt_id: int + provider: str + result: str + seed: int + +class ChromiaDatabase: + def __init__(self, base_url: Union[str, List[str]], brid = None): + if isinstance(base_url, str): + self.base_urls = [base_url] + else: + self.base_urls = base_url + self.brid = brid + + def set_brid(self, brid): + self.brid = brid + + def _try_request(self, endpoint: str, params: Optional[dict] = None, headers: Optional[dict] = None) -> requests.Response: + """ + Helper function to try making a request to multiple base URLs until one succeeds. + + :param endpoint: The API endpoint to append to the base URL. + :param params: The query parameters to include in the request. + :param headers: The headers to include in the request. + :return: The successful requests.Response object. + :raises Exception: If all requests fail. + """ + for base_url in self.base_urls: + try: + url = f"{base_url}{endpoint}" + req = PreparedRequest() + req.prepare_url(url, params) + response = requests.get(req.url, headers=headers) + if response.status_code == 200: + return response + else: + print(f"Failed to retrieve data from {base_url}. Status code: {response.status_code}") + except requests.exceptions.RequestException as e: + print(f"Request to {base_url} failed: {e}") + raise Exception("All base URLs failed to retrieve data.") + + def get_blockchain_brid(self, iid): + response = self._try_request(endpoint=f"/brid/iid_{iid}", headers={"Content-Type": "text/plain"}) + return response.text + + def get_prompt_histories(self, start_time, end_time, pointer, n_prompts) -> (set[PromptHistory], int): + response = self._try_request( + endpoint=f"/query/{self.brid}", + params={ + "type": "get_prompt_histories", + "start_time": start_time, + "end_time": end_time, + "pointer": pointer, + "n_prompts": n_prompts, + }, + headers={"Content-Type": "application/json"} + ) + data = response.json() + return [PromptHistory(**x) for x in data["prompts"]], data["pointer"] + +if __name__ == "__main__": + base_url: str = ["https://dapps0.chromaway.com:7740"] + brid = "BD8A4A23FD35BF0A711A8D65E94C06E651A286A412663A82AC2240416264C74D" + chromia = ChromiaDatabase(base_url, brid) + + # Get prompt history + current_time = round(time.time() * 1000) + data, last_pointer = chromia.get_prompt_histories( + start_time=-1, + end_time=current_time, + pointer = 0, + n_prompts=20 + ) + if data is None or len(data) == 0: + print("No data") + exit() + print(data[0]) + print(len(data)) + print(f"Last pointer: {last_pointer}")