Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
quelgar committed Dec 29, 2023
1 parent 3aad57f commit 93a0fa5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 3 deletions.
25 changes: 22 additions & 3 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ object Main {

def onClose: CloseCallback = CFuncPtr1.fromScalaFunction(stdlib.free)

def onWrite: StreamWriteCallback = CFuncPtr2.fromScalaFunction {
(req: WriteReq, status: ErrorCode) =>
status.onFailMessage(msg => println(s"Write failed: $msg"))
val buf = Buffer.unsafeFromNative(uv_req_get_data(req))
stdlib.free(buf.base)
stdlib.free(buf.toNative)
stdlib.free(req)
}

def onRead: StreamReadCallback = CFuncPtr3.fromScalaFunction {
(handle: StreamHandle, numRead: CSSize, buf: Buffer) =>
numRead match {
Expand All @@ -141,13 +150,21 @@ object Main {
case _ =>
val text = buf.asUtf8String(numRead.toInt)
println(s"Read $numRead bytes: $text")
stdlib.free(buf.base)
val writeReq = UvUtils.mallocRequest(RequestType.WRITE)
val outBuf = Buffer.malloc(buf.base, numRead.toULong)
uv_req_set_data(writeReq, outBuf.toNative)
uv_write(writeReq, handle, outBuf, 1.toUInt, onWrite)
.onFail {
stdlib.free(outBuf.base)
stdlib.free(outBuf.toNative)
stdlib.free(writeReq)
}
}
}

def onNewConnection: ConnectionCallback = CFuncPtr2.fromScalaFunction {
(handle: StreamHandle, status: ErrorCode) =>
val loop = uv_default_loop()
val loop = uv_handle_get_loop(handle)
val attempt = for {
_ <- status.attempt.mapErrorMessage(s =>
s"New connection error: $s"
Expand All @@ -167,9 +184,11 @@ object Main {
attempt.forError(e => println(e.message))
}

val port = 7000
println(s"Listening on port $port")
val serverTcpHandle = UvUtils.stackAllocateHandle(HandleType.UV_TCP)
uv_tcp_init(loop, serverTcpHandle).checkErrorThrowIO()
val serverSocketAddress = SocketAddress4.unspecifiedAddress(7000)
val serverSocketAddress = SocketAddress4.unspecifiedAddress(port)
uv_tcp_bind(serverTcpHandle, serverSocketAddress, 0.toUInt)
.checkErrorThrowIO()
uv_listen(serverTcpHandle, 128, onNewConnection).checkErrorThrowIO()
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/scalauv/Buffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ object Buffer {
uvBuf
}

def malloc(base: Ptr[Byte], size: CSize): Buffer = {
val uvBuf = stdlib.malloc(structureSize.toULong)
helpers.uv_scala_buf_init(base, size.toUInt, uvBuf)
uvBuf
}

def malloc(array: Array[Byte], index: Int = 0): Buffer = {
val uvBuf = stdlib.malloc(structureSize.toULong)
helpers.uv_scala_buf_init(
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/scalauv/LibUv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ object LibUv {

def uv_req_size(req_type: RequestType): CSize = extern

def uv_req_get_data(req: Req): Ptr[Byte] = extern

def uv_req_set_data(req: Req, data: Ptr[Byte]): Unit = extern

def uv_req_get_type(req: Req): RequestType = extern

def uv_req_type_name(reqType: RequestType): CString = extern

// =========================================================
// Async & Timers

Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/scalauv/UvUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import scala.scalanative.libc.*
import scala.scalanative.unsigned.*
import java.io.IOException
import scala.util.boundary
import scala.concurrent.Future
import scala.concurrent.Promise

inline def withZone[A](f: Zone ?=> A): A = Zone(implicit z => f(using z))

Expand Down Expand Up @@ -156,6 +158,9 @@ object UvUtils {
inline def zoneAllocateRequest(requestType: RequestType)(using Zone): Req =
alloc[Byte](uv_req_size(requestType))

inline def mallocRequest(requestType: RequestType): Req =
stdlib.malloc(uv_req_size(requestType)).asInstanceOf[Req]

object FsReq {

inline def use[A](inline f: Req => A): A = {
Expand Down Expand Up @@ -253,6 +258,16 @@ extension (uvResult: CInt) {
def uvIfSuccess[A](a: => A): Uv[A] =
if uvResult < 0 then Uv.fail(uvResult) else Uv.succeed(a)

def onFail(f: => Unit): CInt = {
if uvResult < 0 then f
uvResult
}

def onFailMessage(f: String => Unit): CInt = {
if uvResult < 0 then f(UvUtils.errorMessage(uvResult))
uvResult
}

}

final class IOVector(val nativeBuffers: Buffer, numberOfBuffers: Int) {
Expand Down

0 comments on commit 93a0fa5

Please sign in to comment.