-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsearch.go
224 lines (184 loc) · 6.04 KB
/
search.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package search
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/olivere/elastic/v7"
"github.com/ikeikeikeike/gocore/util"
)
type (
// Command defines command behavior
Command interface {
Search(ctx context.Context, search *elastic.SearchService) (*Result, error)
Bulk(ctx context.Context, bulk *elastic.BulkService) (*Result, error)
PostDocument(ctx context.Context, name string, id int, doc string) (*Result, error)
DeleteDocument(ctx context.Context, name string, id int) (*Result, error)
UpdateByScript(ctx context.Context, name string, id int, script string, params map[string]interface{}) (*Result, error)
UpsertByScript(ctx context.Context, name string, id int, script string, params, upsert map[string]interface{}) (*Result, error)
CreateIndex(ctx context.Context, name string, index string) (*Result, error)
DeleteIndex(ctx context.Context, name string) (*Result, error)
Aliases(ctx context.Context, name string) (*Result, error)
PutAlias(ctx context.Context, name, alias string) (*Result, error)
UpdateAliases(ctx context.Context, name, old, new string) (*Result, error)
}
// command defines interfaces as elasticsearch api.
command struct {
Env util.Environment
ESClient *elastic.Client
}
// Result has common to return a value
Result struct {
Res interface{} // ES Result Buffer
Err error
}
)
// Indices returns values which matches alias name
func (cr *Result) Indices(alias string) []string {
switch value := cr.Res.(type) {
case *elastic.AliasesResult:
return value.IndicesByAlias(alias)
default:
return []string{}
}
}
// JSON returns value as JSON
func (cr *Result) JSON() []byte {
bytes, _ := json.Marshal(cr.Res)
return bytes
}
// Values returns significant values which was chosen along with any es result
func (cr *Result) Values() interface{} {
switch value := cr.Res.(type) {
default:
return value
case *elastic.AliasesResult:
return value.Indices
case *elastic.IndicesCreateResult:
return cr.JSON()
case *Result:
return value.Values()
}
}
// MakeIndexName returns name with timestamp suffix
func MakeIndexName(name string) string {
return fmt.Sprintf("%s_%d", name, time.Now().UnixNano())
}
// RestoreIndexName returns remove timestamp suffix
func RestoreIndexName(name string) string {
return strings.Split(name, "_")[0]
}
func (c *command) do(ctx context.Context, fn func(chan *Result)) (*Result, error) {
rch := make(chan *Result, 1)
go fn(rch)
select {
case <-ctx.Done():
return nil, ctx.Err()
case cr := <-rch:
return cr, cr.Err
}
}
func (c *command) Search(ctx context.Context, search *elastic.SearchService) (*Result, error) {
fn := func(rch chan *Result) {
res, err := search.Pretty(c.Env.IsDebug()).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) Bulk(ctx context.Context, bulk *elastic.BulkService) (*Result, error) {
fn := func(rch chan *Result) {
res, err := bulk.Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) PostDocument(ctx context.Context, name string, id int, doc string) (*Result, error) {
fn := func(rch chan *Result) {
res, err := c.ESClient.Index().
Pretty(c.Env.IsDebug()).
Index(name).Id(strconv.Itoa(id)).BodyString(doc).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) UpdateByScript(ctx context.Context, name string, id int, script string, params map[string]interface{}) (*Result, error) {
fn := func(rch chan *Result) {
script := elastic.NewScript(script).Params(params).Lang("painless")
res, err := c.ESClient.Update().
Pretty(c.Env.IsDebug()).Index(name).Id(strconv.Itoa(id)).
Script(script).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) UpsertByScript(ctx context.Context, name string, id int, script string, params, upsert map[string]interface{}) (*Result, error) {
fn := func(rch chan *Result) {
script := elastic.NewScript(script).Params(params).Lang("painless")
res, err := c.ESClient.Update().
Pretty(c.Env.IsDebug()).Index(name).Id(strconv.Itoa(id)).
Script(script).ScriptedUpsert(true).Upsert(upsert).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) DeleteDocument(ctx context.Context, name string, id int) (*Result, error) {
fn := func(rch chan *Result) {
res, err := c.ESClient.Delete().
Pretty(c.Env.IsDebug()).
Index(name).Id(strconv.Itoa(id)).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) CreateIndex(ctx context.Context, name string, index string) (*Result, error) {
fn := func(rch chan *Result) {
res, err := c.ESClient.CreateIndex(name).
Pretty(c.Env.IsDebug()).Body(index).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) DeleteIndex(ctx context.Context, name string) (*Result, error) {
fn := func(rch chan *Result) {
res, err := c.ESClient.DeleteIndex(name).
Pretty(c.Env.IsDebug()).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) Aliases(ctx context.Context, name string) (*Result, error) {
fn := func(rch chan *Result) {
res, err := c.ESClient.Aliases().
Pretty(c.Env.IsDebug()).Index(name).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) PutAlias(ctx context.Context, name, alias string) (*Result, error) {
fn := func(rch chan *Result) {
res, err := c.ESClient.Alias().
Pretty(c.Env.IsDebug()).Add(name, alias).Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func (c *command) UpdateAliases(ctx context.Context, name, old, new string) (*Result, error) {
fn := func(rch chan *Result) {
res, err := c.ESClient.Alias().
Pretty(c.Env.IsDebug()).
Action(elastic.NewAliasRemoveAction(name).Index(old)).
Action(elastic.NewAliasAddAction(name).Index(new)).
Do(ctx)
rch <- &Result{Res: res, Err: err}
}
return c.do(ctx, fn)
}
func newCommand(env util.Environment, client *elastic.Client) Command {
r := &command{
Env: env,
ESClient: client,
}
return r
}