Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
quelgar committed Dec 30, 2023
1 parent 6d79067 commit a070029
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 50 deletions.
4 changes: 1 addition & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,4 @@ scalacOptions ++= Seq(
"-deprecation"
)

// libraryDependencies += "dev.zio" %% "zio-test" % "2.0.20" % Test

Test / nativeLinkingOptions += "-luv"
// Test / nativeLinkingOptions += "-luv"
8 changes: 0 additions & 8 deletions src/main/resources/scala-native/helpers.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ size_t uv_scala_buf_struct_size()
return sizeof(uv_buf_t);
}

// size_t b42_uv_buf_t_size()
// {
// sizeof(long int);
// sizeof(char);
// sizeof(void*);
// return sizeof(uv_buf_t);
// }

size_t uv_scala_mutex_t_size()
{
return sizeof(uv_mutex_t);
Expand Down
200 changes: 165 additions & 35 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@ import unsigned.*
import LibUv.*
import UvUtils.*
import scala.scalanative.libc.stdlib
import scala.scalanative.libc.string

object Main {

private val DoneMarker = '!'

private var receivedData = Vector.empty[String]
private var failed = false

private val text = "my country is the world, and my religion is to do good"

def main(args: Array[String]): Unit = {

val intSize = sizeof[CInt]
Expand All @@ -26,50 +35,42 @@ object Main {
} yield (a, b, c, d, e)
println(x)

def recordReceived(s: String): Unit = {
receivedData = receivedData :+ s
}

def setFailed(): Unit = {
failed = true
}

withZone {
val loop = uv_default_loop()

println("Running!")

def allocBuffer: AllocCallback = CFuncPtr3.fromScalaFunction {
(handle: StreamHandle, suggestedSize: CSize, buf: Buffer) =>
buf.mallocInit(suggestedSize)
}

def onClose: CloseCallback = CFuncPtr1.fromScalaFunction(stdlib.free)

def onWrite: StreamWriteCallback = CFuncPtr2.fromScalaFunction {
(req: WriteReq, status: ErrorCode) =>
status.onFailMessage(msg => println(s"Write failed: $msg"))
val buf = Buffer.unsafeFromNative(uv_req_get_data(req))
stdlib.free(buf.base)
stdlib.free(buf.toNative)
stdlib.free(req)
}

def onRead: StreamReadCallback = CFuncPtr3.fromScalaFunction {
(handle: StreamHandle, numRead: CSSize, buf: Buffer) =>
numRead match {
case ErrorCodes.EOF =>
println("EOF")
uv_close(handle, onClose)
case code if code < 0 =>
println(s"Read error: ${UvUtils.errorMessage(code.toInt)}")
uv_close(handle, onClose)
setFailed()
case _ =>
val text = buf.asUtf8String(numRead.toInt)
println(s"Read $numRead bytes: $text")
val writeReq = UvUtils.mallocRequest(RequestType.WRITE)
val outBuf = Buffer.malloc(buf.base, numRead.toULong)
uv_req_set_data(writeReq, outBuf.toNative)
uv_write(writeReq, handle, outBuf, 1.toUInt, onWrite)
.onFail {
stdlib.free(outBuf.base)
stdlib.free(outBuf.toNative)
stdlib.free(writeReq)
}
()
val (text, done) =
buf.asUtf8String(numRead.toInt).span(_ != DoneMarker)
recordReceived(text)
if done.nonEmpty then {
val listenHandle = uv_handle_get_data(handle)
uv_close(listenHandle, null)
}
}
stdlib.free(buf.base)
}

def onNewConnection: ConnectionCallback = CFuncPtr2.fromScalaFunction {
Expand All @@ -86,33 +87,162 @@ object Main {
uv_tcp_init(loop, clientTcpHandle).attempt
.mapErrorMessage(s => s"TCP handle init failed: $s")
}
_ <- Uv.succeed {
uv_handle_set_data(clientTcpHandle, handle)
}
_ <- uv_accept(handle, clientTcpHandle).attempt
.mapErrorMessage(s => s"Accept failed: $s")
_ <- uv_read_start(clientTcpHandle, allocBuffer, onRead).attempt
.mapErrorMessage(s => s"Read start failed: $s")
} yield ()
attempt.forError(e => println(e.message))
attempt.forError(_ => setFailed())
}

val port = 7000
println(s"Listening on port $port")
val serverTcpHandle = UvUtils.stackAllocateHandle(HandleType.UV_TCP)
uv_tcp_init(loop, serverTcpHandle).checkErrorThrowIO()
val serverSocketAddress = SocketAddress4.unspecifiedAddress(port)
uv_tcp_bind(serverTcpHandle, serverSocketAddress, 0.toUInt)
.checkErrorThrowIO()
uv_listen(serverTcpHandle, 128, onNewConnection).checkErrorThrowIO()

uv_run(loop, RunMode.DEFAULT)
def onWrite: StreamWriteCallback = CFuncPtr2.fromScalaFunction {
(req: WriteReq, status: ErrorCode) =>
status.onFail(setFailed())
val buf = Buffer.unsafeFromNative(uv_req_get_data(req))
stdlib.free(buf.base)
buf.free()
stdlib.free(req)
}

def onConnect: ConnectCallback = CFuncPtr2.fromScalaFunction {
(req: ConnectReq, status) =>
status.onFail { failed = true }
val stream = req.connectReqStreamHandle
def doWrite(text: String) = {
val writeReq = UvUtils.mallocRequest(RequestType.WRITE)
val cText = mallocCString(text)
val buf = Buffer.malloc(cText, string.strlen(cText).toULong)
uv_req_set_data(writeReq, buf.toNative)
uv_write(writeReq, stream, buf, 1.toUInt, onWrite).onFail {
stdlib.free(cText)
buf.free()
stdlib.free(writeReq)
setFailed()
}
}
doWrite(text)
doWrite(text)
doWrite(DoneMarker.toString)
uv_close(stream, null)
()
}

val clientTcpHandle = UvUtils.stackAllocateHandle(HandleType.UV_TCP)
uv_tcp_init(loop, clientTcpHandle).checkErrorThrowIO()
val clientSocketAddress = SocketAddress4.loopbackAddress(port)
val connectReq = UvUtils.stackAllocateRequest(RequestType.CONNECT)
uv_tcp_connect(
connectReq,
clientTcpHandle,
clientSocketAddress,
onConnect
).checkErrorThrowIO()

uv_run(loop, RunMode.DEFAULT).checkErrorThrowIO()

()
}

// val length: unsafe.CUnsignedInt = 200.toUInt
// val b = unsafe.stackalloc[unsafe.CChar](length)
// println("Calling buf_init")
// val uvBuf = LibUv.uv_buf_init(b, length)
// println(s"_1 = ${uvBuf._1}")
// println(s"_2 = ${uvBuf._2}")
println(failed)
println(receivedData)
}

// withZone {
// val loop = uv_default_loop()

// println("Running!")

// def allocBuffer: AllocCallback = CFuncPtr3.fromScalaFunction {
// (handle: StreamHandle, suggestedSize: CSize, buf: Buffer) =>
// buf.mallocInit(suggestedSize)
// }

// def onClose: CloseCallback = CFuncPtr1.fromScalaFunction(stdlib.free)

// def onWrite: StreamWriteCallback = CFuncPtr2.fromScalaFunction {
// (req: WriteReq, status: ErrorCode) =>
// status.onFailMessage(msg => println(s"Write failed: $msg"))
// val buf = Buffer.unsafeFromNative(uv_req_get_data(req))
// stdlib.free(buf.base)
// stdlib.free(buf.toNative)
// stdlib.free(req)
// }

// def onRead: StreamReadCallback = CFuncPtr3.fromScalaFunction {
// (handle: StreamHandle, numRead: CSSize, buf: Buffer) =>
// numRead match {
// case ErrorCodes.EOF =>
// println("EOF")
// uv_close(handle, onClose)
// case code if code < 0 =>
// println(s"Read error: ${UvUtils.errorMessage(code.toInt)}")
// uv_close(handle, onClose)
// case _ =>
// val text = buf.asUtf8String(numRead.toInt)
// println(s"Read $numRead bytes: $text")
// val writeReq = UvUtils.mallocRequest(RequestType.WRITE)
// val outBuf = Buffer.malloc(buf.base, numRead.toULong)
// uv_req_set_data(writeReq, outBuf.toNative)
// uv_write(writeReq, handle, outBuf, 1.toUInt, onWrite)
// .onFail {
// stdlib.free(outBuf.base)
// stdlib.free(outBuf.toNative)
// stdlib.free(writeReq)
// }
// ()
// }
// }

// def onNewConnection: ConnectionCallback = CFuncPtr2.fromScalaFunction {
// (handle: StreamHandle, status: ErrorCode) =>
// val loop = uv_handle_get_loop(handle)
// val attempt = for {
// _ <- status.attempt.mapErrorMessage(s =>
// s"New connection error: $s"
// )
// clientTcpHandle = UvUtils.mallocHandle(HandleType.UV_TCP)
// _ <- Uv.onFail(uv_close(clientTcpHandle, onClose))
// _ <- {
// println("New connection")
// uv_tcp_init(loop, clientTcpHandle).attempt
// .mapErrorMessage(s => s"TCP handle init failed: $s")
// }
// _ <- uv_accept(handle, clientTcpHandle).attempt
// .mapErrorMessage(s => s"Accept failed: $s")
// _ <- uv_read_start(clientTcpHandle, allocBuffer, onRead).attempt
// .mapErrorMessage(s => s"Read start failed: $s")
// } yield ()
// attempt.forError(e => println(e.message))
// }

// val port = 7000
// println(s"Listening on port $port")
// val serverTcpHandle = UvUtils.stackAllocateHandle(HandleType.UV_TCP)
// uv_tcp_init(loop, serverTcpHandle).checkErrorThrowIO()
// val serverSocketAddress = SocketAddress4.unspecifiedAddress(port)
// uv_tcp_bind(serverTcpHandle, serverSocketAddress, 0.toUInt)
// .checkErrorThrowIO()
// uv_listen(serverTcpHandle, 128, onNewConnection).checkErrorThrowIO()

// uv_run(loop, RunMode.DEFAULT)

// ()
// }

// val length: unsafe.CUnsignedInt = 200.toUInt
// val b = unsafe.stackalloc[unsafe.CChar](length)
// println("Calling buf_init")
// val uvBuf = LibUv.uv_buf_init(b, length)
// println(s"_1 = ${uvBuf._1}")
// println(s"_2 = ${uvBuf._2}")
}
1 change: 1 addition & 0 deletions src/main/scala/scalauv/Buffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extension (buffer: Buffer) {
inline def mallocInit(size: CSize): Unit =
helpers.uv_scala_buf_init(stdlib.malloc(size), size.toUInt, buffer)

inline def free(): Unit = stdlib.free(buffer)
}

