From 628122fb4b8f37ece4a8b537c5316e6676f052b5 Mon Sep 17 00:00:00 2001 From: fishjam Date: Fri, 5 Jan 2024 11:01:21 +0800 Subject: [PATCH] add sync function for incremental update (#84) * https://github.com/fishjam/esm/issues/1 add go.mod and fix build error(infini.sh/framework, fasthttp, util) * run following command to format code: gofmt -l -w . * https://github.com/fishjam/esm/issues/1 1.add Config: SortField, TruncateOutFile, SkipFields, so can dump es index to local file and compare. 2.add Config function: Sync , so can scroll and compare the source and dest index records, and just index/update/delete the changed records 3.refactor code: - add some functions in esapi, ClusterVersion() and DeleteScroll(), add ParseEsApi - move bulk.go to migrator.go, and add some functions - refactor all http method(GET/Post/DoRequest) to sinle Request method, and support proxy. - delete some commented and useless code * fix error while source index not exist and change log. * change model name * change log * update README.md and change the description of some configurations --- README.md | 19 +- buffer.go | 20 +- bulk.go | 183 --------------- domain.go | 10 +- esapi.go | 21 +- file.go | 76 ++++--- go.mod | 2 +- http.go | 170 +++++--------- log.go | 4 +- main.go | 200 ++++------------- migrator.go | 575 ++++++++++++++++++++++++++++++++++++++++++++++++ scroll.go | 65 ++++-- utils.go | 20 +- v0.go | 623 +++++++++++++++++++++++++++------------------------- v5.go | 180 +++++++-------- v6.go | 49 ++--- v7.go | 90 ++++---- verify.go | 68 ++++++ 18 files changed, 1359 insertions(+), 1016 deletions(-) delete mode 100644 bulk.go create mode 100644 migrator.go create mode 100644 verify.go diff --git a/README.md b/README.md index 0b2fd9b..931d973 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Links: * Support specify which _source fields to return from source * Support specify query string query to filter the data source * Support rename source fields while do bulk indexing +* Support incremental update(add/update/delete changed records) with `--sync`. Notice: it use different implementation, just handle the ***changed*** records, but not as fast as the old way * Load generating with ## ESM is fast! @@ -69,6 +70,11 @@ copy index `src_index` from `192.168.1.x` to `192.168.1.y:9200` and save with `d ./bin/esm -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index -w=5 -b=100 ``` +use sync feature for incremental update index `src_index` from `192.168.1.x` to `192.168.1.y:9200` +``` +./bin/esm --sync -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index +``` + support Basic-Auth ``` ./bin/esm -s http://localhost:9200 -x "src_index" -y "dest_index" -d http://localhost:9201 -n admin:111111 @@ -91,6 +97,13 @@ dump elasticsearch documents into local file ./bin/esm -s http://localhost:9200 -x "src_index" -m admin:111111 -c 5000 -q=query:mixer --refresh -o=dump.bin ``` +dump source and target index to local file and compare them, so can find the difference quickly +``` +./bin/esm --sort=_id -s http://localhost:9200 -x "src_index" --truncate_output --skip=_index -o=src.json +./bin/esm --sort=_id -s http://localhost:9200 -x "dst_index" --truncate_output --skip=_index -o=dst.json +diff -W 200 -ry --suppress-common-lines src.json dst.json +``` + loading data from dump files, bulk insert to another es instance ``` ./bin/esm -d http://localhost:9200 -y "dest_index" -n admin:111111 -c 5000 -b 5 --refresh -i=dump.bin @@ -172,6 +185,7 @@ Usage: Application Options: -s, --source= source elasticsearch instance, ie: http://localhost:9200 -q, --query= query against source elasticsearch instance, filter data before migrate, ie: name:medcl + --sort= sort field when scroll, ie: _id (default: _id) -d, --dest= destination elasticsearch instance, ie: http://localhost:9201 -m, --source_auth= basic auth of source elasticsearch instance, ie: user:pass -n, --dest_auth= basic auth of target elasticsearch instance, ie: user:pass @@ -192,12 +206,15 @@ Application Options: --green wait for both hosts cluster status to be green before dump. otherwise yellow is okay -v, --log= setting log level,options:trace,debug,info,warn,error (INFO) -o, --output_file= output documents of source index into local file + --truncate_output= truncate before dump to output file -i, --input_file= indexing from local dump file --input_file_type= the data type of input file, options: dump, json_line, json_array, log_line (dump) --source_proxy= set proxy to source http connections, ie: http://127.0.0.1:8080 --dest_proxy= set proxy to target http connections, ie: http://127.0.0.1:8080 --refresh refresh after migration finished - --fields= filter source fields, comma separated, ie: col1,col2,col3,... + --sync= sync will use scroll for both source and target index, compare the data and sync(index/update/delete) + --fields= filter source fields(white list), comma separated, ie: col1,col2,col3,... + --skip= skip source fields(black list), comma separated, ie: col1,col2,col3,... --rename= rename source fields, comma separated, ie: _type:type, name:myname -l, --logstash_endpoint= target logstash tcp endpoint, ie: 127.0.0.1:5055 --secured_logstash_endpoint target logstash tcp endpoint was secured by TLS diff --git a/buffer.go b/buffer.go index c2c1c85..292cbb2 100644 --- a/buffer.go +++ b/buffer.go @@ -17,8 +17,8 @@ limitations under the License. package main import ( - "io" "errors" + "io" ) //https://golangtc.com/t/5a49e2104ce40d740bbbc515 @@ -39,40 +39,40 @@ func (b *buffer) Len() int { return b.end - b.start } -//将有用的字节前移 +// 将有用的字节前移 func (b *buffer) grow() { if b.start == 0 { return } copy(b.buf, b.buf[b.start:b.end]) b.end -= b.start - b.start = 0; + b.start = 0 } -//从reader里面读取数据,如果reader阻塞,会发生阻塞 +// 从reader里面读取数据,如果reader阻塞,会发生阻塞 func (b *buffer) readFromReader() (int, error) { b.grow() n, err := b.reader.Read(b.buf[b.end:]) - if (err != nil) { + if err != nil { return n, err } b.end += n return n, nil } -//返回n个字节,而不产生移位 +// 返回n个字节,而不产生移位 func (b *buffer) seek(n int) ([]byte, error) { if b.end-b.start >= n { - buf := b.buf[b.start:b.start+n] + buf := b.buf[b.start : b.start+n] return buf, nil } return nil, errors.New("not enough") } -//舍弃offset个字段,读取n个字段 -func (b *buffer) read(offset, n int) ([]byte) { +// 舍弃offset个字段,读取n个字段 +func (b *buffer) read(offset, n int) []byte { b.start += offset - buf := b.buf[b.start:b.start+n] + buf := b.buf[b.start : b.start+n] b.start += n return buf } diff --git a/bulk.go b/bulk.go deleted file mode 100644 index d968eba..0000000 --- a/bulk.go +++ /dev/null @@ -1,183 +0,0 @@ -/* -Copyright 2016 Medcl (m AT medcl.net) - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "bytes" - "encoding/json" - "github.com/cheggaaa/pb" - "strings" - "sync" - "time" - - log "github.com/cihub/seelog" -) - -func (c *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.WaitGroup) { - - log.Debug("start es bulk worker") - - bulkItemSize := 0 - mainBuf := bytes.Buffer{} - docBuf := bytes.Buffer{} - docEnc := json.NewEncoder(&docBuf) - - idleDuration := 5 * time.Second - idleTimeout := time.NewTimer(idleDuration) - defer idleTimeout.Stop() - - taskTimeOutDuration := 5 * time.Minute - taskTimeout := time.NewTimer(taskTimeOutDuration) - defer taskTimeout.Stop() - - -READ_DOCS: - for { - idleTimeout.Reset(idleDuration) - taskTimeout.Reset(taskTimeOutDuration) - select { - case docI, open := <-c.DocChan: - var err error - log.Trace("read doc from channel,", docI) - // this check is in case the document is an error with scroll stuff - if status, ok := docI["status"]; ok { - if status.(int) == 404 { - log.Error("error: ", docI["response"]) - continue - } - } - - // sanity check - for _, key := range []string{"_index", "_type", "_source", "_id"} { - if _, ok := docI[key]; !ok { - break READ_DOCS - } - } - - var tempDestIndexName string - var tempTargetTypeName string - tempDestIndexName = docI["_index"].(string) - tempTargetTypeName = docI["_type"].(string) - - if c.Config.TargetIndexName != "" { - tempDestIndexName = c.Config.TargetIndexName - } - - if c.Config.OverrideTypeName != "" { - tempTargetTypeName = c.Config.OverrideTypeName - } - - doc := Document{ - Index: tempDestIndexName, - Type: tempTargetTypeName, - source: docI["_source"].(map[string]interface{}), - Id: docI["_id"].(string), - } - - if c.Config.RegenerateID { - doc.Id = "" - } - - if c.Config.RenameFields != "" { - kvs := strings.Split(c.Config.RenameFields, ",") - for _, i := range kvs { - fvs := strings.Split(i, ":") - oldField := strings.TrimSpace(fvs[0]) - newField := strings.TrimSpace(fvs[1]) - if oldField == "_type" { - doc.source[newField] = docI["_type"].(string) - } else { - v := doc.source[oldField] - doc.source[newField] = v - delete(doc.source, oldField) - } - } - } - - // add doc "_routing" if exists - if _, ok := docI["_routing"]; ok { - str, ok := docI["_routing"].(string) - if ok && str != "" { - doc.Routing = str - } - } - - // if channel is closed flush and gtfo - if !open { - goto WORKER_DONE - } - - // sanity check - if len(doc.Index) == 0 || len(doc.Type) == 0 { - log.Errorf("failed decoding document: %+v", doc) - continue - } - - // encode the doc and and the _source field for a bulk request - post := map[string]Document{ - "index": doc, - } - if err = docEnc.Encode(post); err != nil { - log.Error(err) - } - if err = docEnc.Encode(doc.source); err != nil { - log.Error(err) - } - - - // append the doc to the main buffer - mainBuf.Write(docBuf.Bytes()) - // reset for next document - bulkItemSize++ - (*docCount)++ - docBuf.Reset() - - // if we approach the 100mb es limit, flush to es and reset mainBuf - if mainBuf.Len()+docBuf.Len() > (c.Config.BulkSizeInMB * 1024*1024) { - goto CLEAN_BUFFER - } - - case <-idleTimeout.C: - log.Debug("5s no message input") - goto CLEAN_BUFFER - case <-taskTimeout.C: - log.Warn("5m no message input, close worker") - goto WORKER_DONE - } - - goto READ_DOCS - - CLEAN_BUFFER: - c.TargetESAPI.Bulk(&mainBuf) - log.Trace("clean buffer, and execute bulk insert") - pb.Add(bulkItemSize) - bulkItemSize = 0 - if c.Config.SleepSecondsAfterEachBulk >0{ - time.Sleep(time.Duration(c.Config.SleepSecondsAfterEachBulk) * time.Second) - } - } -WORKER_DONE: - if docBuf.Len() > 0 { - mainBuf.Write(docBuf.Bytes()) - bulkItemSize++ - } - c.TargetESAPI.Bulk(&mainBuf) - log.Trace("bulk insert") - pb.Add(bulkItemSize) - bulkItemSize = 0 - wg.Done() -} diff --git a/domain.go b/domain.go index 12ba958..bec97ee 100644 --- a/domain.go +++ b/domain.go @@ -77,7 +77,7 @@ type ClusterHealth struct { Status string `json:"status,omitempty"` } -//{"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]} +// {"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]} type BulkResponse struct { Took int `json:"took,omitempty"` Errors bool `json:"errors,omitempty"` @@ -107,11 +107,12 @@ type Config struct { // config options SourceEs string `short:"s" long:"source" description:"source elasticsearch instance, ie: http://localhost:9200"` Query string `short:"q" long:"query" description:"query against source elasticsearch instance, filter data before migrate, ie: name:medcl"` + SortField string `long:"sort" description:"sort field when scroll, ie: _id" default:"_id"` TargetEs string `short:"d" long:"dest" description:"destination elasticsearch instance, ie: http://localhost:9201"` SourceEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, ie: user:pass"` TargetEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, ie: user:pass"` DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"10000"` - BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"` + BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"` Workers int `short:"w" long:"workers" description:"concurrency number for bulk workers" default:"1"` BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"` ScrollTime string `short:"t" long:"time" description:"scroll time" default:"10m"` @@ -127,12 +128,15 @@ type Config struct { WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay"` LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"` DumpOutFile string `short:"o" long:"output_file" description:"output documents of source index into local file" ` + TruncateOutFile bool `long:"truncate_output" description:"truncate before dump to output file" ` DumpInputFile string `short:"i" long:"input_file" description:"indexing from local dump file" ` InputFileType string `long:"input_file_type" description:"the data type of input file, options: dump, json_line, json_array, log_line" default:"dump" ` SourceProxy string `long:"source_proxy" description:"set proxy to source http connections, ie: http://127.0.0.1:8080"` TargetProxy string `long:"dest_proxy" description:"set proxy to target http connections, ie: http://127.0.0.1:8080"` Refresh bool `long:"refresh" description:"refresh after migration finished"` - Fields string `long:"fields" description:"filter source fields, comma separated, ie: col1,col2,col3,..." ` + Sync bool `long:"sync" description:"sync will use scroll for both source and target index, compare the data and sync(index/update/delete)"` + Fields string `long:"fields" description:"filter source fields(white list), comma separated, ie: col1,col2,col3,..." ` + SkipFields string `long:"skip" description:"skip source fields(black list), comma separated, ie: col1,col2,col3,..." ` RenameFields string `long:"rename" description:"rename source fields, comma separated, ie: _type:type, name:myname" ` LogstashEndpoint string `short:"l" long:"logstash_endpoint" description:"target logstash tcp endpoint, ie: 127.0.0.1:5055" ` LogstashSecEndpoint bool `long:"secured_logstash_endpoint" description:"target logstash tcp endpoint was secured by TLS" ` diff --git a/esapi.go b/esapi.go index 5697891..d178919 100644 --- a/esapi.go +++ b/esapi.go @@ -18,16 +18,19 @@ package main import "bytes" -type ESAPI interface{ +type ESAPI interface { ClusterHealth() *ClusterHealth - Bulk(data *bytes.Buffer) + ClusterVersion() *ClusterVersion + Bulk(data *bytes.Buffer) error GetIndexSettings(indexNames string) (*Indexes, error) - DeleteIndex(name string) (error) - CreateIndex(name string,settings map[string]interface{}) (error) - GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error) - UpdateIndexSettings(indexName string,settings map[string]interface{})(error) - UpdateIndexMapping(indexName string,mappings map[string]interface{})(error) - NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(interface{}, error) - NextScroll(scrollTime string,scrollId string)(interface{},error) + DeleteIndex(name string) error + CreateIndex(name string, settings map[string]interface{}) error + GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) + UpdateIndexSettings(indexName string, settings map[string]interface{}) error + UpdateIndexMapping(indexName string, mappings map[string]interface{}) error + NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string, + slicedId int, maxSlicedCount int, fields string) (ScrollAPI, error) + NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) + DeleteScroll(scrollId string) error Refresh(name string) (err error) } diff --git a/file.go b/file.go index ee50b3e..5cb52d6 100644 --- a/file.go +++ b/file.go @@ -17,24 +17,25 @@ limitations under the License. package main import ( - "sync" - "github.com/cheggaaa/pb" - log "github.com/cihub/seelog" - "os" "bufio" "encoding/json" + "github.com/cheggaaa/pb" + log "github.com/cihub/seelog" "io" + "os" + "strings" + "sync" ) -func checkFileIsExist(filename string) (bool) { - var exist = true; +func checkFileIsExist(filename string) bool { + var exist = true if _, err := os.Stat(filename); os.IsNotExist(err) { - exist = false; + exist = false } - return exist; + return exist } -func (m *Migrator) NewFileReadWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) { +func (m *Migrator) NewFileReadWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) { log.Debug("start reading file") f, err := os.Open(m.Config.DumpInputFile) if err != nil { @@ -45,16 +46,16 @@ func (m *Migrator) NewFileReadWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) { defer f.Close() r := bufio.NewReader(f) lineCount := 0 - for{ - line,err := r.ReadString('\n') - if io.EOF == err || nil != err{ + for { + line, err := r.ReadString('\n') + if io.EOF == err || nil != err { break } lineCount += 1 js := map[string]interface{}{} err = DecodeJson(line, &js) - if err!=nil { + if err != nil { log.Error(err) continue } @@ -70,26 +71,44 @@ func (m *Migrator) NewFileReadWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) { func (c *Migrator) NewFileDumpWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) { var f *os.File - var err1 error; + var err1 error if checkFileIsExist(c.Config.DumpOutFile) { - f, err1 = os.OpenFile(c.Config.DumpOutFile, os.O_APPEND|os.O_WRONLY, os.ModeAppend) - if(err1!=nil){ + flag := os.O_WRONLY + if c.Config.TruncateOutFile { + flag |= os.O_TRUNC + } else { + flag |= os.O_APPEND + } + f, err1 = os.OpenFile(c.Config.DumpOutFile, flag, os.ModeAppend) + if err1 != nil { log.Error(err1) return } - }else { + } else { f, err1 = os.Create(c.Config.DumpOutFile) - if(err1!=nil){ + if err1 != nil { log.Error(err1) return } } w := bufio.NewWriter(f) + skipFields := make([]string, 0) + if len(c.Config.SkipFields) > 0 { + //skip fields + if !strings.Contains(c.Config.SkipFields, ",") { + skipFields = append(skipFields, c.Config.SkipFields) + } else { + fields := strings.Split(c.Config.SkipFields, ",") + for _, field := range fields { + skipFields = append(skipFields, field) + } + } + } - READ_DOCS: +READ_DOCS: for { docI, open := <-c.DocChan // this check is in case the document is an error with scroll stuff @@ -106,15 +125,20 @@ func (c *Migrator) NewFileDumpWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) { break READ_DOCS } } + for _, key := range skipFields { + if _, found := docI[key]; found { + delete(docI, key) + } + } - jsr,err:=json.Marshal(docI) + jsr, err := json.Marshal(docI) log.Trace(string(jsr)) - if(err!=nil){ + if err != nil { log.Error(err) } - n,err:=w.WriteString(string(jsr)) - if(err!=nil){ - log.Error(n,err) + n, err := w.WriteString(string(jsr)) + if err != nil { + log.Error(n, err) } w.WriteString("\n") pb.Increment() @@ -125,12 +149,10 @@ func (c *Migrator) NewFileDumpWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) { } } - WORKER_DONE: +WORKER_DONE: w.Flush() f.Close() wg.Done() log.Debug("file dump finished") } - - diff --git a/go.mod b/go.mod index 2571f61..999f32d 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/fishjam/esm +module github.com/medcl/esm go 1.21.1 diff --git a/http.go b/http.go index 406474b..0204e64 100644 --- a/http.go +++ b/http.go @@ -25,38 +25,37 @@ import ( "fmt" log "github.com/cihub/seelog" "github.com/parnurzeal/gorequest" - "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp" "io" "net/http" "net/url" "strings" ) -func BasicAuth(req *fasthttp.Request,user,pass string) { - msg := fmt.Sprintf("%s:%s",user,pass) +func BasicAuth(req *fasthttp.Request, user, pass string) { + msg := fmt.Sprintf("%s:%s", user, pass) encoded := base64.StdEncoding.EncodeToString([]byte(msg)) req.Header.Add("Authorization", "Basic "+encoded) } -func Get(url string,auth *Auth,proxy string) (*http.Response, string, []error) { +func Get(url string, auth *Auth, proxy string) (*http.Response, string, []error) { request := gorequest.New() tr := &http.Transport{ - DisableKeepAlives: true, + DisableKeepAlives: true, DisableCompression: false, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } - request.Transport=tr + request.Transport = tr - - if(auth!=nil){ - request.SetBasicAuth(auth.User,auth.Pass) + if auth != nil { + request.SetBasicAuth(auth.User, auth.Pass) } //request.Type("application/json") - if(len(proxy)>0){ + if len(proxy) > 0 { request.Proxy(proxy) } @@ -65,35 +64,35 @@ func Get(url string,auth *Auth,proxy string) (*http.Response, string, []error) { } -func Post(url string,auth *Auth, body string,proxy string)(*http.Response, string, []error) { +func Post(url string, auth *Auth, body string, proxy string) (*http.Response, string, []error) { request := gorequest.New() tr := &http.Transport{ - DisableKeepAlives: true, + DisableKeepAlives: true, DisableCompression: false, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } - request.Transport=tr + request.Transport = tr - if(auth!=nil){ - request.SetBasicAuth(auth.User,auth.Pass) + if auth != nil { + request.SetBasicAuth(auth.User, auth.Pass) } //request.Type("application/json") - - if(len(proxy)>0){ + + if len(proxy) > 0 { request.Proxy(proxy) } request.Post(url) - if(len(body)>0) { + if len(body) > 0 { request.Send(body) } return request.End() } -func newDeleteRequest(client *http.Client,method, urlStr string) (*http.Request, error) { +func newDeleteRequest(client *http.Client, method, urlStr string) (*http.Request, error) { if method == "" { // We document that "" means "GET" for Request.Method, and people have // relied on that from NewRequest, so keep that working. @@ -117,23 +116,9 @@ func newDeleteRequest(client *http.Client,method, urlStr string) (*http.Request, return req, nil } -// -//func GzipHandler(req *http.Request) { -// var b bytes.Buffer -// var buf bytes.Buffer -// g := gzip.NewWriter(&buf) -// -// _, err := io.Copy(g, &b) -// if err != nil { -// panic(err) -// //slog.Error(err) -// return -// } -//} - -var client *http.Client=&http.Client{ +var client = &http.Client{ Transport: &http.Transport{ - DisableKeepAlives: true, + DisableKeepAlives: true, DisableCompression: false, TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, @@ -144,7 +129,7 @@ var fastHttpClient = &fasthttp.Client{ TLSConfig: &tls.Config{InsecureSkipVerify: true}, } -func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte,proxy string) (string,error) { +func DoRequest(compress bool, method string, loadUrl string, auth *Auth, body []byte, proxy string) (string, error) { req := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() @@ -161,46 +146,26 @@ func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte req.Header.Set("content-encoding", "gzip") } - if auth!=nil{ + if auth != nil { req.URI().SetUsername(auth.User) req.URI().SetPassword(auth.Pass) } - if len(body)>0{ - - //if compress { - // _, err := fasthttp.WriteGzipLevel(req.BodyWriter(), data.Bytes(), fasthttp.CompressBestSpeed) - // if err != nil { - // panic(err) - // } - //} else { - // //req.SetBody(body) - // req.SetBodyStreamWriter(func(w *bufio.Writer) { - // w.Write(data.Bytes()) - // w.Flush() - // }) - // - //} - - if compress{ + if len(body) > 0 { + if compress { _, err := fasthttp.WriteGzipLevel(req.BodyWriter(), body, fasthttp.CompressBestSpeed) if err != nil { panic(err) } - }else{ + } else { req.SetBody(body) - - //req.SetBodyStreamWriter(func(w *bufio.Writer) { - // w.Write(body) - // w.Flush() - //}) } } - err:=fastHttpClient.Do(req, resp) + err := fastHttpClient.Do(req, resp) if err != nil { - panic(err) + panic(err) } if resp == nil { panic("empty response") @@ -215,62 +180,40 @@ func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte //log.Error("received status code", resp.StatusCode, "from", string(resp.Header.Header()), "content", string(resp.Body()), req) } - - //if compress{ // data,err:= resp.BodyGunzip() // return string(data),err //} - return string(resp.Body()),nil + return string(resp.Body()), nil } - -func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)(string,error) { - - //TODO use global client - //client = &http.Client{} - // - //if(len(proxy)>0){ - // proxyURL, err := url.Parse(proxy) - // if(err!=nil){ - // log.Error(err) - // }else{ - // transport := &http.Transport{ - // Proxy: http.ProxyURL(proxyURL), - // DisableKeepAlives: true, - // DisableCompression: false, - // } - // client = &http.Client{Transport: transport} - // } - //} - // - //tr := &http.Transport{ - // DisableKeepAlives: true, - // DisableCompression: false, - // TLSClientConfig: &tls.Config{ - // InsecureSkipVerify: true, - //}, - //} - // - //client.Transport=tr +func Request(compress bool, method string, loadUrl string, auth *Auth, body *bytes.Buffer, proxy string) (string, error) { var err error var reqest *http.Request - if body!=nil { - reqest, err =http.NewRequest(method,r,body) - }else{ - reqest, err = newDeleteRequest(client,method,r) + if body != nil { + reqest, err = http.NewRequest(method, loadUrl, body) + } else { + reqest, err = newDeleteRequest(client, method, loadUrl) } - if err!=nil { + if err != nil { panic(err) } - if auth!=nil { - reqest.SetBasicAuth(auth.User,auth.Pass) + if auth != nil { + reqest.SetBasicAuth(auth.User, auth.Pass) } + oldTransport := client.Transport.(*http.Transport) + if len(proxy) > 0 { + proxyUrl := VerifyWithResult(url.Parse(proxy)).(*url.URL) + proxyFunc := http.ProxyURL(proxyUrl) + oldTransport.Proxy = proxyFunc + } else { + oldTransport.Proxy = nil + } reqest.Header.Set("Content-Type", "application/json") //enable gzip @@ -278,41 +221,40 @@ func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)( //GzipHandler(reqest) // - resp,errs := client.Do(reqest) + resp, errs := client.Do(reqest) if errs != nil { log.Error(SubString(errs.Error(), 0, 500)) - return "",errs + return "", errs } - if resp!=nil&& resp.Body!=nil{ + if resp != nil && resp.Body != nil { //io.Copy(ioutil.Discard, resp.Body) defer resp.Body.Close() } if resp.StatusCode != 200 { b, _ := io.ReadAll(resp.Body) - return "",errors.New("server error: "+string(b)) + return "", errors.New("server error: " + string(b)) } respBody, err := io.ReadAll(resp.Body) - log.Error(SubString(string(respBody), 0, 500)) + //log.Error(SubString(string(respBody), 0, 500)) if err != nil { log.Error(SubString(string(err.Error()), 0, 500)) - return string(respBody),err + return string(respBody), err } if err != nil { - return string(respBody),err + return string(respBody), err } io.Copy(io.Discard, resp.Body) defer resp.Body.Close() - return string(respBody),nil + return string(respBody), nil } -func DecodeJson(jsonStream string, o interface{})(error) { - +func DecodeJson(jsonStream string, o interface{}) error { decoder := json.NewDecoder(strings.NewReader(jsonStream)) // UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64. @@ -326,7 +268,7 @@ func DecodeJson(jsonStream string, o interface{})(error) { return nil } -func DecodeJsonBytes(jsonStream []byte, o interface{})(error) { +func DecodeJsonBytes(jsonStream []byte, o interface{}) error { decoder := json.NewDecoder(bytes.NewReader(jsonStream)) // UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64. decoder.UseNumber() diff --git a/log.go b/log.go index a4da2b0..96aa6fa 100644 --- a/log.go +++ b/log.go @@ -17,8 +17,8 @@ limitations under the License. package main import ( - "strings" log "github.com/cihub/seelog" + "strings" ) func setInitLogging(logLevel string) { @@ -36,7 +36,7 @@ func setInitLogging(logLevel string) { - + ` diff --git a/main.go b/main.go index 527bda3..d448fcc 100644 --- a/main.go +++ b/main.go @@ -2,14 +2,11 @@ package main import ( "bufio" - "encoding/json" - "fmt" "github.com/cheggaaa/pb" log "github.com/cihub/seelog" goflags "github.com/jessevdk/go-flags" "github.com/mattn/go-isatty" "io" - "io/ioutil" "net/http" _ "net/http/pprof" "os" @@ -21,7 +18,6 @@ import ( ) func main() { - runtime.GOMAXPROCS(runtime.NumCPU()) go func() { @@ -77,10 +73,30 @@ func main() { showBar = false } + if c.Sync { + //sync 功能时,只支持一个 index: + if len(c.SourceIndexNames) == 0 || len(c.TargetIndexName) == 0 { + log.Error("migration sync only support source 1 index to 1 target index") + return + } + migrator.SourceESAPI = migrator.ParseEsApi(true, c.SourceEs, c.SourceEsAuthStr, c.SourceProxy, c.Compress) + if migrator.SourceESAPI == nil { + log.Error("can not parse source es api") + return + } + migrator.TargetESAPI = migrator.ParseEsApi(false, c.TargetEs, c.TargetEsAuthStr, c.TargetProxy, false) + if migrator.TargetESAPI == nil { + log.Error("can not parse target es api") + return + } + migrator.SyncBetweenIndex(migrator.SourceESAPI, migrator.TargetESAPI, c) + return + } + //至少输出一次 if c.RepeatOutputTimes < 1 { - c.RepeatOutputTimes=1 - }else{ + c.RepeatOutputTimes = 1 + } else { log.Info("source data will repeat send to target: ", c.RepeatOutputTimes, " times, the document id will be regenerated.") } @@ -88,14 +104,14 @@ func main() { for i := 0; i < c.RepeatOutputTimes; i++ { - if c.RepeatOutputTimes>1 { + if c.RepeatOutputTimes > 1 { log.Info("repeat round: ", i+1) } // enough of a buffer to hold all the search results across all workers migrator.DocChan = make(chan map[string]interface{}, c.BufferCount) - var srcESVersion *ClusterVersion + //var srcESVersion *ClusterVersion // create a progressbar and start a docCount var outputBar *pb.ProgressBar = pb.New(1).Prefix("Output ") @@ -106,50 +122,13 @@ func main() { //dealing with input if len(c.SourceEs) > 0 { //dealing with basic auth - if len(c.SourceEsAuthStr) > 0 && strings.Contains(c.SourceEsAuthStr, ":") { - authArray := strings.Split(c.SourceEsAuthStr, ":") - auth := Auth{User: authArray[0], Pass: authArray[1]} - migrator.SourceAuth = &auth - } - //get source es version - srcESVersion, errs := migrator.ClusterVersion(c.SourceEs, migrator.SourceAuth, migrator.Config.SourceProxy) - if errs != nil { + migrator.SourceESAPI = migrator.ParseEsApi(true, c.SourceEs, c.SourceEsAuthStr, + migrator.Config.SourceProxy, c.Compress) + if migrator.SourceESAPI == nil { + log.Error("can not parse source es api") return } - if strings.HasPrefix(srcESVersion.Version.Number, "7.") { - log.Debug("source es is V7,", srcESVersion.Version.Number) - api := new(ESAPIV7) - api.Host = c.SourceEs - api.Compress=c.Compress - api.Auth = migrator.SourceAuth - api.HttpProxy = migrator.Config.SourceProxy - migrator.SourceESAPI = api - } else if strings.HasPrefix(srcESVersion.Version.Number, "6.") { - log.Debug("source es is V6,", srcESVersion.Version.Number) - api := new(ESAPIV6) - api.Compress=c.Compress - api.Host = c.SourceEs - api.Auth = migrator.SourceAuth - api.HttpProxy = migrator.Config.SourceProxy - migrator.SourceESAPI = api - } else if strings.HasPrefix(srcESVersion.Version.Number, "5.") { - log.Debug("source es is V5,", srcESVersion.Version.Number) - api := new(ESAPIV5) - api.Host = c.SourceEs - api.Compress=c.Compress - api.Auth = migrator.SourceAuth - api.HttpProxy = migrator.Config.SourceProxy - migrator.SourceESAPI = api - } else { - log.Debug("source es is not V5,", srcESVersion.Version.Number) - api := new(ESAPIV0) - api.Host = c.SourceEs - api.Compress=c.Compress - api.Auth = migrator.SourceAuth - api.HttpProxy = migrator.Config.SourceProxy - migrator.SourceESAPI = api - } if c.ScrollSliceSize < 1 { c.ScrollSliceSize = 1 @@ -158,19 +137,18 @@ func main() { totalSize := 0 finishedSlice := 0 for slice := 0; slice < c.ScrollSliceSize; slice++ { - scroll, err := migrator.SourceESAPI.NewScroll(c.SourceIndexNames, c.ScrollTime, c.DocBufferCount, c.Query, slice, c.ScrollSliceSize, c.Fields) + scroll, err := migrator.SourceESAPI.NewScroll(c.SourceIndexNames, c.ScrollTime, c.DocBufferCount, c.Query, + c.SortField, slice, c.ScrollSliceSize, c.Fields) if err != nil { log.Error(err) return } - temp := scroll.(ScrollAPI) + totalSize += scroll.GetHitsTotal() - totalSize += temp.GetHitsTotal() + if scroll.GetDocs() != nil { - if scroll != nil && temp.GetDocs() != nil { - - if temp.GetHitsTotal() == 0 { + if scroll.GetHitsTotal() == 0 { log.Error("can't find documents from source.") return } @@ -179,10 +157,10 @@ func main() { wg.Add(1) //process input // start scroll - temp.ProcessScrollResult(&migrator, fetchBar) + scroll.ProcessScrollResult(&migrator, fetchBar) // loop scrolling until done - for temp.Next(&migrator, fetchBar) == false { + for scroll.Next(&migrator, fetchBar) == false { } if showBar { @@ -255,46 +233,20 @@ func main() { migrator.TargetAuth = &auth } - //get target es version - descESVersion, errs := migrator.ClusterVersion(c.TargetEs, migrator.TargetAuth, migrator.Config.TargetProxy) - if errs != nil { + //get target es api + migrator.TargetESAPI = migrator.ParseEsApi(false, c.TargetEs, c.TargetEsAuthStr, + migrator.Config.TargetProxy, false) + if migrator.TargetESAPI == nil { + log.Error("can not parse target es api") return } - if strings.HasPrefix(descESVersion.Version.Number, "7.") { - log.Debug("target es is V7,", descESVersion.Version.Number) - api := new(ESAPIV7) - api.Host = c.TargetEs - api.Auth = migrator.TargetAuth - api.HttpProxy = migrator.Config.TargetProxy - migrator.TargetESAPI = api - } else if strings.HasPrefix(descESVersion.Version.Number, "6.") { - log.Debug("target es is V6,", descESVersion.Version.Number) - api := new(ESAPIV6) - api.Host = c.TargetEs - api.Auth = migrator.TargetAuth - api.HttpProxy = migrator.Config.TargetProxy - migrator.TargetESAPI = api - } else if strings.HasPrefix(descESVersion.Version.Number, "5.") { - log.Debug("target es is V5,", descESVersion.Version.Number) - api := new(ESAPIV5) - api.Host = c.TargetEs - api.Auth = migrator.TargetAuth - api.HttpProxy = migrator.Config.TargetProxy - migrator.TargetESAPI = api - } else { - log.Debug("target es is not V5,", descESVersion.Version.Number) - api := new(ESAPIV0) - api.Host = c.TargetEs - api.Auth = migrator.TargetAuth - api.HttpProxy = migrator.Config.TargetProxy - migrator.TargetESAPI = api - - } - log.Debug("start process with mappings") - if srcESVersion != nil && c.CopyIndexMappings && descESVersion.Version.Number[0] != srcESVersion.Version.Number[0] { - log.Error(srcESVersion.Version, "=>", descESVersion.Version, ",cross-big-version mapping migration not avaiable, please update mapping manually :(") + if c.CopyIndexMappings && + migrator.TargetESAPI.ClusterVersion().Version.Number[0] != migrator.SourceESAPI.ClusterVersion().Version.Number[0] { + log.Error(migrator.SourceESAPI.ClusterVersion().Version, "=>", + migrator.TargetESAPI.ClusterVersion().Version, + ",cross-big-version mapping migration not available, please update mapping manually :(") return } @@ -507,65 +459,3 @@ func main() { log.Info("data migration finished.") } - -func (c *Migrator) recoveryIndexSettings(sourceIndexRefreshSettings map[string]interface{}) { - //update replica and refresh_interval - for name, interval := range sourceIndexRefreshSettings { - tempIndexSettings := getEmptyIndexSettings() - tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["refresh_interval"] = interval - //tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_replicas"] = 1 - c.TargetESAPI.UpdateIndexSettings(name, tempIndexSettings) - if c.Config.Refresh { - c.TargetESAPI.Refresh(name) - } - } -} - -func (c *Migrator) ClusterVersion(host string, auth *Auth, proxy string) (*ClusterVersion, []error) { - - url := fmt.Sprintf("%s", host) - resp, body, errs := Get(url, auth, proxy) - - if resp != nil && resp.Body != nil { - io.Copy(ioutil.Discard, resp.Body) - defer resp.Body.Close() - } - - if errs != nil { - log.Error(errs) - return nil, errs - } - - log.Debug(body) - - version := &ClusterVersion{} - err := json.Unmarshal([]byte(body), version) - - if err != nil { - log.Error(body, errs) - return nil, errs - } - return version, nil -} - -func (c *Migrator) ClusterReady(api ESAPI) (*ClusterHealth, bool) { - health := api.ClusterHealth() - - if !c.Config.WaitForGreen { - return health, true - } - - if health.Status == "red" { - return health, false - } - - if c.Config.WaitForGreen == false && health.Status == "yellow" { - return health, true - } - - if health.Status == "green" { - return health, true - } - - return health, false -} diff --git a/migrator.go b/migrator.go new file mode 100644 index 0000000..9f948cd --- /dev/null +++ b/migrator.go @@ -0,0 +1,575 @@ +/* +Copyright 2016 Medcl (m AT medcl.net) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/cheggaaa/pb" + "io" + "io/ioutil" + "reflect" + "strings" + "sync" + "time" + + log "github.com/cihub/seelog" +) + +type BulkOperation uint8 + +const ( + opIndex BulkOperation = iota + opDelete +) + +func (op BulkOperation) String() string { + switch op { + case opIndex: + return "opIndex" + case opDelete: + return "opDelete" + default: + return fmt.Sprintf("unknown:%d", op) + } +} + +func (m *Migrator) recoveryIndexSettings(sourceIndexRefreshSettings map[string]interface{}) { + //update replica and refresh_interval + for name, interval := range sourceIndexRefreshSettings { + tempIndexSettings := getEmptyIndexSettings() + tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["refresh_interval"] = interval + //tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_replicas"] = 1 + m.TargetESAPI.UpdateIndexSettings(name, tempIndexSettings) + if m.Config.Refresh { + m.TargetESAPI.Refresh(name) + } + } +} + +func (m *Migrator) ClusterVersion(host string, auth *Auth, proxy string) (*ClusterVersion, []error) { + + url := fmt.Sprintf("%s", host) + resp, body, errs := Get(url, auth, proxy) + + if resp != nil && resp.Body != nil { + io.Copy(ioutil.Discard, resp.Body) + defer resp.Body.Close() + } + + if errs != nil { + log.Error(errs) + return nil, errs + } + + log.Debug(body) + + version := &ClusterVersion{} + err := json.Unmarshal([]byte(body), version) + + if err != nil { + log.Error(body, errs) + return nil, errs + } + return version, nil +} + +func (m *Migrator) ParseEsApi(isSource bool, host string, authStr string, proxy string, compress bool) ESAPI { + var auth *Auth = nil + if len(authStr) > 0 && strings.Contains(authStr, ":") { + authArray := strings.Split(authStr, ":") + auth = &Auth{User: authArray[0], Pass: authArray[1]} + if isSource { + m.SourceAuth = auth + } else { + m.TargetAuth = auth + } + } + + esVersion, errs := m.ClusterVersion(host, auth, proxy) + if errs != nil { + return nil + } + + esInfo := "dest" + if isSource { + esInfo = "source" + } + + log.Infof("%s es version: %s", esInfo, esVersion.Version.Number) + if strings.HasPrefix(esVersion.Version.Number, "7.") { + log.Debug("es is V7,", esVersion.Version.Number) + api := new(ESAPIV7) + api.Host = host + api.Compress = compress + api.Auth = auth + api.HttpProxy = proxy + api.Version = esVersion + return api + //migrator.SourceESAPI = api + } else if strings.HasPrefix(esVersion.Version.Number, "6.") { + log.Debug("es is V6,", esVersion.Version.Number) + api := new(ESAPIV6) + api.Host = host + api.Compress = compress + api.Auth = auth + api.HttpProxy = proxy + api.Version = esVersion + return api + //migrator.SourceESAPI = api + } else if strings.HasPrefix(esVersion.Version.Number, "5.") { + log.Debug("es is V5,", esVersion.Version.Number) + api := new(ESAPIV5) + api.Host = host + api.Compress = compress + api.Auth = auth + api.HttpProxy = proxy + api.Version = esVersion + return api + //migrator.SourceESAPI = api + } else { + log.Debug("es is not V5,", esVersion.Version.Number) + api := new(ESAPIV0) + api.Host = host + api.Compress = compress + api.Auth = auth + api.HttpProxy = proxy + api.Version = esVersion + return api + } +} + +func (m *Migrator) ClusterReady(api ESAPI) (*ClusterHealth, bool) { + health := api.ClusterHealth() + + if !m.Config.WaitForGreen { + return health, true + } + + if health.Status == "red" { + return health, false + } + + if m.Config.WaitForGreen == false && health.Status == "yellow" { + return health, true + } + + if health.Status == "green" { + return health, true + } + + return health, false +} + +func (m *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.WaitGroup) { + + log.Debug("start es bulk worker") + + bulkItemSize := 0 + mainBuf := bytes.Buffer{} + docBuf := bytes.Buffer{} + docEnc := json.NewEncoder(&docBuf) + + idleDuration := 5 * time.Second + idleTimeout := time.NewTimer(idleDuration) + defer idleTimeout.Stop() + + taskTimeOutDuration := 5 * time.Minute + taskTimeout := time.NewTimer(taskTimeOutDuration) + defer taskTimeout.Stop() + +READ_DOCS: + for { + idleTimeout.Reset(idleDuration) + taskTimeout.Reset(taskTimeOutDuration) + select { + case docI, open := <-m.DocChan: + var err error + log.Trace("read doc from channel,", docI) + // this check is in case the document is an error with scroll stuff + if status, ok := docI["status"]; ok { + if status.(int) == 404 { + log.Error("error: ", docI["response"]) + continue + } + } + + // sanity check + for _, key := range []string{"_index", "_type", "_source", "_id"} { + if _, ok := docI[key]; !ok { + break READ_DOCS + } + } + + var tempDestIndexName string + var tempTargetTypeName string + tempDestIndexName = docI["_index"].(string) + tempTargetTypeName = docI["_type"].(string) + + if m.Config.TargetIndexName != "" { + tempDestIndexName = m.Config.TargetIndexName + } + + if m.Config.OverrideTypeName != "" { + tempTargetTypeName = m.Config.OverrideTypeName + } + + doc := Document{ + Index: tempDestIndexName, + Type: tempTargetTypeName, + source: docI["_source"].(map[string]interface{}), + Id: docI["_id"].(string), + } + + if m.Config.RegenerateID { + doc.Id = "" + } + + if m.Config.RenameFields != "" { + kvs := strings.Split(m.Config.RenameFields, ",") + for _, i := range kvs { + fvs := strings.Split(i, ":") + oldField := strings.TrimSpace(fvs[0]) + newField := strings.TrimSpace(fvs[1]) + if oldField == "_type" { + doc.source[newField] = docI["_type"].(string) + } else { + v := doc.source[oldField] + doc.source[newField] = v + delete(doc.source, oldField) + } + } + } + + // add doc "_routing" if exists + if _, ok := docI["_routing"]; ok { + str, ok := docI["_routing"].(string) + if ok && str != "" { + doc.Routing = str + } + } + + // if channel is closed flush and gtfo + if !open { + goto WORKER_DONE + } + + // sanity check + if len(doc.Index) == 0 || len(doc.Type) == 0 { + log.Errorf("failed decoding document: %+v", doc) + continue + } + + // encode the doc and and the _source field for a bulk request + post := map[string]Document{ + "index": doc, + } + if err = docEnc.Encode(post); err != nil { + log.Error(err) + } + if err = docEnc.Encode(doc.source); err != nil { + log.Error(err) + } + + // append the doc to the main buffer + mainBuf.Write(docBuf.Bytes()) + // reset for next document + bulkItemSize++ + (*docCount)++ + docBuf.Reset() + + // if we approach the 100mb es limit, flush to es and reset mainBuf + if mainBuf.Len()+docBuf.Len() > (m.Config.BulkSizeInMB * 1024 * 1024) { + goto CLEAN_BUFFER + } + + case <-idleTimeout.C: + log.Debug("5s no message input") + goto CLEAN_BUFFER + case <-taskTimeout.C: + log.Warn("5m no message input, close worker") + goto WORKER_DONE + } + + goto READ_DOCS + + CLEAN_BUFFER: + m.TargetESAPI.Bulk(&mainBuf) + log.Trace("clean buffer, and execute bulk insert") + pb.Add(bulkItemSize) + bulkItemSize = 0 + if m.Config.SleepSecondsAfterEachBulk > 0 { + time.Sleep(time.Duration(m.Config.SleepSecondsAfterEachBulk) * time.Second) + } + } +WORKER_DONE: + if docBuf.Len() > 0 { + mainBuf.Write(docBuf.Bytes()) + bulkItemSize++ + } + m.TargetESAPI.Bulk(&mainBuf) + log.Trace("bulk insert") + pb.Add(bulkItemSize) + bulkItemSize = 0 + wg.Done() +} + +func (m *Migrator) bulkRecords(bulkOp BulkOperation, dstEsApi ESAPI, targetIndex string, targetType string, diffDocMaps map[string]interface{}) error { + //var err error + docCount := 0 + bulkItemSize := 0 + mainBuf := bytes.Buffer{} + docBuf := bytes.Buffer{} + docEnc := json.NewEncoder(&docBuf) + + //var tempDestIndexName string + //var tempTargetTypeName string + + for docId, docData := range diffDocMaps { + docI := docData.(map[string]interface{}) + log.Debugf("now will bulk %s docId=%s, docData=%+v", bulkOp, docId, docData) + //tempDestIndexName = docI["_index"].(string) + //tempTargetTypeName = docI["_type"].(string) + var strOperation string + doc := Document{ + Index: targetIndex, + Type: targetType, + Id: docId, // docI["_id"].(string), + } + + switch bulkOp { + case opIndex: + doc.source = docI // docI["_source"].(map[string]interface{}), + strOperation = "index" + case opDelete: + strOperation = "delete" + //do nothing + } + + // encode the doc and and the _source field for a bulk request + + post := map[string]Document{ + strOperation: doc, + } + _ = Verify(docEnc.Encode(post)) + if bulkOp == opIndex { + _ = Verify(docEnc.Encode(doc.source)) + } + // append the doc to the main buffer + mainBuf.Write(docBuf.Bytes()) + // reset for next document + bulkItemSize++ + docCount++ + docBuf.Reset() + } + + if mainBuf.Len() > 0 { + _ = Verify(dstEsApi.Bulk(&mainBuf)) + } + return nil +} + +func (m *Migrator) SyncBetweenIndex(srcEsApi ESAPI, dstEsApi ESAPI, cfg *Config) { + // _id => value + srcDocMaps := make(map[string]interface{}) + dstDocMaps := make(map[string]interface{}) + diffDocMaps := make(map[string]interface{}) + + srcRecordIndex := 0 + dstRecordIndex := 0 + var err error + srcType := "" + var srcScroll ScrollAPI = nil + var dstScroll ScrollAPI = nil + var emptyScroll = &EmptyScroll{} + lastSrcId := "" + lastDestId := "" + needScrollSrc := true + needScrollDest := true + + addCount := 0 + updateCount := 0 + deleteCount := 0 + + //TODO: 进度计算,分为 [ scroll src/dst + index ] => delete 几个部分 + srcBar := pb.New(1).Prefix("Progress") + //srcBar := pb.New(1).Prefix("Source") + //dstBar := pb.New(100).Prefix("Dest") + //pool, err := pb.StartPool(srcBar, dstBar) + + for { + if srcScroll == nil { + srcScroll, err = srcEsApi.NewScroll(cfg.SourceIndexNames, cfg.ScrollTime, cfg.DocBufferCount, cfg.Query, + cfg.SortField, 0, cfg.ScrollSliceSize, cfg.Fields) + if err != nil { + log.Infof("can not scroll for source index: %s, reason:%s", cfg.SourceIndexNames, err.Error()) + return + } + log.Infof("src total count=%d", srcScroll.GetHitsTotal()) + srcBar.Total = int64(srcScroll.GetHitsTotal()) + srcBar.Start() + } else if needScrollSrc { + srcScroll = VerifyWithResult(srcEsApi.NextScroll(cfg.ScrollTime, srcScroll.GetScrollId())).(ScrollAPI) + } + + if dstScroll == nil { + dstScroll, err = dstEsApi.NewScroll(cfg.TargetIndexName, cfg.ScrollTime, cfg.DocBufferCount, cfg.Query, + cfg.SortField, 0, cfg.ScrollSliceSize, cfg.Fields) + if err != nil { + log.Infof("can not scroll for dest index: %s, reason:%s", cfg.TargetIndexName, err.Error()) + //生成一个 empty 的, 相当于直接bulk? + dstScroll = emptyScroll + + //没有 dest index,以 src 的条数作为总数 + //dstBar.Total = int64(srcScroll.GetHitsTotal()) // = pb.New(srcScroll.GetHitsTotal()).Prefix("Dest") + } else { + //有 dest index, + //dstBar.Total = int64(dstScroll.GetHitsTotal()) // pb.New(dstScroll.GetHitsTotal()).Prefix("Dest") + } + //dstBar.Start() + log.Infof("dst total count=%d", dstScroll.GetHitsTotal()) + } else if needScrollDest { + dstScroll = VerifyWithResult(dstEsApi.NextScroll(cfg.ScrollTime, dstScroll.GetScrollId())).(ScrollAPI) + } + + //从目标 index 中查询,并放入 destMap, 如果没有则是空 + if needScrollDest { + for idx, dstDocI := range dstScroll.GetDocs() { + destId := dstDocI.(map[string]interface{})["_id"].(string) + dstSource := dstDocI.(map[string]interface{})["_source"] + lastDestId = destId + log.Debugf("dst [%d]: dstId=%s", dstRecordIndex+idx, destId) + + if srcSource, found := srcDocMaps[destId]; found { + delete(srcDocMaps, destId) + + //如果从 src 的 map 中找到匹配地项 + if !reflect.DeepEqual(srcSource, dstSource) { + //不相等, 则需要更新 + diffDocMaps[destId] = srcSource + updateCount++ + } else { + //完全相等, 则不需要处理 + } + } else { + dstDocMaps[destId] = dstSource + } + //dstBar.Increment() + } + dstRecordIndex += len(dstScroll.GetDocs()) + } + + //先将 src 的当前批次查出并放入 map + if needScrollSrc { + for idx, srcDocI := range srcScroll.GetDocs() { + srcId := srcDocI.(map[string]interface{})["_id"].(string) + srcSource := srcDocI.(map[string]interface{})["_source"] + srcType = srcDocI.(map[string]interface{})["_type"].(string) + lastSrcId = srcId + log.Debugf("src [%d]: srcId=%s", srcRecordIndex+idx, srcId) + + if len(lastDestId) == 0 { + //没有 destId, 表示 目标 index 中没有数据, 直接全部更新 + diffDocMaps[srcId] = srcSource + addCount++ + } else if dstSource, ok := dstDocMaps[srcId]; ok { //能从 dstDocMaps 中找到相同ID的数据 + if !reflect.DeepEqual(srcSource, dstSource) { + //不完全相同,需要更新,否则忽略 + diffDocMaps[srcId] = srcSource + updateCount++ + } + //从 dst 中删除相同的 + delete(dstDocMaps, srcId) + } else { + //找不到相同的 id, 可能是 dst 还没找到, 或者 dst 中不存在 + if srcId < lastDestId { + //dest 已经超过当前的 srcId, 表示 dst 中不存在 + diffDocMaps[srcId] = srcSource + addCount++ + } else { + srcDocMaps[srcId] = srcSource + } + } + srcBar.Increment() + } + srcRecordIndex += len(srcScroll.GetDocs()) + } + + if len(diffDocMaps) > 0 { + log.Debugf("now will bulk index %d records", len(diffDocMaps)) + _ = Verify(m.bulkRecords(opIndex, dstEsApi, cfg.TargetIndexName, srcType, diffDocMaps)) + diffDocMaps = make(map[string]interface{}) + } + + if lastSrcId == lastDestId { + needScrollSrc = true + needScrollDest = true + } else if len(lastDestId) == 0 || (lastSrcId < lastDestId || (needScrollDest == true && len(dstScroll.GetDocs()) == 0)) { + //上一次要求遍历 dest,但遍历出空 + needScrollSrc = true + needScrollDest = false + } else if lastSrcId > lastDestId || (needScrollSrc == true && len(srcScroll.GetDocs()) == 0) { + //上一次要求遍历 src, 但遍历出空 + needScrollSrc = false + needScrollDest = true + } else { + panic("TODO:") + } + + //如果 src 和 dst 都遍历完毕, 才退出 + log.Debugf("lastSrcId=%s, lastDestId=%s, "+ + "needScrollSrc=%t, len(srcScroll.GetDocs()=%d, "+ + "needScrollDest=%t, len(dstScroll.GetDocs())=%d", + lastSrcId, lastDestId, + needScrollSrc, len(srcScroll.GetDocs()), + needScrollDest, len(dstScroll.GetDocs())) + + if (!needScrollSrc || (len(srcScroll.GetDocs()) == 0 || len(srcScroll.GetDocs()) < cfg.DocBufferCount)) && + (!needScrollDest || (len(dstScroll.GetDocs()) == 0 || len(dstScroll.GetDocs()) < cfg.DocBufferCount)) { + log.Debugf("can not find more, will quit, and index %d, delete %d", len(srcDocMaps), len(dstDocMaps)) + + if len(srcDocMaps) > 0 { + addCount += len(srcDocMaps) + _ = Verify(m.bulkRecords(opIndex, dstEsApi, cfg.TargetIndexName, srcType, srcDocMaps)) + } + if len(dstDocMaps) > 0 { + //最后在 dst 中还有遗留的,表示 dst 中多的.需要删除 + deleteCount += len(dstDocMaps) + _ = Verify(m.bulkRecords(opDelete, dstEsApi, cfg.TargetIndexName, srcType, dstDocMaps)) + } + break + } + + //目标不存在 或 src 还没有查询到和 dest 一样的地方 + if cfg.SleepSecondsAfterEachBulk > 0 { + time.Sleep(time.Duration(cfg.SleepSecondsAfterEachBulk) * time.Second) + } + } + _ = Verify(srcEsApi.DeleteScroll(srcScroll.GetScrollId())) + _ = Verify(dstEsApi.DeleteScroll(dstScroll.GetScrollId())) + + srcBar.FinishPrint("Source End") + //dstBar.FinishPrint("Dest End") + //pool.Stop() + + log.Infof("sync %s(%d) to %s(%d), add=%d, update=%d, delete=%d", + cfg.SourceIndexNames, srcRecordIndex, cfg.TargetIndexName, dstRecordIndex, + addCount, updateCount, deleteCount) + + //log.Infof("diffDocMaps=%+v", diffDocMaps) +} diff --git a/scroll.go b/scroll.go index fb2978d..41f55db 100644 --- a/scroll.go +++ b/scroll.go @@ -22,45 +22,41 @@ import ( log "github.com/cihub/seelog" ) - -type ScrollAPI interface{ - GetScrollId()string - GetHitsTotal()int +type ScrollAPI interface { + GetScrollId() string + GetHitsTotal() int GetDocs() []interface{} ProcessScrollResult(c *Migrator, bar *pb.ProgressBar) Next(c *Migrator, bar *pb.ProgressBar) (done bool) } - -func (scroll *Scroll) GetHitsTotal()int{ +func (scroll *Scroll) GetHitsTotal() int { return scroll.Hits.Total } -func (scroll *Scroll) GetScrollId()string{ +func (scroll *Scroll) GetScrollId() string { return scroll.ScrollId } -func (scroll *Scroll) GetDocs()[]interface{}{ +func (scroll *Scroll) GetDocs() []interface{} { return scroll.Hits.Docs } -func (scroll *ScrollV7) GetHitsTotal()int{ +func (scroll *ScrollV7) GetHitsTotal() int { return scroll.Hits.Total.Value } - -func (scroll *ScrollV7) GetScrollId()string{ +func (scroll *ScrollV7) GetScrollId() string { return scroll.ScrollId } -func (scroll *ScrollV7) GetDocs()[]interface{}{ +func (scroll *ScrollV7) GetDocs() []interface{} { return scroll.Hits.Docs } - // Stream from source es instance. "done" is an indicator that the stream is // over -func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){ +func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar) { //update progress bar bar.Add(len(s.Hits.Docs)) @@ -79,29 +75,29 @@ func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){ func (s *Scroll) Next(c *Migrator, bar *pb.ProgressBar) (done bool) { - scroll,err:=c.SourceESAPI.NextScroll(c.Config.ScrollTime,s.ScrollId) + scroll, err := c.SourceESAPI.NextScroll(c.Config.ScrollTime, s.ScrollId) if err != nil { log.Error(err) return false } - docs:=scroll.(ScrollAPI).GetDocs() + docs := scroll.GetDocs() if docs == nil || len(docs) <= 0 { log.Debug("scroll result is empty") return true } - scroll.(ScrollAPI).ProcessScrollResult(c,bar) + scroll.ProcessScrollResult(c, bar) //update scrollId - s.ScrollId=scroll.(ScrollAPI).GetScrollId() + s.ScrollId = scroll.GetScrollId() return } // Stream from source es instance. "done" is an indicator that the stream is // over -func (s *ScrollV7) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){ +func (s *ScrollV7) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar) { //update progress bar bar.Add(len(s.Hits.Docs)) @@ -120,24 +116,47 @@ func (s *ScrollV7) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){ func (s *ScrollV7) Next(c *Migrator, bar *pb.ProgressBar) (done bool) { - scroll,err:=c.SourceESAPI.NextScroll(c.Config.ScrollTime,s.ScrollId) + scroll, err := c.SourceESAPI.NextScroll(c.Config.ScrollTime, s.ScrollId) if err != nil { log.Error(err) return false } - docs:=scroll.(ScrollAPI).GetDocs() + docs := scroll.GetDocs() if docs == nil || len(docs) <= 0 { log.Debug("scroll result is empty") return true } - scroll.(ScrollAPI).ProcessScrollResult(c,bar) + scroll.ProcessScrollResult(c, bar) //update scrollId - s.ScrollId=scroll.(ScrollAPI).GetScrollId() + s.ScrollId = scroll.GetScrollId() return } +// 返回空,从而在 compare + bulk 时有相同的处理逻辑 +type EmptyScroll struct { + Dummy int +} + +func (es *EmptyScroll) GetScrollId() string { + return "" +} +func (es *EmptyScroll) GetHitsTotal() int { + return 0 +} + +func (es *EmptyScroll) GetDocs() []interface{} { + return make([]interface{}, 0) +} + +func (es *EmptyScroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar) { + +} + +func (es *EmptyScroll) Next(c *Migrator, bar *pb.ProgressBar) (done bool) { + return true +} diff --git a/utils.go b/utils.go index 2378817..570b99b 100644 --- a/utils.go +++ b/utils.go @@ -3,17 +3,17 @@ package main import "fmt" func SubString(prim string, start int, end int) string { - if len(prim) == 0 { - fmt.Println("primitive str is empty") - } - if l := len(prim); l < end { - end = l - } + if len(prim) == 0 { + fmt.Println("primitive str is empty") + } + if l := len(prim); l < end { + end = l + } - value := prim - runes := []rune(value) + value := prim + runes := []rune(value) - safeSubString := string(runes[start:end]) + safeSubString := string(runes[start:end]) - return safeSubString + return safeSubString } diff --git a/v0.go b/v0.go index 40b0ee1..e810207 100644 --- a/v0.go +++ b/v0.go @@ -17,374 +17,403 @@ limitations under the License. package main import ( - "bytes" - "encoding/json" - "errors" - "fmt" - log "github.com/cihub/seelog" - "io" - "io/ioutil" - "regexp" - "strings" + "bytes" + "encoding/json" + "errors" + "fmt" + log "github.com/cihub/seelog" + "io" + "io/ioutil" + "regexp" + "strings" ) type ESAPIV0 struct { - Host string //eg: http://localhost:9200 - Auth *Auth //eg: user:pass - HttpProxy string //eg: http://proxyIp:proxyPort - Compress bool + Host string //eg: http://localhost:9200 + Auth *Auth //eg: user:pass + HttpProxy string //eg: http://proxyIp:proxyPort + Compress bool + Version *ClusterVersion } - func (s *ESAPIV0) ClusterHealth() *ClusterHealth { - url := fmt.Sprintf("%s/_cluster/health", s.Host) - r, body, errs := Get(url, s.Auth,s.HttpProxy) + url := fmt.Sprintf("%s/_cluster/health", s.Host) + r, body, errs := Get(url, s.Auth, s.HttpProxy) + + if r != nil && r.Body != nil { + io.Copy(ioutil.Discard, r.Body) + defer r.Body.Close() + } - if r!=nil&& r.Body!=nil{ - io.Copy(ioutil.Discard, r.Body) - defer r.Body.Close() - } + if errs != nil { + return &ClusterHealth{Name: s.Host, Status: "unreachable"} + } - if errs != nil { - return &ClusterHealth{Name: s.Host, Status: "unreachable"} - } + log.Debug(url) + log.Debug(body) - log.Debug(url) - log.Debug(body) + health := &ClusterHealth{} + err := json.Unmarshal([]byte(body), health) - health := &ClusterHealth{} - err := json.Unmarshal([]byte(body), health) + if err != nil { + log.Error(body) + return &ClusterHealth{Name: s.Host, Status: "unreachable"} + } + return health +} - if err != nil { - log.Error(body) - return &ClusterHealth{Name: s.Host, Status: "unreachable"} - } - return health +func (s *ESAPIV0) ClusterVersion() *ClusterVersion { + return s.Version } -func (s *ESAPIV0) Bulk(data *bytes.Buffer) { - if data == nil || data.Len() == 0 { - log.Trace("data is empty, skip") - return - } - data.WriteRune('\n') - url := fmt.Sprintf("%s/_bulk", s.Host) - - body,err:=DoRequest(s.Compress,"POST",url,s.Auth,data.Bytes(),s.HttpProxy) - - if err != nil { - log.Error(err) - return - } - response:=BulkResponse{} - err=DecodeJson(body, &response) - if err == nil { - if response.Errors{ - fmt.Println(body) - } - } - - data.Reset() +func (s *ESAPIV0) Bulk(data *bytes.Buffer) error { + if data == nil || data.Len() == 0 { + log.Trace("data is empty, skip") + return nil + } + data.WriteRune('\n') + url := fmt.Sprintf("%s/_bulk", s.Host) + + body, err := Request(s.Compress, "POST", url, s.Auth, data, s.HttpProxy) + + if err != nil { + data.Reset() + log.Error(err) + return err + } + response := BulkResponse{} + err = DecodeJson(body, &response) + if err == nil { + if response.Errors { + log.Warnf("bulk error:%s", body) + } + } + + data.Reset() + return err } func (s *ESAPIV0) GetIndexSettings(indexNames string) (*Indexes, error) { - // get all settings - allSettings := &Indexes{} + // get all settings + allSettings := &Indexes{} - url := fmt.Sprintf("%s/%s/_settings", s.Host, indexNames) - resp, body, errs := Get(url, s.Auth,s.HttpProxy) + url := fmt.Sprintf("%s/%s/_settings", s.Host, indexNames) + resp, body, errs := Get(url, s.Auth, s.HttpProxy) - if resp!=nil&& resp.Body!=nil{ - io.Copy(ioutil.Discard, resp.Body) - defer resp.Body.Close() - } + if resp != nil && resp.Body != nil { + io.Copy(ioutil.Discard, resp.Body) + defer resp.Body.Close() + } - if errs != nil { - return nil, errs[0] - } + if errs != nil { + return nil, errs[0] + } - if resp.StatusCode != 200 { - return nil, errors.New(body) - } + if resp.StatusCode != 200 { + return nil, errors.New(body) + } - log.Debug(body) + log.Debug(body) - err := json.Unmarshal([]byte(body), allSettings) - if err != nil { - panic(err) - return nil, err - } + err := json.Unmarshal([]byte(body), allSettings) + if err != nil { + panic(err) + return nil, err + } - return allSettings, nil + return allSettings, nil } func (s *ESAPIV0) GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) { - url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames) - resp, body, errs := Get(url, s.Auth,s.HttpProxy) - - if resp!=nil&& resp.Body!=nil{ - io.Copy(ioutil.Discard, resp.Body) - defer resp.Body.Close() - } - - if errs != nil { - log.Error(errs) - return "", 0, nil, errs[0] - } - - - if resp.StatusCode != 200 { - return "", 0, nil, errors.New(body) - } - - idxs := Indexes{} - er := json.Unmarshal([]byte(body), &idxs) - - if er != nil { - log.Error(body) - return "", 0, nil, er - } - - // remove indexes that start with . if user asked for it - //if copyAllIndexes == false { - // for name := range idxs { - // switch name[0] { - // case '.': - // delete(idxs, name) - // case '_': - // delete(idxs, name) - // -// -// } -// } -// } - - // if _all indexes limit the list of indexes to only these that we kept - // after looking at mappings - if indexNames == "_all" { - - var newIndexes []string - for name := range idxs { - newIndexes = append(newIndexes, name) - } - indexNames = strings.Join(newIndexes, ",") - - } else if strings.Contains(indexNames, "*") || strings.Contains(indexNames, "?") { - - r, _ := regexp.Compile(indexNames) - - //check index patterns - var newIndexes []string - for name := range idxs { - matched := r.MatchString(name) - if matched { - newIndexes = append(newIndexes, name) - } - } - indexNames = strings.Join(newIndexes, ",") - - } - - i := 0 - // wrap in mappings if moving from super old es - for name, idx := range idxs { - i++ - if _, ok := idx.(map[string]interface{})["mappings"]; !ok { - (idxs)[name] = map[string]interface{}{ - "mappings": idx, - } - } - } - - return indexNames, i, &idxs, nil + url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames) + resp, body, errs := Get(url, s.Auth, s.HttpProxy) + + if resp != nil && resp.Body != nil { + io.Copy(ioutil.Discard, resp.Body) + defer resp.Body.Close() + } + + if errs != nil { + log.Error(errs) + return "", 0, nil, errs[0] + } + + if resp.StatusCode != 200 { + return "", 0, nil, errors.New(body) + } + + idxs := Indexes{} + er := json.Unmarshal([]byte(body), &idxs) + + if er != nil { + log.Error(body) + return "", 0, nil, er + } + + // remove indexes that start with . if user asked for it + //if copyAllIndexes == false { + // for name := range idxs { + // switch name[0] { + // case '.': + // delete(idxs, name) + // case '_': + // delete(idxs, name) + // + // + // } + // } + // } + + // if _all indexes limit the list of indexes to only these that we kept + // after looking at mappings + if indexNames == "_all" { + + var newIndexes []string + for name := range idxs { + newIndexes = append(newIndexes, name) + } + indexNames = strings.Join(newIndexes, ",") + + } else if strings.Contains(indexNames, "*") || strings.Contains(indexNames, "?") { + + r, _ := regexp.Compile(indexNames) + + //check index patterns + var newIndexes []string + for name := range idxs { + matched := r.MatchString(name) + if matched { + newIndexes = append(newIndexes, name) + } + } + indexNames = strings.Join(newIndexes, ",") + + } + + i := 0 + // wrap in mappings if moving from super old es + for name, idx := range idxs { + i++ + if _, ok := idx.(map[string]interface{})["mappings"]; !ok { + (idxs)[name] = map[string]interface{}{ + "mappings": idx, + } + } + } + + return indexNames, i, &idxs, nil } func getEmptyIndexSettings() map[string]interface{} { - tempIndexSettings := map[string]interface{}{} - tempIndexSettings["settings"] = map[string]interface{}{} - tempIndexSettings["settings"].(map[string]interface{})["index"] = map[string]interface{}{} - return tempIndexSettings + tempIndexSettings := map[string]interface{}{} + tempIndexSettings["settings"] = map[string]interface{}{} + tempIndexSettings["settings"].(map[string]interface{})["index"] = map[string]interface{}{} + return tempIndexSettings } func cleanSettings(settings map[string]interface{}) { - //clean up settings - delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "creation_date") - delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "uuid") - delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "version") - delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "provided_name") + //clean up settings + delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "creation_date") + delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "uuid") + delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "version") + delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "provided_name") } func (s *ESAPIV0) UpdateIndexSettings(name string, settings map[string]interface{}) error { - log.Debug("update index: ", name, settings) - cleanSettings(settings) - url := fmt.Sprintf("%s/%s/_settings", s.Host, name) - - if _, ok := settings["settings"].(map[string]interface{})["index"]; ok { - if set, ok := settings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"]; ok { - log.Debug("update static index settings: ", name) - staticIndexSettings := getEmptyIndexSettings() - staticIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"] = set - Post(fmt.Sprintf("%s/%s/_close", s.Host, name), s.Auth, "",s.HttpProxy) - body := bytes.Buffer{} - enc := json.NewEncoder(&body) - enc.Encode(staticIndexSettings) - bodyStr, err := Request("PUT", url, s.Auth, &body,s.HttpProxy) - if err != nil { - log.Error(bodyStr, err) - panic(err) - return err - } - delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "analysis") - Post(fmt.Sprintf("%s/%s/_open", s.Host, name), s.Auth, "",s.HttpProxy) - } - } - - log.Debug("update dynamic index settings: ", name) - - body := bytes.Buffer{} - enc := json.NewEncoder(&body) - enc.Encode(settings) - _, err := Request("PUT", url, s.Auth, &body,s.HttpProxy) - - return err + log.Debug("update index: ", name, settings) + cleanSettings(settings) + url := fmt.Sprintf("%s/%s/_settings", s.Host, name) + + if _, ok := settings["settings"].(map[string]interface{})["index"]; ok { + if set, ok := settings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"]; ok { + log.Debug("update static index settings: ", name) + staticIndexSettings := getEmptyIndexSettings() + staticIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"] = set + Request(false, "POST", fmt.Sprintf("%s/%s/_close", s.Host, name), s.Auth, nil, s.HttpProxy) + //Post(fmt.Sprintf("%s/%s/_close", s.Host, name), s.Auth, "", s.HttpProxy) + body := bytes.Buffer{} + enc := json.NewEncoder(&body) + enc.Encode(staticIndexSettings) + bodyStr, err := Request(s.Compress, "PUT", url, s.Auth, &body, s.HttpProxy) + if err != nil { + log.Error(bodyStr, err) + panic(err) + return err + } + delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "analysis") + Request(false, "POST", fmt.Sprintf("%s/%s/_open", s.Host, name), s.Auth, nil, s.HttpProxy) + //Post(fmt.Sprintf("%s/%s/_open", s.Host, name), s.Auth, "", s.HttpProxy) + } + } + + log.Debug("update dynamic index settings: ", name) + + body := bytes.Buffer{} + enc := json.NewEncoder(&body) + enc.Encode(settings) + _, err := Request(s.Compress, "PUT", url, s.Auth, &body, s.HttpProxy) + + return err } func (s *ESAPIV0) UpdateIndexMapping(indexName string, settings map[string]interface{}) error { - log.Debug("start update mapping: ", indexName,settings) + log.Debug("start update mapping: ", indexName, settings) - for name, mapping := range settings { + for name, mapping := range settings { - log.Debug("start update mapping: ", indexName,name,mapping) + log.Debug("start update mapping: ", indexName, name, mapping) - url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name) + url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name) - body := bytes.Buffer{} - enc := json.NewEncoder(&body) - enc.Encode(mapping) - res, err := Request("POST", url, s.Auth, &body,s.HttpProxy) - if(err!=nil){ - log.Error(url) - log.Error(body.String()) - log.Error(err,res) - panic(err) - } - } - return nil + body := bytes.Buffer{} + enc := json.NewEncoder(&body) + enc.Encode(mapping) + res, err := Request(s.Compress, "POST", url, s.Auth, &body, s.HttpProxy) + if err != nil { + log.Error(url) + log.Error(body.String()) + log.Error(err, res) + panic(err) + } + } + return nil } func (s *ESAPIV0) DeleteIndex(name string) (err error) { - log.Debug("start delete index: ", name) + log.Debug("start delete index: ", name) - url := fmt.Sprintf("%s/%s", s.Host, name) + url := fmt.Sprintf("%s/%s", s.Host, name) - Request("DELETE", url, s.Auth, nil,s.HttpProxy) + Request(s.Compress, "DELETE", url, s.Auth, nil, s.HttpProxy) - log.Debug("delete index: ", name) + log.Debug("delete index: ", name) - return nil + return nil } func (s *ESAPIV0) CreateIndex(name string, settings map[string]interface{}) (err error) { - cleanSettings(settings) + cleanSettings(settings) - body := bytes.Buffer{} - enc := json.NewEncoder(&body) - enc.Encode(settings) - log.Debug("start create index: ", name, settings) + body := bytes.Buffer{} + enc := json.NewEncoder(&body) + enc.Encode(settings) + log.Debug("start create index: ", name, settings) - url := fmt.Sprintf("%s/%s", s.Host, name) + url := fmt.Sprintf("%s/%s", s.Host, name) - resp, err := Request("PUT", url, s.Auth, &body,s.HttpProxy) - log.Debugf("response: %s",resp) + resp, err := Request(s.Compress, "PUT", url, s.Auth, &body, s.HttpProxy) + log.Debugf("response: %s", resp) - return err + return err } func (s *ESAPIV0) Refresh(name string) (err error) { + log.Debug("refresh index: ", name) - log.Debug("refresh index: ", name) + url := fmt.Sprintf("%s/%s/_refresh", s.Host, name) - url := fmt.Sprintf("%s/%s/_refresh", s.Host, name) + resp, err := Request(false, "POST", url, s.Auth, nil, s.HttpProxy) + log.Infof("refresh resp=%s, err=%+v", resp, err) + //resp, _, _ := Post(url, s.Auth, "", s.HttpProxy) + //if resp != nil && resp.Body != nil { + // io.Copy(ioutil.Discard, resp.Body) + // defer resp.Body.Close() + //} - resp,_,_:=Post(url,s.Auth,"",s.HttpProxy) - if resp!=nil&& resp.Body!=nil{ - io.Copy(ioutil.Discard, resp.Body) - defer resp.Body.Close() - } + return nil +} - return nil +func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string, + slicedId int, maxSlicedCount int, fields string) (scroll ScrollAPI, err error) { + + // curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50' + url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) + + var jsonBody []byte + if len(query) > 0 || len(fields) > 0 { + queryBody := map[string]interface{}{} + if len(fields) > 0 { + if !strings.Contains(fields, ",") { + queryBody["_source"] = fields + } else { + queryBody["_source"] = strings.Split(fields, ",") + } + } + + if len(query) > 0 { + queryBody["query"] = map[string]interface{}{} + queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{} + queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query + } + + if len(sort) > 0 { + sortFields := make([]string, 0) + sortFields = append(sortFields, sort) + queryBody["sort"] = sortFields + } + + jsonBody, err = json.Marshal(queryBody) + if err != nil { + log.Error(err) + return nil, err + } + + } + //resp, body, errs := Post(url, s.Auth,jsonBody,s.HttpProxy) + body, err := Request(s.Compress, "POST", url, s.Auth, bytes.NewBuffer(jsonBody), s.HttpProxy) + if err != nil { + log.Error(err) + return nil, err + } + + scroll = &Scroll{} + err = DecodeJson(body, scroll) + if err != nil { + log.Error(err) + return nil, err + } + + return scroll, err } -func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int,query string, slicedId,maxSlicedCount int, fields string) (scroll interface{}, err error) { - - // curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50' - url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) - - var jsonBody []byte - if len(query) > 0 || len(fields) > 0 { - queryBody := map[string]interface{}{} - if len(fields) > 0 { - if !strings.Contains(fields, ",") { - queryBody["_source"] = fields - } else { - queryBody["_source"] = strings.Split(fields, ",") - } - } - - if len(query) > 0 { - queryBody["query"] = map[string]interface{}{} - queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{} - queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query - } - - jsonBody, err = json.Marshal(queryBody) - if err != nil { - log.Error(err) - return nil, err - } - - } - //resp, body, errs := Post(url, s.Auth,jsonBody,s.HttpProxy) - body, err := DoRequest(s.Compress,"POST",url, s.Auth,jsonBody,s.HttpProxy) - if err != nil { - log.Error(err) - return nil, err - } - - scroll = &Scroll{} - err = DecodeJson(body, scroll) - if err != nil { - log.Error(err) - return nil, err - } - - return scroll, err +func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) { + // curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m' + id := bytes.NewBufferString(scrollId) + url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) + body, err := Request(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy) + + if err != nil { + log.Error(err) + return nil, err + } + + // decode elasticsearch scroll response + scroll := &Scroll{} + err = DecodeJson(body, &scroll) + if err != nil { + log.Error(err) + return nil, err + } + + return scroll, nil } -func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (interface{}, error) { - // curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m' - id := bytes.NewBufferString(scrollId) - url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) - body,err:=DoRequest(s.Compress,"GET",url,s.Auth,nil,s.HttpProxy) - - if err != nil { - log.Error(err) - return nil, err - } - - // decode elasticsearch scroll response - scroll := &Scroll{} - err = DecodeJson(body, &scroll) - if err != nil { - log.Error(err) - return nil, err - } - - return scroll, nil +func (s *ESAPIV0) DeleteScroll(scrollId string) error { + id := bytes.NewBufferString(scrollId) + url := fmt.Sprintf("%s/_search/scroll?scroll_id=%s", s.Host, id) + if len(scrollId) > 0 { + _, err := Request(false, "DELETE", url, s.Auth, nil, s.HttpProxy) + if err != nil { + log.Error(err) + return err + } + //log.Infof("delete scroll, result=%s", body) + } + return nil } diff --git a/v5.go b/v5.go index 3158aae..6c34faa 100644 --- a/v5.go +++ b/v5.go @@ -17,120 +17,88 @@ limitations under the License. package main import ( - "bytes" - "encoding/json" - "fmt" - log "github.com/cihub/seelog" - "strings" + "bytes" + "encoding/json" + "fmt" + log "github.com/cihub/seelog" + "strings" ) -type ESAPIV5 struct{ - ESAPIV0 +type ESAPIV5 struct { + ESAPIV0 } -func (s *ESAPIV5) ClusterHealth() *ClusterHealth { - return s.ESAPIV0.ClusterHealth() +func (s *ESAPIV5) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string, + slicedId int, maxSlicedCount int, fields string) (scroll ScrollAPI, err error) { + url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) + + var jsonBody []byte + if len(query) > 0 || maxSlicedCount > 0 || len(fields) > 0 { + queryBody := map[string]interface{}{} + + if len(fields) > 0 { + if !strings.Contains(fields, ",") { + queryBody["_source"] = fields + } else { + queryBody["_source"] = strings.Split(fields, ",") + } + } + + if len(query) > 0 { + queryBody["query"] = map[string]interface{}{} + queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{} + queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query + } + + if len(sort) > 0 { + sortFields := make([]string, 0) + sortFields = append(sortFields, sort) + queryBody["sort"] = sortFields + } + + if maxSlicedCount > 1 { + log.Tracef("sliced scroll, %d of %d", slicedId, maxSlicedCount) + queryBody["slice"] = map[string]interface{}{} + queryBody["slice"].(map[string]interface{})["id"] = slicedId + queryBody["slice"].(map[string]interface{})["max"] = maxSlicedCount + } + + jsonBody, err = json.Marshal(queryBody) + if err != nil { + log.Error(err) + } + } + + body, err := Request(s.Compress, "POST", url, s.Auth, bytes.NewBuffer(jsonBody), s.HttpProxy) + if err != nil { + log.Error(err) + return nil, err + } + + scroll = &Scroll{} + err = DecodeJson(body, scroll) + if err != nil { + log.Error(err) + return nil, err + } + + return scroll, err } -func (s *ESAPIV5) Bulk(data *bytes.Buffer){ - s.ESAPIV0.Bulk(data) -} - -func (s *ESAPIV5) GetIndexSettings(indexNames string) (*Indexes,error){ - return s.ESAPIV0.GetIndexSettings(indexNames) -} - -func (s *ESAPIV5) UpdateIndexSettings(indexName string,settings map[string]interface{})(error){ - return s.ESAPIV0.UpdateIndexSettings(indexName,settings) -} - -func (s *ESAPIV5) GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error){ - return s.ESAPIV0.GetIndexMappings(copyAllIndexes,indexNames) -} - -func (s *ESAPIV5) UpdateIndexMapping(indexName string,settings map[string]interface{}) error { - return s.ESAPIV0.UpdateIndexMapping(indexName,settings) -} - -func (s *ESAPIV5) DeleteIndex(name string) (err error) { - return s.ESAPIV0.DeleteIndex(name) -} - -func (s *ESAPIV5) CreateIndex(name string,settings map[string]interface{}) (err error) { - return s.ESAPIV0.CreateIndex(name,settings) -} - - - -func (s *ESAPIV5) Refresh(name string) (err error) { - return s.ESAPIV0.Refresh(name) -} - -func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(scroll interface{}, err error){ - url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount) - - var jsonBody []byte - if(len(query)>0||maxSlicedCount>0||len(fields)>0) { - queryBody := map[string]interface{}{} - - - if len(fields) > 0 { - if !strings.Contains(fields, ",") { - queryBody["_source"] = fields - } else { - queryBody["_source"] = strings.Split(fields, ",") - } - } - - if(len(query)>0){ - queryBody["query"] = map[string]interface{}{} - queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{} - queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query - } - - if(maxSlicedCount>1){ - log.Tracef("sliced scroll, %d of %d",slicedId,maxSlicedCount) - queryBody["slice"] = map[string]interface{}{} - queryBody["slice"].(map[string]interface{})["id"] = slicedId - queryBody["slice"].(map[string]interface{})["max"]= maxSlicedCount - } - - jsonBody, err = json.Marshal(queryBody) - if err != nil { - log.Error(err) - } - } - - body, err := DoRequest(s.Compress,"POST",url, s.Auth,jsonBody,s.HttpProxy) - if err != nil { - log.Error(err) - return nil, err - } - - scroll = &Scroll{} - err = DecodeJson(body,scroll) - if err != nil { - log.Error(err) - return nil,err - } - - return scroll,err -} - -func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(interface{},error) { - id := bytes.NewBufferString(scrollId) +func (s *ESAPIV5) NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) { + id := bytes.NewBufferString(scrollId) - url:=fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) + url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) - body,err:=DoRequest(s.Compress,"GET",url,s.Auth,nil,s.HttpProxy) + body, err := Request(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy) - // decode elasticsearch scroll response - scroll := &Scroll{} - err= DecodeJson(body, &scroll) - if err != nil { - log.Error(err) - return nil,err - } + // decode elasticsearch scroll response + scroll := &Scroll{} + err = DecodeJson(body, &scroll) + if err != nil { + log.Error(err) + return nil, err + } - return scroll,nil + return scroll, nil } diff --git a/v6.go b/v6.go index 3f31e77..a3f15bb 100644 --- a/v6.go +++ b/v6.go @@ -24,8 +24,8 @@ import ( log "github.com/cihub/seelog" "io" "io/ioutil" - "strings" "regexp" + "strings" //"infini.sh/framework/core/util" ) @@ -33,7 +33,8 @@ type ESAPIV6 struct { ESAPIV5 } -func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, slicedId, maxSlicedCount int, fields string) (scroll interface{}, err error) { +func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string, + slicedId int, maxSlicedCount int, fields string) (scroll ScrollAPI, err error) { url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) var jsonBody []byte @@ -54,6 +55,12 @@ func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query } + if len(sort) > 0 { + sortFields := make([]string, 0) + sortFields = append(sortFields, sort) + queryBody["sort"] = sortFields + } + if maxSlicedCount > 1 { log.Tracef("sliced scroll, %d of %d", slicedId, maxSlicedCount) queryBody["slice"] = map[string]interface{}{} @@ -68,7 +75,7 @@ func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount } } - body, err := DoRequest(s.Compress,"POST",url, s.Auth,jsonBody,s.HttpProxy) + body, err := Request(s.Compress, "POST", url, s.Auth, bytes.NewBuffer(jsonBody), s.HttpProxy) if err != nil { log.Error(err) return nil, err @@ -84,11 +91,11 @@ func (s *ESAPIV6) NewScroll(indexNames string, scrollTime string, docBufferCount return scroll, err } -func (s *ESAPIV6) NextScroll(scrollTime string, scrollId string) (interface{}, error) { +func (s *ESAPIV6) NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) { id := bytes.NewBufferString(scrollId) url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) - body,err:=DoRequest(s.Compress,"GET",url,s.Auth,nil,s.HttpProxy) + body, err := Request(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy) // decode elasticsearch scroll response scroll := &Scroll{} @@ -101,21 +108,11 @@ func (s *ESAPIV6) NextScroll(scrollTime string, scrollId string) (interface{}, e return scroll, nil } - -func (s *ESAPIV6) GetIndexSettings(indexNames string) (*Indexes,error){ - return s.ESAPIV0.GetIndexSettings(indexNames) -} - -func (s *ESAPIV6) UpdateIndexSettings(indexName string,settings map[string]interface{})(error){ - return s.ESAPIV0.UpdateIndexSettings(indexName,settings) -} - - func (s *ESAPIV6) GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) { url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames) - resp, body, errs := Get(url, s.Auth,s.HttpProxy) + resp, body, errs := Get(url, s.Auth, s.HttpProxy) - if resp!=nil&& resp.Body!=nil{ + if resp != nil && resp.Body != nil { io.Copy(ioutil.Discard, resp.Body) defer resp.Body.Close() } @@ -125,7 +122,6 @@ func (s *ESAPIV6) GetIndexMappings(copyAllIndexes bool, indexNames string) (stri return "", 0, nil, errs[0] } - if resp.StatusCode != 200 { return "", 0, nil, errors.New(body) } @@ -178,29 +174,26 @@ func (s *ESAPIV6) GetIndexMappings(copyAllIndexes bool, indexNames string) (stri return indexNames, i, &idxs, nil } - -func (s *ESAPIV6) UpdateIndexMapping(indexName string,settings map[string]interface{}) error { +func (s *ESAPIV6) UpdateIndexMapping(indexName string, settings map[string]interface{}) error { log.Debug("start update mapping: ", indexName, settings) - delete(settings,"dynamic_templates") - - + delete(settings, "dynamic_templates") for name, _ := range settings { - log.Debug("start update mapping: ", indexName,", ",settings) + log.Debug("start update mapping: ", indexName, ", ", settings) - url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName,name) + url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name) body := bytes.Buffer{} enc := json.NewEncoder(&body) enc.Encode(settings) - res, err := Request("POST", url, s.Auth, &body,s.HttpProxy) - if(err!=nil){ + res, err := Request(s.Compress, "POST", url, s.Auth, &body, s.HttpProxy) + if err != nil { log.Error(url) log.Error(settings) - log.Error(err,res) + log.Error(err, res) panic(err) } } diff --git a/v7.go b/v7.go index c3179a7..2a6aeb0 100644 --- a/v7.go +++ b/v7.go @@ -24,15 +24,16 @@ import ( log "github.com/cihub/seelog" "io" "io/ioutil" - "strings" "regexp" + "strings" ) type ESAPIV7 struct { ESAPIV6 } -func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, slicedId, maxSlicedCount int, fields string) (scroll interface{}, err error) { +func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string, + slicedId int, maxSlicedCount int, fields string) (scroll ScrollAPI, err error) { url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) jsonBody := "" @@ -53,6 +54,12 @@ func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query } + if len(sort) > 0 { + sortFields := make([]string, 0) + sortFields = append(sortFields, sort) + queryBody["sort"] = sortFields + } + if maxSlicedCount > 1 { log.Tracef("sliced scroll, %d of %d", slicedId, maxSlicedCount) queryBody["slice"] = map[string]interface{}{} @@ -63,34 +70,35 @@ func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount jsonArray, err := json.Marshal(queryBody) if err != nil { log.Error(err) - + return nil, err } else { jsonBody = string(jsonArray) } } - resp, body, errs := Post(url, s.Auth, jsonBody, s.HttpProxy) + body, errs := Request(false, "POST", url, s.Auth, bytes.NewBufferString(jsonBody), s.HttpProxy) + //resp, body, errs := Post(url, s.Auth, jsonBody, s.HttpProxy) - if resp != nil && resp.Body != nil { - io.Copy(ioutil.Discard, resp.Body) - defer resp.Body.Close() - } + //if resp != nil && resp.Body != nil { + // io.Copy(ioutil.Discard, resp.Body) + // defer resp.Body.Close() + //} if errs != nil { - log.Error(errs) - return nil, errs[0] + //log.Error(errs) + return nil, errs } - if resp.StatusCode != 200 { - return nil, errors.New(body) - } + //if resp.StatusCode != 200 { + // return nil, errors.New(body) + //} log.Trace("new scroll,", body) - if err != nil { - log.Error(err) - return nil, err - } + //if err != nil { + // log.Error(err) + // return nil, err + //} scroll = &ScrollV7{} err = DecodeJson(body, scroll) @@ -102,11 +110,11 @@ func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount return scroll, err } -func (s *ESAPIV7) NextScroll(scrollTime string, scrollId string) (interface{}, error) { +func (s *ESAPIV7) NextScroll(scrollTime string, scrollId string) (ScrollAPI, error) { id := bytes.NewBufferString(scrollId) url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) - body,err:=DoRequest(s.Compress,"GET",url,s.Auth,nil,s.HttpProxy) + body, err := Request(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy) if err != nil { //log.Error(errs) @@ -123,21 +131,11 @@ func (s *ESAPIV7) NextScroll(scrollTime string, scrollId string) (interface{}, e return scroll, nil } - -func (s *ESAPIV7) GetIndexSettings(indexNames string) (*Indexes,error){ - return s.ESAPIV0.GetIndexSettings(indexNames) -} - -func (s *ESAPIV7) UpdateIndexSettings(indexName string,settings map[string]interface{})(error){ - return s.ESAPIV0.UpdateIndexSettings(indexName,settings) -} - - func (s *ESAPIV7) GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) { url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames) - resp, body, errs := Get(url, s.Auth,s.HttpProxy) + resp, body, errs := Get(url, s.Auth, s.HttpProxy) - if resp!=nil&& resp.Body!=nil{ + if resp != nil && resp.Body != nil { io.Copy(ioutil.Discard, resp.Body) defer resp.Body.Close() } @@ -147,7 +145,6 @@ func (s *ESAPIV7) GetIndexMappings(copyAllIndexes bool, indexNames string) (stri return "", 0, nil, errs[0] } - if resp.StatusCode != 200 { return "", 0, nil, errors.New(body) } @@ -201,29 +198,28 @@ func (s *ESAPIV7) GetIndexMappings(copyAllIndexes bool, indexNames string) (stri return indexNames, i, &idxs, nil } - -func (s *ESAPIV7) UpdateIndexMapping(indexName string,settings map[string]interface{}) error { +func (s *ESAPIV7) UpdateIndexMapping(indexName string, settings map[string]interface{}) error { log.Debug("start update mapping: ", indexName, settings) - delete(settings,"dynamic_templates") + delete(settings, "dynamic_templates") //for name, mapping := range settings { - log.Debug("start update mapping: ", indexName,", ",settings) + log.Debug("start update mapping: ", indexName, ", ", settings) - url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexName) + url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexName) - body := bytes.Buffer{} - enc := json.NewEncoder(&body) - enc.Encode(settings) - res, err := Request("POST", url, s.Auth, &body,s.HttpProxy) - if(err!=nil){ - log.Error(url) - log.Error(body.String()) - log.Error(err,res) - panic(err) - } + body := bytes.Buffer{} + enc := json.NewEncoder(&body) + enc.Encode(settings) + res, err := Request(s.Compress, "POST", url, s.Auth, &body, s.HttpProxy) + if err != nil { + log.Error(url) + log.Error(body.String()) + log.Error(err, res) + panic(err) + } //} return nil } diff --git a/verify.go b/verify.go new file mode 100644 index 0000000..a0b4e03 --- /dev/null +++ b/verify.go @@ -0,0 +1,68 @@ +package main + +import ( + "fmt" + log "github.com/cihub/seelog" + "path/filepath" + "reflect" + "runtime" + "strings" +) + +type CheckErrorAction int + +const ( + ACTION_FATAL_QUIT CheckErrorAction = iota + ACTION_LOG_ERROR +) + +//Notice: +// 1. when dev, set to ACTION_FATAL_QUIT, so can check error quickly, +// then can add error logical for the place that once thought could not go wrong +// 2. when released, set to ACTION_LOG_ERROR, so just log error + +var verifyAction = ACTION_FATAL_QUIT + +// GetCallStackInfo return fileName, lineNo, funName +func GetCallStackInfo(skip int) (string, int, string) { + var funName string + pc, fileName, lineNo, ok := runtime.Caller(skip) + if ok { + funName = strings.TrimPrefix(filepath.Ext(runtime.FuncForPC(pc).Name()), ".") + } else { + funName = "" + fileName, lineNo = "", -1 + } + return fileName, lineNo, funName +} + +// skip 表示跳过几个调用堆栈, 获取真正有意义的代码调用位置 +func checkAndHandleError(err error, msg string, action CheckErrorAction, skip int) { + if err != nil { + fileName, lineNo, funName := GetCallStackInfo(skip) + + switch action { + case ACTION_FATAL_QUIT: + log.Errorf("%s:%d: (%s) FAIL(%s), msg=%s", fileName, lineNo, funName, reflect.TypeOf(err).String(), msg) + //log.Fatalf("") //"error at: %s:%d, msg=%s, err=%s", fileName, lineNo, msg, err) + panic(fmt.Sprintf("error at: %s:%d, msg=%s, err=%s", fileName, lineNo, msg, err)) + case ACTION_LOG_ERROR: + log.Warnf("%s:%d: (%s) FAIL(%s), msg=%s", fileName, lineNo, funName, reflect.TypeOf(err).String(), msg) + //flog.Infof("error at: %s:%d, msg=%s, err=%s", fileName, lineNo, msg, err) + } + } +} + +func Verify(err error) error { + if err != nil { + checkAndHandleError(err, err.Error(), verifyAction, 3) + } + return err +} + +func VerifyWithResult(result interface{}, err error) interface{} { + if err != nil { + checkAndHandleError(err, err.Error(), verifyAction, 3) + } + return result +}