From 89cb48951b817ee5239d200db90c3fa730ced114 Mon Sep 17 00:00:00 2001 From: Nimaoth Date: Wed, 3 Apr 2024 12:46:28 +0200 Subject: [PATCH] added option to not start async process immediately --- src/misc/async_process.nim | 86 ++++++++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 18 deletions(-) diff --git a/src/misc/async_process.nim b/src/misc/async_process.nim index e7e559df..b634af95 100644 --- a/src/misc/async_process.nim +++ b/src/misc/async_process.nim @@ -5,6 +5,7 @@ logCategory "asyncprocess" type AsyncChannel*[T] = ref object chan: ptr Channel[T] + closed: bool type AsyncProcess* = ref object name: string @@ -137,6 +138,42 @@ proc recvLine*[T: char](achan: AsyncChannel[T]): Future[string] {.async.} = return "" +proc tryRecvLine*[T: char](achan: AsyncChannel[T]): Future[Option[string]] {.async.} = + if achan.closed: + return string.none + + var buffer = "" + + var cr = false + while not achan.chan.isNil: + while not achan.chan.isNil: + let (hasData, c) = achan.chan[].tryRecv + if not hasData: + continue + + if c == '\0': + achan.closed = true + + if buffer.len > 0: + return buffer.some + else: + return string.none + + if c != '\r' and c != '\n': + cr = false + buffer.add c + elif c == '\r': + cr = true + elif c == '\n': + if cr and buffer.len == 0: + return "\r\n".some + cr = false + return buffer.some + + await sleepAsync 1 + + return string.none + proc send*[T](achan: AsyncChannel[Option[T]], data: T) {.async.} = while not achan.chan[].trySend(data.some): await sleepAsync 1 @@ -155,6 +192,11 @@ proc recvLine*(process: AsyncProcess): Future[string] = return result return process.input.recvLine() +proc tryRecvLine*(process: AsyncProcess): Future[Option[string]] {.async.} = + if process.input.isNil or process.input.chan.isNil: + return string.none + return process.input.tryRecvLine().await + proc recvErrorLine*(process: AsyncProcess): Future[string] = if process.error.isNil or process.error.chan.isNil: result = newFuture[string]("recvError") @@ -225,6 +267,25 @@ proc writeOutput(chan: ptr Channel[Stream], data: ptr Channel[Option[string]]): return true +proc start*(process: AsyncProcess): bool = + log(lvlInfo, fmt"start process {process.name} {process.args}") + try: + process.process = startProcess(process.name, args=process.args, options={poUsePath, poDaemon}) + except CatchableError as e: + log(lvlError, fmt"Failed to start {process.name}: {e.msg}") + return false + + process.readerFlowVar = spawn(readInput(process.inputStreamChannel, process.serverDiedNotifications, process.input.chan, process.output.chan)) + process.inputStreamChannel[].send process.process.outputStream() + + process.errorReaderFlowVar = spawn(readInput(process.errorStreamChannel, process.serverDiedNotifications, process.error.chan, process.output.chan)) + process.errorStreamChannel[].send process.process.errorStream() + + process.writerFlowVar = spawn(writeOutput(process.outputStreamChannel, process.output.chan)) + process.outputStreamChannel[].send process.process.inputStream() + + return true + proc restartServer(process: AsyncProcess) {.async.} = var startCounter = 0 @@ -243,26 +304,14 @@ proc restartServer(process: AsyncProcess) {.async.} = inc startCounter - log(lvlInfo, fmt"start process {process.name} {process.args}") - try: - process.process = startProcess(process.name, args=process.args, options={poUsePath, poDaemon}) - except CatchableError as e: - log(lvlError, fmt"Failed to start {process.name}: {e.msg}") + if not process.start(): break - process.readerFlowVar = spawn(readInput(process.inputStreamChannel, process.serverDiedNotifications, process.input.chan, process.output.chan)) - process.inputStreamChannel[].send process.process.outputStream() - - process.errorReaderFlowVar = spawn(readInput(process.errorStreamChannel, process.serverDiedNotifications, process.error.chan, process.output.chan)) - process.errorStreamChannel[].send process.process.errorStream() - - process.writerFlowVar = spawn(writeOutput(process.outputStreamChannel, process.output.chan)) - process.outputStreamChannel[].send process.process.inputStream() - if not process.onRestarted.isNil: process.onRestarted().await -proc startAsyncProcess*(name: string, args: seq[string] = @[], autoRestart = true): AsyncProcess = + +proc startAsyncProcess*(name: string, args: seq[string] = @[], autoRestart = true, autoStart = true): AsyncProcess = let process = AsyncProcess() process.name = name process.args = @args @@ -283,7 +332,8 @@ proc startAsyncProcess*(name: string, args: seq[string] = @[], autoRestart = tru process.serverDiedNotifications = cast[ptr Channel[bool]](allocShared0(sizeof(Channel[bool]))) process.serverDiedNotifications[].open() - asyncCheck process.restartServer() - process.serverDiedNotifications[].send true + if autoStart: + asyncCheck process.restartServer() + process.serverDiedNotifications[].send true - return process \ No newline at end of file + return process