Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring and F# goodness #30

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Nostra.Client/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ let displayResponse (contacts : Map<byte[], Contact>) (addContact: ContactKey ->
| Ok metadata -> addContact contactKey metadata
| Error _ -> ()
| Ok (Response.RMEvent ("all", event)) ->
let (EventId eventid) = event.Id
let (EventId eventId) = event.Id

if not (receivedEvents.Contains eventid) then
if not (receivedEvents.Contains eventId) then
let contactKey =
if event.Kind = Kind.ChannelMessage then
let eventId =
Expand Down Expand Up @@ -56,12 +56,12 @@ let displayResponse (contacts : Map<byte[], Contact>) (addContact: ContactKey ->
let eventLink = link emoji $"https://njump.me/{nevent}"
let authorLink= link $"👤 {author}" $"https://njump.me/{authorNpub}"
Console.WriteLine $"{eventLink} {authorLink} 📅 {event.CreatedAt}"
Console.ForegroundColor <- enum<ConsoleColor> (-1)
Console.ForegroundColor <- enum<ConsoleColor> -1
Console.WriteLine (event.Content.Trim())
//Console.ForegroundColor <- ConsoleColor.DarkGray
//Console.WriteLine (event.Tags |> List.map (fun (t, vs) -> $"{t}:{vs}"))
Console.WriteLine ()
receivedEvents.Add eventid |> ignore
receivedEvents.Add eventId |> ignore

| Ok (Response.RMACK(eventId, success, message)) ->
Console.ForegroundColor <- ConsoleColor.Green
Expand Down
2 changes: 1 addition & 1 deletion Nostra.Client/User.fs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ module User =

let addContact key metadata user =
let contacts = { key = key; metadata = metadata }:: user.contacts
{ user with contacts = List.distinctBy (fun c -> c.key) contacts }
{ user with contacts = List.distinctBy (_.key) contacts }

let subscribeAuthors (authors : AuthorId list) user =
let authors' = List.distinctBy AuthorId.toBytes (user.subscribedAuthors @ authors)
Expand Down
36 changes: 20 additions & 16 deletions Nostra.Relay/ClientRegistry.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,42 @@ open Nostra.Relay

type EventEvaluator = StoredEvent -> unit
type ClientId = ClientId of IPAddress * uint16

type ClientRegistry = {
subscribe : ClientId -> EventEvaluator -> unit
unsubscribe : ClientId -> unit
notifyEvent : StoredEvent -> unit
notifyEvent : StoredEvent -> unit
}

type ClientRegistryAction =
| Subscribe of ClientId * EventEvaluator
| Unsubscribe of ClientId
| NotifyEvent of StoredEvent


[<TailCall>]
let rec processClientRegistrationRequestLoop
(evaluators: Dictionary<ClientId, EventEvaluator>)
(notifyToAll: StoredEvent -> unit)
(inbox: MailboxProcessor<ClientRegistryAction>) = async {
let! msg = inbox.Receive()
match msg with
| Subscribe (clientId, evaluator) -> evaluators.Add(clientId, evaluator)
| Unsubscribe clientId -> evaluators.Remove(clientId) |> ignore
| NotifyEvent storedEvent -> notifyToAll storedEvent
return! processClientRegistrationRequestLoop evaluators notifyToAll inbox
}

let createClientRegistry () =
let evaluators = Dictionary<ClientId, EventEvaluator>()

let notifyToAll event =
evaluators
|> Seq.map (fun kv -> kv.Value)
|> Seq.map (_.Value)
|> Seq.iter (fun evaluator -> evaluator event)

let worker =
MailboxProcessor<ClientRegistryAction>.Start(fun inbox ->
let rec loop () = async {
let! msg = inbox.Receive()
match msg with
| Subscribe (clientId, evaluator) -> evaluators.Add(clientId, evaluator)
| Unsubscribe clientId -> evaluators.Remove(clientId) |> ignore
| NotifyEvent storedEvent -> notifyToAll storedEvent
return! loop ()
}
loop () )

MailboxProcessor<ClientRegistryAction>.Start(processClientRegistrationRequestLoop evaluators notifyToAll)

{
subscribe = fun clientId evaluator -> worker.Post (Subscribe (clientId, evaluator))
unsubscribe = fun clientId -> worker.Post (Unsubscribe clientId)
Expand Down
86 changes: 46 additions & 40 deletions Nostra.Relay/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,57 @@ open Suave.Sockets
open Suave.Sockets.Control
open Suave.WebSocket

[<TailCall>]
let rec processRelayMessagesLoop (webSocket: WebSocket) (inbox: MailboxProcessor<RelayMessage>) = async {
let! msg = inbox.Receive()
let! result = webSocket.send Text (toPayload msg) true
return! processRelayMessagesLoop webSocket inbox
}

[<TailCall>]
let rec processRequestLoop
(clientId: ClientId)
(webSocket: WebSocket)
(env: Context)
(send: RelayMessage -> unit)
(processRequest: string -> Async<Result<RelayMessage list,EventProcessingError>>) = socket {
let! msg = webSocket.read()
match msg with
| Text, data, true ->
let requestText = UTF8.toString data
env.logger.logDebug requestText
processRequest requestText
|> AsyncResult.map (function
| [ ] -> ()
| (final::messages) ->
messages
|> List.rev
|> List.iter send
send final
())
|> Async.RunSynchronously
|> Result.defaultWith (function
| BusinessError e ->
send e
| UnexpectedError e ->
env.logger.logError (e.ToString())
send (RMNotice "unexpected error"))

return! processRequestLoop clientId webSocket env send processRequest
| Close, _, _ ->
env.clientRegistry.unsubscribe clientId
let emptyResponse = [||] |> ByteSegment
do! webSocket.send Close emptyResponse true
| _ ->
return! processRequestLoop clientId webSocket env send processRequest
}
let webSocketHandler () =
let handle (env : Context) (webSocket : WebSocket) (context: HttpContext) =
let subscriptions = Dictionary<SubscriptionId, Filter list>()

let send =
let worker =
MailboxProcessor<RelayMessage>.Start(fun inbox ->
let rec loop () = async {
let! msg = inbox.Receive()
let! result = webSocket.send Text (toPayload msg) true
return! loop ()
}
loop () )
MailboxProcessor<RelayMessage>.Start(processRelayMessagesLoop webSocket)
worker.Post

let notifyEvent : EventEvaluator =
Expand All @@ -49,40 +87,8 @@ let webSocketHandler () =

let processRequest req = processRequest env subscriptions req

let rec loop () = socket {
let! msg = webSocket.read()
match msg with
| Text, data, true ->
let requestText = UTF8.toString data
env.logger.logDebug requestText
processRequest requestText
|> AsyncResult.map (function
| [ ] -> ()
| (final::messages) ->
messages
|> List.rev
|> List.iter send
send final
())
|> Async.RunSynchronously
|> Result.defaultWith (function
| BusinessError e ->
send e
| UnexpectedError e ->
env.logger.logError (e.ToString())
send (RMNotice "unexpected error"))

return! loop()
| Close, _, _ ->
env.clientRegistry.unsubscribe clientId
let emptyResponse = [||] |> ByteSegment
do! webSocket.send Close emptyResponse true
| _ ->
return! loop()
}

env.clientRegistry.subscribe clientId notifyEvent
loop ()
processRequestLoop clientId webSocket env send processRequest
Monad.Reader (fun (ctx : Context) -> handle ctx)

open Suave.Operators
Expand Down
2 changes: 1 addition & 1 deletion Nostra.Tests/TestingFramework.fs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ let deleteNote evnts : EventFactory =
let ids =
allEvents
|> List.filter (fun e -> List.contains (e.Content) evnts)
|> List.map (fun e -> e.Id)
|> List.map (_.Id)

Event.createDeleteEvent ids "nothing"

Expand Down
20 changes: 8 additions & 12 deletions Nostra/Bech32.fs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ module Bech32 =

type Pad = int -> int -> int list list -> int list list

let yesPadding bits padValue result =
match (bits, padValue, result) with
| 0, _, result -> result
| _, padValue, result -> [ padValue ] :: result
let yesPadding : Pad =
fun bits padValue result ->
match (bits, padValue, result) with
| 0, _, result -> result
| _, padValue, result -> [ padValue ] :: result

let noPadding _ _ result = result
let noPadding : Pad =
fun _ _ result -> result

let convertBits (data: byte list) fromBits toBits (pad: Pad) =
let maxValue = (1 <<< toBits) - 1
Expand Down Expand Up @@ -93,20 +95,14 @@ module Bech32 =
hrp + "1" + encoded

let decode (str: string) =
let lift l =
if List.contains None l then
None
else
Some(List.map Option.get l)

let lastOneIndex = str.IndexOf('1')
let hrp = HRP str[0 .. lastOneIndex - 1]
let data = str[lastOneIndex + 1 ..]

data
|> Seq.map (fun x -> charsetRev[int x])
|> Seq.toList
|> lift
|> List.lift
|> Option.bind (fun d ->
if verifyChecksum hrp d then
Some(d[.. d.Length - 7])
Expand Down
55 changes: 28 additions & 27 deletions Nostra/Client.fs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,15 @@ module Client =

let events evnts filter =
{ filter with
Events =
evnts
|> List.append filter.Events
|> List.distinct }
Events = filter.Events |> List.addUniques evnts }

let authors authors (filter : SubscriptionFilter) =
{ filter with
Authors =
authors
|> List.append filter.Authors
|> List.distinct }
Authors = filter.Authors |> List.addUniques authors }

let referenceAuthor authors (filter : SubscriptionFilter) =
{ filter with
PubKeys = filter.PubKeys |> List.addUniques authors }

let channels channels filter =
let filter' = events channels filter
Expand Down Expand Up @@ -175,18 +173,19 @@ module Client =
open Response

module Communication =
[<TailCall>]
let rec _readMessage (ctx: Context) (mem:MemoryStream) = async {
let buffer = ArrayPool.Shared.Rent(1024)
let! result = ctx.WebSocket.read buffer
mem.Write (buffer, 0, result.Count)
ArrayPool.Shared.Return buffer
if result.EndOfMessage then
return mem.ToArray()
else
return! _readMessage ctx mem
}
let readWebSocketMessage (ctx: Context) =
let rec readMessage (mem:MemoryStream) = async {
let buffer = ArrayPool.Shared.Rent(1024)
let! result = ctx.WebSocket.read buffer
mem.Write (buffer, 0, result.Count)
ArrayPool.Shared.Return buffer
if result.EndOfMessage then
return mem.ToArray()
else
return! readMessage mem
}
readMessage (new MemoryStream (4 * 1024))
_readMessage ctx (new MemoryStream (4 * 1024))

let receiveMessage =
Monad.Reader (fun (ctx: Context) -> async {
Expand All @@ -204,6 +203,7 @@ module Client =
| _ -> Ok relayMsg )
})

[<TailCall>]
let rec startReceiving callback =
let rec loop (ctx: Context) = async {
let (Monad.Reader r ) = receiveMessage
Expand All @@ -213,16 +213,17 @@ module Client =
}
Monad.Reader (fun (ctx: Context) -> loop ctx)

[<TailCall>]
let rec processClientMessageLoop ctx (inbox: MailboxProcessor<ClientMessage>) = async {
let! msg = inbox.Receive()
let serializedMessage = msg |> Request.serialize
let payload = serializedMessage |> Encoding.UTF8.GetBytes
do! ctx.WebSocket.write payload
return! processClientMessageLoop ctx inbox }

let sender () =
let createPusher (ctx: Context) =
MailboxProcessor<Request.ClientMessage>.Start (fun inbox ->
let rec loop () = async {
let! msg = inbox.Receive()
let serializedMessage = msg |> Request.serialize
let payload = serializedMessage |> Encoding.UTF8.GetBytes
do! ctx.WebSocket.write payload
return! loop() }
loop () )
MailboxProcessor<ClientMessage>.Start (processClientMessageLoop ctx)

Monad.Reader(fun (ctx: Context) ->
let pusher = createPusher ctx
Expand Down
4 changes: 4 additions & 0 deletions Nostra/Utils.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ module List =
else
Some (List.map Option.get lst)

let addUniques items list =
items
|> List.append list
|> List.distinct

[<RequireQualifiedAccess>]
module Result =
Expand Down
Loading