Skip to content

Commit

Permalink
mapping migration support
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed May 23, 2016
1 parent f23dbf7 commit 883cfc9
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 6 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ if download version is not fill you environment,you may try to compile it yourse
-c, --count= number of documents at a time: ie "size" in the scroll request (10000)
-t, --time= scroll time (1m)
--shards= set a number of shards on newly created indexes
--copy_settings copy index settings/mappings from source
--copy_settings copy index settings from source
--copy_mappings copy mappings mappings from source
-f, --force delete destination index before copying, default:false
-x, --src_indexes= list of indexes to copy, comma separated (_all), support wildcard match(*)
-y, --dest_index= indexes name to save, allow only one indexname, original indexname will be used if not specified
Expand Down
3 changes: 2 additions & 1 deletion domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ type Config struct {
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"`
RecreateIndex bool `short:"f" long:"force" description:"delete destination index before copying" default:"false"`
CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _"`
CopyIndexSettings bool `long:"copy_settings" description:"copy index settings/mappings from source" default:"false"`
CopyIndexSettings bool `long:"copy_settings" description:"copy index settings from source" default:"false"`
CopyIndexMappings bool `long:"copy_mappings" description:"copy index mappings from source" default:"false"`
ShardsCount int `long:"shards" description:"set a number of shards on newly created indexes"`
SourceIndexNames string `short:"x" long:"src_indexes" description:"indexes name to copy,support regex and comma separated list" default:"_all"`
TargetIndexName string `short:"y" long:"dest_index" description:"indexes name to save, allow only one indexname, original indexname will be used if not specified" default:""`
Expand Down
1 change: 1 addition & 0 deletions esapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ESAPI interface{
CreateIndex(name string,settings map[string]interface{}) (error)
GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error)
UpdateIndexSettings(indexName string,settings map[string]interface{})(error)
UpdateIndexMapping(indexName string,mappings map[string]interface{})(error)
NewScroll(indexNames string,scrollTime string,docBufferCount int)(*Scroll, error)
NextScroll(scrollTime string,scrollId string)(*Scroll,error)
}
40 changes: 36 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func main() {
return
}

if(c.SourceEs==c.TargetEs&&c.SourceIndexNames==c.TargetIndexName){
log.Error("migration output is the same as the output")
return
}

// enough of a buffer to hold all the search results across all workers
c.DocChan = make(chan map[string]interface{}, c.DocBufferCount*c.Workers*10)
Expand Down Expand Up @@ -119,7 +123,7 @@ func main() {


// get all indexes from source
indexNames,indexCount, srcIndexMappings,err := c.SourceESAPI.GetIndexMappings(c.CopyAllIndexes,c.SourceIndexNames);
indexNames,indexCount, sourceIndexMappings,err := c.SourceESAPI.GetIndexMappings(c.CopyAllIndexes,c.SourceIndexNames);
if(err!=nil){
log.Error(err)
return
Expand Down Expand Up @@ -222,18 +226,42 @@ func main() {
if err != nil {
log.Error(err)
}


}

//TODO c.CreateMappings(srcIndexMappings)
log.Debug(srcIndexMappings)

}

if(c.CopyIndexMappings){
log.Debug("start process with mappings")
if(descESVersion.Version.Number[0]!=srcESVersion.Version.Number[0]){
log.Error(srcESVersion.Version,"=>",descESVersion.Version,",cross-big-version mapping migration not avaiable, please update mapping manually :(")
return
}

//if there is only one index and we specify the dest indexname
if((c.SourceIndexNames !=c.TargetIndexName)&&(indexCount==1||(len(c.TargetIndexName)>0))){
log.Debug("only one index,so we can rewrite indexname")
(*sourceIndexMappings)[c.TargetIndexName]=(*sourceIndexMappings)[c.SourceIndexNames]
delete(*sourceIndexMappings,c.SourceIndexNames)
log.Debug(sourceIndexMappings)
}

for name, mapping := range *sourceIndexMappings {
err:=c.TargetESAPI.UpdateIndexMapping(name,mapping.(map[string]interface{})["mappings"].(map[string]interface{}))
if(err!=nil){
log.Error(err)
}
}
}

log.Info("settings/mappings migration finished.")
}


}else{
log.Error("Index not exists,",c.SourceIndexNames)
log.Error("index not exists,",c.SourceIndexNames)
return
}

Expand All @@ -248,6 +276,10 @@ func main() {
}

if scroll != nil && scroll.Hits.Docs != nil {
if(scroll.Hits.Total==0){
log.Error("can't find documents from source.")
return
}
// create a progressbar and start a docCount
fetchBar := pb.New(scroll.Hits.Total).Prefix("Pull ")
bulkBar := pb.New(scroll.Hits.Total).Prefix("Push ")
Expand Down
23 changes: 23 additions & 0 deletions v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,27 @@ func (s *ESAPIV0) UpdateIndexSettings(name string, settings map[string]interface
return err
}

func (s *ESAPIV0) UpdateIndexMapping(indexName string, settings map[string]interface{}) error {

log.Debug("start update mapping: ", indexName,settings)

for name, mapping := range settings {

log.Debug("start update mapping: ", indexName,name,mapping)

url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name)

body := bytes.Buffer{}
enc := json.NewEncoder(&body)
enc.Encode(mapping)
res, err := Request("POST", url, s.Auth, &body)
if(err!=nil){
log.Error(err,res)
}
}
return nil
}

func (s *ESAPIV0) DeleteIndex(name string) (err error) {

log.Debug("start delete index: ", name)
Expand Down Expand Up @@ -324,3 +345,5 @@ func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (*Scroll, error

return scroll, nil
}


5 changes: 5 additions & 0 deletions v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (s *ESAPIV5) CreateIndex(name string,settings map[string]interface{}) (err
return s.ESAPIV0.CreateIndex(name,settings)
}

func (s *ESAPIV5) UpdateIndexMapping(indexName string,settings map[string]interface{}) error {
return s.ESAPIV0.UpdateIndexMapping(indexName,settings)
}


func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int)(scroll *Scroll, err error){
url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount)
resp,body, errs := Get(url,s.Auth)
Expand Down

0 comments on commit 883cfc9

Please sign in to comment.