From 8b628179b322b9e3a8d7370c24ed0ae64a6663cf Mon Sep 17 00:00:00 2001 From: bLue Date: Wed, 11 Dec 2024 01:06:24 +0800 Subject: [PATCH] feat(judger-agent): add callback judge queue --- src/sub-app/judger-agent/src/index.ts | 2 +- .../judger-agent/src/services/index.ts | 265 +-------------- src/sub-app/judger-agent/src/services/task.ts | 307 ++++++++++++++++++ 3 files changed, 313 insertions(+), 261 deletions(-) create mode 100644 src/sub-app/judger-agent/src/services/task.ts diff --git a/src/sub-app/judger-agent/src/index.ts b/src/sub-app/judger-agent/src/index.ts index 3f75765..f035b01 100644 --- a/src/sub-app/judger-agent/src/index.ts +++ b/src/sub-app/judger-agent/src/index.ts @@ -1,4 +1,4 @@ -import { omit, cloneDeep } from 'lodash'; +import { omit } from 'lodash'; import config from './config'; import { judgerAgentLogger as logger } from './utils/logger'; import { decodeJudgeQueueMessage, getSystemInfo } from './utils'; diff --git a/src/sub-app/judger-agent/src/services/index.ts b/src/sub-app/judger-agent/src/services/index.ts index 1022a58..f24eae6 100644 --- a/src/sub-app/judger-agent/src/services/index.ts +++ b/src/sub-app/judger-agent/src/services/index.ts @@ -3,70 +3,15 @@ import https from 'https'; import EventEmitter from 'events'; import Axios from 'axios'; import type { AxiosInstance } from 'axios'; -import PCancelable from 'p-cancelable'; -import microtime from 'microtime'; -import debug from 'debug'; import config from '../config'; -import { judgerAgentLogger } from '../utils/logger'; -import { river } from '../proto/river'; import { Judger } from './judger'; -import { ESolutionResult } from '../enums'; -import { convertRiverResultToOJ } from '../utils'; -import { getProblemDataResult } from '../utils/judger'; - -const logger = judgerAgentLogger; -const dbg = debug('onlinejudge3:judger-agent:JudgerService'); - -export interface IJudgeOptions { - judgeInfoId: number; - solutionId: number; - problem: { - problemId: number; - revision: number; - timeLimit: number; - memoryLimit: number; - spj: boolean; - }; - user: { - userId: number; - }; - competition?: { - competitionId: number; - }; - language: string; - code: string; -} - -class InvalidSolutionError extends Error { - constructor(message: string) { - super(message); - this.name = 'InvalidSolutionError'; - Error.captureStackTrace(this, this.constructor); - } -} - -class JudgerSystemError extends Error { - constructor(message: string) { - super(message); - this.name = 'JudgerSystemError'; - Error.captureStackTrace(this, this.constructor); - } -} - -class AbortError extends Error { - constructor(message: string) { - super(message); - this.name = 'AbortError'; - Error.captureStackTrace(this, this.constructor); - } -} +import { JudgeTask, IJudgeOptions } from './task'; export class JudgerService extends EventEmitter { private readonly judger = new Judger({ address: config.judgerGrpc.address, }); private readonly ojApiInstance: AxiosInstance; - private aborted = false; constructor(public readonly judgerId: string) { super(); @@ -83,211 +28,11 @@ export class JudgerService extends EventEmitter { }); } - onAbortPoint() { - if (this.aborted) { - throw new AbortError('Aborted'); - } - } - - private async callbackJudge( - judgeInfoId: number, - solutionId: number, - redundant: { userId: number; problemId: number; competitionId?: number }, - data: any, - ) { - try { - const req = { - judgeInfoId, - solutionId, - judgerId: this.judgerId, - userId: redundant.userId, - problemId: redundant.problemId, - competitionId: redundant.competitionId, - eventTimestampUs: microtime.now(), - data, - }; - dbg('callback judge: %O', req); - const res = await this.ojApiInstance.post('/callbackJudge', req); - return !!res.data.success; - } catch (e) { - logger.warn('callback judge error:', judgeInfoId, JSON.stringify(data), e.message); - return false; - } - } - public judge(options: IJudgeOptions) { - return new PCancelable((resolve, reject, onCancel) => { - onCancel(() => { - this.aborted = true; - }); - - const logic = async () => { - this.onAbortPoint(); - const { judgeInfoId, solutionId, problem, language, code } = options; - const { revision, problemId, timeLimit, memoryLimit, spj = false } = problem; - const { userId } = options.user || {}; - const { competitionId } = options.competition || {}; - const redundant = { userId, problemId, competitionId }; - const judgeType = spj ? river.JudgeType.Special : river.JudgeType.Standard; - const loggerPrefix = `[${judgeInfoId}/${solutionId}/${problemId}/${revision}]`; - - try { - logger.info(`${loggerPrefix} start`); - - if (!language) { - throw new InvalidSolutionError(`Invalid language "${options.language}"`); - } - if (!problemId || !timeLimit || !memoryLimit) { - throw new InvalidSolutionError(`No problem specified`); - } - if (!code) { - throw new InvalidSolutionError(`No code`); - } - this.onAbortPoint(); - - await this.callbackJudge(judgeInfoId, solutionId, redundant, { - type: 'start', - }); - this.onAbortPoint(); - this.emit('active'); - - let dataDir = `${problemId}`; - let dataCases; - if (config.judgerData.useRemoteDataRelease) { - logger.info('Preparing remote data'); - const dataResult = await getProblemDataResult(problemId, revision); - dataDir = `${problemId}/${dataResult.extraHash}`; - dataCases = dataResult.cases; - } - - // TODO compile spj - const spjFile = spj ? 'spj' : undefined; - logger.info( - `${loggerPrefix} getJudgeCall`, - JSON.stringify({ - problemId, - language, - code: `string(${code.length})`, - timeLimit: timeLimit, - memoryLimit: memoryLimit, - judgeType, - spjFile, - }), - ); - - const call = this.judger.getJudgeCall({ - dataDir, - problemId, - language, - code, - timeLimit: timeLimit, - memoryLimit: memoryLimit, - cases: dataCases, - judgeType, - spjFile, - onStart: () => { - this.onAbortPoint(); - logger.info(`${loggerPrefix} onStart`); - this.emit('active'); - }, - onJudgeCaseStart: (current, total) => { - this.onAbortPoint(); - logger.info(`${loggerPrefix} onJudgeCaseStart ${current}/${total}`); - this.callbackJudge(judgeInfoId, solutionId, redundant, { - type: 'progress', - current, - total, - }); - this.emit('active'); - }, - onJudgeCaseDone: (current, total, res) => { - this.onAbortPoint(); - logger.info(`${loggerPrefix} onJudgeCaseDone ${current}/${total}`); - this.emit('active'); - return res.result === river.JudgeResultEnum.Accepted; - }, - }); - - const jResult = await call.run(); - dbg('judge result: %O', jResult); - this.onAbortPoint(); - logger.info(`${loggerPrefix} done`, JSON.stringify(jResult)); - this.emit('active'); - - switch (jResult.type) { - case 'CompileError': { - await this.callbackJudge(judgeInfoId, solutionId, redundant, { - type: 'finish', - resultType: 'CompileError', - detail: { - compileInfo: jResult.res, - }, - }); - break; - } - case 'SystemError': { - throw new JudgerSystemError(jResult.res); - } - case 'Done': { - const result: ESolutionResult = convertRiverResultToOJ( - jResult.res[jResult.res.length - 1].result!, - ); - let maxTimeUsed = 0; - let maxMemoryUsed = 0; - jResult.res.forEach((r) => { - // @ts-ignore - maxTimeUsed = Math.max(maxTimeUsed, r.timeUsed); - // @ts-ignore - maxMemoryUsed = Math.max(maxMemoryUsed, r.memoryUsed); - }); - const lastCaseNumber = jResult.last; - const totalCaseNumber = jResult.total; - const cases = jResult.res.map((r) => ({ - result: convertRiverResultToOJ(r.result!), - time: r.timeUsed, - memory: r.memoryUsed, - errMsg: r.errmsg || undefined, - outMsg: r.outmsg || undefined, - })); - await this.callbackJudge(judgeInfoId, solutionId, redundant, { - type: 'finish', - resultType: 'Done', - detail: { - result, - maxTimeUsed, - maxMemoryUsed, - lastCaseNumber, - totalCaseNumber, - cases, - }, - }); - logger.info(`${loggerPrefix} Judge all ok`); - } - } - } catch (e) { - await this.callbackJudge(judgeInfoId, solutionId, redundant, { - type: 'finish', - resultType: 'SystemError', - detail: { - error: e.message, - }, - }); - if (e instanceof InvalidSolutionError) { - logger.warn(loggerPrefix, e); - } else if (e instanceof JudgerSystemError) { - logger.error(loggerPrefix, e); - throw e; - } else if (e instanceof AbortError) { - logger.error(loggerPrefix, e); - throw e; - } else { - logger.error(`${loggerPrefix} Caught error:`, e); - throw e; - } - } - }; - - logic().then(resolve).catch(reject); + const task = new JudgeTask(options, this.judgerId, this.judger, this.ojApiInstance); + task.on('active', () => { + this.emit('active'); }); + return task.run(); } } diff --git a/src/sub-app/judger-agent/src/services/task.ts b/src/sub-app/judger-agent/src/services/task.ts new file mode 100644 index 0000000..a06509f --- /dev/null +++ b/src/sub-app/judger-agent/src/services/task.ts @@ -0,0 +1,307 @@ +import EventEmitter from 'events'; +import type { AxiosInstance } from 'axios'; +import PCancelable from 'p-cancelable'; +import PQueue from 'p-queue'; +import microtime from 'microtime'; +import debug from 'debug'; +import config from '../config'; +import { judgerAgentLogger } from '../utils/logger'; +import { river } from '../proto/river'; +import { Judger } from './judger'; +import { ESolutionResult } from '../enums'; +import { convertRiverResultToOJ } from '../utils'; +import { getProblemDataResult } from '../utils/judger'; + +const logger = judgerAgentLogger; +const dbg = debug('onlinejudge3:judger-agent:JudgeTask'); + +export interface IJudgeOptions { + judgeInfoId: number; + solutionId: number; + problem: { + problemId: number; + revision: number; + timeLimit: number; + memoryLimit: number; + spj: boolean; + }; + user: { + userId: number; + }; + competition?: { + competitionId: number; + }; + language: string; + code: string; +} + +class InvalidSolutionError extends Error { + constructor(message: string) { + super(message); + this.name = 'InvalidSolutionError'; + Error.captureStackTrace(this, this.constructor); + } +} + +class JudgerSystemError extends Error { + constructor(message: string) { + super(message); + this.name = 'JudgerSystemError'; + Error.captureStackTrace(this, this.constructor); + } +} + +class AbortError extends Error { + constructor(message: string) { + super(message); + this.name = 'AbortError'; + Error.captureStackTrace(this, this.constructor); + } +} + +export class JudgeTask extends EventEmitter { + public options: IJudgeOptions; + private redundant: { userId: number; problemId: number; competitionId?: number }; + private aborted = false; + private callbackBuffer: { data: any; eventTimestampUs: number }[] = []; + private callbackQueue = new PQueue({ concurrency: 1 }); + + constructor( + options: IJudgeOptions, + public readonly judgerId: string, + private readonly judger: Judger, + private readonly ojApiInstance: AxiosInstance, + ) { + super(); + this.options = options; + const { problemId } = this.options.problem; + const { userId } = this.options.user || {}; + const { competitionId } = this.options.competition || {}; + this.redundant = { userId, problemId, competitionId }; + } + + private onAbortPoint() { + if (this.aborted) { + throw new AbortError('Aborted'); + } + } + + private pushCallbackJudge(data: any) { + this.callbackBuffer.push({ + data, + eventTimestampUs: microtime.now(), + }); + } + + private async flushCallbackJudgeBuffer() { + if (this.callbackBuffer.length === 0) { + return; + } + try { + const req = { + judgeInfoId: this.options.judgeInfoId, + solutionId: this.options.solutionId, + judgerId: this.judgerId, + userId: this.redundant.userId, + problemId: this.redundant.problemId, + competitionId: this.redundant.competitionId, + batchData: this.callbackBuffer, + }; + dbg('callback judge: %O', req); + const res = await this.ojApiInstance.post('/callbackJudge', req); + const success = !!res.data.success; + if (success) { + this.callbackBuffer = []; + } + return success; + } catch (e) { + logger.warn( + 'callback judge error:', + this.options.judgeInfoId, + JSON.stringify(this.callbackBuffer), + e.message, + ); + return false; + } + } + + private async callbackJudge(data) { + this.pushCallbackJudge(data); + return this.callbackQueue.add(() => this.flushCallbackJudgeBuffer()); + } + + public run() { + return new PCancelable((resolve, reject, onCancel) => { + onCancel(() => { + this.aborted = true; + }); + + const logic = async () => { + this.onAbortPoint(); + const { judgeInfoId, solutionId, problem, language, code } = this.options; + const { revision, problemId, timeLimit, memoryLimit, spj = false } = problem; + const judgeType = spj ? river.JudgeType.Special : river.JudgeType.Standard; + const loggerPrefix = `[${judgeInfoId}/${solutionId}/${problemId}/${revision}]`; + + try { + logger.info(`${loggerPrefix} start`); + + if (!language) { + throw new InvalidSolutionError(`Invalid language "${this.options.language}"`); + } + if (!problemId || !timeLimit || !memoryLimit) { + throw new InvalidSolutionError(`No problem specified`); + } + if (!code) { + throw new InvalidSolutionError(`No code`); + } + this.onAbortPoint(); + + await this.callbackJudge({ + type: 'start', + }); + this.onAbortPoint(); + this.emit('active'); + + let dataDir = `${problemId}`; + let dataCases; + if (config.judgerData.useRemoteDataRelease) { + logger.info('Preparing remote data'); + const dataResult = await getProblemDataResult(problemId, revision); + dataDir = `${problemId}/${dataResult.extraHash}`; + dataCases = dataResult.cases; + } + + // TODO compile spj + const spjFile = spj ? 'spj' : undefined; + logger.info( + `${loggerPrefix} getJudgeCall`, + JSON.stringify({ + problemId, + language, + code: `string(${code.length})`, + timeLimit: timeLimit, + memoryLimit: memoryLimit, + judgeType, + spjFile, + }), + ); + + const call = this.judger.getJudgeCall({ + dataDir, + problemId, + language, + code, + timeLimit: timeLimit, + memoryLimit: memoryLimit, + cases: dataCases, + judgeType, + spjFile, + onStart: () => { + this.onAbortPoint(); + logger.info(`${loggerPrefix} onStart`); + this.emit('active'); + }, + onJudgeCaseStart: (current, total) => { + this.onAbortPoint(); + logger.info(`${loggerPrefix} onJudgeCaseStart ${current}/${total}`); + this.callbackJudge({ + type: 'progress', + current, + total, + }); + this.emit('active'); + }, + onJudgeCaseDone: (current, total, res) => { + this.onAbortPoint(); + logger.info(`${loggerPrefix} onJudgeCaseDone ${current}/${total}`); + this.emit('active'); + return res.result === river.JudgeResultEnum.Accepted; + }, + }); + + const jResult = await call.run(); + dbg('judge result: %O', jResult); + this.onAbortPoint(); + logger.info(`${loggerPrefix} done`, JSON.stringify(jResult)); + this.emit('active'); + + switch (jResult.type) { + case 'CompileError': { + await this.callbackJudge({ + type: 'finish', + resultType: 'CompileError', + detail: { + compileInfo: jResult.res, + }, + }); + break; + } + case 'SystemError': { + throw new JudgerSystemError(jResult.res); + } + case 'Done': { + const result: ESolutionResult = convertRiverResultToOJ( + jResult.res[jResult.res.length - 1].result!, + ); + let maxTimeUsed = 0; + let maxMemoryUsed = 0; + jResult.res.forEach((r) => { + // @ts-ignore + maxTimeUsed = Math.max(maxTimeUsed, r.timeUsed); + // @ts-ignore + maxMemoryUsed = Math.max(maxMemoryUsed, r.memoryUsed); + }); + const lastCaseNumber = jResult.last; + const totalCaseNumber = jResult.total; + const cases = jResult.res.map((r) => ({ + result: convertRiverResultToOJ(r.result!), + time: r.timeUsed, + memory: r.memoryUsed, + errMsg: r.errmsg || undefined, + outMsg: r.outmsg || undefined, + })); + await this.callbackJudge({ + type: 'finish', + resultType: 'Done', + detail: { + result, + maxTimeUsed, + maxMemoryUsed, + lastCaseNumber, + totalCaseNumber, + cases, + }, + }); + logger.info(`${loggerPrefix} Judge all ok`); + } + } + + await this.callbackQueue.onIdle(); + } catch (e) { + await this.callbackJudge({ + type: 'finish', + resultType: 'SystemError', + detail: { + error: e.message, + }, + }); + if (e instanceof InvalidSolutionError) { + logger.warn(loggerPrefix, e); + } else if (e instanceof JudgerSystemError) { + logger.error(loggerPrefix, e); + throw e; + } else if (e instanceof AbortError) { + logger.error(loggerPrefix, e); + throw e; + } else { + logger.error(`${loggerPrefix} Caught error:`, e); + throw e; + } + } + }; + + logic().then(resolve).catch(reject); + }); + } +}