Skip to content

Commit

Permalink
Merge pull request #7 from ChasmNetwork/0.0.7-release
Browse files Browse the repository at this point in the history
0.0.7 release
  • Loading branch information
superoo7 authored Oct 9, 2024
2 parents 2acb6c9 + e8f7be5 commit 69de4ed
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 32 deletions.
3 changes: 2 additions & 1 deletion dispute/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
venv
__pycache__
processed_histories.json
processed_histories.json
processed_histories_chromia.json
86 changes: 63 additions & 23 deletions dispute/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from config import LOG_LEVEL, MIN_CONFIDENCE_SCORE
import asyncio
import os
from util.chromia import ChromiaDatabase
from strategy import analyze_text
from util.chasm import ChasmConnection
import json
import logging
import time

logging.basicConfig(
level=LOG_LEVEL,
Expand All @@ -13,61 +15,99 @@
)

chasm = ChasmConnection()
PROCESSED_HISTORIES_FILE = "processed_histories.json"
chromia = ChromiaDatabase(
base_url=[
"https://dapps0.chromaway.com:7740",
"https://chromia-mainnet.w3coins.io:7740",
"https://mainnet-dapp1.sunube.net:7740",
"https://chromia.01node.com:7740",
"https://chromia-mainnet.caliber.build:443",
"https://chromia.nocturnallabs.org:7740",
"https://chromia2.stablelab.cloud:7740",
],
brid="BD8A4A23FD35BF0A711A8D65E94C06E651A286A412663A82AC2240416264C74D"
)
PROCESSED_HISTORIES_FILE = "processed_histories_chromia.json"


def load_processed_histories():
if os.path.exists(PROCESSED_HISTORIES_FILE):
with open(PROCESSED_HISTORIES_FILE, "r") as f:
return set(json.load(f))
try:
with open(PROCESSED_HISTORIES_FILE, "r") as f:
return json.load(f)
except:
return set()
return set()


def save_processed_histories(histories):
with open(PROCESSED_HISTORIES_FILE, "w") as f:
json.dump(list(histories), f)

async def process_histories(pointer):
processed_histories = load_processed_histories()
processed_histories_prompt_ids = set(map(lambda x: x[0], processed_histories))
start_time = -1
if len(processed_histories) > 0:
start_time = max(map(lambda x: x[1], processed_histories))
current_time = round(time.time() * 1000)
histories, last_pointer = chromia.get_prompt_histories(
start_time,
end_time=current_time,
pointer=pointer,
n_prompts=50
)

processed_histories = load_processed_histories()


