-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathoutput_s3_pro.go
117 lines (90 loc) · 2.33 KB
/
output_s3_pro.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//go:build pro
package goreplay
import (
_ "bufio"
"fmt"
_ "io"
"log"
"math/rand"
"os"
"path/filepath"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
_ "github.com/aws/aws-sdk-go/service/s3/s3manager"
)
var _ PluginWriter = (*S3Output)(nil)
// S3Output output plugin
type S3Output struct {
pathTemplate string
buffer *FileOutput
session *session.Session
config *FileOutputConfig
closeC chan struct{}
}
// NewS3Output constructor for FileOutput, accepts path
func NewS3Output(pathTemplate string, config *FileOutputConfig) *S3Output {
o := new(S3Output)
o.pathTemplate = pathTemplate
o.config = config
o.config.onClose = o.onBufferUpdate
if config.BufferPath == "" {
config.BufferPath = "/tmp"
}
rnd := rand.Int63()
buffer_name := fmt.Sprintf("gor_output_s3_%d_buf_", rnd)
pathParts := strings.Split(pathTemplate, "/")
buffer_name += pathParts[len(pathParts)-1]
if strings.HasSuffix(o.pathTemplate, ".gz") {
buffer_name += ".gz"
}
buffer_path := filepath.Join(config.BufferPath, buffer_name)
o.buffer = NewFileOutput(buffer_path, config)
o.connect()
return o
}
func (o *S3Output) connect() {
if o.session == nil {
o.session = session.Must(session.NewSession(awsConfig()))
log.Println("[S3 Output] S3 connection succesfully initialized")
}
}
func (o *S3Output) PluginWrite(msg *Message) (n int, err error) {
return o.buffer.PluginWrite(msg)
}
func (o *S3Output) String() string {
return "S3 output: " + o.pathTemplate
}
func (o *S3Output) Close() error {
return o.buffer.Close()
}
func (o *S3Output) keyPath(idx int) (bucket, key string) {
bucket, key = parseS3Url(o.pathTemplate)
for name, fn := range dateFileNameFuncs {
key = strings.Replace(key, name, fn(o.buffer), -1)
}
key = setFileIndex(key, idx)
return
}
func (o *S3Output) onBufferUpdate(path string) {
svc := s3.New(o.session)
idx := getFileIndex(path)
bucket, key := o.keyPath(idx)
file, _ := os.Open(path)
// reader := bufio.NewReader(file)
_, err := svc.PutObject(&s3.PutObjectInput{
Body: file,
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Printf("[S3 Output] Failed to upload data to %s/%s, %s\n", bucket, key, err)
os.Remove(path)
return
}
os.Remove(path)
if o.closeC != nil {
o.closeC <- struct{}{}
}
}