From 982a5a2446994f3f25d074b60d4f1499ad80e076 Mon Sep 17 00:00:00 2001 From: Joshua Deare Date: Tue, 20 Sep 2016 15:54:22 -0700 Subject: [PATCH] Add API to governor --- api/api.go | 55 +++++++++++++++++++++ api/fsm_api.go | 117 +++++++++++++++++++++++++++++++++++++++++++++ api/ha_api.go | 28 +++++++++++ api/service_api.go | 51 ++++++++++++++++++++ configuration.go | 2 +- fsm/fsm.go | 14 +++--- governor.go | 11 +++++ ha/ha.go | 11 +++-- postgres0.yml | 3 +- postgres1.yml | 3 +- postgres2.yml | 3 +- 11 files changed, 283 insertions(+), 15 deletions(-) create mode 100644 api/api.go create mode 100644 api/fsm_api.go create mode 100644 api/ha_api.go create mode 100644 api/service_api.go diff --git a/api/api.go b/api/api.go new file mode 100644 index 00000000..a76832d9 --- /dev/null +++ b/api/api.go @@ -0,0 +1,55 @@ +package api + +import ( + "encoding/json" + "github.com/compose/governor/fsm" + "github.com/compose/governor/ha" + "github.com/compose/governor/service" + "github.com/gorilla/mux" + "github.com/pkg/errors" + "net/http" +) + +func Router(singleFSM fsm.SingleLeaderFSM, singleHA *ha.SingleLeaderHA, singleService service.SingleLeaderService) (*mux.Router, error) { + r := mux.NewRouter() + if err := registerFSMRouter(singleFSM, singleService, r.PathPrefix("/fsm").Subrouter()); err != nil { + return nil, errors.Wrap(err, "Error registering FSM subrouter for API") + } + if err := registerServiceRouter(singleService, r.PathPrefix("/service").Subrouter()); err != nil { + return nil, errors.Wrap(err, "Error registering Service subrouter for API") + } + if err := registerHARouter(singleHA, r.PathPrefix("/ha").Subrouter()); err != nil { + return nil, errors.Wrap(err, "Error registering HA subrouter for API") + } + return r, nil +} + +type apiSuccessResponse struct { + Data interface{} `json:"data,omitempty"` +} +type apiErrorResponse struct { + Errors []error `json:"errors"` +} + +func sendResponse(code int, jsonData interface{}, errorMsgs []error, w http.ResponseWriter) error { + var resp interface{} + if len(errorMsgs) > 0 { + resp = apiErrorResponse{Errors: errorMsgs} + } else if jsonData != nil { + resp = apiSuccessResponse{Data: jsonData} + } else { + return errors.New("Must supply either errorsMsgs or jsonDATA to response") + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + + if resp == nil { + return nil + } + + if err := json.NewEncoder(w).Encode(resp); err != nil { + return errors.Wrap(err, "Error encoding JSON into response body") + } + return nil +} diff --git a/api/fsm_api.go b/api/fsm_api.go new file mode 100644 index 00000000..95a649b2 --- /dev/null +++ b/api/fsm_api.go @@ -0,0 +1,117 @@ +package api + +import ( + log "github.com/Sirupsen/logrus" + "github.com/compose/governor/fsm" + "github.com/compose/governor/service" + "github.com/gorilla/mux" + "github.com/pkg/errors" + "net/http" +) + +// TODO: Add long listener for updates +func registerFSMRouter(singleFSM fsm.SingleLeaderFSM, singleService service.SingleLeaderService, r *mux.Router) error { + r.HandleFunc("/id", singleFSMIDHandler(singleFSM)).Methods("GET") + r.HandleFunc("/leader", singleFSMLeaderHandler(singleFSM, singleService)).Methods("GET") + r.HandleFunc("/member/{id}", singleFSMMemberHandler(singleFSM, singleService)).Methods("GET") + r.HandleFunc("/members", singleFSMMembersHandler(singleFSM, singleService)).Methods("GET") + return nil +} + +func singleFSMIDHandler(singleFSM fsm.SingleLeaderFSM) http.HandlerFunc { + type idAPIResp struct { + ID uint64 `json:"id"` + } + return func(w http.ResponseWriter, req *http.Request) { + id := singleFSM.UniqueID() + if err := sendResponse(200, idAPIResp{ID: id}, []error{}, w); err != nil { + log.Error("Error sending response for ID request") + } + } +} + +func singleFSMLeaderHandler(singleFSM fsm.SingleLeaderFSM, singleService service.SingleLeaderService) http.HandlerFunc { + type leaderAPIResp struct { + Leader fsm.Leader `json:"leader"` + Exists bool `json:"exists"` + } + return func(w http.ResponseWriter, req *http.Request) { + leaderData, exists, err := singleFSM.Leader() + if err != nil { + if err := sendResponse(500, nil, []error{err}, w); err != nil { + log.Error("Error sending error response") + } + } + leader, err := singleService.FSMLeaderFromBytes(leaderData) + if err != nil { + if err := sendResponse(500, nil, []error{err}, w); err != nil { + log.Error("Error sending error response") + } + } + if err := sendResponse(200, leaderAPIResp{Leader: leader, Exists: exists}, []error{}, w); err != nil { + log.Error("Error sending leader response") + + } + } +} + +func singleFSMMemberHandler(singleFSM fsm.SingleLeaderFSM, singleService service.SingleLeaderService) http.HandlerFunc { + type memberAPIResp struct { + Member fsm.Member `json:"member"` + Exists bool `json:"exists"` + } + return func(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + id, ok := vars["id"] + if !ok { + if err := sendResponse(400, nil, []error{errors.New("ID Not provided in request for member")}, w); err != nil { + log.Error("Error sending error response") + } + } + + memberData, exists, err := singleFSM.Member(id) + if err != nil { + if err := sendResponse(500, nil, []error{err}, w); err != nil { + log.Error("Error sending error response") + } + } + member, err := singleService.FSMMemberFromBytes(memberData) + if err != nil { + if err := sendResponse(500, nil, []error{err}, w); err != nil { + log.Error("Error sending error response") + } + } + if err := sendResponse(200, memberAPIResp{Member: member, Exists: exists}, []error{}, w); err != nil { + log.Error("Error sending leader response") + + } + } +} + +func singleFSMMembersHandler(singleFSM fsm.SingleLeaderFSM, singleService service.SingleLeaderService) http.HandlerFunc { + type membersAPIResp struct { + Members []fsm.Member `json:"members"` + } + return func(w http.ResponseWriter, req *http.Request) { + membersData, err := singleFSM.Members() + members := []fsm.Member{} + for _, memberData := range membersData { + member, err := singleService.FSMMemberFromBytes(memberData) + if err != nil { + if err := sendResponse(500, nil, []error{err}, w); err != nil { + log.Error("Error sending error response") + } + } + members = append(members, member) + } + if err != nil { + if err := sendResponse(500, nil, []error{err}, w); err != nil { + log.Error("Error sending error response") + } + } + if err := sendResponse(200, membersAPIResp{Members: members}, []error{}, w); err != nil { + log.Error("Error sending leader response") + + } + } +} diff --git a/api/ha_api.go b/api/ha_api.go new file mode 100644 index 00000000..b5b9d155 --- /dev/null +++ b/api/ha_api.go @@ -0,0 +1,28 @@ +package api + +import ( + log "github.com/Sirupsen/logrus" + "github.com/compose/governor/ha" + "github.com/gorilla/mux" + "net/http" +) + +func registerHARouter(singleHA *ha.SingleLeaderHA, r *mux.Router) error { + r.HandleFunc("/is_leader", singleHAIsLeaderHandler(singleHA)).Methods("GET") + return nil +} + +func singleHAIsLeaderHandler(singleHA *ha.SingleLeaderHA) http.HandlerFunc { + type isLeaderAPIResp struct { + IsLeader bool `json:"is_leader"` + } + return func(w http.ResponseWriter, req *http.Request) { + isLeader, err := singleHA.IsLeader() + if err != nil { + sendResponse(500, nil, []error{err}, w) + } + if err := sendResponse(200, isLeaderAPIResp{IsLeader: isLeader}, []error{}, w); err != nil { + log.Error("Error sending response for request") + } + } +} diff --git a/api/service_api.go b/api/service_api.go new file mode 100644 index 00000000..b01005d8 --- /dev/null +++ b/api/service_api.go @@ -0,0 +1,51 @@ +package api + +import ( + log "github.com/Sirupsen/logrus" + "github.com/compose/governor/service" + "github.com/gorilla/mux" + "net/http" +) + +func registerServiceRouter(singleService service.SingleLeaderService, r *mux.Router) error { + r.HandleFunc("/running_as_leader", singleServiceRunningAsLeaderHandler(singleService)).Methods("GET") + r.HandleFunc("/is_running", singleServiceIsRunningHandler(singleService)).Methods("GET") + r.HandleFunc("/is_healthy", singleServiceIsHealthyHandler(singleService)).Methods("GET") + return nil +} + +func singleServiceRunningAsLeaderHandler(singleService service.SingleLeaderService) http.HandlerFunc { + type isLeaderAPIResp struct { + RunningAsLeader bool `json:"running_as_leader"` + } + return func(w http.ResponseWriter, req *http.Request) { + runningAsLeader := singleService.RunningAsLeader() + if err := sendResponse(200, isLeaderAPIResp{RunningAsLeader: runningAsLeader}, []error{}, w); err != nil { + log.Error("Error sending response for ID request") + } + } +} + +func singleServiceIsRunningHandler(singleService service.SingleLeaderService) http.HandlerFunc { + type isRunningAPIResp struct { + IsRunning bool `json:"is_running"` + } + return func(w http.ResponseWriter, req *http.Request) { + running := singleService.IsRunning() + if err := sendResponse(200, isRunningAPIResp{IsRunning: running}, []error{}, w); err != nil { + log.Error("Error sending response for ID request") + } + } +} + +func singleServiceIsHealthyHandler(singleService service.SingleLeaderService) http.HandlerFunc { + type isHealthyAPIResp struct { + IsHealthy bool `json:"is_healthy"` + } + return func(w http.ResponseWriter, req *http.Request) { + healthy := singleService.IsHealthy() + if err := sendResponse(200, isHealthyAPIResp{IsHealthy: healthy}, []error{}, w); err != nil { + log.Error("Error sending response for ID request") + } + } +} diff --git a/configuration.go b/configuration.go index b9dabefd..a75ca7a9 100644 --- a/configuration.go +++ b/configuration.go @@ -13,7 +13,7 @@ type Configuration struct { DataDir string `yaml:"data_dir"` FSM *fsm.Config `yaml:"fsm"` Postgresql *service.PostgresqlConfig `yaml:"postgresql"` - HAHealth string `yaml:"haproxy_health_endpoint"` + APIPort int `yaml:"api_port"` } func LoadConfiguration(path string) (Configuration, error) { diff --git a/fsm/fsm.go b/fsm/fsm.go index b787a199..26c22d2e 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -106,12 +106,12 @@ func (f *fsm) RaceForInit(timeout time.Duration) (bool, error) { // TODO: allow custom logger to be passed in type Config struct { - RaftPort int `yaml:"raft_port"` - APIPort int `yaml:"api_port"` - BootstrapPeers []string `yaml:"bootstrap_peers"` - BootstrapNode bool `yaml:"is_bootstrap"` - DataDir string `yaml:"data_dir"` - ClusterID uint64 `yaml:"cluster_id"` + RaftPort int `yaml:"raft_port"` + ClusterConfigPort int `yaml:"cluster_config_port"` + BootstrapPeers []string `yaml:"bootstrap_peers"` + BootstrapNode bool `yaml:"is_bootstrap"` + DataDir string `yaml:"data_dir"` + ClusterID uint64 `yaml:"cluster_id"` // LeaderTTL in milliseconds LeaderTTL int `yaml:"leader_ttl"` // MemberTTL in milliseconds @@ -138,7 +138,7 @@ func NewGovernorFSM(config *Config) (SingleLeaderFSM, error) { FSM: newFSM, ClusterID: config.ClusterID, RaftPort: config.RaftPort, - APIPort: config.APIPort, + APIPort: config.ClusterConfigPort, BootstrapPeers: config.BootstrapPeers, BootstrapNode: config.BootstrapNode, DataDir: config.DataDir, diff --git a/governor.go b/governor.go index a7510ad8..42baf143 100644 --- a/governor.go +++ b/governor.go @@ -5,9 +5,11 @@ import ( //"os/exec" "flag" log "github.com/Sirupsen/logrus" + "github.com/compose/governor/api" "github.com/compose/governor/fsm" "github.com/compose/governor/ha" "github.com/compose/governor/service" + "net/http" "os" "os/signal" "path/filepath" @@ -102,6 +104,15 @@ func main() { }).Info("Clean Shutdown Finished") } }(singleHA, singleLeaderState, pg) + + go func() { + router, err := api.Router(singleLeaderState, singleHA, pg) + if err != nil { + log.Error("Could not start API") + } + http.ListenAndServe(fmt.Sprintf(":%d", configuration.APIPort), router) + }() + if err := singleHA.Run(); err != nil { log.Fatalf("Error Running HA, %+v", err) } diff --git a/ha/ha.go b/ha/ha.go index 1822b09d..3863c5b6 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -303,7 +303,7 @@ func (ha *SingleLeaderHA) RunCycle() error { } } - isLeader, err := ha.isLeader() + isLeader, err := ha.IsLeader() if err != nil { return err } @@ -469,14 +469,17 @@ func (ha *SingleLeaderHA) leaderIsMe(leader fsm.Leader) (bool, error) { } -func (ha *SingleLeaderHA) isLeader() (bool, error) { - curLeader := ha.service.FSMLeaderTemplate() - exists, err := ha.fsm.Leader(curLeader) +func (ha *SingleLeaderHA) IsLeader() (bool, error) { + leaderData, exists, err := ha.fsm.Leader() if err != nil { return false, err } else if !exists { return false, nil } + curLeader, err := ha.service.FSMLeaderFromBytes(leaderData) + if err != nil { + return false, errors.Wrap(err, "Error getting leader from bytes") + } meAsLeader, err := ha.service.AsFSMLeader() if err != nil { diff --git a/postgres0.yml b/postgres0.yml index 9346b196..4c448d4e 100644 --- a/postgres0.yml +++ b/postgres0.yml @@ -1,8 +1,9 @@ loop_wait: 1000 #milliseconds data_dir: "data/postgres0" #canoe and pg data will go here +api_port: 5000 fsm: raft_port: 1234 - api_port: 1244 + cluster_config_port: 1244 #bootstrap_peers: #- http://localhost:1245 #TODO: Alter canoe to allow set-list of bootstrap peers diff --git a/postgres1.yml b/postgres1.yml index be4dd6a9..7a297878 100644 --- a/postgres1.yml +++ b/postgres1.yml @@ -1,8 +1,9 @@ loop_wait: 1000 #milliseconds data_dir: "data/postgres1" #canoe and pg data will go here +api_port: 5001 fsm: raft_port: 1235 - api_port: 1245 + cluster_config_port: 1245 bootstrap_peers: - http://localhost:1244 #TODO: Alter canoe to allow set-list of bootstrap peers diff --git a/postgres2.yml b/postgres2.yml index b7d0df7a..4336c2c8 100644 --- a/postgres2.yml +++ b/postgres2.yml @@ -1,8 +1,9 @@ loop_wait: 1000 #milliseconds data_dir: "data/postgres2" #canoe and pg data will go here +api_port: 5002 fsm: raft_port: 1236 - api_port: 1246 + cluster_config_port: 1246 bootstrap_peers: - http://localhost:1244 - http://localhost:1245