Skip to content

Commit

Permalink
expose bulk error, support to rename field
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed Sep 4, 2019
1 parent 67e10cd commit 12ed896
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 47 deletions.
25 changes: 24 additions & 1 deletion bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"gopkg.in/cheggaaa/pb.v1"
"time"
"strings"
)

func (c *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -70,13 +71,35 @@ func (c *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.Wai
tempTargetTypeName = c.Config.OverrideTypeName
}


doc := Document{
Index: tempDestIndexName,
Type: tempTargetTypeName,
source: docI["_source"].(map[string]interface{}),
Id: docI["_id"].(string),
}



if c.Config.RenameFields != "" {
kvs:=strings.Split(c.Config.RenameFields,",")
//fmt.Println(kvs)
for _,i:=range kvs{
fvs:=strings.Split(i,":")
oldField:=strings.TrimSpace(fvs[0])
newField:=strings.TrimSpace(fvs[1])
if oldField=="_type"{
doc.source[newField]=docI["_type"].(string)
}else{
v:=doc.source[oldField]
doc.source[newField]=v
delete(doc.source,oldField)
}
}
}


//fmt.Println(doc.Index,",",doc.Type,",",doc.Id)

// add doc "_routing"
if _, ok := docI["_routing"]; ok {
str,ok:=docI["_routing"].(string)
Expand Down
103 changes: 58 additions & 45 deletions domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,75 +21,88 @@ import "sync"
type Indexes map[string]interface{}

type Document struct {
Index string `json:"_index,omitempty"`
Type string `json:"_type,omitempty"`
Id string `json:"_id,omitempty"`
source map[string]interface{} `json:"_source,omitempty"`
Routing string `json:"_routing,omitempty"`
Index string `json:"_index,omitempty"`
Type string `json:"_type,omitempty"`
Id string `json:"_id,omitempty"`
source map[string]interface{} `json:"_source,omitempty"`
Routing string `json:"_routing,omitempty"`
}

type Scroll struct {
Took int `json:"took,omitempty"`
Took int `json:"took,omitempty"`
ScrollId string `json:"_scroll_id,omitempty"`
TimedOut bool `json:"timed_out,omitempty"`
Hits struct {
MaxScore float32 `json:"max_score,omitempty"`
Total int `json:"total,omitempty"`
Docs []interface{} `json:"hits,omitempty"`
} `json:"hits"`
MaxScore float32 `json:"max_score,omitempty"`
Total int `json:"total,omitempty"`
Docs []interface{} `json:"hits,omitempty"`
} `json:"hits"`
Shards struct {
Total int `json:"total,omitempty"`
Successful int `json:"successful,omitempty"`
Failed int `json:"failed,omitempty"`
Failures []struct {
Shard int `json:"shard,omitempty"`
Index string `json:"index,omitempty"`
Status int `json:"status,omitempty"`
Reason interface{} `json:"reason,omitempty"`
} `json:"failures,omitempty"`
} `json:"_shards,omitempty"`
Total int `json:"total,omitempty"`
Successful int `json:"successful,omitempty"`
Failed int `json:"failed,omitempty"`
Failures []struct {
Shard int `json:"shard,omitempty"`
Index string `json:"index,omitempty"`
Status int `json:"status,omitempty"`
Reason interface{} `json:"reason,omitempty"`
} `json:"failures,omitempty"`
} `json:"_shards,omitempty"`
}

type ClusterVersion struct{
Name string `json:"name,omitempty"`
ClusterName string `json:"cluster_name,omitempty"`
type ClusterVersion struct {
Name string `json:"name,omitempty"`
ClusterName string `json:"cluster_name,omitempty"`
Version struct {
Number string `json:"number,omitempty"`
LuceneVersion string `json:"lucene_version,omitempty"`
} `json:"version,omitempty"`
Number string `json:"number,omitempty"`
LuceneVersion string `json:"lucene_version,omitempty"`
} `json:"version,omitempty"`
}

type ClusterHealth struct {
Name string `json:"cluster_name,omitempty"`
Status string `json:"status,omitempty"`
}

