Skip to content

Commit

Permalink
added option to not start async process immediately
Browse files Browse the repository at this point in the history
  • Loading branch information
Nimaoth committed Apr 3, 2024
1 parent f85a01c commit 89cb489
Showing 1 changed file with 68 additions and 18 deletions.
86 changes: 68 additions & 18 deletions src/misc/async_process.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ logCategory "asyncprocess"

type AsyncChannel*[T] = ref object
chan: ptr Channel[T]
closed: bool

type AsyncProcess* = ref object
name: string
Expand Down Expand Up @@ -137,6 +138,42 @@ proc recvLine*[T: char](achan: AsyncChannel[T]): Future[string] {.async.} =

return ""

proc tryRecvLine*[T: char](achan: AsyncChannel[T]): Future[Option[string]] {.async.} =
if achan.closed:
return string.none

var buffer = ""

var cr = false
while not achan.chan.isNil:
while not achan.chan.isNil:
let (hasData, c) = achan.chan[].tryRecv
if not hasData:
continue

if c == '\0':
achan.closed = true

if buffer.len > 0:
return buffer.some
else:
return string.none

if c != '\r' and c != '\n':
cr = false
buffer.add c
elif c == '\r':
cr = true
elif c == '\n':
if cr and buffer.len == 0:
return "\r\n".some
cr = false
return buffer.some

await sleepAsync 1

return string.none

proc send*[T](achan: AsyncChannel[Option[T]], data: T) {.async.} =
while not achan.chan[].trySend(data.some):
await sleepAsync 1
Expand All @@ -155,6 +192,11 @@ proc recvLine*(process: AsyncProcess): Future[string] =
return result
return process.input.recvLine()

proc tryRecvLine*(process: AsyncProcess): Future[Option[string]] {.async.} =
if process.input.isNil or process.input.chan.isNil:
return string.none
return process.input.tryRecvLine().await

proc recvErrorLine*(process: AsyncProcess): Future[string] =
if process.error.isNil or process.error.chan.isNil:
result = newFuture[string]("recvError")
Expand Down Expand Up @@ -225,6 +267,25 @@ proc writeOutput(chan: ptr Channel[Stream], data: ptr Channel[Option[string]]):

return true

proc start*(process: AsyncProcess): bool =
log(lvlInfo, fmt"start process {process.name} {process.args}")
try:
process.process = startProcess(process.name, args=process.args, options={poUsePath, poDaemon})
except CatchableError as e:
log(lvlError, fmt"Failed to start {process.name}: {e.msg}")
return false

process.readerFlowVar = spawn(readInput(process.inputStreamChannel, process.serverDiedNotifications, process.input.chan, process.output.chan))
process.inputStreamChannel[].send process.process.outputStream()

process.errorReaderFlowVar = spawn(readInput(process.errorStreamChannel, process.serverDiedNotifications, process.error.chan, process.output.chan))
process.errorStreamChannel[].send process.process.errorStream()

process.writerFlowVar = spawn(writeOutput(process.outputStreamChannel, process.output.chan))
process.outputStreamChannel[].send process.process.inputStream()

return true

proc restartServer(process: AsyncProcess) {.async.} =
var startCounter = 0

Expand All @@ -243,26 +304,14 @@ proc restartServer(process: AsyncProcess) {.async.} =

inc startCounter

log(lvlInfo, fmt"start process {process.name} {process.args}")
try:
process.process = startProcess(process.name, args=process.args, options={poUsePath, poDaemon})
except CatchableError as e:
log(lvlError, fmt"Failed to start {process.name}: {e.msg}")
if not process.start():
break

process.readerFlowVar = spawn(readInput(process.inputStreamChannel, process.serverDiedNotifications, process.input.chan, process.output.chan))
process.inputStreamChannel[].send process.process.outputStream()

process.errorReaderFlowVar = spawn(readInput(process.errorStreamChannel, process.serverDiedNotifications, process.error.chan, process.output.chan))
process.errorStreamChannel[].send process.process.errorStream()

process.writerFlowVar = spawn(writeOutput(process.outputStreamChannel, process.output.chan))
process.outputStreamChannel[].send process.process.inputStream()

if not process.onRestarted.isNil:
process.onRestarted().await

proc startAsyncProcess*(name: string, args: seq[string] = @[], autoRestart = true): AsyncProcess =

proc startAsyncProcess*(name: string, args: seq[string] = @[], autoRestart = true, autoStart = true): AsyncProcess =
let process = AsyncProcess()
process.name = name
process.args = @args
Expand All @@ -283,7 +332,8 @@ proc startAsyncProcess*(name: string, args: seq[string] = @[], autoRestart = tru
process.serverDiedNotifications = cast[ptr Channel[bool]](allocShared0(sizeof(Channel[bool])))
process.serverDiedNotifications[].open()

asyncCheck process.restartServer()
process.serverDiedNotifications[].send true
if autoStart:
asyncCheck process.restartServer()
process.serverDiedNotifications[].send true

return process
return process

0 comments on commit 89cb489

Please sign in to comment.