forked from nikepan/clickhouse-bulk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
79 lines (67 loc) · 1.94 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main
import (
"context"
"github.com/labstack/echo"
"io/ioutil"
"log"
"net/http"
)
// Server - main server object
type Server struct {
Listen string
Collector *Collector
Debug bool
echo *echo.Echo
}
// Status - response status struct
type Status struct {
Status string `json:"status"`
SendQueue int `json:"send_queue,omitempty"`
Servers map[string]*ClickhouseServer `json:"servers,omitempty"`
Tables map[string]*Table `json:"tables,omitempty"`
}
// NewServer - create server
func NewServer(listen string, collector *Collector, debug bool) *Server {
return &Server{listen, collector, debug, echo.New()}
}
func (server *Server) writeHandler(c echo.Context) error {
q, _ := ioutil.ReadAll(c.Request().Body)
s := string(q)
if server.Debug {
log.Printf("query %+v %+v\n", c.QueryString(), s)
}
qs := c.QueryString()
user, password, ok := c.Request().BasicAuth()
if ok {
if qs == "" {
qs = "user=" + user + "&password=" + password
} else {
qs = "user=" + user + "&password=" + password + "&" + qs
}
}
params, content, insert := server.Collector.ParseQuery(qs, s)
if insert {
go server.Collector.Push(params, content)
return c.String(http.StatusOK, "")
}
resp, status := server.Collector.Sender.SendQuery(params, content)
return c.String(status, resp)
}
func (server *Server) statusHandler(c echo.Context) error {
return c.JSON(200, Status{Status: "ok"})
}
// Start - start http server
func (server *Server) Start() error {
return server.echo.Start(server.Listen)
}
// Shutdown - stop http server
func (server *Server) Shutdown(ctx context.Context) error {
return server.echo.Shutdown(ctx)
}
// InitServer - run server
func InitServer(listen string, collector *Collector, debug bool) *Server {
server := NewServer(listen, collector, debug)
server.echo.POST("/", server.writeHandler)
server.echo.GET("/status", server.statusHandler)
return server
}