Skip to content

Commit

Permalink
bug fix and support sliced scroll(only es5)
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed Oct 26, 2016
1 parent 88c6341 commit 81caba1
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 41 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Support cross version and http basic auth.

* Support http proxy

* Support sliced scroll (only for elasticsearch 5.0)


## Example:

Expand Down Expand Up @@ -67,6 +69,11 @@ support proxy
./bin/esm -d http://123345.ap-northeast-1.aws.found.io:9200/ -y "dest_index" -n admin:111111 -c 5000 -b 1 --refresh -i dump.bin --dest_proxy=http://127.0.0.1:9743
```

use sliced scroll(only available in elasticsearch v5) to speed scroll, and update shard number
```
./bin/esm -s=http://192.168.3.206:9200 -d=http://localhost:9200 -n=elastic:changeme -f --copy_settings --copy_mappings -x=bestbuykaggle --sliced_scroll_size=5 --shards=50 --refresh
```

## Download
https://github.com/medcl/elasticsearch-dump/releases

Expand All @@ -87,6 +94,7 @@ if download version is not fill you environment,you may try to compile it yourse
-m, --source_auth basic auth of source elasticsearch instance, ie: user:pass
-n, --dest_auth basic auth of target elasticsearch instance, ie: user:pass
-c, --count= number of documents at a time: ie "size" in the scroll request (10000)
--sliced_scroll_size= size of sliced scroll, to make it work, the size should be > 1, default:"1"
-t, --time= scroll time (1m)
--shards= set a number of shards on newly created indexes
--copy_settings copy index settings from source
Expand All @@ -95,14 +103,14 @@ if download version is not fill you environment,you may try to compile it yourse
-x, --src_indexes= list of indexes to copy, comma separated (_all), support wildcard match(*)
-y, --dest_index= indexes name to save, allow only one indexname, original indexname will be used if not specified
-a, --all copy indexes starting with . and _ (false)
-w, --workers= concurrency (1)
-w, --workers= concurrency number for bulk workers, default is: "1"
-b --bulk_size bulk size in MB" default:5
-v --log setting log level,options:trace,debug,info,warn,error
-i --input_file indexing from local dump file
-o --output_file output documents of source index into local file
--source_proxy set proxy to source http connections, ie: http://127.0.0.1:8080
--dest_proxy set proxy to destination http connections, ie: http://127.0.0.1:8080
--refresh refresh after migration finished
--refresh refresh after migration finished
```

