-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathactivity.go
137 lines (114 loc) · 2.95 KB
/
activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package couchbase
import (
"strings"
"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/support/log"
"gopkg.in/couchbase/gocb.v1"
)
const (
methodUnknown int8 = iota
methodGet
methodInsert
methodUpsert
methodRemove
)
func init() {
_ = activity.Register(&Activity{}, New)
}
func New(ctx activity.InitContext) (activity.Activity, error) {
s := &Settings{}
err := metadata.MapToStruct(ctx.Settings(), s, true)
if err != nil {
return nil, err
}
cluster, connectError := gocb.Connect(s.Server)
if connectError != nil {
ctx.Logger().Errorf("Connection error: %v", connectError)
return nil, connectError
}
err = cluster.Authenticate(gocb.PasswordAuthenticator{
Username: s.Username,
Password: s.Password,
})
if err != nil {
return nil, err
}
bucket, openBucketError := cluster.OpenBucket(s.BucketName, s.BucketPassword)
if openBucketError != nil {
ctx.Logger().Errorf("Error while opening the bucked with the specified credentials: %v", openBucketError)
return nil, openBucketError
}
act := &Activity{bucket: bucket}
switch strings.ToLower(s.Method) {
case "get":
act.method = methodGet
case "insert":
act.method = methodInsert
case "upsert":
act.method = methodUpsert
case "remove":
act.method = methodRemove
}
return act, nil
}
var activityMd = activity.ToMetadata(&Settings{}, &Input{}, &Output{})
type Activity struct {
bucket *gocb.Bucket
method int8
expiry int
}
// Metadata returns the activity's metadata
func (a *Activity) Metadata() *activity.Metadata {
return activityMd
}
func (a *Activity) Cleanup() error {
log.RootLogger().Tracef("cleaning up Couchbase activity")
return a.bucket.Close()
}
// Eval implements api.Activity.Eval - Logs the Message
func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {
logger := ctx.Logger()
input := &Input{}
err = ctx.GetInputObject(input)
if err != nil {
return false, err
}
output := &Output{}
switch a.method {
case methodInsert:
cas, methodError := a.bucket.Insert(input.Key, input.Data, uint32(a.expiry))
if methodError != nil {
logger.Errorf("Insert error: %v", methodError)
return false, methodError
}
output.Data = cas
case methodUpsert:
cas, methodError := a.bucket.Upsert(input.Key, input.Data, uint32(a.expiry))
if methodError != nil {
logger.Errorf("Upsert error: %v", methodError)
return false, methodError
}
output.Data = cas
case methodRemove:
cas, methodError := a.bucket.Remove(input.Key, 0)
if methodError != nil {
logger.Errorf("Remove error: %v", methodError)
return false, methodError
}
output.Data = cas
case methodGet:
var document interface{}
_, methodError := a.bucket.Get(input.Key, &document)
if methodError != nil {
logger.Errorf("Get error: %v", methodError)
return false, methodError
}
output.Data = document
}
err = ctx.SetOutputObject(output)
if err != nil {
return false, err
}
return true, nil
}