-
Notifications
You must be signed in to change notification settings - Fork 2
/
events.go
68 lines (62 loc) · 1.71 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main
import (
"context"
"io"
eventsapi "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime"
"github.com/sirupsen/logrus"
)
func (s *Service) Forward(ctx context.Context, publisher events.Publisher) {
for e := range s.events {
ctx := namespaces.WithNamespace(ctx, e.ns)
err := publisher.Publish(ctx, GetTopic(e.e), e.e)
if err != nil {
logrus.WithError(err).Error("post event")
}
}
if closer, ok := publisher.(io.Closer); ok {
closer.Close()
}
close(s.waitEvents)
}
type eventEnvelope struct {
ns string
e interface{}
}
func (s *Service) send(ctx context.Context, ns string, e interface{}) {
select {
case <-ctx.Done():
case s.events <- eventEnvelope{ns, e}:
}
}
// GetTopic converts an event from an interface type to the specific
// event topic id
func GetTopic(e interface{}) string {
switch e.(type) {
case *eventsapi.TaskCreate:
return runtime.TaskCreateEventTopic
case *eventsapi.TaskStart:
return runtime.TaskStartEventTopic
case *eventsapi.TaskOOM:
return runtime.TaskOOMEventTopic
case *eventsapi.TaskExit:
return runtime.TaskExitEventTopic
case *eventsapi.TaskDelete:
return runtime.TaskDeleteEventTopic
case *eventsapi.TaskExecAdded:
return runtime.TaskExecAddedEventTopic
case *eventsapi.TaskExecStarted:
return runtime.TaskExecStartedEventTopic
case *eventsapi.TaskPaused:
return runtime.TaskPausedEventTopic
case *eventsapi.TaskResumed:
return runtime.TaskResumedEventTopic
case *eventsapi.TaskCheckpointed:
return runtime.TaskCheckpointedEventTopic
default:
logrus.Warnf("no topic for type %#v", e)
}
return runtime.TaskUnknownTopic
}