Skip to content

Commit

Permalink
support basic auth
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed May 19, 2016
1 parent 93363b7 commit 097b317
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 82 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ copy index `src_index` from `192.168.1.x` to `192.168.1.y:9200` and save with `d
./bin/esmove -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index -w=5 -b=100
```

desc es use basic auth
```
./bin/esmove -s http://localhost:9200/ -x "src_index" -y "dest_index-test" -d http://localhost:9201 -n admin:111111
```

## Compile:

1. make build
Expand All @@ -29,6 +34,8 @@ https://github.com/medcl/elasticsearch-dump/releases
```
-s, --source= source elasticsearch instance
-d, --dest= destination elasticsearch instance
-m, --source-auth basic auth of source elasticsearch instance, eg: user:pass
-n, --dest-auth basic auth of target elasticsearch instance, eg: user:pass
-c, --count= number of documents at a time: ie "size" in the scroll request (8000)
-t, --time= scroll time (1m)
--shards= set a number of shards on newly created indexes
Expand Down
9 changes: 8 additions & 1 deletion domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ type Config struct {
Uid string // es scroll uid
SrcESAPI ESAPI
DescESAPI ESAPI

SrcAuth *Auth
DescAuth *Auth
// config options
SrcEs string `short:"s" long:"source" description:"source elasticsearch instance" required:"true"`
SrcEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, eg: user:pass"`
DescEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, eg: user:pass"`
DstEs string `short:"d" long:"dest" description:"destination elasticsearch instance" required:"true"`
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"5000"`
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"1m"`
Expand All @@ -88,3 +91,7 @@ type Config struct {
LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"`
}

type Auth struct {
User string
Pass string
}
File renamed without changes.
55 changes: 41 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,21 @@ func main() {
// enough of a buffer to hold all the search results across all workers
c.DocChan = make(chan map[string]interface{}, c.DocBufferCount*c.Workers*10)


//dealing with basic auth
if(len(c.SrcEsAuthStr)>0&&strings.Contains(c.SrcEsAuthStr,":")){
authArray:=strings.Split(c.SrcEsAuthStr,":")
auth:=Auth{User:authArray[0],Pass:authArray[1]}
c.SrcAuth=&auth
}
if(len(c.DescEsAuthStr)>0&&strings.Contains(c.DescEsAuthStr,":")){
authArray:=strings.Split(c.DescEsAuthStr,":")
auth:=Auth{User:authArray[0],Pass:authArray[1]}
c.DescAuth=&auth
}

//get source es version
srcESVersion, errs := c.ClusterVersion(c.SrcEs)
srcESVersion, errs := c.ClusterVersion(c.SrcEs,c.SrcAuth)
if errs != nil {
return
}
Expand All @@ -47,16 +60,18 @@ func main() {
log.Debug("src es is V5,",srcESVersion.Version.Number)
api:=new(ESAPIV5)
api.Host=c.SrcEs
api.Auth=c.SrcAuth
c.SrcESAPI = api
} else {
log.Debug("src es is not V5,",srcESVersion.Version.Number)
api:=new(ESAPIV0)
api.Host=c.SrcEs
api.Auth=c.SrcAuth
c.SrcESAPI = api
}

//get target es version
descESVersion, errs := c.ClusterVersion(c.DstEs)
descESVersion, errs := c.ClusterVersion(c.DstEs,c.DescAuth)
if errs != nil {
return
}
Expand All @@ -65,12 +80,15 @@ func main() {
log.Debug("dest es is V5,",descESVersion.Version.Number)
api:=new(ESAPIV5)
api.Host=c.DstEs
api.Auth=c.DescAuth
c.DescESAPI = api
} else {
log.Debug("dest es is not V5,",descESVersion.Version.Number)
api:=new(ESAPIV0)
api.Host=c.DstEs
api.Auth=c.DescAuth
c.DescESAPI = api

}

// get all indexes from source
Expand Down Expand Up @@ -103,13 +121,14 @@ func main() {
if c.DestIndexName != "" {
//TODO
}else{
// create indexes on DstEs
if err := c.CreateIndexes(idxs); err != nil {
log.Error(err)
return
}
//// create indexes on DstEs
//if err := c.CreateIndexes(idxs); err != nil {
// log.Error(err)
// return
//}
}
}

// wait for cluster state to be okay before moving
timer := time.NewTimer(time.Second * 3)

Expand Down Expand Up @@ -172,6 +191,8 @@ func main() {
pool.Stop()
}

log.Info("done move..")

}

func setInitLogging(logLevel string) {
Expand Down Expand Up @@ -334,10 +355,10 @@ WORKER_DONE:
wg.Done()
}