Expand Down
3 changes: 2 additions & 1 deletion domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ type Config struct {
SourceEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, ie: user:pass"`
TargetEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, ie: user:pass"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"10000"`
Workers int `short:"w" long:"workers" description:"concurrency" default:"1"`
Workers int `short:"w" long:"workers" description:"concurrency number for bulk workers" default:"1"`
BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"`
ScrollSliceSize int `long:"sliced_scroll_size" description:"size of sliced scroll, to make it work, the size should be > 1" default:"1"`
RecreateIndex bool `short:"f" long:"force" description:"delete destination index before copying" default:"false"`
CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _"`
CopyIndexSettings bool `long:"copy_settings" description:"copy index settings from source" default:"false"`
Expand Down
2 changes: 1 addition & 1 deletion esapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ESAPI interface{
GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error)
UpdateIndexSettings(indexName string,settings map[string]interface{})(error)
UpdateIndexMapping(indexName string,mappings map[string]interface{})(error)
NewScroll(indexNames string,scrollTime string,docBufferCount int,query string)(*Scroll, error)
NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int)(*Scroll, error)
NextScroll(scrollTime string,scrollId string)(*Scroll,error)
Refresh(name string) (err error)
}
88 changes: 59 additions & 29 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func main() {

var srcESVersion *ClusterVersion
// create a progressbar and start a docCount
var fetchBar, outputBar *pb.ProgressBar
var outputBar *pb.ProgressBar
var fetchBar = pb.New(1).Prefix("Scroll")

wg := sync.WaitGroup{}

Expand Down Expand Up @@ -87,36 +88,59 @@ func main() {
migrator.SourceESAPI = api
}

scroll, err := migrator.SourceESAPI.NewScroll(c.SourceIndexNames, c.ScrollTime, c.DocBufferCount, c.Query)
if err != nil {
log.Error(err)
return
}
if scroll != nil && scroll.Hits.Docs != nil {
if(c.ScrollSliceSize<1){c.ScrollSliceSize=1}

fetchBar.ShowBar=false

if scroll.Hits.Total == 0 {
log.Error("can't find documents from source.")
totalSize:=0;
finishedSlice:=0
for slice:=0;slice<c.ScrollSliceSize ;slice++ {
scroll, err := migrator.SourceESAPI.NewScroll(c.SourceIndexNames, c.ScrollTime, c.DocBufferCount, c.Query,slice,c.ScrollSliceSize)
if err != nil {
log.Error(err)
return
}
totalSize+=scroll.Hits.Total

if scroll != nil && scroll.Hits.Docs != nil {

if scroll.Hits.Total == 0 {
log.Error("can't find documents from source.")
return
}

fetchBar = pb.New(scroll.Hits.Total).Prefix("Scroll ")
outputBar = pb.New(scroll.Hits.Total).Prefix("Output ")
}

go func() {
wg.Add(1)
//process input
// start scroll
scroll.ProcessScrollResult(&migrator, fetchBar)
go func() {
wg.Add(1)
//process input
// start scroll
scroll.ProcessScrollResult(&migrator, fetchBar)

// loop scrolling until done
for scroll.Next(&migrator, fetchBar) == false {
// loop scrolling until done
for scroll.Next(&migrator, fetchBar) == false {
}
fetchBar.Finish()
// finished, close doc chan and wait for goroutines to be done
wg.Done()
finishedSlice++

//clean up final results
if(finishedSlice==c.ScrollSliceSize){
log.Debug("closing doc chan")
close(migrator.DocChan)
}
}()
}
fetchBar.Finish()
// finished, close doc chan and wait for goroutines to be done
wg.Done()
close(migrator.DocChan)
}()
}

if(totalSize>0){
fetchBar.Total=int64(totalSize)
fetchBar.ShowBar=true
outputBar = pb.New(totalSize).Prefix("Output ")
}



} else if len(c.DumpInputFile) > 0 {
//read file stream
wg.Add(1)
Expand All @@ -137,7 +161,7 @@ func main() {
lineCount += 1
}
log.Trace("file line,", lineCount)
fetchBar = pb.New(lineCount).Prefix("Read")
fetchBar := pb.New(lineCount).Prefix("Read")
outputBar = pb.New(lineCount).Prefix("Output ")
f.Close()

Expand Down Expand Up @@ -220,6 +244,8 @@ func main() {

sourceIndexRefreshSettings := map[string]interface{}{}

log.Debugf("indexCount: %d",indexCount)

if indexCount > 0 {
//override indexnames to be copy
c.SourceIndexNames = indexNames
Expand All @@ -246,8 +272,8 @@ func main() {
log.Debug("target IndexSettings", targetIndexSettings)

//if there is only one index and we specify the dest indexname
if (c.SourceIndexNames != c.TargetIndexName) && (indexCount == 1 || (len(c.TargetIndexName) > 0)) {
log.Debug("only one index,so we can rewrite indexname")
if (c.SourceIndexNames != c.TargetIndexName && (len(c.TargetIndexName) > 0) && indexCount == 1 ) {
log.Debugf("only one index,so we can rewrite indexname, src:%v, dest:%v ,indexCount:%d",c.SourceIndexNames,c.TargetIndexName,indexCount)
(*sourceIndexSettings)[c.TargetIndexName] = (*sourceIndexSettings)[c.SourceIndexNames]
delete(*sourceIndexSettings, c.SourceIndexNames)
log.Debug(sourceIndexSettings)
Expand Down Expand Up @@ -299,6 +325,10 @@ func main() {
//copy indexsettings and mappings
if targetIndexExist {
log.Debug("update index with settings,", name, tempIndexSettings)
//override shard settings
if c.ShardsCount > 0 {
tempIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_shards"] = c.ShardsCount
}
err := migrator.TargetESAPI.UpdateIndexSettings(name, tempIndexSettings)
if err != nil {
log.Error(err)
Expand All @@ -323,8 +353,8 @@ func main() {
if c.CopyIndexMappings {

//if there is only one index and we specify the dest indexname
if (c.SourceIndexNames != c.TargetIndexName) && (indexCount == 1 || (len(c.TargetIndexName) > 0)) {
log.Debug("only one index,so we can rewrite indexname")
if (c.SourceIndexNames != c.TargetIndexName && (len(c.TargetIndexName) > 0) && indexCount == 1 ) {
log.Debugf("only one index,so we can rewrite indexname, src:%v, dest:%v ,indexCount:%d",c.SourceIndexNames,c.TargetIndexName,indexCount)
(*sourceIndexMappings)[c.TargetIndexName] = (*sourceIndexMappings)[c.SourceIndexNames]
delete(*sourceIndexMappings, c.SourceIndexNames)
log.Debug(sourceIndexMappings)
Expand Down
11 changes: 8 additions & 3 deletions v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (s *ESAPIV0) GetIndexSettings(indexNames string) (*Indexes, error) {

err := json.Unmarshal([]byte(body), allSettings)
if err != nil {
panic(err)
return nil, err
}

Expand Down Expand Up @@ -219,6 +220,7 @@ func (s *ESAPIV0) UpdateIndexSettings(name string, settings map[string]interface
bodyStr, err := Request("PUT", url, s.Auth, &body,s.HttpProxy)
if err != nil {
log.Error(bodyStr, err)
panic(err)
return err
}
delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "analysis")
Expand Down Expand Up @@ -251,7 +253,10 @@ func (s *ESAPIV0) UpdateIndexMapping(indexName string, settings map[string]inter
enc.Encode(mapping)
res, err := Request("POST", url, s.Auth, &body,s.HttpProxy)
if(err!=nil){
log.Error(url)
log.Error(body.String())
log.Error(err,res)
panic(err)
}
}
return nil
Expand Down Expand Up @@ -280,8 +285,8 @@ func (s *ESAPIV0) CreateIndex(name string, settings map[string]interface{}) (err

url := fmt.Sprintf("%s/%s", s.Host, name)

resp, err := Request("POST", url, s.Auth, &body,s.HttpProxy)
log.Debug(resp)
resp, err := Request("PUT", url, s.Auth, &body,s.HttpProxy)
log.Debugf("response: %s",resp)

return err
}
Expand All @@ -298,7 +303,7 @@ func (s *ESAPIV0) Refresh(name string) (err error) {
return nil
}

func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int,query string) (scroll *Scroll, err error) {
func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int,query string, slicedId,maxSlicedCount int) (scroll *Scroll, err error) {

// curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50'
url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
Expand Down
20 changes: 15 additions & 5 deletions v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,25 @@ func (s *ESAPIV5) Refresh(name string) (err error) {
return s.ESAPIV0.Refresh(name)
}

func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int,query string)(scroll *Scroll, err error){
func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int)(scroll *Scroll, err error){
url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount)

jsonBody:=""
if(len(query)>0) {
if(len(query)>0||maxSlicedCount>0) {
queryBody := map[string]interface{}{}
queryBody["query"] = map[string]interface{}{}
queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{}
queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query

if(len(query)>0){
queryBody["query"] = map[string]interface{}{}
queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{}
queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query
}

if(maxSlicedCount>1){
log.Tracef("sliced scroll, %d of %d",slicedId,maxSlicedCount)
queryBody["slice"] = map[string]interface{}{}
queryBody["slice"].(map[string]interface{})["id"] = slicedId
queryBody["slice"].(map[string]interface{})["max"]= maxSlicedCount
}

jsonArray, err := json.Marshal(queryBody)
if (err != nil) {
Expand Down

0 comments on commit 81caba1

Please sign in to comment.