-
Notifications
You must be signed in to change notification settings - Fork 15
/
stream_layer.go
162 lines (133 loc) · 3.6 KB
/
stream_layer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package onion
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
)
var (
decLock sync.RWMutex
decoders = map[string]Decoder{
"json": &jsonDecoder{},
}
)
// Cipher is used to decrypt data on loading
type Cipher interface {
Decrypt(io.Reader) ([]byte, error)
}
// Decoder is a stream decoder to convert a stream into a map of config keys, json is supported out of
// the box
type Decoder interface {
Decode(context.Context, io.Reader) (map[string]interface{}, error)
}
type jsonDecoder struct {
}
func decrypt(c Cipher, r io.Reader) (io.Reader, error) {
if c == nil {
return r, nil
}
b, err := c.Decrypt(r)
if err != nil {
return nil, err
}
return bytes.NewReader(b), nil
}
func (jd *jsonDecoder) Decode(_ context.Context, r io.Reader) (map[string]interface{}, error) {
var data map[string]interface{}
if err := json.NewDecoder(r).Decode(&data); err != nil {
return nil, err
}
return data, nil
}
// RegisterDecoder add a new decoder to the system, json is registered out of the box
func RegisterDecoder(dec Decoder, formats ...string) {
decLock.Lock()
defer decLock.Unlock()
for _, format := range formats {
format := strings.ToLower(format)
_, alreadyExists := decoders[format]
if alreadyExists {
log.Fatalf("decoder for format %q is already registered: you can have only one", format)
}
decoders[format] = dec
}
}
// GetDecoder returns the decoder based on its name, it may returns nil if the decoder is not
// registered
func GetDecoder(format string) Decoder {
decLock.RLock()
defer decLock.RUnlock()
return decoders[strings.ToLower(format)]
}
type streamLayer struct {
c chan map[string]interface{}
cipher Cipher
}
func (sl *streamLayer) Load() map[string]interface{} {
return <-sl.c
}
func (sl *streamLayer) Watch() <-chan map[string]interface{} {
return sl.c
}
func (sl *streamLayer) Reload(ctx context.Context, r io.Reader, format string) error {
dec := GetDecoder(format)
if dec == nil {
return fmt.Errorf("format %q is not registered", format)
}
dr, err := decrypt(sl.cipher, r)
if err != nil {
return err
}
data, err := dec.Decode(ctx, dr)
if err != nil {
return err
}
go func() {
select {
case sl.c <- data:
case <-ctx.Done():
}
}()
return nil
}
// NewStreamLayerContext try to create a layer based on a stream, the format should be a registered
// format (see RegisterDecoder) and if the Cipher is not nil, it pass data to cipher first.
// A nil cipher is accepted as plain cipher
func NewStreamLayerContext(ctx context.Context, r io.Reader, format string, c Cipher) (Layer, error) {
if r == nil {
return nil, fmt.Errorf("nil stream")
}
sl := &streamLayer{
c: make(chan map[string]interface{}),
cipher: c,
}
err := sl.Reload(ctx, r, format)
if err != nil {
return nil, err
}
return sl, nil
}
// NewStreamLayer create new stream layer, see the NewStreamLayerContext
func NewStreamLayer(r io.Reader, format string, c Cipher) (Layer, error) {
return NewStreamLayerContext(context.Background(), r, format, c)
}
// NewFileLayerContext create a new file layer. it choose the format base on the extension
func NewFileLayerContext(ctx context.Context, path string, c Cipher) (Layer, error) {
ext := strings.TrimPrefix(filepath.Ext(path), ".")
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer func() { _ = f.Close() }()
return NewStreamLayerContext(ctx, f, ext, c)
}
// NewFileLayer create a new file layer. it choose the format base on the extension
func NewFileLayer(path string, c Cipher) (Layer, error) {
return NewFileLayerContext(context.Background(), path, c)
}