Skip to content

Commit

Permalink
gracefully exit
Browse files Browse the repository at this point in the history
  • Loading branch information
tld committed Aug 14, 2024
1 parent 7d23bc1 commit c2cd571
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 22 deletions.
11 changes: 5 additions & 6 deletions eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,18 @@ func (eb *EventBus) Subscribe(id string, eventName string, ch chan Event) {
if _, ok := eb.subscribers[eventName]; !ok {
eb.subscribers[eventName] = make(map[string]chan Event)
}
if _, ok := eb.subscribers[eventName][id]; !ok {
eb.subscribers[eventName][id] = ch
} else {
eb.subscribers[eventName][id] = ch
}

eb.subscribers[eventName][id] = ch

}

func (eb *EventBus) Unsubscribe(id string, eventName string) {
func (eb *EventBus) Unsubscribe(id string, eventName string, handleFinished chan bool) {
eb.mu.Lock()
defer eb.mu.Unlock()
if _, ok := eb.subscribers[eventName][id]; !ok {
return
}
close(handleFinished)
delete(eb.subscribers[eventName], id)
}

Expand Down
3 changes: 2 additions & 1 deletion eventbus/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ func TestUnsubscribe(t *testing.T) {
bus := NewEventBus()

testCh := make(chan Event)
finished := make(chan bool)
eventName := "testEvent"
id := "testID"

bus.Subscribe(id, eventName, testCh)
bus.Unsubscribe(id, eventName)
bus.Unsubscribe(id, eventName, finished)

g.Eventually(func() bool {
_, ok := bus.subscribers[eventName][id]
Expand Down
14 changes: 5 additions & 9 deletions eventcenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Message struct {
eventbus.Event
}

// TODO: center管理connection
type EventCenter struct {
eb *eventbus.EventBus
}
Expand Down Expand Up @@ -43,16 +42,14 @@ func (center *EventCenter) listenAndWrite(eventCh chan eventbus.Event, conn *web
case event, ok := <-eventCh:
if !ok {
log.Printf("channel closed, %v unsubscribe topic: %v", msg.ID, msg.Name)
center.eb.Unsubscribe(msg.ID, msg.Name)
return
center.eb.Unsubscribe(msg.ID, msg.Name, handleFinished)
}
if err := conn.WriteJSON(event); err != nil {
log.Printf("write error: %v, %v unsubscribe topic: %v", err, msg.ID, msg.Name)
center.eb.Unsubscribe(msg.ID, msg.Name)
return
center.eb.Unsubscribe(msg.ID, msg.Name, handleFinished)
}
case <-handleFinished:
center.eb.Unsubscribe(msg.ID, msg.Name)
log.Printf("gracefully exit, client_id: %v", msg.ID)
return
}
}
Expand All @@ -74,17 +71,16 @@ func (center *EventCenter) HandleWebSocket(w http.ResponseWriter, r *http.Reques
err := conn.ReadJSON(msg)
if err != nil {
log.Printf("read json err: %v, %v unsubscribe topic: %v", err, msg.ID, msg.Name)
center.eb.Unsubscribe(msg.ID, msg.Name)
center.eb.Unsubscribe(msg.ID, msg.Name, handleFinished)
break
}
if msg.MsgType == "subscription" {
center.eb.Subscribe(msg.ID, msg.Name, eventCh)
center.listenAndWrite(eventCh, conn, msg, handleFinished)
} else if msg.MsgType == "unsubscription" {
center.eb.Unsubscribe(msg.ID, msg.Name)
center.eb.Unsubscribe(msg.ID, msg.Name, handleFinished)
time.Sleep(time.Second * 1)
break
}
}
handleFinished <- true
}
10 changes: 4 additions & 6 deletions resources/WebSocket Sampler.jmx
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
<elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables">
<collectionProp name="Arguments.arguments"/>
</elementProp>
<boolProp name="TestPlan.functional_mode">false</boolProp>
<boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
</TestPlan>
<hashTree>
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="线程组">
Expand Down Expand Up @@ -125,11 +123,11 @@
<stringProp name="dataFile"></stringProp>
</eu.luminis.jmeter.wssampler.RequestResponseWebSocketSampler>
<hashTree/>
<LoopController guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器">
<LoopController guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器" enabled="true">
<stringProp name="LoopController.loops">5000</stringProp>
</LoopController>
<hashTree>
<eu.luminis.jmeter.wssampler.SingleReadWebSocketSampler guiclass="eu.luminis.jmeter.wssampler.SingleReadWebSocketSamplerGui" testclass="eu.luminis.jmeter.wssampler.SingleReadWebSocketSampler" testname="WebSocket Single Read Sampler">
<eu.luminis.jmeter.wssampler.SingleReadWebSocketSampler guiclass="eu.luminis.jmeter.wssampler.SingleReadWebSocketSamplerGui" testclass="eu.luminis.jmeter.wssampler.SingleReadWebSocketSampler" testname="WebSocket Single Read Sampler" enabled="true">
<boolProp name="TLS">false</boolProp>
<stringProp name="server"></stringProp>
<stringProp name="port">80</stringProp>
Expand All @@ -148,15 +146,15 @@
</eu.luminis.jmeter.wssampler.CloseWebSocketSampler>
<hashTree/>
</hashTree>
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="线程组">
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="线程组" enabled="true">
<intProp name="ThreadGroup.num_threads">10</intProp>
<intProp name="ThreadGroup.ramp_time">1</intProp>
<longProp name="ThreadGroup.duration">2</longProp>
<longProp name="ThreadGroup.delay">5</longProp>
<boolProp name="ThreadGroup.same_user_on_next_iteration">true</boolProp>
<stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
<elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="循环控制器">
<stringProp name="LoopController.loops">500</stringProp>
<stringProp name="LoopController.loops">5000</stringProp>
<boolProp name="LoopController.continue_forever">false</boolProp>
</elementProp>
</ThreadGroup>
Expand Down

0 comments on commit c2cd571

Please sign in to comment.