diff --git a/README.md b/README.md index af7f569..c9c180e 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,8 @@ if download version is not fill you environment,you may try to compile it yourse -c, --count= number of documents at a time: ie "size" in the scroll request (10000) -t, --time= scroll time (1m) --shards= set a number of shards on newly created indexes - --copy_settings copy index settings/mappings from source + --copy_settings copy index settings from source + --copy_mappings copy mappings mappings from source -f, --force delete destination index before copying, default:false -x, --src_indexes= list of indexes to copy, comma separated (_all), support wildcard match(*) -y, --dest_index= indexes name to save, allow only one indexname, original indexname will be used if not specified diff --git a/domain.go b/domain.go index f73dca8..eed504b 100644 --- a/domain.go +++ b/domain.go @@ -82,7 +82,8 @@ type Config struct { ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"` RecreateIndex bool `short:"f" long:"force" description:"delete destination index before copying" default:"false"` CopyAllIndexes bool `short:"a" long:"all" description:"copy indexes starting with . and _"` - CopyIndexSettings bool `long:"copy_settings" description:"copy index settings/mappings from source" default:"false"` + CopyIndexSettings bool `long:"copy_settings" description:"copy index settings from source" default:"false"` + CopyIndexMappings bool `long:"copy_mappings" description:"copy index mappings from source" default:"false"` ShardsCount int `long:"shards" description:"set a number of shards on newly created indexes"` SourceIndexNames string `short:"x" long:"src_indexes" description:"indexes name to copy,support regex and comma separated list" default:"_all"` TargetIndexName string `short:"y" long:"dest_index" description:"indexes name to save, allow only one indexname, original indexname will be used if not specified" default:""` diff --git a/esapi.go b/esapi.go index ea3722c..28f0e08 100644 --- a/esapi.go +++ b/esapi.go @@ -26,6 +26,7 @@ type ESAPI interface{ CreateIndex(name string,settings map[string]interface{}) (error) GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error) UpdateIndexSettings(indexName string,settings map[string]interface{})(error) + UpdateIndexMapping(indexName string,mappings map[string]interface{})(error) NewScroll(indexNames string,scrollTime string,docBufferCount int)(*Scroll, error) NextScroll(scrollTime string,scrollId string)(*Scroll,error) } diff --git a/main.go b/main.go index b95f3ba..1df3704 100644 --- a/main.go +++ b/main.go @@ -40,6 +40,10 @@ func main() { return } + if(c.SourceEs==c.TargetEs&&c.SourceIndexNames==c.TargetIndexName){ + log.Error("migration output is the same as the output") + return + } // enough of a buffer to hold all the search results across all workers c.DocChan = make(chan map[string]interface{}, c.DocBufferCount*c.Workers*10) @@ -119,7 +123,7 @@ func main() { // get all indexes from source - indexNames,indexCount, srcIndexMappings,err := c.SourceESAPI.GetIndexMappings(c.CopyAllIndexes,c.SourceIndexNames); + indexNames,indexCount, sourceIndexMappings,err := c.SourceESAPI.GetIndexMappings(c.CopyAllIndexes,c.SourceIndexNames); if(err!=nil){ log.Error(err) return @@ -222,10 +226,34 @@ func main() { if err != nil { log.Error(err) } + + } - //TODO c.CreateMappings(srcIndexMappings) - log.Debug(srcIndexMappings) + + } + + if(c.CopyIndexMappings){ + log.Debug("start process with mappings") + if(descESVersion.Version.Number[0]!=srcESVersion.Version.Number[0]){ + log.Error(srcESVersion.Version,"=>",descESVersion.Version,",cross-big-version mapping migration not avaiable, please update mapping manually :(") + return + } + + //if there is only one index and we specify the dest indexname + if((c.SourceIndexNames !=c.TargetIndexName)&&(indexCount==1||(len(c.TargetIndexName)>0))){ + log.Debug("only one index,so we can rewrite indexname") + (*sourceIndexMappings)[c.TargetIndexName]=(*sourceIndexMappings)[c.SourceIndexNames] + delete(*sourceIndexMappings,c.SourceIndexNames) + log.Debug(sourceIndexMappings) + } + + for name, mapping := range *sourceIndexMappings { + err:=c.TargetESAPI.UpdateIndexMapping(name,mapping.(map[string]interface{})["mappings"].(map[string]interface{})) + if(err!=nil){ + log.Error(err) + } + } } log.Info("settings/mappings migration finished.") @@ -233,7 +261,7 @@ func main() { }else{ - log.Error("Index not exists,",c.SourceIndexNames) + log.Error("index not exists,",c.SourceIndexNames) return } @@ -248,6 +276,10 @@ func main() { } if scroll != nil && scroll.Hits.Docs != nil { + if(scroll.Hits.Total==0){ + log.Error("can't find documents from source.") + return + } // create a progressbar and start a docCount fetchBar := pb.New(scroll.Hits.Total).Prefix("Pull ") bulkBar := pb.New(scroll.Hits.Total).Prefix("Push ") diff --git a/v0.go b/v0.go index 4f4857d..f77f74e 100644 --- a/v0.go +++ b/v0.go @@ -236,6 +236,27 @@ func (s *ESAPIV0) UpdateIndexSettings(name string, settings map[string]interface return err } +func (s *ESAPIV0) UpdateIndexMapping(indexName string, settings map[string]interface{}) error { + + log.Debug("start update mapping: ", indexName,settings) + + for name, mapping := range settings { + + log.Debug("start update mapping: ", indexName,name,mapping) + + url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name) + + body := bytes.Buffer{} + enc := json.NewEncoder(&body) + enc.Encode(mapping) + res, err := Request("POST", url, s.Auth, &body) + if(err!=nil){ + log.Error(err,res) + } + } + return nil +} + func (s *ESAPIV0) DeleteIndex(name string) (err error) { log.Debug("start delete index: ", name) @@ -324,3 +345,5 @@ func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (*Scroll, error return scroll, nil } + + diff --git a/v5.go b/v5.go index f4c961e..7e42e2c 100644 --- a/v5.go +++ b/v5.go @@ -55,6 +55,11 @@ func (s *ESAPIV5) CreateIndex(name string,settings map[string]interface{}) (err return s.ESAPIV0.CreateIndex(name,settings) } +func (s *ESAPIV5) UpdateIndexMapping(indexName string,settings map[string]interface{}) error { + return s.ESAPIV0.UpdateIndexMapping(indexName,settings) +} + + func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int)(scroll *Scroll, err error){ url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount) resp,body, errs := Get(url,s.Auth)