type Migrator struct{
//{"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]}
type BulkResponse struct {
Took int `json:"took,omitempty"`
Errors bool `json:"errors,omitempty"`
Items []map[string]Action `json:"items,omitempty"`
}

FlushLock sync.Mutex
DocChan chan map[string]interface{}
SourceESAPI ESAPI
TargetESAPI ESAPI
SourceAuth *Auth
TargetAuth *Auth
Config *Config
type Action struct {
Index string `json:"_index,omitempty"`
Type string `json:"_type,omitempty"`
Id string `json:"_id,omitempty"`
Status int `json:"status,omitempty"`
Error interface{} `json:"error,omitempty"`
}

type Migrator struct {
FlushLock sync.Mutex
DocChan chan map[string]interface{}
SourceESAPI ESAPI
TargetESAPI ESAPI
SourceAuth *Auth
TargetAuth *Auth
Config *Config
}

type Config struct {

// config options
SourceEs string `short:"s" long:"source" description:"source elasticsearch instance, ie: http://localhost:9200"`
Query string `short:"q" long:"query" description:"query against source elasticsearch instance, filter data before migrate, ie: name:medcl"`
TargetEs string `short:"d" long:"dest" description:"destination elasticsearch instance, ie: http://localhost:9201"`
SourceEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, ie: user:pass"`
TargetEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, ie: user:pass"`
SourceEs string `short:"s" long:"source" description:"source elasticsearch instance, ie: http://localhost:9200"`
Query string `short:"q" long:"query" description:"query against source elasticsearch instance, filter data before migrate, ie: name:medcl"`
TargetEs string `short:"d" long:"dest" description:"destination elasticsearch instance, ie: http://localhost:9201"`
SourceEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, ie: user:pass"`
TargetEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, ie: user:pass"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"10000"`
Workers int `short:"w" long:"workers" description:"concurrency number for bulk workers" default:"1"`
BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"`
ScrollSliceSize int `long:"sliced_scroll_size" description:"size of sliced scroll, to make it work, the size should be > 1" default:"1"`
RecreateIndex bool `short:"f" long:"force" description:"delete destination index before copying"`
RecreateIndex bool `short:"f" long:"force" description:"delete destination index before copying"`
CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _"`
CopyIndexSettings bool `long:"copy_settings" description:"copy index settings from source"`
CopyIndexMappings bool `long:"copy_mappings" description:"copy index mappings from source"`
Expand All @@ -100,12 +113,12 @@ type Config struct {
WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay"`
LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"`
DumpOutFile string `short:"o" long:"output_file" description:"output documents of source index into local file" `
DumpInputFile string `short:"i" long:"input_file" description:"indexing from local dump file" `
SourceProxy string `long:"source_proxy" description:"set proxy to source http connections, ie: http://127.0.0.1:8080"`
TargetProxy string `long:"dest_proxy" description:"set proxy to target http connections, ie: http://127.0.0.1:8080"`
Refresh bool `long:"refresh" description:"refresh after migration finished"`
DumpInputFile string `short:"i" long:"input_file" description:"indexing from local dump file" `
SourceProxy string `long:"source_proxy" description:"set proxy to source http connections, ie: http://127.0.0.1:8080"`
TargetProxy string `long:"dest_proxy" description:"set proxy to target http connections, ie: http://127.0.0.1:8080"`
Refresh bool `long:"refresh" description:"refresh after migration finished"`
Fields string `long:"fields" description:"output fields, comma separated, ie: col1,col2,col3,..." `

RenameFields string `long:"rename" description:"rename source fields, comma separated, ie: _type:type, name:myname" `
}

type Auth struct {
Expand Down
15 changes: 14 additions & 1 deletion v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,23 @@ func (s *ESAPIV0) Bulk(data *bytes.Buffer) {
body,err:=Request("POST",url,s.Auth,data,s.HttpProxy)

if err != nil {
fmt.Println(err)
log.Error(err)
return
}
log.Trace(url,string(body))
//log.Trace(url,string(body))
//fmt.Println(string(body))
response:=BulkResponse{}
json.Unmarshal([]byte(body), &response)
//v,_:=json.MarshalIndent(&response,""," ")
//fmt.Println(string(v))
if err == nil {
// fmt.Println(response)
if response.Errors{
fmt.Println(body)
}
}

data.Reset()
}

Expand Down

0 comments on commit 12ed896

Please sign in to comment.