object Buffer {
Expand Down
13 changes: 11 additions & 2 deletions src/main/scala/scalauv/LibUv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ object LibUv {

def uv_loop_close(loop: Loop): ErrorCode = extern

def uv_loop_size(): CSize = extern

def uv_loop_init(loop: Loop): ErrorCode = extern

// def uv_loop_configure(loop: Loop, option: CInt, value: CInt): ErrorCode =
// extern

def uv_ip4_addr(
ip: CString,
port: CInt,
Expand Down Expand Up @@ -343,7 +350,7 @@ object LibUv {
req: Req,
handle: TcpHandle,
addr: SocketAddressIp4,
cb: FsCallback
cb: ConnectCallback
): ErrorCode = extern

def uv_tcp_close_reset(handle: TcpHandle, cb: CloseCallback): ErrorCode =
Expand Down Expand Up @@ -462,7 +469,9 @@ private[scalauv] object helpers {

def uv_scala_mutex_t_size(): CSize = extern

def uv_scala_connect_stream_handle(req: LibUv.Req): LibUv.StreamHandle =
def uv_scala_connect_stream_handle(
req: LibUv.ConnectReq
): LibUv.StreamHandle =
extern

def uv_scala_shutdown_stream_handle(req: LibUv.Req): LibUv.StreamHandle =
Expand Down
33 changes: 31 additions & 2 deletions src/main/scala/scalauv/UvUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,25 @@ import scala.scalanative.libc.*
import scala.scalanative.unsigned.*
import java.io.IOException
import scala.util.boundary

inline def withZone[A](f: Zone ?=> A): A = Zone(implicit z => f(using z))
import java.nio.charset.StandardCharsets
import java.nio.charset.Charset

inline def withZone[A](f: Zone ?=> A): A = Zone(z => f(using z))

def mallocCString(
s: String,
charset: Charset = StandardCharsets.UTF_8
): CString = {
if s.isEmpty() then c""
else {
val bytes = s.getBytes(charset)
val size = bytes.length.toUInt
val cString = stdlib.malloc(size + 1.toUInt)
string.memcpy(cString, bytes.at(0), size)
!(cString + size) = 0.toByte
cString
}
}

enum Uv[+A] {

Expand Down Expand Up @@ -346,6 +363,8 @@ object Ip4Address {

val Unspecified: Ip4Address = apply(0, 0, 0, 0)

val loopback: Ip4Address = apply(127, 0, 0, 1)

}

type Port = Int
Expand Down Expand Up @@ -381,6 +400,16 @@ object SocketAddress4 {
inline def unspecifiedAddress(port: Port): SocketAddressIp4 =
apply(Ip4Address.Unspecified, port)

inline def loopbackAddress(port: Port): SocketAddressIp4 =
apply(Ip4Address.loopback, port)

}

opaque type SocketAddressIp6 = Ptr[Byte]

extension (r: LibUv.ConnectReq) {

inline def connectReqStreamHandle: LibUv.StreamHandle =
helpers.uv_scala_connect_stream_handle(r)

}
Loading

0 comments on commit a070029

Please sign in to comment.