data transportation tool, from one to another.such as,file, kafka, hdfs etc.
- Demo: Example
// 将配置文件解析成制定格式v的结构
type Configer interface {
Parse(v interface{}) error
}
任何实现了Inputer接口,即可做为input组件
type Inputer interface {
Init(config Configer) error
Start() error
Read([]byte) (int, error)
Close() error
Version() string
}
任何实现了Adapter接口,即可做为数据处理组件
type Adapter interface {
Init(config Configer) error
Handle(in, out byte) error
Version() string
}
任何实现了Outputer接口,即可作为output组件
type Outputer interface {
Init(config Configer) error
Start() error
Write([]byte) (int, error)
Close() error
Version() string
}
Handler可以组合Inject struct,以实现向Input/Output中注入数据,示例
Transport作为library使用Input/Output,示例
-
exec: 执行程序/脚本
-
file(s): 文件
-
hdfs: hdfs
-
http: HTTP POST方法
-
influxdb: influxdb
-
kafka: kafka
-
std: stdin,标准输入
-
random: 随机生成UUID,用于测试
-
elasticsearch:es API scroll query
- file: 文件
- kafka: kafka
- http: http, Post to a endpoint
- null: 类似于/dev/null,输出到空
- std: stdout,标准输出
- elasticsearch: es API
/_bulk
- tcp: tcp
- hdfs: hdfs
- influxdb: influxdb
- null,直接连接input,output
- addenter,在行尾加入换行符,例子:写文件
- grok,正则格式化成json格式,说明: ^(?P<命名>子表达式)$ 被捕获的组,该组被编号且被命名 (子匹配)"
- kv,string split 成json格式
-
install docker
-
download src code
git clone https://github.com/luopengift/transport.git
cd transport
- build Docker image
docker build -t transport:0.0.3 .
- run with docker
docker run -p 12345:12345 transport:0.0.3 -f docker-test.json
- 下载
git clone https://github.com/luopengift/transport.git
cd transport
- 编译
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh build cmd/main.go
2017-09-14.15:09:14
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
build transport success.
- 查看插件列表
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./transport -h
Usage of ./transport:
-f string
(config)配置文件
-l (list)查看插件列表和插件版本
-r (read)读取当前配置文件
-v (version)版本号
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./transport -l
[Inputs] version
random 0.0.1
std 0.0.1
exec 0.0.1
file 0.0.1
files 0.0.1
hdfs 0.0.1
http 0.0.1
kafka 0.0.1
[Adapters]
kv 0.0.3
zhizilog 0.0.1
null 0.0.1
addenter 0.0.1
grok 0.0.1
inject 0.0.1_debug
[Outputers]
elasticsearch 0.0.1
file 0.0.1
hdfs 0.0.1
kafka 0.0.1
null 0.0.1
std 0.0.1
tcp 0.0.1
- 查看当前配置文件是否可以加载成功
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./transport -f test/kafka-file.json -r
2017-09-14 15:11:59.955 [I] <file test/kafka-file.json is END:EOF>
config info:
[Inputs]
kafka:
offset: -1
addrs: ["10.10.20.14:9092","10.10.20.15:9092","10.10.20.16:9092"]
topics: ["zhizi-log"]
[Adapts]
addenter:
[Outputs]
file:
path: /tmp/tmp.log
- 运行
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./transport -f test/kafka-file.json
2017-09-14 15:13:22.107 [I] <file test/kafka-file.json is END:EOF>
2017-09-14 15:13:22.107 [I] Transport starting...
2017-09-14 15:13:22.107 [W] Starting loading performance data, please press CTRL+C exit...
HttpsServer Start 0.0.0.0:12345
^C2017-09-14 15:13:31.526 [W] Get signal:interrupt, Profile File is cpu.prof/mem.prof
- 启动服务[加载config.json配置文件]
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh start
2017-09-14.15:16:45
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
transport started..., PID=22778
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ps -ef |grep -v grep |grep transport
root 22778 1 0 15:18 pts/0 00:00:00 ./transport -f config.json
- 查看服务状态
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh status
2017-09-14.15:18:24
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
root 22778 1 0 15:18 pts/0 00:00:00 ./transport -f config.json
transport now is running already, PID=22778
- 查看运行日志
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh tail
2017-09-14.15:22:07
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
2017-09-14 15:18:16.399 [D] [transport] [files] recv 2017-01-02 15:58:43 DEBUG This is a debug Test
2017-09-14 15:18:16.399 [D] [transport] [files] recv 44
2017-09-14 15:18:16.399 [D] [transport] send 44
......
- 停止服务
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh stop
2017-09-14.15:19:09
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
transport stoped...
- 优化性能
- 加入更多组件