Skip to content

Commit

Permalink
fixed race issue
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Oct 22, 2023
1 parent ad6fc26 commit 3308252
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 32 deletions.
17 changes: 7 additions & 10 deletions cmds/liter-server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *Server)handle(c *liter.Conn, cfg *Config){
}
cid := s.conns.Put(c0)
wc.OnClose = func(){
wc.Emit(&script.Event{ Name: "close" })
wc.Emit(&script.Event{ Name: "close", Data: Map{ "conn": wc.Exports() } })
s.conns.Free(cid)
}

Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *Server)handle(c *liter.Conn, cfg *Config){
}
}()
wconn.OnClose = func(){
wconn.Emit(&script.Event{ Name: "close" })
wconn.Emit(&script.Event{ Name: "close", Data: Map{ "conn": wconn.Exports() } })
}

ploger.Debugf("Target %q connected", svr.Id)
Expand Down Expand Up @@ -223,7 +223,7 @@ func (s *Server)handle(c *liter.Conn, cfg *Config){
ploger.Errorf("Error at client connection: %v", err)
if <-wconn.Emit(script.NewEvent("before_close", Map{
"conn": wconn.Exports(),
"error": err,
"error": err.Error(),
})) {
ploger.Infof("Server connection default close action prevented")
preventSvrSideClose = true
Expand All @@ -232,7 +232,7 @@ func (s *Server)handle(c *liter.Conn, cfg *Config){
ploger.Errorf("Error at server connection: %v", err)
if <-wc.Emit(script.NewEvent("before_close", Map{
"conn": wc.Exports(),
"error": err,
"error": err.Error(),
})) {
ploger.Infof("Client connection default close action prevented")
preventCliSideClose = true
Expand Down Expand Up @@ -395,26 +395,23 @@ func parseAndForward(dst, src *script.WrappedConn)(<-chan error){
src.Emit(&script.Event{
Name: "error",
Data: Map{
"src": src.Exports(),
"dst": dst.Exports(),
"conn": src.Exports(),
"error": err.Error(),
},
})
}
return
}
if !<-src.Emit(script.NewEvent("packet", Map{
"src": src.Exports(),
"dst": dst.Exports(),
"conn": src.Exports(),
"packet": r,
})) {
if err = dst.Send(pkt.Reset(r.Protocol(), r.Id()).ByteArray(r.Bytes())); err != nil {
if !dst.Closed() {
dst.Emit(&script.Event{
Name: "error",
Data: Map{
"src": src.Exports(),
"dst": dst.Exports(),
"conn": dst.Exports(),
"error": err.Error(),
},
})
Expand Down
2 changes: 1 addition & 1 deletion script/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func WrapConn(conn *liter.Conn, vm *goja.Runtime, loop *eventloop.EventLoop)(c *
panic(vm.NewTypeError("arg#0 is not a integer of packet id"))
}
id := (liter.VarInt)(call.Argument(0).ToInteger())
return WrapPacketBuilder(liter.NewPacket(conn.Protocol(), id), conn, vm)
return WrapPacketBuilder(liter.NewPacket(conn.Protocol(), id), conn, vm, loop)
})
c.emitter.ExportTo(o)
c.exports = o
Expand Down
34 changes: 17 additions & 17 deletions script/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func NewEventEmitter(vm *goja.Runtime, loop *eventloop.EventLoop)(e *EventEmitte
}

func (e *EventEmitter)ExportTo(o *goja.Object){
o.Set("on", e.On)
o.Set("addListener", e.On)
o.Set("off", e.Off)
o.Set("removeListener", e.Off)
o.Set("on", e.on)
o.Set("addListener", e.on)
o.Set("off", e.off)
o.Set("removeListener", e.off)
o.Set("emit", func(call goja.FunctionCall, vm *goja.Runtime)(goja.Value){
event := &Event{
Name: call.Arguments[0].String(),
Expand All @@ -46,25 +46,25 @@ func (e *EventEmitter)ExportTo(o *goja.Object){
event.Cancelable = cancelable
}
}
return vm.ToValue(e.Emit(event))
return vm.ToValue(e.emit(event))
})
o.Set("removeAllListeners", e.RemoveAllListeners)
o.Set("eventNames", e.EventNames)
o.Set("listeners", e.Listeners)
o.Set("removeAllListeners", e.removeAllListeners)
o.Set("eventNames", e.eventNames)
o.Set("listeners", e.getListeners)
}

func (e *EventEmitter)On(name string, listener goja.Callable)(*EventEmitter){
func (e *EventEmitter)on(name string, listener goja.Callable)(*EventEmitter){
e.listeners[name] = append(e.listeners[name], listener)
return e
}

func (e *EventEmitter)OnAsync(name string, listener goja.Callable){
e.loop.RunOnLoop(func(vm *goja.Runtime){
e.On(name, listener)
e.on(name, listener)
})
}

func (e *EventEmitter)Off(name string, listener goja.Callable){
func (e *EventEmitter)off(name string, listener goja.Callable){
if listeners, ok := e.listeners[name]; ok {
hp := reflect.ValueOf(listener).Pointer()
for i := len(listeners) - 1; i >= 0; i-- {
Expand All @@ -79,15 +79,15 @@ func (e *EventEmitter)Off(name string, listener goja.Callable){

func (e *EventEmitter)OffAsync(name string, listener goja.Callable){
e.loop.RunOnLoop(func(vm *goja.Runtime){
e.Off(name, listener)
e.off(name, listener)
})
}

func (e *EventEmitter)RemoveAllListeners(name string){
func (e *EventEmitter)removeAllListeners(name string){
delete(e.listeners, name)
}

func (e *EventEmitter)Emit(event *Event)(cancelled bool){
func (e *EventEmitter)emit(event *Event)(cancelled bool){
if event.Cancelled() {
panic(e.vm.ToValue("Error: Trying to emit a cancelled event"))
}
Expand All @@ -112,20 +112,20 @@ func (e *EventEmitter)EmitAsync(event *Event)(done <-chan bool){
exit := make(chan bool, 1)
e.loop.RunOnLoop(func(vm *goja.Runtime){
defer close(exit)
exit <- e.Emit(event)
exit <- e.emit(event)
})
return exit
}

func (e *EventEmitter)EventNames()(names []string){
func (e *EventEmitter)eventNames()(names []string){
names = make([]string, 0, len(e.listeners))
for n, _ := range e.listeners {
names = append(names, n)
}
return
}

func (e *EventEmitter)Listeners(name string)(handlers []goja.Callable){
func (e *EventEmitter)getListeners(name string)(handlers []goja.Callable){
listeners := e.listeners[name]
handlers = make([]goja.Callable, 0, len(listeners))
for _, c := range listeners {
Expand Down
18 changes: 14 additions & 4 deletions script/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (

"github.com/google/uuid"
"github.com/dop251/goja"
"github.com/dop251/goja_nodejs/eventloop"
"github.com/kmcsr/go-liter"
)

func WrapPacketBuilder(p *liter.PacketBuilder, conn *liter.Conn, vm *goja.Runtime)(o *goja.Object){
func WrapPacketBuilder(p *liter.PacketBuilder, conn *liter.Conn, vm *goja.Runtime, loop *eventloop.EventLoop)(o *goja.Object){
o = vm.NewObject()
sent := false
o.DefineAccessorProperty("protocol", vm.ToValue(func(goja.FunctionCall)(goja.Value){
Expand All @@ -26,9 +27,13 @@ func WrapPacketBuilder(p *liter.PacketBuilder, conn *liter.Conn, vm *goja.Runtim
pm, r, e := vm.NewPromise()
go func(){
if err := conn.Send(p); err != nil {
e(err)
loop.RunOnLoop(func(*goja.Runtime){
e(err)
})
}else{
r(goja.Undefined())
loop.RunOnLoop(func(*goja.Runtime){
r(goja.Undefined())
})
}
}()
return vm.ToValue(pm)
Expand Down Expand Up @@ -130,7 +135,12 @@ func WrapPacketReader(p *liter.PacketReader, vm *goja.Runtime)(*WrappedPacketRea
if !p.ByteArray(buf) {
panic(EOF)
}
return vm.ToValue(vm.NewArrayBuffer(buf))
jsUint8Array, _ := goja.AssertConstructor(vm.Get("Uint8Array"))
res, err := jsUint8Array(vm.NewObject(), vm.ToValue(vm.NewArrayBuffer(buf)))
if err != nil {
panic(err)
}
return res
})
o.Set("bool", func(call goja.FunctionCall)(goja.Value){
v, ok := p.Bool()
Expand Down

0 comments on commit 3308252

Please sign in to comment.