async def process_histories():
histories = chasm.get_prompt_history()
logging.info(f"Histories: {len(histories)}")
for history in histories:
if history["_id"] in processed_histories:
prompt_id = history.prompt_id
if prompt_id in processed_histories_prompt_ids:
logging.debug(f"Skipping already processed history: {history['_id']}")
continue
output = history["result"]["choices"][0]["message"]["content"]
result = await analyze_text(
history["messages"],
logging.debug(f"Processing history: {prompt_id}")
created_at = history.created_at
seed = history.seed
provider = history.provider
model = history.model
messages = json.loads(history.messages)
result = json.loads(history.result)
input = map(lambda x: x["content"], messages)
input = "\n".join(input)
output = result["choices"][0]["message"]["content"]
res = await analyze_text(
messages,
output,
history["seed"],
history["result"]["scout"]["provider"],
history["result"]["scout"]["model"],
seed,
provider,
model
)
logging.debug(f"Result: {result}")
logging.debug(f"Score: {result['confidence_score']}")
logging.debug(f"Result: {res}")
logging.debug(f"Score: {res['confidence_score']}")

if result["confidence_score"] > MIN_CONFIDENCE_SCORE:
rs_output = result.get("rs_output")
if res["confidence_score"] > MIN_CONFIDENCE_SCORE:
rs_output = res.get("rs_output")
assert rs_output is not None, "rs_output is not generated"
response = chasm.file_dispute(
history["_id"],
history["messages"],
prompt_id,
messages,
{"role": "assistant", "content": rs_output},
)
if response is not None:
logging.info(f"Dispute filed: {response['result']}")

# Cache history
processed_histories.add(history["_id"])
processed_histories.add((
prompt_id,
created_at,
))
save_processed_histories(processed_histories)
return last_pointer


async def main():
history_pointer = 0
while True:
await process_histories()
history_pointer = await process_histories(history_pointer)
await asyncio.sleep(1)


Expand Down
90 changes: 90 additions & 0 deletions dispute/util/chromia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import json
import time
from typing import List, Optional, Union
from pydantic import BaseModel
import requests
from requests.models import PreparedRequest

class PromptHistory(BaseModel):
UID: int
created_at: int
messages: str
model: str
prompt_id: int
provider: str
result: str
seed: int

class ChromiaDatabase:
def __init__(self, base_url: Union[str, List[str]], brid = None):
if isinstance(base_url, str):
self.base_urls = [base_url]
else:
self.base_urls = base_url
self.brid = brid

def set_brid(self, brid):
self.brid = brid

def _try_request(self, endpoint: str, params: Optional[dict] = None, headers: Optional[dict] = None) -> requests.Response:
"""
Helper function to try making a request to multiple base URLs until one succeeds.
:param endpoint: The API endpoint to append to the base URL.
:param params: The query parameters to include in the request.
:param headers: The headers to include in the request.
:return: The successful requests.Response object.
:raises Exception: If all requests fail.
"""
for base_url in self.base_urls:
try:
url = f"{base_url}{endpoint}"
req = PreparedRequest()
req.prepare_url(url, params)
response = requests.get(req.url, headers=headers)
if response.status_code == 200:
return response
else:
print(f"Failed to retrieve data from {base_url}. Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Request to {base_url} failed: {e}")
raise Exception("All base URLs failed to retrieve data.")

def get_blockchain_brid(self, iid):
response = self._try_request(endpoint=f"/brid/iid_{iid}", headers={"Content-Type": "text/plain"})
return response.text

def get_prompt_histories(self, start_time, end_time, pointer, n_prompts) -> (set[PromptHistory], int):
response = self._try_request(
endpoint=f"/query/{self.brid}",
params={
"type": "get_prompt_histories",
"start_time": start_time,
"end_time": end_time,
"pointer": pointer,
"n_prompts": n_prompts,
},
headers={"Content-Type": "application/json"}
)
data = response.json()
return [PromptHistory(**x) for x in data["prompts"]], data["pointer"]

if __name__ == "__main__":
base_url: str = ["https://dapps0.chromaway.com:7740"]
brid = "BD8A4A23FD35BF0A711A8D65E94C06E651A286A412663A82AC2240416264C74D"
chromia = ChromiaDatabase(base_url, brid)

# Get prompt history
current_time = round(time.time() * 1000)
data, last_pointer = chromia.get_prompt_histories(
start_time=-1,
end_time=current_time,
pointer = 0,
n_prompts=20
)
if data is None or len(data) == 0:
print("No data")
exit()
print(data[0])
print(len(data))
print(f"Last pointer: {last_pointer}")
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "scout",
"version": "0.0.6",
"version": "0.0.7",
"description": "",
"main": "index.js",
"scripts": {
Expand Down
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { LLMProvider } from "./utils/llm";

process.env.OPENAI_API_KEY = process.env.OPENAI_API_KEY || "";
process.env.OPENROUTER_API_KEY = process.env.OPENROUTER_API_KEY || "";
process.env.VLLM_URL = process.env.VLLM_URL || "";

// Validator for LLM providers
const providersValidator = makeValidator((x) => {
Expand Down Expand Up @@ -68,6 +69,7 @@ export const env = cleanEnv(process.env, {
}),
LOGGER_LEVEL: str({ choices: ["debug", "info", "warn", "error", "fatal"] }),
ORCHESTRATOR_URL: url(),
VLLM_URL: url(),
SCOUT_NAME: str(),
SCOUT_UID: num(),
WEBHOOK_API_KEY: str(),
Expand Down
15 changes: 13 additions & 2 deletions src/integration/OpenAIBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import { logger } from "../utils/logger";
export abstract class OpenAIBase {
protected openai: OpenAI;

constructor(apiKey: string, baseURL?: string) {
constructor(
apiKey: string,
baseURL?: string,
protected logProbs = false,
) {
this.openai = new OpenAI({
apiKey: apiKey,
baseURL: baseURL,
Expand All @@ -29,6 +33,7 @@ export abstract class OpenAIBase {
return stream;
} else {
let accumulatedData = "";
let accumulatedLogProbs: any[] = [];
let completion: OpenAI.Chat.Completions.ChatCompletion | null = null;
const usage = {
prompt_tokens: 0,
Expand All @@ -38,6 +43,8 @@ export abstract class OpenAIBase {

for await (const chunk of stream) {
accumulatedData += chunk.choices[0].delta.content || "";
if (this.logProbs)
accumulatedLogProbs.push(chunk.choices[0].logprobs);
completion = chunk as any;
usage.completion_tokens += 1;
}
Expand All @@ -54,7 +61,11 @@ export abstract class OpenAIBase {
content: accumulatedData,
},
finish_reason: "stop",
logprobs: null,
logprobs: this.logProbs
? {
content: accumulatedLogProbs,
}
: null,
},
],
};
Expand Down
44 changes: 44 additions & 0 deletions src/integration/vllm.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import OpenAI from "openai";

import { env } from "../config";
import { logger } from "../utils/logger";
import { OpenAIBase } from "./OpenAIBase";

class VLLM extends OpenAIBase {
constructor() {
super("vllm", `${env.VLLM_URL}/v1`, false);
}
}

export const vllmQuery = async (
body: OpenAI.Chat.Completions.ChatCompletionCreateParamsNonStreaming,
) => {
logger.debug("[vllm] Req: %o", body);
const vllm = new VLLM();
const modifedBody: typeof body = {
...body,
messages: [
{
role: "user",
content: createPromptFromMessages(body.messages),
},
],
logprobs: true,
top_logprobs: 5,
};

const response = await vllm.query(modifedBody);
logger.debug("[vllm] Res: %o", response);
return response;
};

export const createPromptFromMessages = (
messages: OpenAI.Chat.Completions.ChatCompletionCreateParamsNonStreaming["messages"],
) => {
return messages
.map(
(message) =>
`<start_of_turn>${message.role}\n${message.content}<end_of_turn>`,
)
.join("\n");
};
4 changes: 4 additions & 0 deletions src/utils/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ import { groqQuery } from "../integration/groq";
import { ollamaQuery } from "../integration/ollama";
import { openAiQuery } from "../integration/openai";
import { openRouterQuery } from "../integration/openrouter";
import { vllmQuery } from "../integration/vllm";
import { logger } from "./logger";

export enum LLMProvider {
GROQ = "groq",
OPENAI = "openai",
OPENROUTER = "openrouter",
OLLAMA = "ollama",
VLLM = "vllm",
}

export type QueryFunction =
Expand All @@ -22,6 +24,7 @@ export const getLlmQuery = (provider: LLMProvider): QueryFunction => {
[LLMProvider.OPENAI]: openAiQuery,
[LLMProvider.OPENROUTER]: openRouterQuery,
[LLMProvider.OLLAMA]: ollamaQuery,
[LLMProvider.VLLM]: vllmQuery,
};

return queryFunctions[provider];
Expand All @@ -33,6 +36,7 @@ export const getModelName = (provider: LLMProvider, model: string): string => {
[LLMProvider.GROQ]: "gemma2-9b-it",
[LLMProvider.OPENROUTER]: "google/gemma-2-9b-it",
[LLMProvider.OLLAMA]: "gemma2:9b",
[LLMProvider.VLLM]: "google/gemma-2-9b-it",
},
"gemma-7b-it": {
[LLMProvider.GROQ]: "gemma-7b-it",
Expand Down
Loading

0 comments on commit 69de4ed

Please sign in to comment.