From 7dc95a9762b1d2818580a6c5e25bb5249a50a9d1 Mon Sep 17 00:00:00 2001 From: medcl Date: Thu, 5 Sep 2019 11:17:24 +0800 Subject: [PATCH] support migrate 7.x to 2.x --- README.md | 6 +++ domain.go | 14 ++++++ esapi.go | 4 +- main.go | 32 ++++++++++--- scroll.go | 97 +++++++++++++++++++++++++++++++++++++-- v0.go | 5 +- v5.go | 4 +- v7.go | 135 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 281 insertions(+), 16 deletions(-) create mode 100644 v7.go diff --git a/README.md b/README.md index 55778fb..d9f3485 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,12 @@ migrate 5.x to 6.x and unify all the types to `doc` ``` +migrate 2.x to 7.x and rename `_type` to `type` +``` +./esm -s http://localhost:9201 -x "source" -y "target" -d https://localhost:9200 --rename="_type:type,age:myage" -u"_doc" + +``` + ## Download https://github.com/medcl/elasticsearch-dump/releases diff --git a/domain.go b/domain.go index ec683e0..ca3d5c8 100644 --- a/domain.go +++ b/domain.go @@ -28,6 +28,7 @@ type Document struct { Routing string `json:"_routing,omitempty"` } + type Scroll struct { Took int `json:"took,omitempty"` ScrollId string `json:"_scroll_id,omitempty"` @@ -40,6 +41,7 @@ type Scroll struct { Shards struct { Total int `json:"total,omitempty"` Successful int `json:"successful,omitempty"` + Skipped int `json:"skipped,omitempty"` Failed int `json:"failed,omitempty"` Failures []struct { Shard int `json:"shard,omitempty"` @@ -50,6 +52,18 @@ type Scroll struct { } `json:"_shards,omitempty"` } +type ScrollV7 struct { + Scroll + Hits struct { + MaxScore float32 `json:"max_score,omitempty"` + Total struct{ + Value int `json:"value,omitempty"` + Relation string `json:"relation,omitempty"` + } `json:"total,omitempty"` + Docs []interface{} `json:"hits,omitempty"` + } `json:"hits"` +} + type ClusterVersion struct { Name string `json:"name,omitempty"` ClusterName string `json:"cluster_name,omitempty"` diff --git a/esapi.go b/esapi.go index e10724e..5697891 100644 --- a/esapi.go +++ b/esapi.go @@ -27,7 +27,7 @@ type ESAPI interface{ GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error) UpdateIndexSettings(indexName string,settings map[string]interface{})(error) UpdateIndexMapping(indexName string,mappings map[string]interface{})(error) - NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(*Scroll, error) - NextScroll(scrollTime string,scrollId string)(*Scroll,error) + NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(interface{}, error) + NextScroll(scrollTime string,scrollId string)(interface{},error) Refresh(name string) (err error) } diff --git a/main.go b/main.go index d031f17..64fe844 100644 --- a/main.go +++ b/main.go @@ -75,7 +75,14 @@ func main() { if errs != nil { return } - if strings.HasPrefix(srcESVersion.Version.Number, "6.") { + if strings.HasPrefix(srcESVersion.Version.Number, "7.") { + log.Debug("source es is V7,", srcESVersion.Version.Number) + api := new(ESAPIV7) + api.Host = c.SourceEs + api.Auth = migrator.SourceAuth + api.HttpProxy=migrator.Config.SourceProxy + migrator.SourceESAPI = api + }else if strings.HasPrefix(srcESVersion.Version.Number, "6.") { log.Debug("source es is V6,", srcESVersion.Version.Number) api := new(ESAPIV5) api.Host = c.SourceEs @@ -110,11 +117,14 @@ func main() { log.Error(err) return } - totalSize+=scroll.Hits.Total - if scroll != nil && scroll.Hits.Docs != nil { + temp:=scroll.(ScrollAPI) + + totalSize+=temp.GetHitsTotal() + + if scroll != nil && temp.GetDocs() != nil { - if scroll.Hits.Total == 0 { + if temp.GetHitsTotal() == 0 { log.Error("can't find documents from source.") return } @@ -124,10 +134,10 @@ func main() { wg.Add(1) //process input // start scroll - scroll.ProcessScrollResult(&migrator, fetchBar) + temp.ProcessScrollResult(&migrator, fetchBar) // loop scrolling until done - for scroll.Next(&migrator, fetchBar) == false { + for temp.Next(&migrator, fetchBar) == false { } fetchBar.Finish() // finished, close doc chan and wait for goroutines to be done @@ -198,7 +208,14 @@ func main() { return } - if strings.HasPrefix(descESVersion.Version.Number, "6.") { + if strings.HasPrefix(descESVersion.Version.Number, "7.") { + log.Debug("target es is V7,", descESVersion.Version.Number) + api := new(ESAPIV7) + api.Host = c.TargetEs + api.Auth = migrator.TargetAuth + api.HttpProxy=migrator.Config.TargetProxy + migrator.TargetESAPI = api + }else if strings.HasPrefix(descESVersion.Version.Number, "6.") { log.Debug("target es is V6,", descESVersion.Version.Number) api := new(ESAPIV5) api.Host = c.TargetEs @@ -484,3 +501,4 @@ func (c *Migrator) ClusterReady(api ESAPI) (*ClusterHealth, bool) { return health, false } + diff --git a/scroll.go b/scroll.go index 7867f2c..ff751d0 100644 --- a/scroll.go +++ b/scroll.go @@ -23,6 +23,50 @@ import ( ) +type ScrollAPI interface{ + GetScrollId()string + GetHitsTotal()int + GetDocs() []interface{} + ProcessScrollResult(c *Migrator, bar *pb.ProgressBar) + Next(c *Migrator, bar *pb.ProgressBar) (done bool) +} + + +func (scroll *Scroll) GetHitsTotal()int{ + //fmt.Println("total v0:",scroll.Hits.Total) + return scroll.Hits.Total +} + +func (scroll *Scroll) GetScrollId()string{ + return scroll.ScrollId +} + +func (scroll *Scroll) GetDocs()[]interface{}{ + + //fmt.Println("docs v0:",scroll.Hits) + + return scroll.Hits.Docs +} + +func (scroll *ScrollV7) GetHitsTotal()int{ + //fmt.Println("total v7:",scroll.Hits.Total.Value) + + return scroll.Hits.Total.Value +} + + +func (scroll *ScrollV7) GetScrollId()string{ + return scroll.ScrollId +} + +func (scroll *ScrollV7) GetDocs()[]interface{}{ + + //fmt.Println("docs v7:",scroll.Hits) + + return scroll.Hits.Docs +} + + // Stream from source es instance. "done" is an indicator that the stream is // over func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){ @@ -38,6 +82,7 @@ func (s *Scroll) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){ // write all the docs into a channel for _, docI := range s.Hits.Docs { + //fmt.Println(docI) c.DocChan <- docI.(map[string]interface{}) } } @@ -50,16 +95,62 @@ func (s *Scroll) Next(c *Migrator, bar *pb.ProgressBar) (done bool) { return false } - if scroll.Hits.Docs == nil || len(scroll.Hits.Docs) <= 0 { + docs:=scroll.(ScrollAPI).GetDocs() + if docs == nil || len(docs) <= 0 { + log.Debug("scroll result is empty") + return true + } + + scroll.(ScrollAPI).ProcessScrollResult(c,bar) + + //update scrollId + s.ScrollId=scroll.(ScrollAPI).GetScrollId() + + return +} + + + +// Stream from source es instance. "done" is an indicator that the stream is +// over +func (s *ScrollV7) ProcessScrollResult(c *Migrator, bar *pb.ProgressBar){ + + //update progress bar + bar.Add(len(s.Hits.Docs)) + + // show any failures + for _, failure := range s.Shards.Failures { + reason, _ := json.Marshal(failure.Reason) + log.Errorf(string(reason)) + } + + // write all the docs into a channel + for _, docI := range s.Hits.Docs { + //fmt.Println(docI) + c.DocChan <- docI.(map[string]interface{}) + } +} + +func (s *ScrollV7) Next(c *Migrator, bar *pb.ProgressBar) (done bool) { + + scroll,err:=c.SourceESAPI.NextScroll(c.Config.ScrollTime,s.ScrollId) + if err != nil { + log.Error(err) + return false + } + + docs:=scroll.(ScrollAPI).GetDocs() + if docs == nil || len(docs) <= 0 { log.Debug("scroll result is empty") return true } - scroll.ProcessScrollResult(c,bar) + scroll.(ScrollAPI).ProcessScrollResult(c,bar) //update scrollId - s.ScrollId=scroll.ScrollId + s.ScrollId=scroll.(ScrollAPI).GetScrollId() return } + diff --git a/v0.go b/v0.go index 1c6cb46..1bd0751 100644 --- a/v0.go +++ b/v0.go @@ -63,6 +63,7 @@ func (s *ESAPIV0) ClusterHealth() *ClusterHealth { func (s *ESAPIV0) Bulk(data *bytes.Buffer) { if data == nil || data.Len() == 0 { + log.Error("data is empty, skip") return } data.WriteRune('\n') @@ -327,7 +328,7 @@ func (s *ESAPIV0) Refresh(name string) (err error) { return nil } -func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int,query string, slicedId,maxSlicedCount int, fields string) (scroll *Scroll, err error) { +func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int,query string, slicedId,maxSlicedCount int, fields string) (scroll interface{}, err error) { // curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50' url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) @@ -392,7 +393,7 @@ func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount return scroll, err } -func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (*Scroll, error) { +func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (interface{}, error) { // curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m' id := bytes.NewBufferString(scrollId) url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) diff --git a/v5.go b/v5.go index 499d598..4ec69a7 100644 --- a/v5.go +++ b/v5.go @@ -66,7 +66,7 @@ func (s *ESAPIV5) Refresh(name string) (err error) { return s.ESAPIV0.Refresh(name) } -func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(scroll *Scroll, err error){ +func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(scroll interface{}, err error){ url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount) jsonBody:="" @@ -138,7 +138,7 @@ func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount i return scroll,err } -func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(*Scroll,error) { +func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(interface{},error) { id := bytes.NewBufferString(scrollId) url:=fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) diff --git a/v7.go b/v7.go new file mode 100644 index 0000000..0a75794 --- /dev/null +++ b/v7.go @@ -0,0 +1,135 @@ +/* +Copyright 2016 Medcl (m AT medcl.net) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + log "github.com/cihub/seelog" + "io" + "io/ioutil" + "strings" +) + +type ESAPIV7 struct { + ESAPIV5 +} + +func (s *ESAPIV7) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, slicedId, maxSlicedCount int, fields string) (scroll interface{}, err error) { + url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) + + jsonBody := "" + if len(query) > 0 || maxSlicedCount > 0 || len(fields) > 0 { + queryBody := map[string]interface{}{} + + if len(fields) > 0 { + if !strings.Contains(fields, ",") { + log.Error("The fields shoud be seraprated by ,") + return nil, errors.New("") + } else { + queryBody["_source"] = strings.Split(fields, ",") + } + } + + if len(query) > 0 { + queryBody["query"] = map[string]interface{}{} + queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{} + queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query + } + + if maxSlicedCount > 1 { + log.Tracef("sliced scroll, %d of %d", slicedId, maxSlicedCount) + queryBody["slice"] = map[string]interface{}{} + queryBody["slice"].(map[string]interface{})["id"] = slicedId + queryBody["slice"].(map[string]interface{})["max"] = maxSlicedCount + } + + jsonArray, err := json.Marshal(queryBody) + if err != nil { + log.Error(err) + + } else { + jsonBody = string(jsonArray) + } + } + + resp, body, errs := Post(url, s.Auth, jsonBody, s.HttpProxy) + + if resp != nil && resp.Body != nil { + io.Copy(ioutil.Discard, resp.Body) + defer resp.Body.Close() + } + + if errs != nil { + log.Error(errs) + return nil, errs[0] + } + + if resp.StatusCode != 200 { + return nil, errors.New(body) + } + + log.Trace("new scroll,", body) + + if err != nil { + log.Error(err) + return nil, err + } + + scroll = &ScrollV7{} + err = json.Unmarshal([]byte(body), scroll) + if err != nil { + log.Error(err) + return nil, err + } + + return scroll, err +} + +func (s *ESAPIV7) NextScroll(scrollTime string, scrollId string) (interface{}, error) { + id := bytes.NewBufferString(scrollId) + + url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) + resp, body, errs := Get(url, s.Auth, s.HttpProxy) + + if resp != nil && resp.Body != nil { + io.Copy(ioutil.Discard, resp.Body) + defer resp.Body.Close() + } + + if errs != nil { + log.Error(errs) + return nil, errs[0] + } + + if resp.StatusCode != 200 { + return nil, errors.New(body) + } + + // decode elasticsearch scroll response + scroll := &ScrollV7{} + err := json.Unmarshal([]byte(body), &scroll) + if err != nil { + log.Error(body) + log.Error(err) + return nil, err + } + + return scroll, nil +}