Skip to content

Commit

Permalink
refactoring, add buffer_count to control memory consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed Dec 21, 2020
1 parent d6402a7 commit e41616d
Show file tree
Hide file tree
Showing 15 changed files with 263 additions and 304 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Created by .ignore support plugin (hsz.mobi)
esm.log
.DS_Store

### Go template
Expand Down
75 changes: 43 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,39 @@

Elasticsearch cross version data migration.

[Dec 3rd, 2020: [EN] Cross version Elasticsearch data migration with ESM](https://discuss.elastic.co/t/dec-3rd-2020-en-cross-version-elasticsearch-data-migration-with-esm/256516)

## Features:

* Cross version migration supported

* Overwrite index name

* Copy index settings and mapping

* Support http basic auth

* Support dump index to local file

* Support loading index from local file

* Support http proxy

* Support sliced scroll ( elasticsearch 5.0 +)

* Support run in background

* Generate testing data by randomize the source document id

* Support rename filed name

* Support unify document type name

* Support specify which _source fields to return from source

* Support specify query string query to filter the data source

* Support rename source fields while do bulk indexing
* Load generating with

## ESM is fast!

A 3 nodes cluster(3 * c5d.4xlarge, 16C,32GB,10Gbps)

```
root@ip-172-31-13-181:/tmp# ./esm -s https://localhost:8000 -d https://localhost:8000 -x logs1kw -y logs122 -m elastic:medcl123 -n elastic:medcl123 -w 40 --sliced_scroll_size=60 -b 5 --buffer_count=2000000 --regenerate_id
[12-19 06:31:20] [INF] [main.go:506,main] start data migration..
Scroll 10064570 / 10064570 [=================================================] 100.00% 55s
Bulk 10062602 / 10064570 [==================================================] 99.98% 55s
[12-19 06:32:15] [INF] [main.go:537,main] data migration finished.
```
Migrated 10,000,000 documents within a minute, Nginx log generated from kibana_sample_data_logs.


## Example:
Expand Down Expand Up @@ -123,8 +124,13 @@ rename fields while do bulk indexing
./bin/esm -i dump.json -d http://localhost:9201 -y target-index41 --rename=title:newtitle
```

user buffer_count to control memory used by ESM, and use gzip to compress network traffic
```
./esm -s https://localhost:8000 -d https://localhost:8000 -x logs1kw -y logs122 -m elastic:medcl123 -n elastic:medcl123 --regenerate_id -w 20 --sliced_scroll_size=60 -b 5 --buffer_count=1000000 --compress false
```

## Download
https://github.com/medcl/elasticsearch-dump/releases
https://github.com/medcl/esm/releases


## Compile:
Expand All @@ -146,6 +152,7 @@ Application Options:
-m, --source_auth= basic auth of source elasticsearch instance, ie: user:pass
-n, --dest_auth= basic auth of target elasticsearch instance, ie: user:pass
-c, --count= number of documents at a time: ie "size" in the scroll request (10000)
--buffer_count= number of buffered documents in memory (100000)
-w, --workers= concurrency number for bulk workers (1)
-b, --bulk_size= bulk size in MB (5)
-t, --time= scroll time (1m)
Expand All @@ -166,10 +173,14 @@ Application Options:
--source_proxy= set proxy to source http connections, ie: http://127.0.0.1:8080
--dest_proxy= set proxy to target http connections, ie: http://127.0.0.1:8080
--refresh refresh after migration finished
--fields= output fields, comma separated, ie: col1,col2,col3,...
--rename= rename source fields while do bulk indexing, comma separated, ie: _type:type, name:myname
--fields= filter source fields, comma separated, ie: col1,col2,col3,...
--rename= rename source fields, comma separated, ie: _type:type, name:myname
-l, --logstash_endpoint= target logstash tcp endpoint, ie: 127.0.0.1:5055
--secured_logstash_endpoint target logstash tcp endpoint was secured by TLS
--repeat_times= repeat the data from source N times to dest output, use align with parameter regenerate_id to amplify the data size
-r, --regenerate_id regenerate id for documents, this will override the exist document id in data source
--compress use gzip to compress traffic
-p, --sleep= sleep N seconds after finished a bulk request (-1)
Help Options:
-h, --help Show this help message
Expand All @@ -193,27 +204,27 @@ From | To
-----------|-----------
1.x | 1.x
1.x | 2.x
1.x | 5.0
1.x | 6.0
1.x | 7.0
1.x | 5.x
1.x | 6.x
1.x | 7.x
2.x | 1.x
2.x | 2.x
2.x | 5.0
2.x | 6.0
2.x | 7.0
5.0 | 1.x
5.0 | 2.x
5.0 | 5.0
2.x | 5.x
2.x | 6.x
2.x | 7.x
5.x | 1.x
5.x | 2.x
5.x | 5.x
5.x | 6.x
5.x | 7.x
6.0 | 1.x
6.0 | 2.x
6.0 | 5.0
6.x | 1.x
6.x | 2.x
6.x | 5.0
6.x | 6.x
6.x | 7.x
7.0 | 1.x
7.0 | 2.x
7.0 | 5.0
7.x | 1.x
7.x | 2.x
7.x | 5.x
7.x | 6.x
7.x | 7.x

16 changes: 6 additions & 10 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package main
import (
"bytes"
"encoding/json"
"github.com/cheggaaa/pb"
"strings"
"sync"
"time"

log "github.com/cihub/seelog"
"github.com/infinitbyte/framework/core/util"
"gopkg.in/cheggaaa/pb.v1"
)

func (c *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -54,8 +53,6 @@ READ_DOCS:
// sanity check
for _, key := range []string{"_index", "_type", "_source", "_id"} {
if _, ok := docI[key]; !ok {
//json,_:=json.Marshal(docI)
//log.Errorf("failed parsing document: %v", string(json))
break READ_DOCS
}
}
Expand All @@ -81,12 +78,11 @@ READ_DOCS:
}

if c.Config.RegenerateID {
doc.Id = util.GetUUID()
doc.Id = ""
}

if c.Config.RenameFields != "" {
kvs := strings.Split(c.Config.RenameFields, ",")
//fmt.Println(kvs)
for _, i := range kvs {
fvs := strings.Split(i, ":")
oldField := strings.TrimSpace(fvs[0])
Expand All @@ -101,8 +97,6 @@ READ_DOCS:
}
}

//fmt.Println(doc.Index,",",doc.Type,",",doc.Id)

// add doc "_routing" if exists
if _, ok := docI["_routing"]; ok {
str, ok := docI["_routing"].(string)
Expand All @@ -117,7 +111,7 @@ READ_DOCS:
}

// sanity check
if len(doc.Index) == 0 || len(doc.Id) == 0 || len(doc.Type) == 0 {
if len(doc.Index) == 0 || len(doc.Type) == 0 {
log.Errorf("failed decoding document: %+v", doc)
continue
}
Expand Down Expand Up @@ -159,7 +153,9 @@ READ_DOCS:
log.Trace("clean buffer, and execute bulk insert")
pb.Add(bulkItemSize)
bulkItemSize = 0

if c.Config.SleepSecondsAfterEachBulk >0{
time.Sleep(time.Duration(c.Config.SleepSecondsAfterEachBulk) * time.Second)
}
}
WORKER_DONE:
if docBuf.Len() > 0 {
Expand Down
11 changes: 6 additions & 5 deletions domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ type Config struct {
SourceEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, ie: user:pass"`
TargetEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, ie: user:pass"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"10000"`
BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"`
Workers int `short:"w" long:"workers" description:"concurrency number for bulk workers" default:"1"`
BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"10m"`
ScrollSliceSize int `long:"sliced_scroll_size" description:"size of sliced scroll, to make it work, the size should be > 1" default:"1"`
RecreateIndex bool `short:"f" long:"force" description:"delete destination index before copying"`
CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _"`
Expand All @@ -135,11 +136,11 @@ type Config struct {
RenameFields string `long:"rename" description:"rename source fields, comma separated, ie: _type:type, name:myname" `
LogstashEndpoint string `short:"l" long:"logstash_endpoint" description:"target logstash tcp endpoint, ie: 127.0.0.1:5055" `
LogstashSecEndpoint bool `long:"secured_logstash_endpoint" description:"target logstash tcp endpoint was secured by TLS" `
//TestLevel string `long:"test_level" description:"target logstash tcp endpoint was secured by TLS" `
//TestEnvironment string `long:"test_environment" description:"target logstash tcp endpoint was secured by TLS" `

RepeatOutputTimes int `long:"repeat_times" description:"repeat the data from source N times to dest output, use align with parameter regenerate_id to amplify the data size "`
RegenerateID bool `short:"r" long:"regenerate_id" description:"regenerate id for documents, this will override the exist document id in data source"`
RepeatOutputTimes int `long:"repeat_times" description:"repeat the data from source N times to dest output, use align with parameter regenerate_id to amplify the data size "`
RegenerateID bool `short:"r" long:"regenerate_id" description:"regenerate id for documents, this will override the exist document id in data source"`
Compress bool `long:"compress" description:"use gzip to compress traffic"`
SleepSecondsAfterEachBulk int `short:"p" long:"sleep" description:"sleep N seconds after each bulk request" default:"-1"`
}

type Auth struct {
Expand Down
5 changes: 1 addition & 4 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package main

import (
"sync"
"gopkg.in/cheggaaa/pb.v1"
"github.com/cheggaaa/pb"
log "github.com/cihub/seelog"
"os"
"bufio"
Expand Down Expand Up @@ -92,7 +92,6 @@ func (c *Migrator) NewFileDumpWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
READ_DOCS:
for {
docI, open := <-c.DocChan

// this check is in case the document is an error with scroll stuff
if status, ok := docI["status"]; ok {
if status.(int) == 404 {
Expand All @@ -104,8 +103,6 @@ func (c *Migrator) NewFileDumpWorker(pb *pb.ProgressBar, wg *sync.WaitGroup) {
// sanity check
for _, key := range []string{"_index", "_type", "_source", "_id"} {
if _, ok := docI[key]; !ok {
//json,_:=json.Marshal(docI)
//log.Errorf("failed parsing document: %v", string(json))
break READ_DOCS
}
}
Expand Down
Loading

0 comments on commit e41616d

Please sign in to comment.