Skip to content

Commit

Permalink
fix scroll api in es5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed May 19, 2016
1 parent 915c8fd commit b349996
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 64 deletions.
29 changes: 8 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,17 @@ https://github.com/medcl/elasticsearch-dump/releases
```
-s, --source= source elasticsearch instance
-d, --dest= destination elasticsearch instance
-c, --count= number of documents at a time: ie "size" in the scroll request (100)
-c, --count= number of documents at a time: ie "size" in the scroll request (8000)
-t, --time= scroll time (1m)
-f, --force delete destination index before copying (false)
--shards= set a number of shards on newly created indexes
--docs-only load documents only, do not try to recreate indexes (false)
--index-only only create indexes, do not load documents (false)
--replicate enable replication while indexing into the new indexes (false)
-i, --indexes= list of indexes to copy, comma separated (_all)
--index-only only create indexes, do not load documents (true)
-x, --src_indexes= list of indexes to copy, comma separated (_all), support wildcard match(*)
-y, --dest_index= indexes name to save, allow only one indexname, original indexname will be used if not specified
-a, --all copy indexes starting with . and _ (false)
-w, --workers= concurrency (1)
--settings copy sharding settings from source (true)
--green wait for both hosts cluster status to be green before move. otherwise yellow is okay (false)
-b bulk_size bulk size in MB" default:100
-b bulk_size bulk size in MB" default:5
-v log setting log level,options:trace,debug,info,warn,error
```

Expand All @@ -52,17 +50,6 @@ From | To
-----------|-----------
2.x | 2.x
2.x | 5.0


## NOTES:

