diff --git a/README.md b/README.md index 1f3dac9..7d27fd8 100644 --- a/README.md +++ b/README.md @@ -29,19 +29,17 @@ https://github.com/medcl/elasticsearch-dump/releases ``` -s, --source= source elasticsearch instance -d, --dest= destination elasticsearch instance - -c, --count= number of documents at a time: ie "size" in the scroll request (100) + -c, --count= number of documents at a time: ie "size" in the scroll request (8000) -t, --time= scroll time (1m) -f, --force delete destination index before copying (false) --shards= set a number of shards on newly created indexes - --docs-only load documents only, do not try to recreate indexes (false) - --index-only only create indexes, do not load documents (false) - --replicate enable replication while indexing into the new indexes (false) - -i, --indexes= list of indexes to copy, comma separated (_all) + --index-only only create indexes, do not load documents (true) + -x, --src_indexes= list of indexes to copy, comma separated (_all), support wildcard match(*) + -y, --dest_index= indexes name to save, allow only one indexname, original indexname will be used if not specified -a, --all copy indexes starting with . and _ (false) -w, --workers= concurrency (1) - --settings copy sharding settings from source (true) - --green wait for both hosts cluster status to be green before move. otherwise yellow is okay (false) - -b bulk_size bulk size in MB" default:100 + -b bulk_size bulk size in MB" default:5 + -v log setting log level,options:trace,debug,info,warn,error ``` @@ -52,17 +50,6 @@ From | To -----------|----------- 2.x | 2.x 2.x | 5.0 - - -## NOTES: - -1. Has been tested getting data from 0.9 onto a 1.4 box. For other scenaries YMMV. (look out for this bug: https://github.com/elasticsearch/elasticsearch/issues/5165) -1. Copies using the [_source](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-source-field.html) field in elasticsearch. If you have made modifications to it (excluding fields, etc) they will not be indexed on the destination host. -1. ```--force``` will delete indexes on the destination host. Otherwise an error will be returned if the index exists -1. ```--time``` is the [scroll time](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-search-context) passed to the source host, default is 1m. This is a string in es's format. -1. ```--count``` is the [number of documents](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-scan) that will be request and bulk indexed at a time. Note that this depends on the number of shards (ie: size of 10 on 5 shards is 50 documents) -1. ```--indexes``` is a comma separated list of indexes to copy -1. ```--all``` indexes starting with . and _ are ignored by default, --all overrides this behavior -1. ```--workers``` concurrency when we post to the bulk api. Only one post happens at a time, but higher concurrency should give you more throughput when using larger scroll sizes. -1. Ports are required, otherwise 80 is the assumed port (what) +5.0 | 2.x +5.0 | 5.0 diff --git a/domain.go b/domain.go index 1c608f9..00abdd3 100644 --- a/domain.go +++ b/domain.go @@ -73,18 +73,18 @@ type Config struct { // config options SrcEs string `short:"s" long:"source" description:"source elasticsearch instance" required:"true"` DstEs string `short:"d" long:"dest" description:"destination elasticsearch instance" required:"true"` - DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"1000"` + DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"5000"` ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"` Destructive bool `short:"f" long:"force" description:"delete destination index before copying"` ShardsCount int `long:"shards" description:"set a number of shards on newly created indexes"` - IndexDocsOnly bool `long:"index_docs_only" description:"index documents only, do not try to recreate indexes"` + IndexDocsOnly bool `long:"index_docs_only" description:"index documents only, do not try to recreate indexes" default:"true"` CreateIndexesOnly bool `long:"create_index_only" description:"only create indexes, do not load documents"` EnableReplication bool `long:"replicate" description:"enable replication while indexing into the new indexes" default:"false"` SrcIndexNames string `short:"x" long:"src_indexes" description:"indexes name to copy,support regex and comma separated list" default:"_all"` - DestIndexNames string `short:"y" long:"dest_indexe" description:"indexes name to save, comma separated list, original indexname will be used if not specified" default:""` + DestIndexName string `short:"y" long:"dest_index" description:"indexes name to save, allow only one indexname, original indexname will be used if not specified" default:""` CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _"` Workers int `short:"w" long:"workers" description:"concurrency" default:"1"` - BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"100"` + BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"` CopyIndexSettings bool `long:"copy_settings" description:"copy index settings from source" default:"false"` WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay"` LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"` diff --git a/main.go b/main.go index 73b7405..535702f 100644 --- a/main.go +++ b/main.go @@ -43,11 +43,13 @@ func main() { return } - if strings.HasPrefix("5.", srcESVersion.Version.Number) { + if strings.HasPrefix(srcESVersion.Version.Number,"5.") { + log.Debug("src es is V5,",srcESVersion.Version.Number) api:=new(ESAPIV5) api.Host=c.SrcEs c.SrcESAPI = api } else { + log.Debug("src es is not V5,",srcESVersion.Version.Number) api:=new(ESAPIV0) api.Host=c.SrcEs c.SrcESAPI = api @@ -59,11 +61,13 @@ func main() { return } - if strings.HasPrefix("5.", descESVersion.Version.Number) { + if strings.HasPrefix(descESVersion.Version.Number,"5.") { + log.Debug("dest es is V5,",descESVersion.Version.Number) api:=new(ESAPIV5) api.Host=c.DstEs c.DescESAPI = api } else { + log.Debug("dest es is not V5,",descESVersion.Version.Number) api:=new(ESAPIV0) api.Host=c.DstEs c.DescESAPI = api @@ -168,6 +172,8 @@ func main() { go c.NewWorker(&docCount, bulkBar, &wg) } + scroll.ProcessScrollResult(&c,fetchBar) + // loop scrolling until done for scroll.Next(&c, fetchBar) == false { } @@ -214,6 +220,23 @@ func setInitLogging(logLevel string) { // Stream from source es instance. "done" is an indicator that the stream is // over +func (s *Scroll) ProcessScrollResult(c *Config, bar *pb.ProgressBar){ + + //update progress bar + bar.Add(len(s.Hits.Docs)) + + // show any failures + for _, failure := range s.Shards.Failures { + reason, _ := json.Marshal(failure.Reason) + log.Errorf(string(reason)) + } + + // write all the docs into a channel + for _, docI := range s.Hits.Docs { + c.DocChan <- docI.(map[string]interface{}) + } +} + func (s *Scroll) Next(c *Config, bar *pb.ProgressBar) (done bool) { scroll,err:=c.SrcESAPI.NextScroll(c.ScrollTime,s.ScrollId) @@ -227,34 +250,7 @@ func (s *Scroll) Next(c *Config, bar *pb.ProgressBar) (done bool) { return true } - // XXX this might be bad, but assume we are done - /* - switch resp.StatusCode { - case 200: - break - case 404: - // this may indicate bug - c.ErrChan <- fmt.Errorf("looks like we moved all we could...") - default: - c.ErrChan <- fmt.Errorf("scroll response: %s", stream) - // flush and quit - return true - } - */ - - //update progress bar - bar.Add(len(scroll.Hits.Docs)) - - // show any failures - for _, failure := range scroll.Shards.Failures { - reason, _ := json.Marshal(failure.Reason) - log.Errorf(string(reason)) - } - - // write all the docs into a channel - for _, docI := range scroll.Hits.Docs { - c.DocChan <- docI.(map[string]interface{}) - } + scroll.ProcessScrollResult(c,bar) //update scrollId s.ScrollId=scroll.ScrollId @@ -294,8 +290,8 @@ READ_DOCS: var tempDestIndexName string tempDestIndexName = docI["_index"].(string) - if c.DestIndexNames != "" { - tempDestIndexName = c.DestIndexNames + if c.DestIndexName != "" { + tempDestIndexName = c.DestIndexName } doc := Document{ diff --git a/v0.go b/v0.go index 4054978..446da64 100644 --- a/v0.go +++ b/v0.go @@ -143,16 +143,28 @@ func (s *ESAPIV0) NewScroll(indexNames string,scrollTime string,docBufferCount i url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) resp, err := http.Get(url) if err != nil { - return + log.Error(err) + return nil,err } defer resp.Body.Close() - dec := json.NewDecoder(resp.Body) + body,err:=ioutil.ReadAll(resp.Body) + + log.Debug("new scroll,",string(body)) + + if err != nil { + log.Error(err) + return nil,err + } scroll = &Scroll{} - err = dec.Decode(scroll) + err = json.Unmarshal(body,scroll) + if err != nil { + log.Error(err) + return nil,err + } - return + return scroll,err } func (s *ESAPIV0) NextScroll(scrollTime string,scrollId string)(*Scroll,error) { diff --git a/v5.go b/v5.go index 89625aa..0cbe996 100644 --- a/v5.go +++ b/v5.go @@ -16,7 +16,14 @@ limitations under the License. package main -import "bytes" +import ( + "bytes" + log "github.com/cihub/seelog" + "encoding/json" + "io/ioutil" + "fmt" + "net/http" +) type ESAPIV5 struct{ ESAPIV0 @@ -35,9 +42,62 @@ func (s *ESAPIV5) GetIndexSettings(copyAllIndexes bool,indexNames string)(string } func (s *ESAPIV5) UpdateIndexSettings(){} func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int)(scroll *Scroll, err error){ - return s.ESAPIV0.NewScroll(indexNames,scrollTime,docBufferCount) + url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount) + resp, err := http.Get(url) + if err != nil { + log.Error(err) + return nil,err + } + defer resp.Body.Close() + + body,err:=ioutil.ReadAll(resp.Body) + + log.Debug("new scroll,",string(body)) + + if err != nil { + log.Error(err) + return nil,err + } + + scroll = &Scroll{} + err = json.Unmarshal(body,scroll) + if err != nil { + log.Error(err) + return nil,err + } + + return scroll,err } func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(*Scroll,error) { - return s.ESAPIV0.NextScroll(scrollId,scrollId) + id := bytes.NewBufferString(scrollId) + + req, err := http.NewRequest("GET", fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id), nil) + if err != nil { + log.Error(err) + return nil,err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Error(err) + return nil,err + } + defer resp.Body.Close() + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Error(err) + return nil,err + } + + // decode elasticsearch scroll response + scroll := &Scroll{} + err = json.Unmarshal(data, &scroll) + if err != nil { + log.Error(string(data)) + log.Error(err) + return nil,err + } + + return scroll,nil }