diff --git a/README.md b/README.md
index 0b2fd9b..931d973 100644
--- a/README.md
+++ b/README.md
@@ -23,6 +23,7 @@ Links:
* Support specify which _source fields to return from source
* Support specify query string query to filter the data source
* Support rename source fields while do bulk indexing
+* Support incremental update(add/update/delete changed records) with `--sync`. Notice: it use different implementation, just handle the ***changed*** records, but not as fast as the old way
* Load generating with
## ESM is fast!
@@ -69,6 +70,11 @@ copy index `src_index` from `192.168.1.x` to `192.168.1.y:9200` and save with `d
./bin/esm -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index -w=5 -b=100
```
+use sync feature for incremental update index `src_index` from `192.168.1.x` to `192.168.1.y:9200`
+```
+./bin/esm --sync -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index
+```
+
support Basic-Auth
```
./bin/esm -s http://localhost:9200 -x "src_index" -y "dest_index" -d http://localhost:9201 -n admin:111111
@@ -91,6 +97,13 @@ dump elasticsearch documents into local file
./bin/esm -s http://localhost:9200 -x "src_index" -m admin:111111 -c 5000 -q=query:mixer --refresh -o=dump.bin
```
+dump source and target index to local file and compare them, so can find the difference quickly
+```
+./bin/esm --sort=_id -s http://localhost:9200 -x "src_index" --truncate_output --skip=_index -o=src.json
+./bin/esm --sort=_id -s http://localhost:9200 -x "dst_index" --truncate_output --skip=_index -o=dst.json
+diff -W 200 -ry --suppress-common-lines src.json dst.json
+```
+
loading data from dump files, bulk insert to another es instance
```
./bin/esm -d http://localhost:9200 -y "dest_index" -n admin:111111 -c 5000 -b 5 --refresh -i=dump.bin
@@ -172,6 +185,7 @@ Usage:
Application Options:
-s, --source= source elasticsearch instance, ie: http://localhost:9200
-q, --query= query against source elasticsearch instance, filter data before migrate, ie: name:medcl
+ --sort= sort field when scroll, ie: _id (default: _id)
-d, --dest= destination elasticsearch instance, ie: http://localhost:9201
-m, --source_auth= basic auth of source elasticsearch instance, ie: user:pass
-n, --dest_auth= basic auth of target elasticsearch instance, ie: user:pass
@@ -192,12 +206,15 @@ Application Options:
--green wait for both hosts cluster status to be green before dump. otherwise yellow is okay
-v, --log= setting log level,options:trace,debug,info,warn,error (INFO)
-o, --output_file= output documents of source index into local file
+ --truncate_output= truncate before dump to output file
-i, --input_file= indexing from local dump file
--input_file_type= the data type of input file, options: dump, json_line, json_array, log_line (dump)
--source_proxy= set proxy to source http connections, ie: http://127.0.0.1:8080
--dest_proxy= set proxy to target http connections, ie: http://127.0.0.1:8080
--refresh refresh after migration finished
- --fields= filter source fields, comma separated, ie: col1,col2,col3,...
+ --sync= sync will use scroll for both source and target index, compare the data and sync(index/update/delete)
+ --fields= filter source fields(white list), comma separated, ie: col1,col2,col3,...
+ --skip= skip source fields(black list), comma separated, ie: col1,col2,col3,...
--rename= rename source fields, comma separated, ie: _type:type, name:myname
-l, --logstash_endpoint= target logstash tcp endpoint, ie: 127.0.0.1:5055
--secured_logstash_endpoint target logstash tcp endpoint was secured by TLS
diff --git a/buffer.go b/buffer.go
index c2c1c85..292cbb2 100644
--- a/buffer.go
+++ b/buffer.go
@@ -17,8 +17,8 @@ limitations under the License.
package main
import (
- "io"
"errors"
+ "io"
)
//https://golangtc.com/t/5a49e2104ce40d740bbbc515
@@ -39,40 +39,40 @@ func (b *buffer) Len() int {
return b.end - b.start
}
-//将有用的字节前移
+// 将有用的字节前移
func (b *buffer) grow() {
if b.start == 0 {
return
}
copy(b.buf, b.buf[b.start:b.end])
b.end -= b.start
- b.start = 0;
+ b.start = 0
}
-//从reader里面读取数据,如果reader阻塞,会发生阻塞
+// 从reader里面读取数据,如果reader阻塞,会发生阻塞
func (b *buffer) readFromReader() (int, error) {
b.grow()
n, err := b.reader.Read(b.buf[b.end:])
- if (err != nil) {
+ if err != nil {
return n, err
}
b.end += n
return n, nil
}
-//返回n个字节,而不产生移位
+// 返回n个字节,而不产生移位
func (b *buffer) seek(n int) ([]byte, error) {
if b.end-b.start >= n {
- buf := b.buf[b.start:b.start+n]
+ buf := b.buf[b.start : b.start+n]
return buf, nil
}
return nil, errors.New("not enough")
}
-//舍弃offset个字段,读取n个字段
-func (b *buffer) read(offset, n int) ([]byte) {
+// 舍弃offset个字段,读取n个字段
+func (b *buffer) read(offset, n int) []byte {
b.start += offset
- buf := b.buf[b.start:b.start+n]
+ buf := b.buf[b.start : b.start+n]
b.start += n
return buf
}
diff --git a/bulk.go b/bulk.go
deleted file mode 100644
index d968eba..0000000
--- a/bulk.go
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
-Copyright 2016 Medcl (m AT medcl.net)
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
- "bytes"
- "encoding/json"
- "github.com/cheggaaa/pb"
- "strings"
- "sync"
- "time"
-
- log "github.com/cihub/seelog"
-)
-
-func (c *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.WaitGroup) {
-
- log.Debug("start es bulk worker")
-
- bulkItemSize := 0
- mainBuf := bytes.Buffer{}
- docBuf := bytes.Buffer{}
- docEnc := json.NewEncoder(&docBuf)
-
- idleDuration := 5 * time.Second
- idleTimeout := time.NewTimer(idleDuration)
- defer idleTimeout.Stop()
-
- taskTimeOutDuration := 5 * time.Minute
- taskTimeout := time.NewTimer(taskTimeOutDuration)
- defer taskTimeout.Stop()
-
-
-READ_DOCS:
- for {
- idleTimeout.Reset(idleDuration)
- taskTimeout.Reset(taskTimeOutDuration)
- select {
- case docI, open := <-c.DocChan:
- var err error
- log.Trace("read doc from channel,", docI)
- // this check is in case the document is an error with scroll stuff
- if status, ok := docI["status"]; ok {
- if status.(int) == 404 {
- log.Error("error: ", docI["response"])
- continue
- }
- }
-
- // sanity check
- for _, key := range []string{"_index", "_type", "_source", "_id"} {
- if _, ok := docI[key]; !ok {
- break READ_DOCS
- }
- }
-
- var tempDestIndexName string
- var tempTargetTypeName string
- tempDestIndexName = docI["_index"].(string)
- tempTargetTypeName = docI["_type"].(string)
-
- if c.Config.TargetIndexName != "" {
- tempDestIndexName = c.Config.TargetIndexName
- }
-
- if c.Config.OverrideTypeName != "" {
- tempTargetTypeName = c.Config.OverrideTypeName
- }
-
- doc := Document{
- Index: tempDestIndexName,
- Type: tempTargetTypeName,
- source: docI["_source"].(map[string]interface{}),
- Id: docI["_id"].(string),
- }
-
- if c.Config.RegenerateID {
- doc.Id = ""
- }
-
- if c.Config.RenameFields != "" {
- kvs := strings.Split(c.Config.RenameFields, ",")
- for _, i := range kvs {
- fvs := strings.Split(i, ":")
- oldField := strings.TrimSpace(fvs[0])
- newField := strings.TrimSpace(fvs[1])
- if oldField == "_type" {
- doc.source[newField] = docI["_type"].(string)
- } else {
- v := doc.source[oldField]
- doc.source[newField] = v
- delete(doc.source, oldField)
- }
- }
- }
-
- // add doc "_routing" if exists
- if _, ok := docI["_routing"]; ok {
- str, ok := docI["_routing"].(string)
- if ok && str != "" {
- doc.Routing = str
- }
- }
-
- // if channel is closed flush and gtfo
- if !open {
- goto WORKER_DONE
- }
-
- // sanity check
- if len(doc.Index) == 0 || len(doc.Type) == 0 {
- log.Errorf("failed decoding document: %+v", doc)
- continue
- }
-
- // encode the doc and and the _source field for a bulk request
- post := map[string]Document{
- "index": doc,
- }
- if err = docEnc.Encode(post); err != nil {
- log.Error(err)
- }
- if err = docEnc.Encode(doc.source); err != nil {
- log.Error(err)
- }
-
-
- // append the doc to the main buffer
- mainBuf.Write(docBuf.Bytes())
- // reset for next document
- bulkItemSize++
- (*docCount)++
- docBuf.Reset()
-
- // if we approach the 100mb es limit, flush to es and reset mainBuf
- if mainBuf.Len()+docBuf.Len() > (c.Config.BulkSizeInMB * 1024*1024) {
- goto CLEAN_BUFFER
- }
-
- case <-idleTimeout.C:
- log.Debug("5s no message input")
- goto CLEAN_BUFFER
- case <-taskTimeout.C:
- log.Warn("5m no message input, close worker")
- goto WORKER_DONE
- }
-
- goto READ_DOCS
-
- CLEAN_BUFFER:
- c.TargetESAPI.Bulk(&mainBuf)
- log.Trace("clean buffer, and execute bulk insert")
- pb.Add(bulkItemSize)
- bulkItemSize = 0
- if c.Config.SleepSecondsAfterEachBulk >0{
- time.Sleep(time.Duration(c.Config.SleepSecondsAfterEachBulk) * time.Second)
- }
- }
-WORKER_DONE:
- if docBuf.Len() > 0 {
- mainBuf.Write(docBuf.Bytes())
- bulkItemSize++
- }
- c.TargetESAPI.Bulk(&mainBuf)
- log.Trace("bulk insert")
- pb.Add(bulkItemSize)
- bulkItemSize = 0
- wg.Done()
-}
diff --git a/domain.go b/domain.go
index 12ba958..bec97ee 100644
--- a/domain.go
+++ b/domain.go
@@ -77,7 +77,7 @@ type ClusterHealth struct {
Status string `json:"status,omitempty"`
}
-//{"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]}
+// {"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]}
type BulkResponse struct {
Took int `json:"took,omitempty"`
Errors bool `json:"errors,omitempty"`
@@ -107,11 +107,12 @@ type Config struct {
// config options
SourceEs string `short:"s" long:"source" description:"source elasticsearch instance, ie: http://localhost:9200"`
Query string `short:"q" long:"query" description:"query against source elasticsearch instance, filter data before migrate, ie: name:medcl"`
+ SortField string `long:"sort" description:"sort field when scroll, ie: _id" default:"_id"`
TargetEs string `short:"d" long:"dest" description:"destination elasticsearch instance, ie: http://localhost:9201"`
SourceEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, ie: user:pass"`
TargetEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, ie: user:pass"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"10000"`
- BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"`
+ BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"`
Workers int `short:"w" long:"workers" description:"concurrency number for bulk workers" default:"1"`
BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"10m"`
@@ -127,12 +128,15 @@ type Config struct {
WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay"`
LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"`
DumpOutFile string `short:"o" long:"output_file" description:"output documents of source index into local file" `
+ TruncateOutFile bool `long:"truncate_output" description:"truncate before dump to output file" `
DumpInputFile string `short:"i" long:"input_file" description:"indexing from local dump file" `
InputFileType string `long:"input_file_type" description:"the data type of input file, options: dump, json_line, json_array, log_line" default:"dump" `
SourceProxy string `long:"source_proxy" description:"set proxy to source http connections, ie: http://127.0.0.1:8080"`
TargetProxy string `long:"dest_proxy" description:"set proxy to target http connections, ie: http://127.0.0.1:8080"`
Refresh bool `long:"refresh" description:"refresh after migration finished"`
- Fields string `long:"fields" description:"filter source fields, comma separated, ie: col1,col2,col3,..." `
+ Sync bool `long:"sync" description:"sync will use scroll for both source and target index, compare the data and sync(index/update/delete)"`
+ Fields string `long:"fields" description:"filter source fields(white list), comma separated, ie: col1,col2,col3,..." `
+ SkipFields string `long:"skip" description:"skip source fields(black list), comma separated, ie: col1,col2,col3,..." `
RenameFields string `long:"rename" description:"rename source fields, comma separated, ie: _type:type, name:myname" `
LogstashEndpoint string `short:"l" long:"logstash_endpoint" description:"target logstash tcp endpoint, ie: 127.0.0.1:5055" `
LogstashSecEndpoint bool `long:"secured_logstash_endpoint" description:"target logstash tcp endpoint was secured by TLS" `
diff --git a/esapi.go b/esapi.go
index 5697891..d178919 100644
--- a/esapi.go
+++ b/esapi.go
@@ -18,16 +18,19 @@ package main
import "bytes"
-type ESAPI interface{
+type ESAPI interface {
ClusterHealth() *ClusterHealth
- Bulk(data *bytes.Buffer)
+ ClusterVersion() *ClusterVersion
+ Bulk(data *bytes.Buffer) error
GetIndexSettings(indexNames string) (*Indexes, error)
- DeleteIndex(name string) (error)
- CreateIndex(name string,settings map[string]interface{}) (error)
- GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error)
- UpdateIndexSettings(indexName string,settings map[string]interface{})(error)
- UpdateIndexMapping(indexName string,mappings map[string]interface{})(error)
- NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(interface{}, error)
- NextScroll(scrollTime string,scrollId string)(interface{},error)
+ DeleteIndex(name string) error
+ CreateIndex(name string, settings map[string]interface{}) error
+ GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error)
+ UpdateIndexSettings(indexName string, settings map[string]interface{}) error
+ UpdateIndexMapping(indexName string, mappings map[string]interface{}) error
+ NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string,
+ slicedId int, maxSlicedCount int, fields string) (ScrollAPI, error)
+ NextScroll(scrollTime string, scrollId string) (ScrollAPI, error)
+ DeleteScroll(scrollId string) error
Refresh(name string) (err error)
}
diff --git a/file.go b/file.go
index ee50b3e..5cb52d6 100644
--- a/file.go
+++ b/file.go
@@ -17,24 +17,25 @@ limitations under the License.
package main
import (
- "sync"
- "github.com/cheggaaa/pb"
- log "github.com/cihub/seelog"
- "os"
"bufio"
"encoding/json"
+ "github.com/cheggaaa/pb"
+ log "github.com/cihub/seelog"
"io"
+ "os"
+ "strings"
+ "sync"
)
-func checkFileIsExist(filename string) (bool) {
- var exist = true;
+func checkFileIsExist(filename string) bool {
+ var exist = true
if _, err := os.Stat(filename); os.IsNotExist(err) {
- exist = false;
+ exist = false
}
- return exist;
+ return exist
}
-func (m *Migrator) NewFileReadWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
+func (m *Migrator) NewFileReadWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
log.Debug("start reading file")
f, err := os.Open(m.Config.DumpInputFile)
if err != nil {
@@ -45,16 +46,16 @@ func (m *Migrator) NewFileReadWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
defer f.Close()
r := bufio.NewReader(f)
lineCount := 0
- for{
- line,err := r.ReadString('\n')
- if io.EOF == err || nil != err{
+ for {
+ line, err := r.ReadString('\n')
+ if io.EOF == err || nil != err {
break
}
lineCount += 1
js := map[string]interface{}{}
err = DecodeJson(line, &js)
- if err!=nil {
+ if err != nil {
log.Error(err)
continue
}
@@ -70,26 +71,44 @@ func (m *Migrator) NewFileReadWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
func (c *Migrator) NewFileDumpWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
var f *os.File
- var err1 error;
+ var err1 error
if checkFileIsExist(c.Config.DumpOutFile) {
- f, err1 = os.OpenFile(c.Config.DumpOutFile, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
- if(err1!=nil){
+ flag := os.O_WRONLY
+ if c.Config.TruncateOutFile {
+ flag |= os.O_TRUNC
+ } else {
+ flag |= os.O_APPEND
+ }
+ f, err1 = os.OpenFile(c.Config.DumpOutFile, flag, os.ModeAppend)
+ if err1 != nil {
log.Error(err1)
return
}
- }else {
+ } else {
f, err1 = os.Create(c.Config.DumpOutFile)
- if(err1!=nil){
+ if err1 != nil {
log.Error(err1)
return
}
}
w := bufio.NewWriter(f)
+ skipFields := make([]string, 0)
+ if len(c.Config.SkipFields) > 0 {
+ //skip fields
+ if !strings.Contains(c.Config.SkipFields, ",") {
+ skipFields = append(skipFields, c.Config.SkipFields)
+ } else {
+ fields := strings.Split(c.Config.SkipFields, ",")
+ for _, field := range fields {
+ skipFields = append(skipFields, field)
+ }
+ }
+ }
- READ_DOCS:
+READ_DOCS:
for {
docI, open := <-c.DocChan
// this check is in case the document is an error with scroll stuff
@@ -106,15 +125,20 @@ func (c *Migrator) NewFileDumpWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
break READ_DOCS
}
}
+ for _, key := range skipFields {
+ if _, found := docI[key]; found {
+ delete(docI, key)
+ }
+ }
- jsr,err:=json.Marshal(docI)
+ jsr, err := json.Marshal(docI)
log.Trace(string(jsr))
- if(err!=nil){
+ if err != nil {
log.Error(err)
}
- n,err:=w.WriteString(string(jsr))
- if(err!=nil){
- log.Error(n,err)
+ n, err := w.WriteString(string(jsr))
+ if err != nil {
+ log.Error(n, err)
}
w.WriteString("\n")
pb.Increment()
@@ -125,12 +149,10 @@ func (c *Migrator) NewFileDumpWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
}
}
- WORKER_DONE:
+WORKER_DONE:
w.Flush()
f.Close()
wg.Done()
log.Debug("file dump finished")
}
-
-
diff --git a/go.mod b/go.mod
index 2571f61..999f32d 100644
--- a/go.mod
+++ b/go.mod
@@ -1,4 +1,4 @@
-module github.com/fishjam/esm
+module github.com/medcl/esm
go 1.21.1
diff --git a/http.go b/http.go
index 406474b..0204e64 100644
--- a/http.go
+++ b/http.go
@@ -25,38 +25,37 @@ import (
"fmt"
log "github.com/cihub/seelog"
"github.com/parnurzeal/gorequest"
- "github.com/valyala/fasthttp"
+ "github.com/valyala/fasthttp"
"io"
"net/http"
"net/url"
"strings"
)
-func BasicAuth(req *fasthttp.Request,user,pass string) {
- msg := fmt.Sprintf("%s:%s",user,pass)
+func BasicAuth(req *fasthttp.Request, user, pass string) {
+ msg := fmt.Sprintf("%s:%s", user, pass)
encoded := base64.StdEncoding.EncodeToString([]byte(msg))
req.Header.Add("Authorization", "Basic "+encoded)
}
-func Get(url string,auth *Auth,proxy string) (*http.Response, string, []error) {
+func Get(url string, auth *Auth, proxy string) (*http.Response, string, []error) {
request := gorequest.New()
tr := &http.Transport{
- DisableKeepAlives: true,
+ DisableKeepAlives: true,
DisableCompression: false,
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
- request.Transport=tr
+ request.Transport = tr
-
- if(auth!=nil){
- request.SetBasicAuth(auth.User,auth.Pass)
+ if auth != nil {
+ request.SetBasicAuth(auth.User, auth.Pass)
}
//request.Type("application/json")
- if(len(proxy)>0){
+ if len(proxy) > 0 {
request.Proxy(proxy)
}
@@ -65,35 +64,35 @@ func Get(url string,auth *Auth,proxy string) (*http.Response, string, []error) {
}
-func Post(url string,auth *Auth, body string,proxy string)(*http.Response, string, []error) {
+func Post(url string, auth *Auth, body string, proxy string) (*http.Response, string, []error) {
request := gorequest.New()
tr := &http.Transport{
- DisableKeepAlives: true,
+ DisableKeepAlives: true,
DisableCompression: false,
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
- request.Transport=tr
+ request.Transport = tr
- if(auth!=nil){
- request.SetBasicAuth(auth.User,auth.Pass)
+ if auth != nil {
+ request.SetBasicAuth(auth.User, auth.Pass)
}
//request.Type("application/json")
-
- if(len(proxy)>0){
+
+ if len(proxy) > 0 {
request.Proxy(proxy)
}
request.Post(url)
- if(len(body)>0) {
+ if len(body) > 0 {
request.Send(body)
}
return request.End()
}
-func newDeleteRequest(client *http.Client,method, urlStr string) (*http.Request, error) {
+func newDeleteRequest(client *http.Client, method, urlStr string) (*http.Request, error) {
if method == "" {
// We document that "" means "GET" for Request.Method, and people have
// relied on that from NewRequest, so keep that working.
@@ -117,23 +116,9 @@ func newDeleteRequest(client *http.Client,method, urlStr string) (*http.Request,
return req, nil
}
-//
-//func GzipHandler(req *http.Request) {
-// var b bytes.Buffer
-// var buf bytes.Buffer
-// g := gzip.NewWriter(&buf)
-//
-// _, err := io.Copy(g, &b)
-// if err != nil {
-// panic(err)
-// //slog.Error(err)
-// return
-// }
-//}
-
-var client *http.Client=&http.Client{
+var client = &http.Client{
Transport: &http.Transport{
- DisableKeepAlives: true,
+ DisableKeepAlives: true,
DisableCompression: false,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
@@ -144,7 +129,7 @@ var fastHttpClient = &fasthttp.Client{
TLSConfig: &tls.Config{InsecureSkipVerify: true},
}
-func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte,proxy string) (string,error) {
+func DoRequest(compress bool, method string, loadUrl string, auth *Auth, body []byte, proxy string) (string, error) {
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
@@ -161,46 +146,26 @@ func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte
req.Header.Set("content-encoding", "gzip")
}
- if auth!=nil{
+ if auth != nil {
req.URI().SetUsername(auth.User)
req.URI().SetPassword(auth.Pass)
}
- if len(body)>0{
-
- //if compress {
- // _, err := fasthttp.WriteGzipLevel(req.BodyWriter(), data.Bytes(), fasthttp.CompressBestSpeed)
- // if err != nil {
- // panic(err)
- // }
- //} else {
- // //req.SetBody(body)
- // req.SetBodyStreamWriter(func(w *bufio.Writer) {
- // w.Write(data.Bytes())
- // w.Flush()
- // })
- //
- //}
-
- if compress{
+ if len(body) > 0 {
+ if compress {
_, err := fasthttp.WriteGzipLevel(req.BodyWriter(), body, fasthttp.CompressBestSpeed)
if err != nil {
panic(err)
}
- }else{
+ } else {
req.SetBody(body)
-
- //req.SetBodyStreamWriter(func(w *bufio.Writer) {
- // w.Write(body)
- // w.Flush()
- //})
}
}
- err:=fastHttpClient.Do(req, resp)
+ err := fastHttpClient.Do(req, resp)
if err != nil {
- panic(err)
+ panic(err)
}
if resp == nil {
panic("empty response")
@@ -215,62 +180,40 @@ func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte
//log.Error("received status code", resp.StatusCode, "from", string(resp.Header.Header()), "content", string(resp.Body()), req)
}
-
-
//if compress{
// data,err:= resp.BodyGunzip()
// return string(data),err
//}
- return string(resp.Body()),nil
+ return string(resp.Body()), nil
}
-
-func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)(string,error) {
-
- //TODO use global client
- //client = &http.Client{}
- //
- //if(len(proxy)>0){
- // proxyURL, err := url.Parse(proxy)
- // if(err!=nil){
- // log.Error(err)
- // }else{
- // transport := &http.Transport{
- // Proxy: http.ProxyURL(proxyURL),
- // DisableKeepAlives: true,
- // DisableCompression: false,
- // }
- // client = &http.Client{Transport: transport}
- // }
- //}
- //
- //tr := &http.Transport{
- // DisableKeepAlives: true,
- // DisableCompression: false,
- // TLSClientConfig: &tls.Config{
- // InsecureSkipVerify: true,
- //},
- //}
- //
- //client.Transport=tr
+func Request(compress bool, method string, loadUrl string, auth *Auth, body *bytes.Buffer, proxy string) (string, error) {
var err error
var reqest *http.Request
- if body!=nil {
- reqest, err =http.NewRequest(method,r,body)
- }else{
- reqest, err = newDeleteRequest(client,method,r)
+ if body != nil {
+ reqest, err = http.NewRequest(method, loadUrl, body)
+ } else {
+ reqest, err = newDeleteRequest(client, method, loadUrl)
}
- if err!=nil {
+ if err != nil {
panic(err)
}
- if auth!=nil {
- reqest.SetBasicAuth(auth.User,auth.Pass)
+ if auth != nil {
+ reqest.SetBasicAuth(auth.User, auth.Pass)
}
+ oldTransport := client.Transport.(*http.Transport)
+ if len(proxy) > 0 {
+ proxyUrl := VerifyWithResult(url.Parse(proxy)).(*url.URL)
+ proxyFunc := http.ProxyURL(proxyUrl)
+ oldTransport.Proxy = proxyFunc
+ } else {
+ oldTransport.Proxy = nil
+ }
reqest.Header.Set("Content-Type", "application/json")
//enable gzip
@@ -278,41 +221,40 @@ func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)(
//GzipHandler(reqest)
//
- resp,errs := client.Do(reqest)
+ resp, errs := client.Do(reqest)
if errs != nil {
log.Error(SubString(errs.Error(), 0, 500))
- return "",errs
+ return "", errs
}
- if resp!=nil&& resp.Body!=nil{
+ if resp != nil && resp.Body != nil {
//io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}
if resp.StatusCode != 200 {
b, _ := io.ReadAll(resp.Body)
- return "",errors.New("server error: "+string(b))
+ return "", errors.New("server error: " + string(b))
}
respBody, err := io.ReadAll(resp.Body)
- log.Error(SubString(string(respBody), 0, 500))
+ //log.Error(SubString(string(respBody), 0, 500))
if err != nil {
log.Error(SubString(string(err.Error()), 0, 500))
- return string(respBody),err
+ return string(respBody), err
}
if err != nil {
- return string(respBody),err
+ return string(respBody), err
}
io.Copy(io.Discard, resp.Body)
defer resp.Body.Close()
- return string(respBody),nil
+ return string(respBody), nil
}
-func DecodeJson(jsonStream string, o interface{})(error) {
-
+func DecodeJson(jsonStream string, o interface{}) error {
decoder := json.NewDecoder(strings.NewReader(jsonStream))
// UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64.
@@ -326,7 +268,7 @@ func DecodeJson(jsonStream string, o interface{})(error) {
return nil
}
-func DecodeJsonBytes(jsonStream []byte, o interface{})(error) {
+func DecodeJsonBytes(jsonStream []byte, o interface{}) error {
decoder := json.NewDecoder(bytes.NewReader(jsonStream))
// UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64.
decoder.UseNumber()
diff --git a/log.go b/log.go
index a4da2b0..96aa6fa 100644
--- a/log.go
+++ b/log.go
@@ -17,8 +17,8 @@ limitations under the License.
package main
import (
- "strings"
log "github.com/cihub/seelog"
+ "strings"
)
func setInitLogging(logLevel string) {
@@ -36,7 +36,7 @@ func setInitLogging(logLevel string) {
-
+
`
diff --git a/main.go b/main.go
index 527bda3..d448fcc 100644
--- a/main.go
+++ b/main.go
@@ -2,14 +2,11 @@ package main
import (
"bufio"
- "encoding/json"
- "fmt"
"github.com/cheggaaa/pb"
log "github.com/cihub/seelog"
goflags "github.com/jessevdk/go-flags"
"github.com/mattn/go-isatty"
"io"
- "io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
@@ -21,7 +18,6 @@ import (
)
func main() {
-
runtime.GOMAXPROCS(runtime.NumCPU())
go func() {
@@ -77,10 +73,30 @@ func main() {
showBar = false
}
+ if c.Sync {
+ //sync 功能时,只支持一个 index:
+ if len(c.SourceIndexNames) == 0 || len(c.TargetIndexName) == 0 {
+ log.Error("migration sync only support source 1 index to 1 target index")
+ return
+ }
+ migrator.SourceESAPI = migrator.ParseEsApi(true, c.SourceEs, c.SourceEsAuthStr, c.SourceProxy, c.Compress)
+ if migrator.SourceESAPI == nil {
+ log.Error("can not parse source es api")
+ return
+ }
+ migrator.TargetESAPI = migrator.ParseEsApi(false, c.TargetEs, c.TargetEsAuthStr, c.TargetProxy, false)
+ if migrator.TargetESAPI == nil {
+ log.Error("can not parse target es api")
+ return
+ }
+ migrator.SyncBetweenIndex(migrator.SourceESAPI, migrator.TargetESAPI, c)
+ return
+ }
+
//至少输出一次
if c.RepeatOutputTimes < 1 {
- c.RepeatOutputTimes=1
- }else{
+ c.RepeatOutputTimes = 1
+ } else {
log.Info("source data will repeat send to target: ", c.RepeatOutputTimes, " times, the document id will be regenerated.")
}
@@ -88,14 +104,14 @@ func main() {
for i := 0; i < c.RepeatOutputTimes; i++ {
- if c.RepeatOutputTimes>1 {
+ if c.RepeatOutputTimes > 1 {
log.Info("repeat round: ", i+1)
}
// enough of a buffer to hold all the search results across all workers
migrator.DocChan = make(chan map[string]interface{}, c.BufferCount)
- var srcESVersion *ClusterVersion
+ //var srcESVersion *ClusterVersion
// create a progressbar and start a docCount
var outputBar *pb.ProgressBar = pb.New(1).Prefix("Output ")
@@ -106,50 +122,13 @@ func main() {
//dealing with input
if len(c.SourceEs) > 0 {
//dealing with basic auth
- if len(c.SourceEsAuthStr) > 0 && strings.Contains(c.SourceEsAuthStr, ":") {
- authArray := strings.Split(c.SourceEsAuthStr, ":")
- auth := Auth{User: authArray[0], Pass: authArray[1]}
- migrator.SourceAuth = &auth
- }
- //get source es version
- srcESVersion, errs := migrator.ClusterVersion(c.SourceEs, migrator.SourceAuth, migrator.Config.SourceProxy)
- if errs != nil {
+ migrator.SourceESAPI = migrator.ParseEsApi(true, c.SourceEs, c.SourceEsAuthStr,
+ migrator.Config.SourceProxy, c.Compress)
+ if migrator.SourceESAPI == nil {
+ log.Error("can not parse source es api")
return
}
- if strings.HasPrefix(srcESVersion.Version.Number, "7.") {
- log.Debug("source es is V7,", srcESVersion.Version.Number)
- api := new(ESAPIV7)
- api.Host = c.SourceEs
- api.Compress=c.Compress
- api.Auth = migrator.SourceAuth
- api.HttpProxy = migrator.Config.SourceProxy
- migrator.SourceESAPI = api
- } else if strings.HasPrefix(srcESVersion.Version.Number, "6.") {
- log.Debug("source es is V6,", srcESVersion.Version.Number)
- api := new(ESAPIV6)
- api.Compress=c.Compress
- api.Host = c.SourceEs
- api.Auth = migrator.SourceAuth
- api.HttpProxy = migrator.Config.SourceProxy
- migrator.SourceESAPI = api
- } else if strings.HasPrefix(srcESVersion.Version.Number, "5.") {
- log.Debug("source es is V5,", srcESVersion.Version.Number)
- api := new(ESAPIV5)
- api.Host = c.SourceEs
- api.Compress=c.Compress
- api.Auth = migrator.SourceAuth
- api.HttpProxy = migrator.Config.SourceProxy
- migrator.SourceESAPI = api
- } else {
- log.Debug("source es is not V5,", srcESVersion.Version.Number)
- api := new(ESAPIV0)
- api.Host = c.SourceEs
- api.Compress=c.Compress
- api.Auth = migrator.SourceAuth
- api.HttpProxy = migrator.Config.SourceProxy
- migrator.SourceESAPI = api
- }
if c.ScrollSliceSize < 1 {
c.ScrollSliceSize = 1
@@ -158,19 +137,18 @@ func main() {
totalSize := 0
finishedSlice := 0
for slice := 0; slice < c.ScrollSliceSize; slice++ {
- scroll, err := migrator.SourceESAPI.NewScroll(c.SourceIndexNames, c.ScrollTime, c.DocBufferCount, c.Query, slice, c.ScrollSliceSize, c.Fields)
+ scroll, err := migrator.SourceESAPI.NewScroll(c.SourceIndexNames, c.ScrollTime, c.DocBufferCount, c.Query,
+ c.SortField, slice, c.ScrollSliceSize, c.Fields)
if err != nil {
log.Error(err)
return
}
- temp := scroll.(ScrollAPI)
+ totalSize += scroll.GetHitsTotal()
- totalSize += temp.GetHitsTotal()
+ if scroll.GetDocs() != nil {
- if scroll != nil && temp.GetDocs() != nil {
-
- if temp.GetHitsTotal() == 0 {
+ if scroll.GetHitsTotal() == 0 {
log.Error("can't find documents from source.")
return
}
@@ -179,10 +157,10 @@ func main() {
wg.Add(1)
//process input
// start scroll
- temp.ProcessScrollResult(&migrator, fetchBar)
+ scroll.ProcessScrollResult(&migrator, fetchBar)
// loop scrolling until done
- for temp.Next(&migrator, fetchBar) == false {
+ for scroll.Next(&migrator, fetchBar) == false {
}
if showBar {
@@ -255,46 +233,20 @@ func main() {
migrator.TargetAuth = &auth
}
- //get target es version
- descESVersion, errs := migrator.ClusterVersion(c.TargetEs, migrator.TargetAuth, migrator.Config.TargetProxy)
- if errs != nil {
+ //get target es api
+ migrator.TargetESAPI = migrator.ParseEsApi(false, c.TargetEs, c.TargetEsAuthStr,
+ migrator.Config.TargetProxy, false)
+ if migrator.TargetESAPI == nil {
+ log.Error("can not parse target es api")
return
}
- if strings.HasPrefix(descESVersion.Version.Number, "7.") {
- log.Debug("target es is V7,", descESVersion.Version.Number)
- api := new(ESAPIV7)
- api.Host = c.TargetEs
- api.Auth = migrator.TargetAuth
- api.HttpProxy = migrator.Config.TargetProxy
- migrator.TargetESAPI = api
- } else if strings.HasPrefix(descESVersion.Version.Number, "6.") {
- log.Debug("target es is V6,", descESVersion.Version.Number)
- api := new(ESAPIV6)
- api.Host = c.TargetEs
- api.Auth = migrator.TargetAuth
- api.HttpProxy = migrator.Config.TargetProxy
- migrator.TargetESAPI = api
- } else if strings.HasPrefix(descESVersion.Version.Number, "5.") {
- log.Debug("target es is V5,", descESVersion.Version.Number)
- api := new(ESAPIV5)
- api.Host = c.TargetEs
- api.Auth = migrator.TargetAuth
- api.HttpProxy = migrator.Config.TargetProxy
- migrator.TargetESAPI = api
- } else {
- log.Debug("target es is not V5,", descESVersion.Version.Number)
- api := new(ESAPIV0)
- api.Host = c.TargetEs
- api.Auth = migrator.TargetAuth
- api.HttpProxy = migrator.Config.TargetProxy
- migrator.TargetESAPI = api
-
- }
-
log.Debug("start process with mappings")
- if srcESVersion != nil && c.CopyIndexMappings && descESVersion.Version.Number[0] != srcESVersion.Version.Number[0] {
- log.Error(srcESVersion.Version, "=>", descESVersion.Version, ",cross-big-version mapping migration not avaiable, please update mapping manually :(")
+ if c.CopyIndexMappings &&
+ migrator.TargetESAPI.ClusterVersion().Version.Number[0] != migrator.SourceESAPI.ClusterVersion().Version.Number[0] {
+ log.Error(migrator.SourceESAPI.ClusterVersion().Version, "=>",
+ migrator.TargetESAPI.ClusterVersion().Version,
+ ",cross-big-version mapping migration not available, please update mapping manually :(")
return
}
@@ -507,65 +459,3 @@ func main() {
log.Info("data migration finished.")
}
-
-func (c *Migrator) recoveryIndexSettings(sourceIndexRefreshSettings map[string]interface{}) {
- //update replica and refresh_interval
- for name, interval := range sourceIndexRefreshSettings {
- tempIndexSettings := getEmptyIndexSettings()
- tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["refresh_interval"] = interval
- //tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_replicas"] = 1
- c.TargetESAPI.UpdateIndexSettings(name, tempIndexSettings)
- if c.Config.Refresh {
- c.TargetESAPI.Refresh(name)
- }
- }
-}
-
-func (c *Migrator) ClusterVersion(host string, auth *Auth, proxy string) (*ClusterVersion, []error) {
-
- url := fmt.Sprintf("%s", host)
- resp, body, errs := Get(url, auth, proxy)
-
- if resp != nil && resp.Body != nil {
- io.Copy(ioutil.Discard, resp.Body)
- defer resp.Body.Close()
- }
-
- if errs != nil {
- log.Error(errs)
- return nil, errs
- }
-
- log.Debug(body)
-
- version := &ClusterVersion{}
- err := json.Unmarshal([]byte(body), version)
-
- if err != nil {
- log.Error(body, errs)
- return nil, errs
- }
- return version, nil
-}
-
-func (c *Migrator) ClusterReady(api ESAPI) (*ClusterHealth, bool) {
- health := api.ClusterHealth()
-
- if !c.Config.WaitForGreen {
- return health, true
- }
-
- if health.Status == "red" {
- return health, false
- }
-
- if c.Config.WaitForGreen == false && health.Status == "yellow" {
- return health, true
- }
-
- if health.Status == "green" {
- return health, true
- }
-
- return health, false
-}
diff --git a/migrator.go b/migrator.go
new file mode 100644
index 0000000..9f948cd
--- /dev/null
+++ b/migrator.go
@@ -0,0 +1,575 @@
+/*
+Copyright 2016 Medcl (m AT medcl.net)
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "github.com/cheggaaa/pb"
+ "io"
+ "io/ioutil"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+
+ log "github.com/cihub/seelog"
+)
+
+type BulkOperation uint8
+
+const (
+ opIndex BulkOperation = iota
+ opDelete
+)
+
+func (op BulkOperation) String() string {
+ switch op {
+ case opIndex:
+ return "opIndex"
+ case opDelete:
+ return "opDelete"
+ default:
+ return fmt.Sprintf("unknown:%d", op)
+ }
+}
+
+func (m *Migrator) recoveryIndexSettings(sourceIndexRefreshSettings map[string]interface{}) {
+ //update replica and refresh_interval
+ for name, interval := range sourceIndexRefreshSettings {
+ tempIndexSettings := getEmptyIndexSettings()
+ tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["refresh_interval"] = interval
+ //tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_replicas"] = 1
+ m.TargetESAPI.UpdateIndexSettings(name, tempIndexSettings)
+ if m.Config.Refresh {
+ m.TargetESAPI.Refresh(name)
+ }
+ }
+}
+
+func (m *Migrator) ClusterVersion(host string, auth *Auth, proxy string) (*ClusterVersion, []error) {
+
+ url := fmt.Sprintf("%s", host)
+ resp, body, errs := Get(url, auth, proxy)
+
+ if resp != nil && resp.Body != nil {
+ io.Copy(ioutil.Discard, resp.Body)
+ defer resp.Body.Close()
+ }
+
+ if errs != nil {
+ log.Error(errs)
+ return nil, errs
+ }
+
+ log.Debug(body)
+
+ version := &ClusterVersion{}
+ err := json.Unmarshal([]byte(body), version)
+
+ if err != nil {
+ log.Error(body, errs)
+ return nil, errs
+ }
+ return version, nil
+}
+
+func (m *Migrator) ParseEsApi(isSource bool, host string, authStr string, proxy string, compress bool) ESAPI {
+ var auth *Auth = nil
+ if len(authStr) > 0 && strings.Contains(authStr, ":") {
+ authArray := strings.Split(authStr, ":")
+ auth = &Auth{User: authArray[0], Pass: authArray[1]}
+ if isSource {
+ m.SourceAuth = auth
+ } else {
+ m.TargetAuth = auth
+ }
+ }
+
+ esVersion, errs := m.ClusterVersion(host, auth, proxy)
+ if errs != nil {
+ return nil
+ }
+
+ esInfo := "dest"
+ if isSource {
+ esInfo = "source"
+ }
+
+ log.Infof("%s es version: %s", esInfo, esVersion.Version.Number)
+ if strings.HasPrefix(esVersion.Version.Number, "7.") {
+ log.Debug("es is V7,", esVersion.Version.Number)
+ api := new(ESAPIV7)
+ api.Host = host
+ api.Compress = compress
+ api.Auth = auth
+ api.HttpProxy = proxy
+ api.Version = esVersion
+ return api
+ //migrator.SourceESAPI = api
+ } else if strings.HasPrefix(esVersion.Version.Number, "6.") {
+ log.Debug("es is V6,", esVersion.Version.Number)
+ api := new(ESAPIV6)
+ api.Host = host
+ api.Compress = compress
+ api.Auth = auth
+ api.HttpProxy = proxy
+ api.Version = esVersion
+ return api
+ //migrator.SourceESAPI = api
+ } else if strings.HasPrefix(esVersion.Version.Number, "5.") {
+ log.Debug("es is V5,", esVersion.Version.Number)
+ api := new(ESAPIV5)
+ api.Host = host
+ api.Compress = compress
+ api.Auth = auth
+ api.HttpProxy = proxy
+ api.Version = esVersion
+ return api
+ //migrator.SourceESAPI = api
+ } else {
+ log.Debug("es is not V5,", esVersion.Version.Number)
+ api := new(ESAPIV0)
+ api.Host = host
+ api.Compress = compress
+ api.Auth = auth
+ api.HttpProxy = proxy
+ api.Version = esVersion
+ return api
+ }
+}
+
+func (m *Migrator) ClusterReady(api ESAPI) (*ClusterHealth, bool) {
+ health := api.ClusterHealth()
+
+ if !m.Config.WaitForGreen {
+ return health, true
+ }
+
+ if health.Status == "red" {
+ return health, false
+ }
+
+ if m.Config.WaitForGreen == false && health.Status == "yellow" {
+ return health, true
+ }
+
+ if health.Status == "green" {
+ return health, true
+ }
+
+ return health, false
+}
+
+func (m *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.WaitGroup) {
+
+ log.Debug("start es bulk worker")
+
+ bulkItemSize := 0
+ mainBuf := bytes.Buffer{}
+ docBuf := bytes.Buffer{}
+ docEnc := json.NewEncoder(&docBuf)
+
+ idleDuration := 5 * time.Second
+ idleTimeout := time.NewTimer(idleDuration)
+ defer idleTimeout.Stop()
+
+ taskTimeOutDuration := 5 * time.Minute
+ taskTimeout := time.NewTimer(taskTimeOutDuration)
+ defer taskTimeout.Stop()
+
+READ_DOCS:
+ for {
+ idleTimeout.Reset(idleDuration)
+ taskTimeout.Reset(taskTimeOutDuration)
+ select {
+ case docI, open := <-m.DocChan:
+ var err error
+ log.Trace("read doc from channel,", docI)
+ // this check is in case the document is an error with scroll stuff
+ if status, ok := docI["status"]; ok {
+ if status.(int) == 404 {
+ log.Error("error: ", docI["response"])
+ continue
+ }
+ }
+
+ // sanity check
+ for _, key := range []string{"_index", "_type", "_source", "_id"} {
+ if _, ok := docI[key]; !ok {
+ break READ_DOCS
+ }
+ }
+
+ var tempDestIndexName string
+ var tempTargetTypeName string
+ tempDestIndexName = docI["_index"].(string)
+ tempTargetTypeName = docI["_type"].(string)
+
+ if m.Config.TargetIndexName != "" {
+ tempDestIndexName = m.Config.TargetIndexName
+ }
+
+ if m.Config.OverrideTypeName != "" {
+ tempTargetTypeName = m.Config.OverrideTypeName
+ }
+
+ doc := Document{
+ Index: tempDestIndexName,
+ Type: tempTargetTypeName,
+ source: docI["_source"].(map[string]interface{}),
+ Id: docI["_id"].(string),
+ }
+
+ if m.Config.RegenerateID {
+ doc.Id = ""
+ }
+
+ if m.Config.RenameFields != "" {
+ kvs := strings.Split(m.Config.RenameFields, ",")
+ for _, i := range kvs {
+ fvs := strings.Split(i, ":")
+ oldField := strings.TrimSpace(fvs[0])
+ newField := strings.TrimSpace(fvs[1])
+ if oldField == "_type" {
+ doc.source[newField] = docI["_type"].(string)
+ } else {
+ v := doc.source[oldField]
+ doc.source[newField] = v
+ delete(doc.source, oldField)
+ }
+ }
+ }
+
+ // add doc "_routing" if exists
+ if _, ok := docI["_routing"]; ok {
+ str, ok := docI["_routing"].(string)
+ if ok && str != "" {
+ doc.Routing = str
+ }
+ }
+
+ // if channel is closed flush and gtfo
+ if !open {
+ goto WORKER_DONE
+ }
+
+ // sanity check
+ if len(doc.Index) == 0 || len(doc.Type) == 0 {
+ log.Errorf("failed decoding document: %+v", doc)
+ continue
+ }
+
+ // encode the doc and and the _source field for a bulk request
+ post := map[string]Document{
+ "index": doc,
+ }
+ if err = docEnc.Encode(post); err != nil {
+ log.Error(err)
+ }
+ if err = docEnc.Encode(doc.source); err != nil {
+ log.Error(err)
+ }
+
+ // append the doc to the main buffer
+ mainBuf.Write(docBuf.Bytes())
+ // reset for next document
+ bulkItemSize++
+ (*docCount)++
+ docBuf.Reset()
+
+ // if we approach the 100mb es limit, flush to es and reset mainBuf
+ if mainBuf.Len()+docBuf.Len() > (m.Config.BulkSizeInMB * 1024 * 1024) {
+ goto CLEAN_BUFFER
+ }
+
+ case <-idleTimeout.C:
+ log.Debug("5s no message input")
+ goto CLEAN_BUFFER
+ case <-taskTimeout.C:
+ log.Warn("5m no message input, close worker")
+ goto WORKER_DONE
+ }
+
+ goto READ_DOCS
+
+ CLEAN_BUFFER:
+ m.TargetESAPI.Bulk(&mainBuf)
+ log.Trace("clean buffer, and execute bulk insert")
+ pb.Add(bulkItemSize)
+ bulkItemSize = 0
+ if m.Config.SleepSecondsAfterEachBulk > 0 {
+ time.Sleep(time.Duration(m.Config.SleepSecondsAfterEachBulk) * time.Second)
+ }
+ }
+WORKER_DONE:
+ if docBuf.Len() > 0 {
+ mainBuf.Write(docBuf.Bytes())
+ bulkItemSize++
+ }
+ m.TargetESAPI.Bulk(&mainBuf)
+ log.Trace("bulk insert")
+ pb.Add(bulkItemSize)
+ bulkItemSize = 0
+ wg.Done()
+}
+
+func (m *Migrator) bulkRecords(bulkOp BulkOperation, dstEsApi ESAPI, targetIndex string, targetType string, diffDocMaps map[string]interface{}) error {
+ //var err error
+ docCount := 0
+ bulkItemSize := 0
+ mainBuf := bytes.Buffer{}
+ docBuf := bytes.Buffer{}
+ docEnc := json.NewEncoder(&docBuf)
+
+ //var tempDestIndexName string
+ //var tempTargetTypeName string
+
+ for docId, docData := range diffDocMaps {
+ docI := docData.(map[string]interface{})
+ log.Debugf("now will bulk %s docId=%s, docData=%+v", bulkOp, docId, docData)
+ //tempDestIndexName = docI["_index"].(string)
+ //tempTargetTypeName = docI["_type"].(string)
+ var strOperation string
+ doc := Document{
+ Index: targetIndex,
+ Type: targetType,
+ Id: docId, // docI["_id"].(string),
+ }
+
+ switch bulkOp {
+ case opIndex:
+ doc.source = docI // docI["_source"].(map[string]interface{}),
+ strOperation = "index"
+ case opDelete:
+ strOperation = "delete"
+ //do nothing
+ }
+
+ // encode the doc and and the _source field for a bulk request
+
+ post := map[string]Document{
+ strOperation: doc,
+ }
+ _ = Verify(docEnc.Encode(post))
+ if bulkOp == opIndex {
+ _ = Verify(docEnc.Encode(doc.source))
+ }
+ // append the doc to the main buffer
+ mainBuf.Write(docBuf.Bytes())
+ // reset for next document
+ bulkItemSize++
+ docCount++
+ docBuf.Reset()
+ }
+
+ if mainBuf.Len() > 0 {
+ _ = Verify(dstEsApi.Bulk(&mainBuf))
+ }
+ return nil
+}
+
+func (m *Migrator) SyncBetweenIndex(srcEsApi ESAPI, dstEsApi ESAPI, cfg *Config) {
+ // _id => value
+ srcDocMaps := make(map[string]interface{})
+ dstDocMaps := make(map[string]interface{})
+ diffDocMaps := make(map[string]interface{})
+
+ srcRecordIndex := 0
+ dstRecordIndex := 0
+ var err error
+ srcType := ""
+ var srcScroll ScrollAPI = nil
+ var dstScroll ScrollAPI = nil
+ var emptyScroll = &EmptyScroll{}
+ lastSrcId := ""
+ lastDestId := ""
+ needScrollSrc := true
+ needScrollDest := true
+
+ addCount := 0
+ updateCount := 0
+ deleteCount := 0
+
+ //TODO: 进度计算,分为 [ scroll src/dst + index ] => delete 几个部分
+ srcBar := pb.New(1).Prefix("Progress")
+ //srcBar := pb.New(1).Prefix("Source")
+ //dstBar := pb.New(100).Prefix("Dest")
+ //pool, err := pb.StartPool(srcBar, dstBar)
+
+ for {
+ if srcScroll == nil {
+ srcScroll, err = srcEsApi.NewScroll(cfg.SourceIndexNames, cfg.ScrollTime, cfg.DocBufferCount, cfg.Query,
+ cfg.SortField, 0, cfg.ScrollSliceSize, cfg.Fields)
+ if err != nil {
+ log.Infof("can not scroll for source index: %s, reason:%s", cfg.SourceIndexNames, err.Error())
+ return
+ }
+ log.Infof("src total count=%d", srcScroll.GetHitsTotal())
+ srcBar.Total = int64(srcScroll.GetHitsTotal())
+ srcBar.Start()
+ } else if needScrollSrc {
+ srcScroll = VerifyWithResult(srcEsApi.NextScroll(cfg.ScrollTime, srcScroll.GetScrollId())).(ScrollAPI)
+ }
+
+ if dstScroll == nil {
+ dstScroll, err = dstEsApi.NewScroll(cfg.TargetIndexName, cfg.ScrollTime, cfg.DocBufferCount, cfg.Query,
+ cfg.SortField, 0, cfg.ScrollSliceSize, cfg.Fields)
+ if err != nil {
+ log.Infof("can not scroll for dest index: %s, reason:%s", cfg.TargetIndexName, err.Error())
+ //生成一个 empty 的, 相当于直接bulk?
+ dstScroll = emptyScroll
+
+ //没有 dest index,以 src 的条数作为总数
+ //dstBar.Total = int64(srcScroll.GetHitsTotal()) // = pb.New(srcScroll.GetHitsTotal()).Prefix("Dest")
+ } else {
+ //有 dest index,
+ //dstBar.Total = int64(dstScroll.GetHitsTotal()) // pb.New(dstScroll.GetHitsTotal()).Prefix("Dest")
+ }
+ //dstBar.Start()
+ log.Infof("dst total count=%d", dstScroll.GetHitsTotal())
+ } else if needScrollDest {
+ dstScroll = VerifyWithResult(dstEsApi.NextScroll(cfg.ScrollTime, dstScroll.GetScrollId())).(ScrollAPI)
+ }
+
+ //从目标 index 中查询,并放入 destMap, 如果没有则是空
+ if needScrollDest {
+ for idx, dstDocI := range dstScroll.GetDocs() {
+ destId := dstDocI.(map[string]interface{})["_id"].(string)
+ dstSource := dstDocI.(map[string]interface{})["_source"]
+ lastDestId = destId
+ log.Debugf("dst [%d]: dstId=%s", dstRecordIndex+idx, destId)
+
+ if srcSource, found := srcDocMaps[destId]; found {
+ delete(srcDocMaps, destId)
+
+ //如果从 src 的 map 中找到匹配地项
+ if !reflect.DeepEqual(srcSource, dstSource) {
+ //不相等, 则需要更新
+ diffDocMaps[destId] = srcSource
+ updateCount++
+ } else {
+ //完全相等, 则不需要处理
+ }
+ } else {
+ dstDocMaps[destId] = dstSource
+ }
+ //dstBar.Increment()
+ }
+ dstRecordIndex += len(dstScroll.GetDocs())
+ }
+
+ //先将 src 的当前批次查出并放入 map
+ if needScrollSrc {
+ for idx, srcDocI := range srcScroll.GetDocs() {
+ srcId := srcDocI.(map[string]interface{})["_id"].(string)
+ srcSource := srcDocI.(map[string]interface{})["_source"]
+ srcType = srcDocI.(map[string]interface{})["_type"].(string)
+ lastSrcId = srcId
+ log.Debugf("src [%d]: srcId=%s", srcRecordIndex+idx, srcId)
+
+ if len(lastDestId) == 0 {
+ //没有 destId, 表示 目标 index 中没有数据, 直接全部更新
+ diffDocMaps[srcId] = srcSource
+ addCount++
+ } else if dstSource, ok := dstDocMaps[srcId]; ok { //能从 dstDocMaps 中找到相同ID的数据
+ if !reflect.DeepEqual(srcSource, dstSource) {
+ //不完全相同,需要更新,否则忽略
+ diffDocMaps[srcId] = srcSource
+ updateCount++
+ }
+ //从 dst 中删除相同的
+ delete(dstDocMaps, srcId)
+ } else {
+ //找不到相同的 id, 可能是 dst 还没找到, 或者 dst 中不存在
+ if srcId < lastDestId {
+ //dest 已经超过当前的 srcId, 表示 dst 中不存在
+ diffDocMaps[srcId] = srcSource
+ addCount++
+ } else {
+ srcDocMaps[srcId] = srcSource
+ }
+ }
+ srcBar.Increment()
+ }
+ srcRecordIndex += len(srcScroll.GetDocs())
+ }
+
+ if len(diffDocMaps) > 0 {
+ log.Debugf("now will bulk index %d records", len(diffDocMaps))
+ _ = Verify(m.bulkRecords(opIndex, dstEsApi, cfg.TargetIndexName, srcType, diffDocMaps))
+ diffDocMaps = make(map[string]interface{})
+ }
+
+ if lastSrcId == lastDestId {
+ needScrollSrc = true
+ needScrollDest = true
+ } else if len(lastDestId) == 0 || (lastSrcId < lastDestId || (needScrollDest == true && len(dstScroll.GetDocs()) == 0)) {
+ //上一次要求遍历 dest,但遍历出空
+ needScrollSrc = true
+ needScrollDest = false
+ } else if lastSrcId > lastDestId || (needScrollSrc == true && len(srcScroll.GetDocs()) == 0) {
+ //上一次要求遍历 src, 但遍历出空
+ needScrollSrc = false
+ needScrollDest = true
+ } else {
+ panic("TODO:")
+ }
+
+ //如果 src 和 dst 都遍历完毕, 才退出
+ log.Debugf("lastSrcId=%s, lastDestId=%s, "+
+ "needScrollSrc=%t, len(srcScroll.GetDocs()=%d, "+
+ "needScrollDest=%t, len(dstScroll.GetDocs())=%d",
+ lastSrcId, lastDestId,
+ needScrollSrc, len(srcScroll.GetDocs()),
+ needScrollDest, len(dstScroll.GetDocs()))
+
+ if (!needScrollSrc || (len(srcScroll.GetDocs()) == 0 || len(srcScroll.GetDocs()) < cfg.DocBufferCount)) &&
+ (!needScrollDest || (len(dstScroll.GetDocs()) == 0 || len(dstScroll.GetDocs()) < cfg.DocBufferCount)) {
+ log.Debugf("can not find more, will quit, and index %d, delete %d", len(srcDocMaps), len(dstDocMaps))
+
+ if len(srcDocMaps) > 0 {
+ addCount += len(srcDocMaps)
+ _ = Verify(m.bulkRecords(opIndex, dstEsApi, cfg.TargetIndexName, srcType, srcDocMaps))
+ }
+ if len(dstDocMaps) > 0 {
+ //最后在 dst 中还有遗留的,表示 dst 中多的.需要删除
+ deleteCount += len(dstDocMaps)
+ _ = Verify(m.bulkRecords(opDelete, dstEsApi, cfg.TargetIndexName, srcType, dstDocMaps))
+ }
+ break
+ }
+
+ //目标不存在 或 src 还没有查询到和 dest 一样的地方
+ if cfg.SleepSecondsAfterEachBulk > 0 {
+ time.Sleep(time.Duration(cfg.SleepSecondsAfterEachBulk) * time.Second)
+ }
+ }
+ _ = Verify(srcEsApi.DeleteScroll(srcScroll.GetScrollId()))
+ _ = Verify(dstEsApi.DeleteScroll(dstScroll.GetScrollId()))
+
+ srcBar.FinishPrint("Source End")
+ //dstBar.FinishPrint("Dest End")
+ //pool.Stop()
+
+ log.Infof("sync %s(%d) to %s(%d), add=%d, update=%d, delete=%d",
+ cfg.SourceIndexNames, srcRecordIndex, cfg.TargetIndexName, dstRecordIndex,
+ addCount, updateCount, deleteCount)
+
+ //log.Infof("diffDocMaps=%+v", diffDocMaps)
+}
diff --git a/scroll.go b/scroll.go
index fb2978d..41f55db 100644
--- a/scroll.go
+++ b/scroll.go
@@ -22,45 +22,41 @@ import (
log "github.com/cihub/seelog"
)
-
-type ScrollAPI interface{
- GetScrollId()string
- GetHitsTotal()int
+type ScrollAPI interface {
+ GetScrollId() string
+ GetHitsTotal() int
GetDocs() []interface{}
ProcessScrollResult(c *Migrator, bar *pb.ProgressBar)
Next(c *Migrator, bar *pb.ProgressBar) (done bool)
}
-
-func (scroll *Scroll) GetHitsTotal()int{
+func (scroll *Scroll) GetHitsTotal() int {
return scroll.Hits.Total
}
-func (scroll *Scroll) GetScrollId()string{
+func (scroll *Scroll) GetScrollId() string {
return scroll.ScrollId
}
-func (scroll *Scroll) GetDocs()[]interface{}{
+func (scroll *Scroll) GetDocs() []interface{} {
return scroll.Hits.Docs
}
-func (scroll *ScrollV7) GetHitsTotal()int{
+func (scroll *ScrollV7) GetHitsTotal() int {
return scroll.Hits.Total.Value
}
-
-func (scroll *ScrollV7) GetScrollId()string{
+func (scroll *ScrollV7) GetScrollId() string {
return scroll.ScrollId
}
-func (scroll *ScrollV7) GetDocs()[]interface{}{
+func (scroll *ScrollV7) GetDocs() []interface{} {
return scroll.Hits.Docs
}
-
// Stream from source es instance. "done" is an indicator that the stream is
// over
-func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){
+func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar) {
//update progress bar
bar.Add(len(s.Hits.Docs))
@@ -79,29 +75,29 @@ func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){
func (s *Scroll) Next(c *Migrator, bar *pb.ProgressBar) (done bool) {
- scroll,err:=c.SourceESAPI.NextScroll(c.Config.ScrollTime,s.ScrollId)
+ scroll, err := c.SourceESAPI.NextScroll(c.Config.ScrollTime, s.ScrollId)
if err != nil {
log.Error(err)
return false
}
- docs:=scroll.(ScrollAPI).GetDocs()
+ docs := scroll.GetDocs()
if docs == nil || len(docs) <= 0 {
log.Debug("scroll result is empty")
return true
}
- scroll.(ScrollAPI).ProcessScrollResult(c,bar)
+ scroll.ProcessScrollResult(c, bar)
//update scrollId
- s.ScrollId=scroll.(ScrollAPI).GetScrollId()
+ s.ScrollId = scroll.GetScrollId()
return
}
// Stream from source es instance. "done" is an indicator that the stream is
// over
-func (s *ScrollV7) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){
+func (s *ScrollV7) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar) {
//update progress bar
bar.Add(len(s.Hits.Docs))
@@ -120,24 +116,47 @@ func (s *ScrollV7) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){
func (s *ScrollV7) Next(c *Migrator, bar *pb.ProgressBar) (done bool) {
- scroll,err:=c.SourceESAPI.NextScroll(c.Config.ScrollTime,s.ScrollId)
+ scroll, err := c.SourceESAPI.NextScroll(c.Config.ScrollTime, s.ScrollId)
if err != nil {
log.Error(err)
return false
}
- docs:=scroll.(ScrollAPI).GetDocs()
+ docs := scroll.GetDocs()
if docs == nil || len(docs) <= 0 {
log.Debug("scroll result is empty")
return true
}
- scroll.(ScrollAPI).ProcessScrollResult(c,bar)
+ scroll.ProcessScrollResult(c, bar)
//update scrollId
- s.ScrollId=scroll.(ScrollAPI).GetScrollId()
+ s.ScrollId = scroll.GetScrollId()
return
}
+// 返回空,从而在 compare + bulk 时有相同的处理逻辑
+type EmptyScroll struct {
+ Dummy int
+}
+
+func (es *EmptyScroll) GetScrollId() string {
+ return ""
+}
+func (es *EmptyScroll) GetHitsTotal() int {
+ return 0
+}
+
+func (es *EmptyScroll) GetDocs() []interface{} {
+ return make([]interface{}, 0)
+}
+
+func (es *EmptyScroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar) {
+
+}
+
+func (es *EmptyScroll) Next(c *Migrator, bar *pb.ProgressBar) (done bool) {
+ return true
+}
diff --git a/utils.go b/utils.go
index 2378817..570b99b 100644
--- a/utils.go
+++ b/utils.go
@@ -3,17 +3,17 @@ package main
import "fmt"
func SubString(prim string, start int, end int) string {
- if len(prim) == 0 {
- fmt.Println("primitive str is empty")
- }
- if l := len(prim); l < end {
- end = l
- }
+ if len(prim) == 0 {
+ fmt.Println("primitive str is empty")
+ }
+ if l := len(prim); l < end {
+ end = l
+ }
- value := prim
- runes := []rune(value)
+ value := prim
+ runes := []rune(value)
- safeSubString := string(runes[start:end])
+ safeSubString := string(runes[start:end])
- return safeSubString
+ return safeSubString
}
diff --git a/v0.go b/v0.go
index 40b0ee1..e810207 100644
--- a/v0.go
+++ b/v0.go
@@ -17,374 +17,403 @@ limitations under the License.
package main
import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- log "github.com/cihub/seelog"
- "io"
- "io/ioutil"
- "regexp"
- "strings"
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ log "github.com/cihub/seelog"
+ "io"
+ "io/ioutil"
+ "regexp"
+ "strings"
)
type ESAPIV0 struct {
- Host string //eg: http://localhost:9200
- Auth *Auth //eg: user:pass
- HttpProxy string //eg: http://proxyIp:proxyPort
- Compress bool
+ Host string //eg: http://localhost:9200
+ Auth *Auth //eg: user:pass
+ HttpProxy string //eg: http://proxyIp:proxyPort
+ Compress bool
+ Version *ClusterVersion
}
-
func (s *ESAPIV0) ClusterHealth() *ClusterHealth {
- url := fmt.Sprintf("%s/_cluster/health", s.Host)
- r, body, errs := Get(url, s.Auth,s.HttpProxy)
+ url := fmt.Sprintf("%s/_cluster/health", s.Host)
+ r, body, errs := Get(url, s.Auth, s.HttpProxy)
+
+ if r != nil && r.Body != nil {
+ io.Copy(ioutil.Discard, r.Body)
+ defer r.Body.Close()
+ }
- if r!=nil&& r.Body!=nil{
- io.Copy(ioutil.Discard, r.Body)
- defer r.Body.Close()
- }
+ if errs != nil {
+ return &ClusterHealth{Name: s.Host, Status: "unreachable"}
+ }
- if errs != nil {
- return &ClusterHealth{Name: s.Host, Status: "unreachable"}
- }
+ log.Debug(url)
+ log.Debug(body)
- log.Debug(url)
- log.Debug(body)
+ health := &ClusterHealth{}
+ err := json.Unmarshal([]byte(body), health)
- health := &ClusterHealth{}
- err := json.Unmarshal([]byte(body), health)
+ if err != nil {
+ log.Error(body)
+ return &ClusterHealth{Name: s.Host, Status: "unreachable"}
+ }
+ return health
+}
- if err != nil {
- log.Error(body)
- return &ClusterHealth{Name: s.Host, Status: "unreachable"}
- }
- return health
+func (s *ESAPIV0) ClusterVersion() *ClusterVersion {
+ return s.Version
}
-func (s *ESAPIV0) Bulk(data *bytes.Buffer) {
- if data == nil || data.Len() == 0 {
- log.Trace("data is empty, skip")
- return
- }
- data.WriteRune('\n')
- url := fmt.Sprintf("%s/_bulk", s.Host)
-
- body,err:=DoRequest(s.Compress,"POST",url,s.Auth,data.Bytes(),s.HttpProxy)
-
- if err != nil {
- log.Error(err)
- return
- }
- response:=BulkResponse{}
- err=DecodeJson(body, &response)
- if err == nil {
- if response.Errors{
- fmt.Println(body)
- }
- }
-
- data.Reset()
+func (s *ESAPIV0) Bulk(data *bytes.Buffer) error {
+ if data == nil || data.Len() == 0 {
+ log.Trace("data is empty, skip")
+ return nil
+ }
+ data.WriteRune('\n')
+ url := fmt.Sprintf("%s/_bulk", s.Host)
+
+ body, err := Request(s.Compress, "POST", url, s.Auth, data, s.HttpProxy)
+
+ if err != nil {
+ data.Reset()
+ log.Error(err)
+ return err
+ }
+ response := BulkResponse{}
+ err = DecodeJson(body, &response)
+ if err == nil {
+ if response.Errors {
+ log.Warnf("bulk error:%s", body)
+ }
+ }
+
+ data.Reset()
+ return err
}
func (s *ESAPIV0) GetIndexSettings(indexNames string) (*Indexes, error) {
- // get all settings
- allSettings := &Indexes{}
+ // get all settings
+ allSettings := &Indexes{}
- url := fmt.Sprintf("%s/%s/_settings", s.Host, indexNames)
- resp, body, errs := Get(url, s.Auth,s.HttpProxy)
+ url := fmt.Sprintf("%s/%s/_settings", s.Host, indexNames)
+ resp, body, errs := Get(url, s.Auth, s.HttpProxy)
- if resp!=nil&& resp.Body!=nil{
- io.Copy(ioutil.Discard, resp.Body)
- defer resp.Body.Close()
- }
+ if resp != nil && resp.Body != nil {
+ io.Copy(ioutil.Discard, resp.Body)
+ defer resp.Body.Close()
+ }
- if errs != nil {
- return nil, errs[0]
- }
+ if errs != nil {
+ return nil, errs[0]
+ }
- if resp.StatusCode != 200 {
- return nil, errors.New(body)
- }
+ if resp.StatusCode != 200 {
+ return nil, errors.New(body)
+ }
- log.Debug(body)
+ log.Debug(body)
- err := json.Unmarshal([]byte(body), allSettings)
- if err != nil {
- panic(err)
- return nil, err
- }
+ err := json.Unmarshal([]byte(body), allSettings)
+ if err != nil {
+ panic(err)
+ return nil, err
+ }
- return allSettings, nil
+ return allSettings, nil
}
func (s *ESAPIV0) GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) {
- url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames)
- resp, body, errs := Get(url, s.Auth,s.HttpProxy)
-
- if resp!=nil&& resp.Body!=nil{
- io.Copy(ioutil.Discard, resp.Body)
- defer resp.Body.Close()
- }
-
- if errs != nil {
- log.Error(errs)
- return "", 0, nil, errs[0]
- }
-
-
- if resp.StatusCode != 200 {
- return "", 0, nil, errors.New(body)
- }
-
- idxs := Indexes{}
- er := json.Unmarshal([]byte(body), &idxs)
-
- if er != nil {
- log.Error(body)
- return "", 0, nil, er
- }
-
- // remove indexes that start with . if user asked for it
- //if copyAllIndexes == false {
- // for name := range idxs {
- // switch name[0] {
- // case '.':
- // delete(idxs, name)
- // case '_':
- // delete(idxs, name)
- //
-//
-// }
-// }
-// }
-
- // if _all indexes limit the list of indexes to only these that we kept
- // after looking at mappings
- if indexNames == "_all" {
-
- var newIndexes []string
- for name := range idxs {
- newIndexes = append(newIndexes, name)
- }
- indexNames = strings.Join(newIndexes, ",")
-
- } else if strings.Contains(indexNames, "*") || strings.Contains(indexNames, "?") {
-
- r, _ := regexp.Compile(indexNames)
-
- //check index patterns
- var newIndexes []string
- for name := range idxs {
- matched := r.MatchString(name)
- if matched {
- newIndexes = append(newIndexes, name)
- }
- }
- indexNames = strings.Join(newIndexes, ",")
-
- }
-
- i := 0
- // wrap in mappings if moving from super old es
- for name, idx := range idxs {
- i++
- if _, ok := idx.(map[string]interface{})["mappings"]; !ok {
- (idxs)[name] = map[string]interface{}{
- "mappings": idx,
- }
- }
- }
-
- return indexNames, i, &idxs, nil
+ url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames)
+ resp, body, errs := Get(url, s.Auth, s.HttpProxy)
+
+ if resp != nil && resp.Body != nil {
+ io.Copy(ioutil.Discard, resp.Body)
+ defer resp.Body.Close()
+ }
+
+ if errs != nil {
+ log.Error(errs)
+ return "", 0, nil, errs[0]
+ }
+
+ if resp.StatusCode != 200 {
+ return "", 0, nil, errors.New(body)
+ }
+
+ idxs := Indexes{}
+ er := json.Unmarshal([]byte(body), &idxs)
+
+ if er != nil {
+ log.Error(body)
+ return "", 0, nil, er
+ }
+
+ // remove indexes that start with . if user asked for it
+ //if copyAllIndexes == false {
+ // for name := range idxs {
+ // switch name[0] {
+ // case '.':
+ // delete(idxs, name)
+ // case '_':
+ // delete(idxs, name)
+ //
+ //
+ // }
+ // }
+ // }
+
+ // if _all indexes limit the list of indexes to only these that we kept
+ // after looking at mappings
+ if indexNames == "_all" {
+
+ var newIndexes []string
+ for name := range idxs {
+ newIndexes = append(newIndexes, name)
+ }
+ indexNames = strings.Join(newIndexes, ",")
+
+ } else if strings.Contains(indexNames, "*") || strings.Contains(indexNames, "?") {
+
+ r, _ := regexp.Compile(indexNames)
+
+ //check index patterns
+ var newIndexes []string
+ for name := range idxs {
+ matched := r.MatchString(name)
+ if matched {
+ newIndexes = append(newIndexes, name)
+ }
+ }
+ indexNames = strings.Join(newIndexes, ",")
+
+ }
+
+ i := 0
+ // wrap in mappings if moving from super old es
+ for name, idx := range idxs {
+ i++
+ if _, ok := idx.(map[string]interface{})["mappings"]; !ok {
+ (idxs)[name] = map[string]interface{}{
+ "mappings": idx,
+ }
+ }
+ }
+
+ return indexNames, i, &idxs, nil
}
func getEmptyIndexSettings() map[string]interface{} {
- tempIndexSettings := map[string]interface{}{}
- tempIndexSettings["settings"] = map[string]interface{}{}
- tempIndexSettings["settings"].(map[string]interface{})["index"] = map[string]interface{}{}
- return tempIndexSettings
+ tempIndexSettings := map[string]interface{}{}
+ tempIndexSettings["settings"] = map[string]interface{}{}
+ tempIndexSettings["settings"].(map[string]interface{})["index"] = map[string]interface{}{}
+ return tempIndexSettings
}
func cleanSettings(settings map[string]interface{}) {
- //clean up settings
- delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "creation_date")
- delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "uuid")
- delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "version")
- delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "provided_name")
+ //clean up settings
+ delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "creation_date")
+ delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "uuid")
+ delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "version")
+ delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "provided_name")
}
func (s *ESAPIV0) UpdateIndexSettings(name string, settings map[string]interface{}) error {
- log.Debug("update index: ", name, settings)
- cleanSettings(settings)
- url := fmt.Sprintf("%s/%s/_settings", s.Host, name)
-
- if _, ok := settings["settings"].(map[string]interface{})["index"]; ok {
- if set, ok := settings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"]; ok {
- log.Debug("update static index settings: ", name)
- staticIndexSettings := getEmptyIndexSettings()
- staticIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"] = set
- Post(fmt.Sprintf("%s/%s/_close", s.Host, name), s.Auth, "",s.HttpProxy)
- body := bytes.Buffer{}
- enc := json.NewEncoder(&body)
- enc.Encode(staticIndexSettings)
- bodyStr, err := Request("PUT", url, s.Auth, &body,s.HttpProxy)
- if err != nil {
- log.Error(bodyStr, err)
- panic(err)
- return err
- }
- delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "analysis")
- Post(fmt.Sprintf("%s/%s/_open", s.Host, name), s.Auth, "",s.HttpProxy)
- }
- }
-
- log.Debug("update dynamic index settings: ", name)
-
- body := bytes.Buffer{}
- enc := json.NewEncoder(&body)
- enc.Encode(settings)
- _, err := Request("PUT", url, s.Auth, &body,s.HttpProxy)
-
- return err
+ log.Debug("update index: ", name, settings)
+ cleanSettings(settings)
+ url := fmt.Sprintf("%s/%s/_settings", s.Host, name)
+
+ if _, ok := settings["settings"].(map[string]interface{})["index"]; ok {
+ if set, ok := settings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"]; ok {
+ log.Debug("update static index settings: ", name)
+ staticIndexSettings := getEmptyIndexSettings()
+ staticIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"] = set
+ Request(false, "POST", fmt.Sprintf("%s/%s/_close", s.Host, name), s.Auth, nil, s.HttpProxy)
+ //Post(fmt.Sprintf("%s/%s/_close", s.Host, name), s.Auth, "", s.HttpProxy)
+ body := bytes.Buffer{}
+ enc := json.NewEncoder(&body)
+ enc.Encode(staticIndexSettings)
+ bodyStr, err := Request(s.Compress, "PUT", url, s.Auth, &body, s.HttpProxy)
+ if err != nil {
+ log.Error(bodyStr, err)
+ panic(err)
+ return err
+ }
+ delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "analysis")
+ Request(false, "POST", fmt.Sprintf("%s/%s/_open", s.Host, name), s.Auth, nil, s.HttpProxy)
+ //Post(fmt.Sprintf("%s/%s/_open", s.Host, name), s.Auth, "", s.HttpProxy)
+ }
+ }
+
+ log.Debug("update dynamic index settings: ", name)
+
+ body := bytes.Buffer{}
+ enc := json.NewEncoder(&body)
+ enc.Encode(settings)
+ _, err := Request(s.Compress, "PUT", url, s.Auth, &body, s.HttpProxy)
+
+ return err
}
func (s *ESAPIV0) UpdateIndexMapping(indexName string, settings map[string]interface{}) error {
- log.Debug("start update mapping: ", indexName,settings)
+ log.Debug("start update mapping: ", indexName, settings)
- for name, mapping := range settings {
+ for name, mapping := range settings {
- log.Debug("start update mapping: ", indexName,name,mapping)
+ log.Debug("start update mapping: ", indexName, name, mapping)
- url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name)
+ url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name)
- body := bytes.Buffer{}
- enc := json.NewEncoder(&body)
- enc.Encode(mapping)
- res, err := Request("POST", url, s.Auth, &body,s.HttpProxy)
- if(err!=nil){
- log.Error(url)
- log.Error(body.String())
- log.Error(err,res)
- panic(err)
- }
- }
- return nil
+ body := bytes.Buffer{}
+ enc := json.NewEncoder(&body)
+ enc.Encode(mapping)
+ res, err := Request(s.Compress, "POST", url, s.Auth, &body, s.HttpProxy)
+ if err != nil {
+ log.Error(url)
+ log.Error(body.String())
+ log.Error(err, res)
+ panic(err)
+ }
+ }
+ return nil
}
func (s *ESAPIV0) DeleteIndex(name string) (err error) {
- log.Debug("start delete index: ", name)
+ log.Debug("start delete index: ", name)
- url := fmt.Sprintf("%s/%s", s.Host, name)
+ url := fmt.Sprintf("%s/%s", s.Host, name)
- Request("DELETE", url, s.Auth, nil,s.HttpProxy)
+ Request(s.Compress, "DELETE", url, s.Auth, nil, s.HttpProxy)
- log.Debug("delete index: ", name)
+ log.Debug("delete index: ", name)
- return nil
+ return nil
}
func (s *ESAPIV0) CreateIndex(name string, settings map[string]interface{}) (err error) {
- cleanSettings(settings)
+ cleanSettings(settings)
- body := bytes.Buffer{}
- enc := json.NewEncoder(&body)
- enc.Encode(settings)
- log.Debug("start create index: ", name, settings)
+ body := bytes.Buffer{}
+ enc := json.NewEncoder(&body)
+ enc.Encode(settings)
+ log.Debug("start create index: ", name, settings)
- url := fmt.Sprintf("%s/%s", s.Host, name)
+ url := fmt.Sprintf("%s/%s", s.Host, name)
- resp, err := Request("PUT", url, s.Auth, &body,s.HttpProxy)
- log.Debugf("response: %s",resp)
+ resp, err := Request(s.Compress, "PUT", url, s.Auth, &body, s.HttpProxy)
+ log.Debugf("response: %s", resp)
- return err
+ return err
}
func (s *ESAPIV0) Refresh(name string) (err error) {
+ log.Debug("refresh index: ", name)
- log.Debug("refresh index: ", name)
+ url := fmt.Sprintf("%s/%s/_refresh", s.Host, name)
- url := fmt.Sprintf("%s/%s/_refresh", s.Host, name)
+ resp, err := Request(false, "POST", url, s.Auth, nil, s.HttpProxy)
+ log.Infof("refresh resp=%s, err=%+v", resp, err)
+ //resp, _, _ := Post(url, s.Auth, "", s.HttpProxy)
+ //if resp != nil && resp.Body != nil {
+ // io.Copy(ioutil.Discard, resp.Body)
+ // defer resp.Body.Close()
+ //}
- resp,_,_:=Post(url,s.Auth,"",s.HttpProxy)
- if resp!=nil&& resp.Body!=nil{
- io.Copy(ioutil.Discard, resp.Body)
- defer resp.Body.Close()
- }
+ return nil
+}
- return nil
+func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string,
+ slicedId int, maxSlicedCount int, fields string) (scroll ScrollAPI, err error) {
+
+ // curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50'
+ url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
+
+ var jsonBody []byte
+ if len(query) > 0 || len(fields) > 0 {
+ queryBody := map[string]interface{}{}
+ if len(fields) > 0 {
+ if !strings.Contains(fields, ",") {
+ queryBody["_source"] = fields
+ } else {
+ queryBody["_source"] = strings.Split(fields, ",")
+ }
+ }
+
+ if len(query) > 0 {
+ queryBody["query"] = map[string]interface{}{}
+ queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{}
+ queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query
+ }
+
+ if len(sort) > 0 {
+ sortFields := make([]string, 0)
+ sortFields = append(sortFields, sort)
+ queryBody["sort"] = sortFields
+ }
+
+ jsonBody, err = json.Marshal(queryBody)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+
+ }
+ //resp, body, errs := Post(url, s.Auth,jsonBody,s.HttpProxy)
+ body, err := Request(s.Compress, "POST", url, s.Auth, bytes.NewBuffer(jsonBody), s.HttpProxy)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+
+ scroll = &Scroll{}
+ err = DecodeJson(body, scroll)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+
+ return scroll, err
}
-func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int,query string, slicedId,maxSlicedCount int, fields string) (scroll interface{}, err error) {
-
- // curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50'
- url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
-
- var jsonBody []byte
- if len(query) > 0 || len(fields) > 0 {
- queryBody := map[string]interface{}{}
- if len(fields) > 0 {
- if !strings.Contains(fields, ",") {
- queryBody["_source"] = fields
- } else {
- queryBody["_source"] = strings.Split(fields, ",")
- }
- }
-
- if len(query) > 0 {
- queryBody["query"] = map[string]interface{}{}
- queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{}
- queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query
- }
-
- jsonBody, err = json.Marshal(queryBody)
- if err != nil {
- log.Error(err)
- return nil, err
- }
-
- }
- //resp, body, errs := Post(url, s.Auth,jsonBody,s.HttpProxy)
- body, err := DoRequest(s.Compress,"POST",url, s.Auth,jsonBody,s.HttpProxy)
- if err != nil {
- log.Error(err)
- return nil, err
- }
-
- scroll = &Scroll{}
- err = DecodeJson(body, scroll)
- if err != nil {
- log.Error(err)
- return nil, err
- }
-
- return scroll, err
+func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) {
+ // curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m'
+ id := bytes.NewBufferString(scrollId)
+ url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
+ body, err := Request(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy)
+
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+
+ // decode elasticsearch scroll response
+ scroll := &Scroll{}
+ err = DecodeJson(body, &scroll)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+
+ return scroll, nil
}
-func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (interface{}, error) {
- // curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m'
- id := bytes.NewBufferString(scrollId)
- url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
- body,err:=DoRequest(s.Compress,"GET",url,s.Auth,nil,s.HttpProxy)
-
- if err != nil {
- log.Error(err)
- return nil, err
- }
-
- // decode elasticsearch scroll response
- scroll := &Scroll{}
- err = DecodeJson(body, &scroll)
- if err != nil {
- log.Error(err)
- return nil, err
- }
-
- return scroll, nil
+func (s *ESAPIV0) DeleteScroll(scrollId string) error {
+ id := bytes.NewBufferString(scrollId)
+ url := fmt.Sprintf("%s/_search/scroll?scroll_id=%s", s.Host, id)
+ if len(scrollId) > 0 {
+ _, err := Request(false, "DELETE", url, s.Auth, nil, s.HttpProxy)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+ //log.Infof("delete scroll, result=%s", body)
+ }
+ return nil
}
diff --git a/v5.go b/v5.go
index 3158aae..6c34faa 100644
--- a/v5.go
+++ b/v5.go
@@ -17,120 +17,88 @@ limitations under the License.
package main
import (
- "bytes"
- "encoding/json"
- "fmt"
- log "github.com/cihub/seelog"
- "strings"
+ "bytes"
+ "encoding/json"
+ "fmt"
+ log "github.com/cihub/seelog"
+ "strings"
)
-type ESAPIV5 struct{
- ESAPIV0
+type ESAPIV5 struct {
+ ESAPIV0
}
-func (s *ESAPIV5) ClusterHealth() *ClusterHealth {
- return s.ESAPIV0.ClusterHealth()
+func (s *ESAPIV5) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string,
+ slicedId int, maxSlicedCount int, fields string) (scroll ScrollAPI, err error) {
+ url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
+
+ var jsonBody []byte
+ if len(query) > 0 || maxSlicedCount > 0 || len(fields) > 0 {
+ queryBody := map[string]interface{}{}
+
+ if len(fields) > 0 {
+ if !strings.Contains(fields, ",") {
+ queryBody["_source"] = fields
+ } else {
+ queryBody["_source"] = strings.Split(fields, ",")
+ }
+ }
+
+ if len(query) > 0 {
+ queryBody["query"] = map[string]interface{}{}
+ queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{}
+ queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query
+ }
+
+ if len(sort) > 0 {
+ sortFields := make([]string, 0)
+ sortFields = append(sortFields, sort)
+ queryBody["sort"] = sortFields
+ }
+
+ if maxSlicedCount > 1 {
+ log.Tracef("sliced scroll, %d of %d", slicedId, maxSlicedCount)
+ queryBody["slice"] = map[string]interface{}{}
+ queryBody["slice"].(map[string]interface{})["id"] = slicedId
+ queryBody["slice"].(map[string]interface{})["max"] = maxSlicedCount
+ }
+
+ jsonBody, err = json.Marshal(queryBody)
+ if err != nil {
+ log.Error(err)
+ }
+ }
+
+ body, err := Request(s.Compress, "POST", url, s.Auth, bytes.NewBuffer(jsonBody), s.HttpProxy)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+
+ scroll = &Scroll{}
+ err = DecodeJson(body, scroll)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+
+ return scroll, err
}
-func (s *ESAPIV5) Bulk(data *bytes.Buffer){
- s.ESAPIV0.Bulk(data)
-}
-
-func (s *ESAPIV5) GetIndexSettings(indexNames string) (*Indexes,error){
- return s.ESAPIV0.GetIndexSettings(indexNames)
-}
-
-func (s *ESAPIV5) UpdateIndexSettings(indexName string,settings map[string]interface{})(error){
- return s.ESAPIV0.UpdateIndexSettings(indexName,settings)
-}
-
-func (s *ESAPIV5) GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error){
- return s.ESAPIV0.GetIndexMappings(copyAllIndexes,indexNames)
-}
-
-func (s *ESAPIV5) UpdateIndexMapping(indexName string,settings map[string]interface{}) error {
- return s.ESAPIV0.UpdateIndexMapping(indexName,settings)
-}
-
-func (s *ESAPIV5) DeleteIndex(name string) (err error) {
- return s.ESAPIV0.DeleteIndex(name)
-}
-
-func (s *ESAPIV5) CreateIndex(name string,settings map[string]interface{}) (err error) {
- return s.ESAPIV0.CreateIndex(name,settings)
-}
-
-
-
-func (s *ESAPIV5) Refresh(name string) (err error) {
- return s.ESAPIV0.Refresh(name)
-}
-
-func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(scroll interface{}, err error){
- url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount)
-
- var jsonBody []byte
- if(len(query)>0||maxSlicedCount>0||len(fields)>0) {
- queryBody := map[string]interface{}{}
-
-
- if len(fields) > 0 {
- if !strings.Contains(fields, ",") {
- queryBody["_source"] = fields
- } else {
- queryBody["_source"] = strings.Split(fields, ",")
- }
- }
-
- if(len(query)>0){
- queryBody["query"] = map[string]interface{}{}
- queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{}
- queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query
- }
-
- if(maxSlicedCount>1){
- log.Tracef("sliced scroll, %d of %d",slicedId,maxSlicedCount)
- queryBody["slice"] = map[string]interface{}{}
- queryBody["slice"].(map[string]interface{})["id"] = slicedId
- queryBody["slice"].(map[string]interface{})["max"]= maxSlicedCount
- }
-
- jsonBody, err = json.Marshal(queryBody)
- if err != nil {
- log.Error(err)
- }
- }
-
- body, err := DoRequest(s.Compress,"POST",url, s.Auth,jsonBody,s.HttpProxy)
- if err != nil {
- log.Error(err)
- return nil, err
- }
-
- scroll = &Scroll{}
- err = DecodeJson(body,scroll)
- if err != nil {
- log.Error(err)
- return nil,err
- }
-
- return scroll,err
-}
-
-func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(interface{},error) {
- id := bytes.NewBufferString(scrollId)
+func (s *ESAPIV5) NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) {
+ id := bytes.NewBufferString(scrollId)
- url:=fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
+ url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
- body,err:=DoRequest(s.Compress,"GET",url,s.Auth,nil,s.HttpProxy)
+ body, err := Request(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy)
- // decode elasticsearch scroll response
- scroll := &Scroll{}
- err= DecodeJson(body, &scroll)
- if err != nil {
- log.Error(err)
- return nil,err
- }
+ // decode elasticsearch scroll response
+ scroll := &Scroll{}
+ err = DecodeJson(body, &scroll)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
- return scroll,nil
+ return scroll, nil
}
diff --git a/v6.go b/v6.go
index 3f31e77..a3f15bb 100644
--- a/v6.go
+++ b/v6.go
@@ -24,8 +24,8 @@ import (
log "github.com/cihub/seelog"
"io"
"io/ioutil"
- "strings"
"regexp"
+ "strings"
//"infini.sh/framework/core/util"
)
@@ -33,7 +33,8 @@ type ESAPIV6 struct {
ESAPIV5
}
-func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, slicedId, maxSlicedCount int, fields string) (scroll interface{}, err error) {
+func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string,
+ slicedId int, maxSlicedCount int, fields string) (scroll ScrollAPI, err error) {
url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
var jsonBody []byte
@@ -54,6 +55,12 @@ func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount
queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query
}
+ if len(sort) > 0 {
+ sortFields := make([]string, 0)
+ sortFields = append(sortFields, sort)
+ queryBody["sort"] = sortFields
+ }
+
if maxSlicedCount > 1 {
log.Tracef("sliced scroll, %d of %d", slicedId, maxSlicedCount)
queryBody["slice"] = map[string]interface{}{}
@@ -68,7 +75,7 @@ func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount
}
}
- body, err := DoRequest(s.Compress,"POST",url, s.Auth,jsonBody,s.HttpProxy)
+ body, err := Request(s.Compress, "POST", url, s.Auth, bytes.NewBuffer(jsonBody), s.HttpProxy)
if err != nil {
log.Error(err)
return nil, err
@@ -84,11 +91,11 @@ func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount
return scroll, err
}
-func (s *ESAPIV6) NextScroll(scrollTime string, scrollId string) (interface{}, error) {
+func (s *ESAPIV6) NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) {
id := bytes.NewBufferString(scrollId)
url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
- body,err:=DoRequest(s.Compress,"GET",url,s.Auth,nil,s.HttpProxy)
+ body, err := Request(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy)
// decode elasticsearch scroll response
scroll := &Scroll{}
@@ -101,21 +108,11 @@ func (s *ESAPIV6) NextScroll(scrollTime string, scrollId string) (interface{}, e
return scroll, nil
}
-
-func (s *ESAPIV6) GetIndexSettings(indexNames string) (*Indexes,error){
- return s.ESAPIV0.GetIndexSettings(indexNames)
-}
-
-func (s *ESAPIV6) UpdateIndexSettings(indexName string,settings map[string]interface{})(error){
- return s.ESAPIV0.UpdateIndexSettings(indexName,settings)
-}
-
-
func (s *ESAPIV6) GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) {
url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames)
- resp, body, errs := Get(url, s.Auth,s.HttpProxy)
+ resp, body, errs := Get(url, s.Auth, s.HttpProxy)
- if resp!=nil&& resp.Body!=nil{
+ if resp != nil && resp.Body != nil {
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}
@@ -125,7 +122,6 @@ func (s *ESAPIV6) GetIndexMappings(copyAllIndexes bool, indexNames string) (stri
return "", 0, nil, errs[0]
}
-
if resp.StatusCode != 200 {
return "", 0, nil, errors.New(body)
}
@@ -178,29 +174,26 @@ func (s *ESAPIV6) GetIndexMappings(copyAllIndexes bool, indexNames string) (stri
return indexNames, i, &idxs, nil
}
-
-func (s *ESAPIV6) UpdateIndexMapping(indexName string,settings map[string]interface{}) error {
+func (s *ESAPIV6) UpdateIndexMapping(indexName string, settings map[string]interface{}) error {
log.Debug("start update mapping: ", indexName, settings)
- delete(settings,"dynamic_templates")
-
-
+ delete(settings, "dynamic_templates")
for name, _ := range settings {
- log.Debug("start update mapping: ", indexName,", ",settings)
+ log.Debug("start update mapping: ", indexName, ", ", settings)
- url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName,name)
+ url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name)
body := bytes.Buffer{}
enc := json.NewEncoder(&body)
enc.Encode(settings)
- res, err := Request("POST", url, s.Auth, &body,s.HttpProxy)
- if(err!=nil){
+ res, err := Request(s.Compress, "POST", url, s.Auth, &body, s.HttpProxy)
+ if err != nil {
log.Error(url)
log.Error(settings)
- log.Error(err,res)
+ log.Error(err, res)
panic(err)
}
}
diff --git a/v7.go b/v7.go
index c3179a7..2a6aeb0 100644
--- a/v7.go
+++ b/v7.go
@@ -24,15 +24,16 @@ import (
log "github.com/cihub/seelog"
"io"
"io/ioutil"
- "strings"
"regexp"
+ "strings"
)
type ESAPIV7 struct {
ESAPIV6
}
-func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, slicedId, maxSlicedCount int, fields string) (scroll interface{}, err error) {
+func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string,
+ slicedId int, maxSlicedCount int, fields string) (scroll ScrollAPI, err error) {
url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
jsonBody := ""
@@ -53,6 +54,12 @@ func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount
queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query
}
+ if len(sort) > 0 {
+ sortFields := make([]string, 0)
+ sortFields = append(sortFields, sort)
+ queryBody["sort"] = sortFields
+ }
+
if maxSlicedCount > 1 {
log.Tracef("sliced scroll, %d of %d", slicedId, maxSlicedCount)
queryBody["slice"] = map[string]interface{}{}
@@ -63,34 +70,35 @@ func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount
jsonArray, err := json.Marshal(queryBody)
if err != nil {
log.Error(err)
-
+ return nil, err
} else {
jsonBody = string(jsonArray)
}
}
- resp, body, errs := Post(url, s.Auth, jsonBody, s.HttpProxy)
+ body, errs := Request(false, "POST", url, s.Auth, bytes.NewBufferString(jsonBody), s.HttpProxy)
+ //resp, body, errs := Post(url, s.Auth, jsonBody, s.HttpProxy)
- if resp != nil && resp.Body != nil {
- io.Copy(ioutil.Discard, resp.Body)
- defer resp.Body.Close()
- }
+ //if resp != nil && resp.Body != nil {
+ // io.Copy(ioutil.Discard, resp.Body)
+ // defer resp.Body.Close()
+ //}
if errs != nil {
- log.Error(errs)
- return nil, errs[0]
+ //log.Error(errs)
+ return nil, errs
}
- if resp.StatusCode != 200 {
- return nil, errors.New(body)
- }
+ //if resp.StatusCode != 200 {
+ // return nil, errors.New(body)
+ //}
log.Trace("new scroll,", body)
- if err != nil {
- log.Error(err)
- return nil, err
- }
+ //if err != nil {
+ // log.Error(err)
+ // return nil, err
+ //}
scroll = &ScrollV7{}
err = DecodeJson(body, scroll)
@@ -102,11 +110,11 @@ func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount
return scroll, err
}
-func (s *ESAPIV7) NextScroll(scrollTime string, scrollId string) (interface{}, error) {
+func (s *ESAPIV7) NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) {
id := bytes.NewBufferString(scrollId)
url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
- body,err:=DoRequest(s.Compress,"GET",url,s.Auth,nil,s.HttpProxy)
+ body, err := Request(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy)
if err != nil {
//log.Error(errs)
@@ -123,21 +131,11 @@ func (s *ESAPIV7) NextScroll(scrollTime string, scrollId string) (interface{}, e
return scroll, nil
}
-
-func (s *ESAPIV7) GetIndexSettings(indexNames string) (*Indexes,error){
- return s.ESAPIV0.GetIndexSettings(indexNames)
-}
-
-func (s *ESAPIV7) UpdateIndexSettings(indexName string,settings map[string]interface{})(error){
- return s.ESAPIV0.UpdateIndexSettings(indexName,settings)
-}
-
-
func (s *ESAPIV7) GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) {
url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames)
- resp, body, errs := Get(url, s.Auth,s.HttpProxy)
+ resp, body, errs := Get(url, s.Auth, s.HttpProxy)
- if resp!=nil&& resp.Body!=nil{
+ if resp != nil && resp.Body != nil {
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}
@@ -147,7 +145,6 @@ func (s *ESAPIV7) GetIndexMappings(copyAllIndexes bool, indexNames string) (stri
return "", 0, nil, errs[0]
}
-
if resp.StatusCode != 200 {
return "", 0, nil, errors.New(body)
}
@@ -201,29 +198,28 @@ func (s *ESAPIV7) GetIndexMappings(copyAllIndexes bool, indexNames string) (stri
return indexNames, i, &idxs, nil
}
-
-func (s *ESAPIV7) UpdateIndexMapping(indexName string,settings map[string]interface{}) error {
+func (s *ESAPIV7) UpdateIndexMapping(indexName string, settings map[string]interface{}) error {
log.Debug("start update mapping: ", indexName, settings)
- delete(settings,"dynamic_templates")
+ delete(settings, "dynamic_templates")
//for name, mapping := range settings {
- log.Debug("start update mapping: ", indexName,", ",settings)
+ log.Debug("start update mapping: ", indexName, ", ", settings)
- url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexName)
+ url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexName)
- body := bytes.Buffer{}
- enc := json.NewEncoder(&body)
- enc.Encode(settings)
- res, err := Request("POST", url, s.Auth, &body,s.HttpProxy)
- if(err!=nil){
- log.Error(url)
- log.Error(body.String())
- log.Error(err,res)
- panic(err)
- }
+ body := bytes.Buffer{}
+ enc := json.NewEncoder(&body)
+ enc.Encode(settings)
+ res, err := Request(s.Compress, "POST", url, s.Auth, &body, s.HttpProxy)
+ if err != nil {
+ log.Error(url)
+ log.Error(body.String())
+ log.Error(err, res)
+ panic(err)
+ }
//}
return nil
}
diff --git a/verify.go b/verify.go
new file mode 100644
index 0000000..a0b4e03
--- /dev/null
+++ b/verify.go
@@ -0,0 +1,68 @@
+package main
+
+import (
+ "fmt"
+ log "github.com/cihub/seelog"
+ "path/filepath"
+ "reflect"
+ "runtime"
+ "strings"
+)
+
+type CheckErrorAction int
+
+const (
+ ACTION_FATAL_QUIT CheckErrorAction = iota
+ ACTION_LOG_ERROR
+)
+
+//Notice:
+// 1. when dev, set to ACTION_FATAL_QUIT, so can check error quickly,
+// then can add error logical for the place that once thought could not go wrong
+// 2. when released, set to ACTION_LOG_ERROR, so just log error
+
+var verifyAction = ACTION_FATAL_QUIT
+
+// GetCallStackInfo return fileName, lineNo, funName
+func GetCallStackInfo(skip int) (string, int, string) {
+ var funName string
+ pc, fileName, lineNo, ok := runtime.Caller(skip)
+ if ok {
+ funName = strings.TrimPrefix(filepath.Ext(runtime.FuncForPC(pc).Name()), ".")
+ } else {
+ funName = ""
+ fileName, lineNo = "", -1
+ }
+ return fileName, lineNo, funName
+}
+
+// skip 表示跳过几个调用堆栈, 获取真正有意义的代码调用位置
+func checkAndHandleError(err error, msg string, action CheckErrorAction, skip int) {
+ if err != nil {
+ fileName, lineNo, funName := GetCallStackInfo(skip)
+
+ switch action {
+ case ACTION_FATAL_QUIT:
+ log.Errorf("%s:%d: (%s) FAIL(%s), msg=%s", fileName, lineNo, funName, reflect.TypeOf(err).String(), msg)
+ //log.Fatalf("") //"error at: %s:%d, msg=%s, err=%s", fileName, lineNo, msg, err)
+ panic(fmt.Sprintf("error at: %s:%d, msg=%s, err=%s", fileName, lineNo, msg, err))
+ case ACTION_LOG_ERROR:
+ log.Warnf("%s:%d: (%s) FAIL(%s), msg=%s", fileName, lineNo, funName, reflect.TypeOf(err).String(), msg)
+ //flog.Infof("error at: %s:%d, msg=%s, err=%s", fileName, lineNo, msg, err)
+ }
+ }
+}
+
+func Verify(err error) error {
+ if err != nil {
+ checkAndHandleError(err, err.Error(), verifyAction, 3)
+ }
+ return err
+}
+
+func VerifyWithResult(result interface{}, err error) interface{} {
+ if err != nil {
+ checkAndHandleError(err, err.Error(), verifyAction, 3)
+ }
+ return result
+}