1. Has been tested getting data from 0.9 onto a 1.4 box. For other scenaries YMMV. (look out for this bug: https://github.com/elasticsearch/elasticsearch/issues/5165)
1. Copies using the [_source](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-source-field.html) field in elasticsearch. If you have made modifications to it (excluding fields, etc) they will not be indexed on the destination host.
1. ```--force``` will delete indexes on the destination host. Otherwise an error will be returned if the index exists
1. ```--time``` is the [scroll time](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-search-context) passed to the source host, default is 1m. This is a string in es's format.
1. ```--count``` is the [number of documents](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-scan) that will be request and bulk indexed at a time. Note that this depends on the number of shards (ie: size of 10 on 5 shards is 50 documents)
1. ```--indexes``` is a comma separated list of indexes to copy
1. ```--all``` indexes starting with . and _ are ignored by default, --all overrides this behavior
1. ```--workers``` concurrency when we post to the bulk api. Only one post happens at a time, but higher concurrency should give you more throughput when using larger scroll sizes.
1. Ports are required, otherwise 80 is the assumed port (what)
5.0 | 2.x
5.0 | 5.0

8 changes: 4 additions & 4 deletions domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,18 @@ type Config struct {
// config options
SrcEs string `short:"s" long:"source" description:"source elasticsearch instance" required:"true"`
DstEs string `short:"d" long:"dest" description:"destination elasticsearch instance" required:"true"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"1000"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"5000"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"`
Destructive bool `short:"f" long:"force" description:"delete destination index before copying"`
ShardsCount int `long:"shards" description:"set a number of shards on newly created indexes"`
IndexDocsOnly bool `long:"index_docs_only" description:"index documents only, do not try to recreate indexes"`
IndexDocsOnly bool `long:"index_docs_only" description:"index documents only, do not try to recreate indexes" default:"true"`
CreateIndexesOnly bool `long:"create_index_only" description:"only create indexes, do not load documents"`
EnableReplication bool `long:"replicate" description:"enable replication while indexing into the new indexes" default:"false"`
SrcIndexNames string `short:"x" long:"src_indexes" description:"indexes name to copy,support regex and comma separated list" default:"_all"`
DestIndexNames string `short:"y" long:"dest_indexe" description:"indexes name to save, comma separated list, original indexname will be used if not specified" default:""`
DestIndexName string `short:"y" long:"dest_index" description:"indexes name to save, allow only one indexname, original indexname will be used if not specified" default:""`
CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _"`
Workers int `short:"w" long:"workers" description:"concurrency" default:"1"`
BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"100"`
BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"`
CopyIndexSettings bool `long:"copy_settings" description:"copy index settings from source" default:"false"`
WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay"`
LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"`
Expand Down
60 changes: 28 additions & 32 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ func main() {
return
}

if strings.HasPrefix("5.", srcESVersion.Version.Number) {
if strings.HasPrefix(srcESVersion.Version.Number,"5.") {
log.Debug("src es is V5,",srcESVersion.Version.Number)
api:=new(ESAPIV5)
api.Host=c.SrcEs
c.SrcESAPI = api
} else {
log.Debug("src es is not V5,",srcESVersion.Version.Number)
api:=new(ESAPIV0)
api.Host=c.SrcEs
c.SrcESAPI = api
Expand All @@ -59,11 +61,13 @@ func main() {
return
}

if strings.HasPrefix("5.", descESVersion.Version.Number) {
if strings.HasPrefix(descESVersion.Version.Number,"5.") {
log.Debug("dest es is V5,",descESVersion.Version.Number)
api:=new(ESAPIV5)
api.Host=c.DstEs
c.DescESAPI = api
} else {
log.Debug("dest es is not V5,",descESVersion.Version.Number)
api:=new(ESAPIV0)
api.Host=c.DstEs
c.DescESAPI = api
Expand Down Expand Up @@ -168,6 +172,8 @@ func main() {
go c.NewWorker(&docCount, bulkBar, &wg)
}

scroll.ProcessScrollResult(&c,fetchBar)

// loop scrolling until done
for scroll.Next(&c, fetchBar) == false {
}
Expand Down Expand Up @@ -214,6 +220,23 @@ func setInitLogging(logLevel string) {

// Stream from source es instance. "done" is an indicator that the stream is
// over
func (s *Scroll) ProcessScrollResult(c *Config, bar *pb.ProgressBar){

//update progress bar
bar.Add(len(s.Hits.Docs))

// show any failures
for _, failure := range s.Shards.Failures {
reason, _ := json.Marshal(failure.Reason)
log.Errorf(string(reason))
}

// write all the docs into a channel
for _, docI := range s.Hits.Docs {
c.DocChan <- docI.(map[string]interface{})
}
}

func (s *Scroll) Next(c *Config, bar *pb.ProgressBar) (done bool) {

scroll,err:=c.SrcESAPI.NextScroll(c.ScrollTime,s.ScrollId)
Expand All @@ -227,34 +250,7 @@ func (s *Scroll) Next(c *Config, bar *pb.ProgressBar) (done bool) {
return true
}

// XXX this might be bad, but assume we are done
/*
switch resp.StatusCode {
case 200:
break
case 404:
// this may indicate bug
c.ErrChan <- fmt.Errorf("looks like we moved all we could...")
default:
c.ErrChan <- fmt.Errorf("scroll response: %s", stream)
// flush and quit
return true
}
*/

//update progress bar
bar.Add(len(scroll.Hits.Docs))

// show any failures
for _, failure := range scroll.Shards.Failures {
reason, _ := json.Marshal(failure.Reason)
log.Errorf(string(reason))
}

// write all the docs into a channel
for _, docI := range scroll.Hits.Docs {
c.DocChan <- docI.(map[string]interface{})
}
scroll.ProcessScrollResult(c,bar)

//update scrollId
s.ScrollId=scroll.ScrollId
Expand Down Expand Up @@ -294,8 +290,8 @@ READ_DOCS:
var tempDestIndexName string
tempDestIndexName = docI["_index"].(string)

if c.DestIndexNames != "" {
tempDestIndexName = c.DestIndexNames
if c.DestIndexName != "" {
tempDestIndexName = c.DestIndexName
}

doc := Document{
Expand Down
20 changes: 16 additions & 4 deletions v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,28 @@ func (s *ESAPIV0) NewScroll(indexNames string,scrollTime string,docBufferCount i
url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
resp, err := http.Get(url)
if err != nil {
return
log.Error(err)
return nil,err
}
defer resp.Body.Close()

dec := json.NewDecoder(resp.Body)
body,err:=ioutil.ReadAll(resp.Body)

log.Debug("new scroll,",string(body))

if err != nil {
log.Error(err)
return nil,err
}

scroll = &Scroll{}
err = dec.Decode(scroll)
err = json.Unmarshal(body,scroll)
if err != nil {
log.Error(err)
return nil,err
}

return
return scroll,err
}

func (s *ESAPIV0) NextScroll(scrollTime string,scrollId string)(*Scroll,error) {
Expand Down
66 changes: 63 additions & 3 deletions v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ limitations under the License.

package main

import "bytes"
import (
"bytes"
log "github.com/cihub/seelog"
"encoding/json"
"io/ioutil"
"fmt"
"net/http"
)

type ESAPIV5 struct{
ESAPIV0
Expand All @@ -35,9 +42,62 @@ func (s *ESAPIV5) GetIndexSettings(copyAllIndexes bool,indexNames string)(string
}
func (s *ESAPIV5) UpdateIndexSettings(){}
func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int)(scroll *Scroll, err error){
return s.ESAPIV0.NewScroll(indexNames,scrollTime,docBufferCount)
url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount)
resp, err := http.Get(url)
if err != nil {
log.Error(err)
return nil,err
}
defer resp.Body.Close()

body,err:=ioutil.ReadAll(resp.Body)

log.Debug("new scroll,",string(body))

if err != nil {
log.Error(err)
return nil,err
}

scroll = &Scroll{}
err = json.Unmarshal(body,scroll)
if err != nil {
log.Error(err)
return nil,err
}

return scroll,err
}
func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(*Scroll,error) {
return s.ESAPIV0.NextScroll(scrollId,scrollId)
id := bytes.NewBufferString(scrollId)

req, err := http.NewRequest("GET", fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id), nil)
if err != nil {
log.Error(err)
return nil,err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Error(err)
return nil,err
}
defer resp.Body.Close()

data, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error(err)
return nil,err
}

// decode elasticsearch scroll response
scroll := &Scroll{}
err = json.Unmarshal(data, &scroll)
if err != nil {
log.Error(string(data))
log.Error(err)
return nil,err
}

return scroll,nil
}

0 comments on commit b349996

Please sign in to comment.