Skip to content

Commit

Permalink
feat: improve streaming for export (#648)
Browse files Browse the repository at this point in the history
Signed-off-by: 7HR4IZ3 <[email protected]>
Signed-off-by: Hugues Chocart <[email protected]>
Signed-off-by: 7HR4IZ3 <[email protected]>
Co-authored-by: Hugues Chocart <[email protected]>
  • Loading branch information
7HR4IZ3 and hughcrt authored Nov 16, 2024
1 parent 6d40fa2 commit 7803ea6
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 149 deletions.
2 changes: 1 addition & 1 deletion packages/backend/src/api/v1/auth/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ const publicRoutes = [
`/v1/runs/ingest`,
new RegExp(`/v1/runs/.+/public`), // public run data
new RegExp(`/v1/runs/.+/feedback`), // getFeedback in SDKs
new RegExp(`/v1/runs/download/.+`), // run exports
new RegExp(`/v1/runs/exports/.+`), // run exports
`/v1/template_versions/latest`,
`/v1/template-versions/latest`,
"/v1/users/verify-email",
Expand Down
33 changes: 21 additions & 12 deletions packages/backend/src/api/v1/runs/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,10 @@ function getTraceChildren(run: Run, runs: Run[]): TraceRun {
};
}

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

export async function fileExport(
{ ctx, sql, cursor, formatRun, projectId }: ExportType,
exportFormat: "csv" | "ojsonl" | "jsonl",
exportType?: "trace" | "thread",
exportType?: "trace" | "thread" | "llm",
) {
if (exportFormat === "csv") {
const parser = new Parser();
Expand All @@ -124,10 +120,22 @@ export async function fileExport(

const stream = Readable.from({
async *[Symbol.asyncIterator]() {
let isFirst = true;
for await (const [row] of cursor) {
// TODO: Remove this. Simulate a large dataset
await sleep(25);
yield parser.parse(formatRun(row));
let line;
if (exportType === "trace") {
const related = await getRelatedRuns(sql, row.id, projectId);
line = parser.parse(getTraceChildren(formatRun(row), related));
} else {
line = parser.parse(formatRun(row));
}
if (isFirst) {
isFirst = false;
} else {
line = line.trim().split("\n").slice(1).join("\\n");
}
// console.log(line);
yield line + "\n";
}
},
});
Expand All @@ -145,10 +153,11 @@ export async function fileExport(

if (input.length && output.length) {
// convert to JSON string format { messages: [input, output]}
const line = JSON.stringify(unCamelObject({
messages: [...input, ...output]
.map(cleanOpenAiMessage),
}));
const line = JSON.stringify(
unCamelObject({
messages: [...input, ...output].map(cleanOpenAiMessage),
}),
);
if (line.length > 0) {
yield line + "\n";
}
Expand Down
Loading

0 comments on commit 7803ea6

Please sign in to comment.