diff --git a/README.md b/README.md index 5eddfe2..47d896f 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,11 @@ copy index `src_index` from `192.168.1.x` to `192.168.1.y:9200` and save with `d ./bin/esmove -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index -w=5 -b=100 ``` +desc es use basic auth +``` +./bin/esmove -s http://localhost:9200/ -x "src_index" -y "dest_index-test" -d http://localhost:9201 -n admin:111111 +``` + ## Compile: 1. make build @@ -29,6 +34,8 @@ https://github.com/medcl/elasticsearch-dump/releases ``` -s, --source= source elasticsearch instance -d, --dest= destination elasticsearch instance + -m, --source-auth basic auth of source elasticsearch instance, eg: user:pass + -n, --dest-auth basic auth of target elasticsearch instance, eg: user:pass -c, --count= number of documents at a time: ie "size" in the scroll request (8000) -t, --time= scroll time (1m) --shards= set a number of shards on newly created indexes diff --git a/domain.go b/domain.go index ac0bed0..5819dd4 100644 --- a/domain.go +++ b/domain.go @@ -69,9 +69,12 @@ type Config struct { Uid string // es scroll uid SrcESAPI ESAPI DescESAPI ESAPI - + SrcAuth *Auth + DescAuth *Auth // config options SrcEs string `short:"s" long:"source" description:"source elasticsearch instance" required:"true"` + SrcEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, eg: user:pass"` + DescEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, eg: user:pass"` DstEs string `short:"d" long:"dest" description:"destination elasticsearch instance" required:"true"` DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"5000"` ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"` @@ -88,3 +91,7 @@ type Config struct { LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"` } +type Auth struct { + User string + Pass string +} diff --git a/interface.go b/esapi.go similarity index 100% rename from interface.go rename to esapi.go diff --git a/main.go b/main.go index e25a1ab..af0417c 100644 --- a/main.go +++ b/main.go @@ -37,8 +37,21 @@ func main() { // enough of a buffer to hold all the search results across all workers c.DocChan = make(chan map[string]interface{}, c.DocBufferCount*c.Workers*10) + + //dealing with basic auth + if(len(c.SrcEsAuthStr)>0&&strings.Contains(c.SrcEsAuthStr,":")){ + authArray:=strings.Split(c.SrcEsAuthStr,":") + auth:=Auth{User:authArray[0],Pass:authArray[1]} + c.SrcAuth=&auth + } + if(len(c.DescEsAuthStr)>0&&strings.Contains(c.DescEsAuthStr,":")){ + authArray:=strings.Split(c.DescEsAuthStr,":") + auth:=Auth{User:authArray[0],Pass:authArray[1]} + c.DescAuth=&auth + } + //get source es version - srcESVersion, errs := c.ClusterVersion(c.SrcEs) + srcESVersion, errs := c.ClusterVersion(c.SrcEs,c.SrcAuth) if errs != nil { return } @@ -47,16 +60,18 @@ func main() { log.Debug("src es is V5,",srcESVersion.Version.Number) api:=new(ESAPIV5) api.Host=c.SrcEs + api.Auth=c.SrcAuth c.SrcESAPI = api } else { log.Debug("src es is not V5,",srcESVersion.Version.Number) api:=new(ESAPIV0) api.Host=c.SrcEs + api.Auth=c.SrcAuth c.SrcESAPI = api } //get target es version - descESVersion, errs := c.ClusterVersion(c.DstEs) + descESVersion, errs := c.ClusterVersion(c.DstEs,c.DescAuth) if errs != nil { return } @@ -65,12 +80,15 @@ func main() { log.Debug("dest es is V5,",descESVersion.Version.Number) api:=new(ESAPIV5) api.Host=c.DstEs + api.Auth=c.DescAuth c.DescESAPI = api } else { log.Debug("dest es is not V5,",descESVersion.Version.Number) api:=new(ESAPIV0) api.Host=c.DstEs + api.Auth=c.DescAuth c.DescESAPI = api + } // get all indexes from source @@ -103,13 +121,14 @@ func main() { if c.DestIndexName != "" { //TODO }else{ - // create indexes on DstEs - if err := c.CreateIndexes(idxs); err != nil { - log.Error(err) - return - } + //// create indexes on DstEs + //if err := c.CreateIndexes(idxs); err != nil { + // log.Error(err) + // return + //} } } + // wait for cluster state to be okay before moving timer := time.NewTimer(time.Second * 3) @@ -172,6 +191,8 @@ func main() { pool.Stop() } + log.Info("done move..") + } func setInitLogging(logLevel string) { @@ -334,10 +355,10 @@ WORKER_DONE: wg.Done() } -func (c *Config)ClusterVersion(host string) (*ClusterVersion, []error) { +func (c *Config)ClusterVersion(host string,auth *Auth) (*ClusterVersion, []error) { url := fmt.Sprintf("%s", host) - _, body, errs := Get(url) + _, body, errs := Get(url,auth) if errs != nil { log.Error(errs) return nil,errs @@ -460,16 +481,22 @@ func (c *Config) ClusterReady(api ESAPI) (*ClusterHealth, bool) { } -func Get(url string) (*http.Response, string, []error) { - request := gorequest.New() //.SetBasicAuth("username", "password") +func Get(url string,auth *Auth) (*http.Response, string, []error) { + request := gorequest.New() + if(auth!=nil){ + request.SetBasicAuth(auth.User,auth.Pass) + } resp, body, errs := request.Get(url).End() return resp, body, errs } -func Post(url string, body []byte) { - request := gorequest.New() //.SetBasicAuth("username", "password") - request.Post(url).Send(body).End() +func Post(url string,auth *Auth, body string)(*http.Response, string, []error) { + request := gorequest.New() + if(auth!=nil){ + request.SetBasicAuth(auth.User,auth.Pass) + } + return request.Post(url).Send(body).End() } diff --git a/v0.go b/v0.go index 446da64..87ddd0e 100644 --- a/v0.go +++ b/v0.go @@ -20,22 +20,22 @@ import ( "encoding/json" "fmt" log "github.com/cihub/seelog" - "net/http" - "io/ioutil" "bytes" "strings" "regexp" + "net/http" + "io/ioutil" ) type ESAPIV0 struct{ - Host string //eg:http://localhost:9200 + Host string //eg: http://localhost:9200 + Auth *Auth //eg: user:pass } - func (s *ESAPIV0) ClusterHealth() *ClusterHealth { url := fmt.Sprintf("%s/_cluster/health", s.Host) - _, body, errs := Get(url) + _, body, errs := Get(url,s.Auth) if errs != nil { return &ClusterHealth{Name: s.Host, Status: "unreachable"} @@ -58,7 +58,20 @@ func (s *ESAPIV0) Bulk(data *bytes.Buffer){ return } data.WriteRune('\n') - resp, err := http.Post(fmt.Sprintf("%s/_bulk", s.Host), "", data) + url:=fmt.Sprintf("%s/_bulk", s.Host) + + client := &http.Client{} + reqest, _ := http.NewRequest("POST", url, data) + if(s.Auth!=nil){ + reqest.SetBasicAuth(s.Auth.User,s.Auth.Pass) + } + resp,errs := client.Do(reqest) + if errs != nil { + log.Error(errs) + return + } + + body,err:=ioutil.ReadAll(resp.Body) if err != nil { log.Error(err) return @@ -67,22 +80,26 @@ func (s *ESAPIV0) Bulk(data *bytes.Buffer){ defer resp.Body.Close() defer data.Reset() if resp.StatusCode != 200 { - b, _ := ioutil.ReadAll(resp.Body) - log.Errorf("bad bulk response: %s %s", string(b), resp.StatusCode) + log.Errorf("bad bulk response: %s %s", body, resp.StatusCode) return } } func (s *ESAPIV0) GetIndexSettings(copyAllIndexes bool,indexNames string)(string,*Indexes,error){ - resp, err := http.Get(fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames)) - if err != nil { - return "",nil,err + url:=fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames) + resp,body, errs := Get(url,s.Auth) + if errs != nil { + log.Error(errs) + return "",nil,errs[0] } defer resp.Body.Close() idxs := Indexes{} - dec := json.NewDecoder(resp.Body) - err = dec.Decode(&idxs) + er := json.Unmarshal([]byte(body),&idxs) + + if er != nil { + return "",nil,er + } // remove indexes that start with . if user asked for it if copyAllIndexes == false { @@ -132,7 +149,7 @@ func (s *ESAPIV0) GetIndexSettings(copyAllIndexes bool,indexNames string)(string } } - return indexNames,&idxs,err + return indexNames,&idxs,nil } func (s *ESAPIV0) UpdateIndexSettings(){} @@ -141,16 +158,14 @@ func (s *ESAPIV0) NewScroll(indexNames string,scrollTime string,docBufferCount i // curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50' url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount) - resp, err := http.Get(url) + resp,body, errs := Get(url,s.Auth) if err != nil { - log.Error(err) - return nil,err + log.Error(errs) + return nil,errs[0] } defer resp.Body.Close() - body,err:=ioutil.ReadAll(resp.Body) - - log.Debug("new scroll,",string(body)) + log.Debug("new scroll,",body) if err != nil { log.Error(err) @@ -158,7 +173,7 @@ func (s *ESAPIV0) NewScroll(indexNames string,scrollTime string,docBufferCount i } scroll = &Scroll{} - err = json.Unmarshal(body,scroll) + err = json.Unmarshal([]byte(body),scroll) if err != nil { log.Error(err) return nil,err @@ -170,30 +185,20 @@ func (s *ESAPIV0) NewScroll(indexNames string,scrollTime string,docBufferCount i func (s *ESAPIV0) NextScroll(scrollTime string,scrollId string)(*Scroll,error) { // curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m' id := bytes.NewBufferString(scrollId) - - req, err := http.NewRequest("GET", fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id), nil) - if err != nil { - log.Error(err) - return nil,err - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.Error(err) - return nil,err + url:=fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) + resp,body, errs := Get(url,s.Auth) + if errs != nil { + log.Error(errs) + return nil,errs[0] } - defer resp.Body.Close() - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Error(err) - return nil,err - } + defer resp.Body.Close() // decode elasticsearch scroll response scroll := &Scroll{} - err = json.Unmarshal(data, &scroll) + err := json.Unmarshal([]byte(body), &scroll) if err != nil { - log.Error(string(data)) + log.Error(body) log.Error(err) return nil,err } diff --git a/v5.go b/v5.go index 0cbe996..02cdfd4 100644 --- a/v5.go +++ b/v5.go @@ -20,16 +20,13 @@ import ( "bytes" log "github.com/cihub/seelog" "encoding/json" - "io/ioutil" "fmt" - "net/http" ) type ESAPIV5 struct{ ESAPIV0 } - func (s *ESAPIV5) ClusterHealth() *ClusterHealth { return s.ESAPIV0.ClusterHealth() } @@ -41,18 +38,17 @@ func (s *ESAPIV5) GetIndexSettings(copyAllIndexes bool,indexNames string)(string return s.ESAPIV0.GetIndexSettings(copyAllIndexes,indexNames) } func (s *ESAPIV5) UpdateIndexSettings(){} + func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount int)(scroll *Scroll, err error){ url := fmt.Sprintf("%s/%s/_search?scroll=%s&size=%d", s.Host, indexNames, scrollTime,docBufferCount) - resp, err := http.Get(url) - if err != nil { - log.Error(err) - return nil,err + resp,body, errs := Get(url,s.Auth) + if errs != nil { + log.Error(errs) + return nil,errs[0] } defer resp.Body.Close() - body,err:=ioutil.ReadAll(resp.Body) - - log.Debug("new scroll,",string(body)) + log.Debug("new scroll,",body) if err != nil { log.Error(err) @@ -60,7 +56,7 @@ func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount i } scroll = &Scroll{} - err = json.Unmarshal(body,scroll) + err = json.Unmarshal([]byte(body),scroll) if err != nil { log.Error(err) return nil,err @@ -71,29 +67,19 @@ func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount i func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(*Scroll,error) { id := bytes.NewBufferString(scrollId) - req, err := http.NewRequest("GET", fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id), nil) - if err != nil { - log.Error(err) - return nil,err - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.Error(err) - return nil,err + url:=fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id) + resp,body, errs := Get(url,s.Auth) + if errs != nil { + log.Error(errs) + return nil,errs[0] } defer resp.Body.Close() - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Error(err) - return nil,err - } - // decode elasticsearch scroll response scroll := &Scroll{} - err = json.Unmarshal(data, &scroll) + err:= json.Unmarshal([]byte(body), &scroll) if err != nil { - log.Error(string(data)) + log.Error(body) log.Error(err) return nil,err }