func (c *Config)ClusterVersion(host string) (*ClusterVersion, []error) {
func (c *Config)ClusterVersion(host string,auth *Auth) (*ClusterVersion, []error) {

url := fmt.Sprintf("%s", host)
_, body, errs := Get(url)
_, body, errs := Get(url,auth)
if errs != nil {
log.Error(errs)
return nil,errs
Expand Down Expand Up @@ -460,16 +481,22 @@ func (c *Config) ClusterReady(api ESAPI) (*ClusterHealth, bool) {
}


func Get(url string) (*http.Response, string, []error) {
request := gorequest.New() //.SetBasicAuth("username", "password")
func Get(url string,auth *Auth) (*http.Response, string, []error) {
request := gorequest.New()
if(auth!=nil){
request.SetBasicAuth(auth.User,auth.Pass)
}

resp, body, errs := request.Get(url).End()
return resp, body, errs

}

func Post(url string, body []byte) {
request := gorequest.New() //.SetBasicAuth("username", "password")
request.Post(url).Send(body).End()
func Post(url string,auth *Auth, body string)(*http.Response, string, []error) {
request := gorequest.New()
if(auth!=nil){
request.SetBasicAuth(auth.User,auth.Pass)
}
return request.Post(url).Send(body).End()
}

83 changes: 44 additions & 39 deletions v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ import (
"encoding/json"
"fmt"
log "github.com/cihub/seelog"
"net/http"
"io/ioutil"
"bytes"
"strings"
"regexp"
"net/http"
"io/ioutil"
)

type ESAPIV0 struct{
Host string //eg:http://localhost:9200
Host string //eg: http://localhost:9200
Auth *Auth //eg: user:pass
}


func (s *ESAPIV0) ClusterHealth() *ClusterHealth {

url := fmt.Sprintf("%s/_cluster/health", s.Host)
_, body, errs := Get(url)
_, body, errs := Get(url,s.Auth)

if errs != nil {
return &ClusterHealth{Name: s.Host, Status: "unreachable"}
Expand All @@ -58,7 +58,20 @@ func (s *ESAPIV0) Bulk(data *bytes.Buffer){
return
}
data.WriteRune('\n')
resp, err := http.Post(fmt.Sprintf("%s/_bulk", s.Host), "", data)
url:=fmt.Sprintf("%s/_bulk", s.Host)

client := &http.Client{}
reqest, _ := http.NewRequest("POST", url, data)
if(s.Auth!=nil){
reqest.SetBasicAuth(s.Auth.User,s.Auth.Pass)
}
resp,errs := client.Do(reqest)
if errs != nil {
log.Error(errs)
return
}

body,err:=ioutil.ReadAll(resp.Body)
if err != nil {
log.Error(err)
return
Expand All @@ -67,22 +80,26 @@ func (s *ESAPIV0) Bulk(data *bytes.Buffer){
defer resp.Body.Close()
defer data.Reset()
if resp.StatusCode != 200 {
b, _ := ioutil.ReadAll(resp.Body)
log.Errorf("bad bulk response: %s %s", string(b), resp.StatusCode)
log.Errorf("bad bulk response: %s %s", body, resp.StatusCode)
return
}
}

func (s *ESAPIV0) GetIndexSettings(copyAllIndexes bool,indexNames string)(string,*Indexes,error){
resp, err := http.Get(fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames))
if err != nil {
return "",nil,err
url:=fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames)
resp,body, errs := Get(url,s.Auth)
if errs != nil {
log.Error(errs)
return "",nil,errs[0]
}
defer resp.Body.Close()

idxs := Indexes{}
dec := json.NewDecoder(resp.Body)
err = dec.Decode(&idxs)
er := json.Unmarshal([]byte(body),&idxs)

if er != nil {
return "",nil,er
}

// remove indexes that start with . if user asked for it
if copyAllIndexes == false {
Expand Down Expand Up @@ -132,7 +149,7 @@ func (s *ESAPIV0) GetIndexSettings(copyAllIndexes bool,indexNames string)(string
}
}

return indexNames,&idxs,err
return indexNames,&idxs,nil
}

func (s *ESAPIV0) UpdateIndexSettings(){}
Expand All @@ -141,24 +158,22 @@ func (s *ESAPIV0) NewScroll(indexNames string,scrollTime string,docBufferCount i

// curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50'
url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)
resp, err := http.Get(url)
resp,body, errs := Get(url,s.Auth)
if err != nil {
log.Error(err)
return nil,err
log.Error(errs)
return nil,errs[0]
}
defer resp.Body.Close()

body,err:=ioutil.ReadAll(resp.Body)

log.Debug("new scroll,",string(body))
log.Debug("new scroll,",body)

if err != nil {
log.Error(err)
return nil,err
}

scroll = &Scroll{}
err = json.Unmarshal(body,scroll)
err = json.Unmarshal([]byte(body),scroll)
if err != nil {
log.Error(err)
return nil,err
Expand All @@ -170,30 +185,20 @@ func (s *ESAPIV0) NewScroll(indexNames string,scrollTime string,docBufferCount i
func (s *ESAPIV0) NextScroll(scrollTime string,scrollId string)(*Scroll,error) {
// curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m'
id := bytes.NewBufferString(scrollId)

req, err := http.NewRequest("GET", fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id), nil)
if err != nil {
log.Error(err)
return nil,err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Error(err)
return nil,err
url:=fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
resp,body, errs := Get(url,s.Auth)
if errs != nil {
log.Error(errs)
return nil,errs[0]
}
defer resp.Body.Close()

data, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error(err)
return nil,err
}
defer resp.Body.Close()

// decode elasticsearch scroll response
scroll := &Scroll{}
err = json.Unmarshal(data, &scroll)
err := json.Unmarshal([]byte(body), &scroll)
if err != nil {
log.Error(string(data))
log.Error(body)
log.Error(err)
return nil,err
}
Expand Down
Loading

0 comments on commit 097b317

Please sign in to comment.