From bf77458ddcedf1aba591b5075140f6b20539fc76 Mon Sep 17 00:00:00 2001 From: Sabyasachi Patra Date: Mon, 20 Dec 2021 10:17:15 +0530 Subject: [PATCH] websocket (#162) * websocket initialised, user and pod commands added (#152) * addressing #119, uploading same file multiple times * addressing #127, server requires postageBlockId to run * addressing #119, backup old file in case same name * code cleanup * addressing #117 and #118, issues related to pod deletion (#1) * addressing #117 and #118, issues related to pod deletion * fix reploading deleted file * fix #124, handle removed root directory * fixed #19, pod deletion requires password * mod tidy (#146) * mod tidy * manually restore vendor * manually restore vendor * fix #86, shared pod removed from list * websocket initialised, user and pod commands added * fixing lint * fixing lint * file related events added * download working * most of the kv and doc commands added * loadcsv, loadjson, indexjson * minor type changes * whitelist origins * add stream functionality * add streaming for loadcsv and loadjson * require centent_length for upload stream * lint fixes * fix #165 (#166) * cli can connect to remote dfs server (#164) * cli can connect to remote dfs server * lint fixes * bump version --- cmd/common/websocket_request.go | 138 +++++ cmd/dfs-cli/cmd/fdfs-api.go | 19 +- cmd/dfs-cli/cmd/pod.go | 4 +- cmd/dfs-cli/cmd/prompt.go | 2 +- cmd/dfs-cli/cmd/root.go | 24 +- cmd/dfs/cmd/server.go | 6 +- go.mod | 1 + pkg/api/doc_indexjson.go | 2 +- pkg/api/doc_new.go | 2 +- pkg/api/file_download.go | 2 +- pkg/api/kv_new.go | 2 +- pkg/api/login_middleware.go | 2 +- pkg/api/ws.go | 968 ++++++++++++++++++++++++++++++++ pkg/collection/document.go | 14 +- pkg/dir/rmdir_test.go | 1 - pkg/file/reader.go | 2 +- vendor/modules.txt | 1 + version.go | 2 +- 18 files changed, 1154 insertions(+), 38 deletions(-) create mode 100644 cmd/common/websocket_request.go create mode 100644 pkg/api/ws.go diff --git a/cmd/common/websocket_request.go b/cmd/common/websocket_request.go new file mode 100644 index 00000000..4193bc72 --- /dev/null +++ b/cmd/common/websocket_request.go @@ -0,0 +1,138 @@ +package common + +import ( + "bytes" + "encoding/json" + "net/http" +) + +type Event string + +var ( + UserSignup Event = "/user/signup" + UserLogin Event = "/user/login" + UserImport Event = "/user/import" + UserPresent Event = "/user/present" + UserIsLoggedin Event = "/user/isloggedin" + UserLogout Event = "/user/logout" + UserExport Event = "/user/export" + UserDelete Event = "/user/delete" + UserStat Event = "/user/stat" + PodNew Event = "/pod/new" + PodOpen Event = "/pod/open" + PodClose Event = "/pod/close" + PodSync Event = "/pod/sync" + PodDelete Event = "/pod/delete" + PodLs Event = "/pod/ls" + PodStat Event = "/pod/stat" + PodShare Event = "/pod/share" + PodReceive Event = "/pod/receive" + PodReceiveInfo Event = "/pod/receiveinfo" + DirIsPresent Event = "/dir/present" + DirMkdir Event = "/dir/mkdir" + DirRmdir Event = "/dir/rmdir" + DirLs Event = "/dir/ls" + DirStat Event = "/dir/stat" + FileDownload Event = "/file/download" + FileDownloadStream Event = "/file/download/stream" + FileUpload Event = "/file/upload" + FileUploadStream Event = "/file/upload/stream" + FileShare Event = "/file/share" + FileReceive Event = "/file/receive" + FileReceiveInfo Event = "/file/receiveinfo" + FileDelete Event = "/file/delete" + FileStat Event = "/file/stat" + KVCreate Event = "/kv/new" + KVList Event = "/kv/ls" + KVOpen Event = "/kv/open" + KVDelete Event = "/kv/delete" + KVCount Event = "/kv/count" + KVEntryPut Event = "/kv/entry/put" + KVEntryGet Event = "/kv/entry/get" + KVEntryDelete Event = "/kv/entry/del" + KVLoadCSV Event = "/kv/loadcsv" + KVLoadCSVStream Event = "/kv/loadcsv/stream" + KVSeek Event = "/kv/seek" + KVSeekNext Event = "/kv/seek/next" + DocCreate Event = "/doc/new" + DocList Event = "/doc/ls" + DocOpen Event = "/doc/open" + DocCount Event = "/doc/count" + DocDelete Event = "/doc/delete" + DocFind Event = "/doc/find" + DocEntryPut Event = "/doc/entry/put" + DocEntryGet Event = "/doc/entry/get" + DocEntryDel Event = "/doc/entry/del" + DocLoadJson Event = "/doc/loadjson" + DocLoadJsonStream Event = "/doc/loadjson/stream" + DocIndexJson Event = "/doc/indexjson" +) + +type WebsocketRequest struct { + Event Event `json:"event"` + Params interface{} `json:"params,omitempty"` +} + +type FileRequest struct { + PodName string `json:"pod_name,omitempty"` + TableName string `json:"table_name,omitempty"` + DirPath string `json:"dir_path,omitempty"` + BlockSize string `json:"block_size,omitempty"` + FileName string `json:"file_name,omitempty"` +} + +type FileDownloadRequest struct { + PodName string `json:"pod_name,omitempty"` + Filepath string `json:"file_path,omitempty"` +} + +type WebsocketResponse struct { + Event Event `json:"event"` + StatusCode int `json:"code"` + Params interface{} `json:"params,omitempty"` + header http.Header + buf bytes.Buffer +} + +func NewWebsocketResponse() *WebsocketResponse { + return &WebsocketResponse{ + header: map[string][]string{}, + } +} + +func (w *WebsocketResponse) Header() http.Header { + return w.header +} + +func (w *WebsocketResponse) Write(bytes []byte) (int, error) { + if w.Header().Get("Content-Type") == "application/json; charset=utf-8" || + w.Header().Get("Content-Type") == "application/json" { + body := map[string]interface{}{} + err := json.Unmarshal(bytes, &body) + if err != nil { + return 0, err + } + w.Params = body + return len(bytes), nil + } + if w.Header().Get("Content-Length") != "" || w.Header().Get("Content-Length") != "0" { + return w.buf.Write(bytes) + } + return 0, nil +} + +func (w *WebsocketResponse) WriteHeader(statusCode int) { + w.StatusCode = statusCode +} + +func (w *WebsocketResponse) Marshal() []byte { + if w.Header().Get("Content-Type") == "application/json; charset=utf-8" || + w.Header().Get("Content-Type") == "application/json" { + data, _ := json.Marshal(w) + return data + } + if w.Header().Get("Content-Length") != "" { + return w.buf.Bytes() + } + return nil +} diff --git a/cmd/dfs-cli/cmd/fdfs-api.go b/cmd/dfs-cli/cmd/fdfs-api.go index f7139db8..3073b63a 100644 --- a/cmd/dfs-cli/cmd/fdfs-api.go +++ b/cmd/dfs-cli/cmd/fdfs-api.go @@ -47,13 +47,13 @@ type FdfsClient struct { cookie *http.Cookie } -func NewFdfsClient(host, port string) (*FdfsClient, error) { +func NewFdfsClient(fdfsServer string) (*FdfsClient, error) { client, err := createHTTPClient() if err != nil { return nil, err } return &FdfsClient{ - url: fmt.Sprintf("http://" + host + ":" + port), + url: fdfsServer, client: client, }, nil } @@ -84,24 +84,15 @@ func (s *FdfsClient) CheckConnection() bool { if err != nil { return false } + defer response.Body.Close() req.Close = true if response.StatusCode != http.StatusOK { return false } - data, err := ioutil.ReadAll(response.Body) - if err != nil { - return false - } - err = response.Body.Close() - if err != nil { - return false - } - if !strings.HasPrefix(string(data), "FairOS-dfs") { - return false - } - return true + _, err = ioutil.ReadAll(response.Body) + return err == nil } func (s *FdfsClient) postReq(method, urlPath string, jsonBytes []byte) ([]byte, error) { diff --git a/cmd/dfs-cli/cmd/pod.go b/cmd/dfs-cli/cmd/pod.go index f6fb6d69..cedd6521 100644 --- a/cmd/dfs-cli/cmd/pod.go +++ b/cmd/dfs-cli/cmd/pod.go @@ -48,8 +48,10 @@ func podNew(podName string) { } func deletePod(podName string) { + password := getPassword() delPod := common.PodRequest{ - PodName: podName, + PodName: podName, + Password: password, } jsonData, err := json.Marshal(delPod) if err != nil { diff --git a/cmd/dfs-cli/cmd/prompt.go b/cmd/dfs-cli/cmd/prompt.go index 1629d42b..99c1a10b 100644 --- a/cmd/dfs-cli/cmd/prompt.go +++ b/cmd/dfs-cli/cmd/prompt.go @@ -109,7 +109,7 @@ type Message struct { // NewPrompt spawns dfs-client and checks if the it is connected to it. func NewPrompt() { var err error - fdfsAPI, err = NewFdfsClient(fdfsHost, fdfsPort) + fdfsAPI, err = NewFdfsClient(fdfsServer) if err != nil { fmt.Println("could not create fdfs client") os.Exit(1) diff --git a/cmd/dfs-cli/cmd/root.go b/cmd/dfs-cli/cmd/root.go index 1fa30c0d..301af5cb 100644 --- a/cmd/dfs-cli/cmd/root.go +++ b/cmd/dfs-cli/cmd/root.go @@ -29,9 +29,8 @@ import ( ) var ( - cfgFile string - fdfsHost string - fdfsPort string + cfgFile string + fdfsServer string ) // rootCmd represents the base command when called without any subcommands @@ -42,8 +41,7 @@ var rootCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { fmt.Println("version : ", dfs.Version) - fmt.Println("fdfsHost : ", fdfsHost) - fmt.Println("fdfsPort : ", fdfsPort) + fmt.Println("fdfsServer : ", fdfsServer) NewPrompt() initPrompt() }, @@ -88,8 +86,20 @@ func init() { // when this action is called directly. rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") - rootCmd.PersistentFlags().StringVar(&fdfsHost, "fdfsHost", "127.0.0.1", "fdfs host") - rootCmd.PersistentFlags().StringVar(&fdfsPort, "fdfsPort", "9090", "fdfs port") + rootCmd.PersistentFlags().String("fdfsHost", "127.0.0.1", "fdfs host") + rootCmd.PersistentFlags().String("fdfsPort", "9090", "fdfs port") + + rootCmd.PersistentFlags().StringVar(&fdfsServer, "fdfsServer", "http://localhost:9090", "fdfs server api endpoint") + + if err := rootCmd.PersistentFlags().MarkDeprecated("fdfsHost", "run --fdfsServer, fdfs server api endpoint"); err != nil { + fmt.Println(err) + os.Exit(1) + } + + if err := rootCmd.PersistentFlags().MarkDeprecated("fdfsPort", "run --fdfsServer, fdfs server api endpoint"); err != nil { + fmt.Println(err) + os.Exit(1) + } } // initConfig reads in config file and ENV variables if set. diff --git a/cmd/dfs/cmd/server.go b/cmd/dfs/cmd/server.go index 9f2e0eff..2691306c 100644 --- a/cmd/dfs/cmd/server.go +++ b/cmd/dfs/cmd/server.go @@ -23,7 +23,6 @@ import ( "strings" dfs "github.com/fairdatasociety/fairOS-dfs" - "github.com/fairdatasociety/fairOS-dfs/pkg/api" "github.com/fairdatasociety/fairOS-dfs/pkg/logging" "github.com/gorilla/mux" @@ -161,8 +160,11 @@ func startHttpService(logger logging.Logger) { return } }) - apiVersion := "v1" + + wsRouter := router.PathPrefix("/ws/" + apiVersion).Subrouter() + wsRouter.HandleFunc("/", handler.WebsocketHandler) + baseRouter := router.PathPrefix("/" + apiVersion).Subrouter() baseRouter.HandleFunc("/robots.txt", func(w http.ResponseWriter, r *http.Request) { _, err := fmt.Fprintln(w, "User-agent: *\nDisallow: /") diff --git a/go.mod b/go.mod index d43def3f..f41a1f14 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/gorilla/mux v1.7.4 github.com/gorilla/securecookie v1.1.1 + github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/klauspost/pgzip v1.2.5 github.com/mattn/go-tty v0.0.3 // indirect diff --git a/pkg/api/doc_indexjson.go b/pkg/api/doc_indexjson.go index 1a729c3f..d6113d52 100644 --- a/pkg/api/doc_indexjson.go +++ b/pkg/api/doc_indexjson.go @@ -49,7 +49,7 @@ func (h *Handler) DocIndexJsonHandler(w http.ResponseWriter, r *http.Request) { return } - podName := docReq.TableName + podName := docReq.PodName if podName == "" { h.logger.Errorf("doc indexjson: \"pod_name\" argument missing") jsonhttp.BadRequest(w, "doc indexjson: \"pod_name\" argument missing") diff --git a/pkg/api/doc_new.go b/pkg/api/doc_new.go index b2d1df1e..b6153682 100644 --- a/pkg/api/doc_new.go +++ b/pkg/api/doc_new.go @@ -116,5 +116,5 @@ func (h *Handler) DocCreateHandler(w http.ResponseWriter, r *http.Request) { return } - jsonhttp.OK(w, "document db created") + jsonhttp.Created(w, "document db created") } diff --git a/pkg/api/file_download.go b/pkg/api/file_download.go index c2e740c4..4a780803 100644 --- a/pkg/api/file_download.go +++ b/pkg/api/file_download.go @@ -83,7 +83,7 @@ func (h *Handler) FileDownloadHandler(w http.ResponseWriter, r *http.Request) { if err != nil { h.logger.Errorf("download: %v", err) w.Header().Set("Content-Type", " application/json") - jsonhttp.InternalServerError(w, "stat dir: "+err.Error()) + jsonhttp.InternalServerError(w, "download: "+err.Error()) } _ = reader.Close() } diff --git a/pkg/api/kv_new.go b/pkg/api/kv_new.go index 6c2697ca..156e987a 100644 --- a/pkg/api/kv_new.go +++ b/pkg/api/kv_new.go @@ -101,5 +101,5 @@ func (h *Handler) KVCreateHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.InternalServerError(w, "kv create: "+err.Error()) return } - jsonhttp.OK(w, "kv store created") + jsonhttp.Created(w, "kv store created") } diff --git a/pkg/api/login_middleware.go b/pkg/api/login_middleware.go index 9ea906b9..f57efc91 100644 --- a/pkg/api/login_middleware.go +++ b/pkg/api/login_middleware.go @@ -25,7 +25,7 @@ import ( ) // LoginMiddleware is a middleware that gets called before executing any of the protected handlers. -// this check if a there is a valid session id the request and then alows the request handler to +// this check if a there is a valid session id the request and then allows the request handler to // proceed for execution. func (h *Handler) LoginMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/api/ws.go b/pkg/api/ws.go new file mode 100644 index 00000000..b15b51dc --- /dev/null +++ b/pkg/api/ws.go @@ -0,0 +1,968 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/fairdatasociety/fairOS-dfs/cmd/common" + "github.com/fairdatasociety/fairOS-dfs/pkg/logging" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +const ( + wsChunkLimit = 1000000 +) + +var ( + readDeadline = 4 * time.Second + + writeDeadline = 4 * time.Second +) + +func (h *Handler) WebsocketHandler(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{} // use default options + upgrader.CheckOrigin = func(r *http.Request) bool { + // TODO: whitelist origins + return true + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + h.logger.Errorf("Error during connection upgrade:", err) + return + } + defer conn.Close() + + err = h.handleEvents(conn) + if err != nil { + h.logger.Errorf("Error during handling event:", err) + return + } +} + +func (h *Handler) handleEvents(conn *websocket.Conn) error { + defer conn.Close() + + err := conn.SetReadDeadline(time.Now().Add(readDeadline)) + if err != nil { + h.logger.Debugf("ws event handler: set read deadline failed on connection : %v", err) + h.logger.Error("ws event handler: set read deadline failed on connection") + return err + } + + // keep pinging to check pong + go func() { + pingPeriod := (readDeadline * 9) / 10 + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for range ticker.C { + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { + return + } + if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + h.logger.Debugf("ws event handler: upload: failed to send ping: %v", err) + h.logger.Error("ws event handler: upload: failed to send ping") + return + } + } + }() + + // add read deadline in pong + conn.SetPongHandler(func(message string) error { + if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { + h.logger.Debugf("ws event handler: set read deadline failed on connection : %v", err) + h.logger.Error("ws event handler: set read deadline failed on connection") + return err + } + return nil + }) + + var cookie []string + + // create a http request for feeding the http handler + newRequest := func(method, url string, buf []byte) (*http.Request, error) { + httpReq, err := http.NewRequest(method, url, bytes.NewBuffer(buf)) + if err != nil { + return nil, err + } + httpReq.Header.Add("Content-Type", "application/json") + httpReq.Header.Add("Content-Length", strconv.Itoa(len(buf))) + if cookie != nil { + httpReq.Header.Set("Cookie", cookie[0]) + } + return httpReq, nil + } + + // create a file upload request for feeding the http handler + newMultipartRequestWithBinaryMessage := func(params interface{}, formField, method, url string, streaming bool) (*http.Request, error) { + jsonBytes, _ := json.Marshal(params) + args := make(map[string]string) + if err := json.Unmarshal(jsonBytes, &args); err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read params: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read params") + return nil, err + } + + if err != nil { + return nil, err + } + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + fileName := "" + compression := "" + contentLength := "0" + // Add parameters + for k, v := range args { + if k == "file_name" { + fileName = v + } else if k == "content_length" { + contentLength = v + } else if k == "compression" { + compression = strings.ToLower(compression) + } + err := writer.WriteField(k, v) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to write fields in form: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to write fields in form") + return nil, err + } + } + + part, err := writer.CreateFormFile(formField, fileName) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to create files field in form: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to create files field in form") + return nil, err + } + if streaming { + if contentLength == "" || contentLength == "0" { + h.logger.Warning("streaming needs \"content_length\"") + return nil, fmt.Errorf("streaming needs \"content_length\"") + } + var totalRead int64 = 0 + for { + mt, reader, err := conn.NextReader() + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read next message: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read next message") + return nil, err + } + if mt != websocket.BinaryMessage { + h.logger.Warning("non binary message", mt) + return nil, fmt.Errorf("received non binary message inside upload stream aborting") + } + n, err := io.Copy(part, reader) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read file: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read file") + return nil, err + } + totalRead += n + if fmt.Sprintf("%d", totalRead) == contentLength { + h.logger.Debug("streamed full content") + break + } + } + } else { + mt, reader, err := conn.NextReader() + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read next message: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read next message") + return nil, err + } + if mt != websocket.BinaryMessage { + h.logger.Warning("non binary message", mt) + return nil, fmt.Errorf("file content should be as binary message") + } + _, err = io.Copy(part, reader) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read file: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read file") + return nil, err + } + } + + err = writer.Close() + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to close writer: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to close writer") + return nil, err + } + + httpReq, err := http.NewRequest(method, url, body) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to create http request: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to create http request") + return nil, err + } + contentType := fmt.Sprintf("multipart/form-data;boundary=%v", writer.Boundary()) + httpReq.Header.Set("Content-Type", contentType) + if cookie != nil { + httpReq.Header.Set("Cookie", cookie[0]) + } + if compression != "" { + httpReq.Header.Set(CompressionHeader, compression) + } + return httpReq, nil + } + + // create a http request for file download + newMultipartRequest := func(method, url, boundary string, r io.Reader) (*http.Request, error) { + httpReq, err := http.NewRequest(method, url, r) + if err != nil { + return nil, err + } + contentType := fmt.Sprintf("multipart/form-data;boundary=%v", boundary) + httpReq.Header.Set("Content-Type", contentType) + if cookie != nil { + httpReq.Header.Set("Cookie", cookie[0]) + } + return httpReq, nil + } + + respondWithError := func(response *common.WebsocketResponse, originalErr error) { + if originalErr == nil { + return + } + if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { + return + } + + message := map[string]interface{}{} + message["message"] = originalErr.Error() + response.Params = &message + response.StatusCode = http.StatusInternalServerError + + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { + return + } + if err := conn.WriteMessage(websocket.TextMessage, response.Marshal()); err != nil { + h.logger.Debugf("ws event handler: upload: failed to write error response: %v", err) + h.logger.Error("ws event handler: upload: failed to write error response") + return + } + } + + makeQueryParams := func(base string, params interface{}) string { + paramsMap := params.(map[string]interface{}) + url := base + "?" + for i, v := range paramsMap { + url = fmt.Sprintf("%s%s=%s&", url, i, v) + } + return url + } + + logEventDescription := func(url string, startTime time.Time, status int, logger logging.Logger) { + fields := logrus.Fields{ + "uri": url, + "duration": time.Since(startTime).String(), + "status": status, + } + logger.WithFields(fields).Log(logrus.DebugLevel, "ws event response: ") + } + + for { + res := common.NewWebsocketResponse() + messageType, message, err := conn.ReadMessage() + if err != nil { + h.logger.Debugf("ws event handler: read message error: %v", err) + h.logger.Error("ws event handler: read message error") + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + return err + } + return nil + } + to := time.Now() + req := &common.WebsocketRequest{} + err = json.Unmarshal(message, req) + if err != nil { + h.logger.Debugf("ws event handler: failed to read request: %v", err) + h.logger.Error("ws event handler: failed to read request") + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { + return err + } + return conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, err.Error())) + } + res.Event = req.Event + if err := conn.SetReadDeadline(time.Time{}); err != nil { + continue + } + switch req.Event { + // user related events + case common.UserSignup: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.UserSignup), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.UserSignupHandler(res, httpReq) + cookie = res.Header()["Set-Cookie"] + logEventDescription(string(common.UserSignup), to, res.StatusCode, h.logger) + case common.UserLogin: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.UserLogin), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.UserLoginHandler(res, httpReq) + cookie = res.Header()["Set-Cookie"] + logEventDescription(string(common.UserLogin), to, res.StatusCode, h.logger) + case common.UserImport: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.UserImport), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.ImportUserHandler(res, httpReq) + logEventDescription(string(common.UserImport), to, res.StatusCode, h.logger) + case common.UserPresent: + url := makeQueryParams(string(common.UserPresent), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.UserPresentHandler(res, httpReq) + logEventDescription(string(common.UserPresent), to, res.StatusCode, h.logger) + case common.UserIsLoggedin: + url := makeQueryParams(string(common.UserIsLoggedin), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.IsUserLoggedInHandler(res, httpReq) + logEventDescription(string(common.UserIsLoggedin), to, res.StatusCode, h.logger) + case common.UserLogout: + httpReq, err := newRequest(http.MethodPost, string(common.UserLogout), nil) + if err != nil { + respondWithError(res, err) + continue + } + h.UserLogoutHandler(res, httpReq) + logEventDescription(string(common.UserLogout), to, res.StatusCode, h.logger) + case common.UserExport: + httpReq, err := newRequest(http.MethodPost, string(common.UserExport), nil) + if err != nil { + respondWithError(res, err) + continue + } + h.ExportUserHandler(res, httpReq) + logEventDescription(string(common.UserExport), to, res.StatusCode, h.logger) + case common.UserDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.UserDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.UserDeleteHandler(res, httpReq) + logEventDescription(string(common.UserDelete), to, res.StatusCode, h.logger) + case common.UserStat: + httpReq, err := newRequest(http.MethodGet, string(common.UserStat), nil) + if err != nil { + respondWithError(res, err) + continue + } + h.UserStatHandler(res, httpReq) + logEventDescription(string(common.UserStat), to, res.StatusCode, h.logger) + // pod related events + case common.PodReceive: + url := makeQueryParams(string(common.PodReceive), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.PodReceiveHandler(res, httpReq) + logEventDescription(string(common.PodReceive), to, res.StatusCode, h.logger) + case common.PodReceiveInfo: + url := makeQueryParams(string(common.PodReceiveInfo), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.PodReceiveInfoHandler(res, httpReq) + logEventDescription(string(common.PodReceiveInfo), to, res.StatusCode, h.logger) + case common.PodNew: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodNew), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodCreateHandler(res, httpReq) + logEventDescription(string(common.PodNew), to, res.StatusCode, h.logger) + case common.PodOpen: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodOpen), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodOpenHandler(res, httpReq) + logEventDescription(string(common.PodOpen), to, res.StatusCode, h.logger) + case common.PodClose: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodClose), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodCloseHandler(res, httpReq) + logEventDescription(string(common.PodClose), to, res.StatusCode, h.logger) + case common.PodSync: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodSync), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodSyncHandler(res, httpReq) + logEventDescription(string(common.PodSync), to, res.StatusCode, h.logger) + case common.PodShare: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodShare), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodShareHandler(res, httpReq) + logEventDescription(string(common.PodShare), to, res.StatusCode, h.logger) + case common.PodDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.PodDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodDeleteHandler(res, httpReq) + logEventDescription(string(common.PodDelete), to, res.StatusCode, h.logger) + case common.PodLs: + httpReq, err := newRequest(http.MethodGet, string(common.PodLs), nil) + if err != nil { + respondWithError(res, err) + continue + } + h.PodListHandler(res, httpReq) + logEventDescription(string(common.PodLs), to, res.StatusCode, h.logger) + case common.PodStat: + url := makeQueryParams(string(common.UserPresent), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.PodStatHandler(res, httpReq) + logEventDescription(string(common.PodStat), to, res.StatusCode, h.logger) + + // file related events + case common.DirMkdir: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DirMkdir), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryMkdirHandler(res, httpReq) + logEventDescription(string(common.DirMkdir), to, res.StatusCode, h.logger) + case common.DirRmdir: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.DirRmdir), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryRmdirHandler(res, httpReq) + logEventDescription(string(common.DirRmdir), to, res.StatusCode, h.logger) + case common.DirLs: + url := makeQueryParams(string(common.DirLs), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryLsHandler(res, httpReq) + logEventDescription(string(common.DirLs), to, res.StatusCode, h.logger) + case common.DirStat: + url := makeQueryParams(string(common.DirStat), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryStatHandler(res, httpReq) + logEventDescription(string(common.DirStat), to, res.StatusCode, h.logger) + case common.DirIsPresent: + url := makeQueryParams(string(common.DirIsPresent), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryPresentHandler(res, httpReq) + logEventDescription(string(common.DirIsPresent), to, res.StatusCode, h.logger) + case common.FileDownloadStream: + jsonBytes, _ := json.Marshal(req.Params) + args := make(map[string]string) + if err := json.Unmarshal(jsonBytes, &args); err != nil { + h.logger.Debugf("ws event handler: download: failed to read params: %v", err) + h.logger.Error("ws event handler: download: failed to read params") + respondWithError(res, err) + continue + } + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + for k, v := range args { + err := writer.WriteField(k, v) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to write fields in form: %v", err) + h.logger.Error("ws event handler: download: failed to write fields in form") + respondWithError(res, err) + continue + } + } + err = writer.Close() + if err != nil { + h.logger.Debugf("ws event handler: download: failed to close writer: %v", err) + h.logger.Error("ws event handler: download: failed to close writer") + respondWithError(res, err) + continue + } + httpReq, err := newMultipartRequest(http.MethodPost, string(common.FileDownload), writer.Boundary(), body) + if err != nil { + respondWithError(res, err) + continue + } + h.FileDownloadHandler(res, httpReq) + if res.StatusCode != 0 { + errMessage := res.Params.(map[string]interface{}) + respondWithError(res, fmt.Errorf("%s", errMessage["message"])) + continue + } + downloadConfirmResponse := common.NewWebsocketResponse() + downloadConfirmResponse.Event = common.FileDownloadStream + downloadConfirmResponse.Header().Set("Content-Type", "application/json; charset=utf-8") + if res.Header().Get("Content-Length") != "" { + dlMessage := map[string]string{} + dlMessage["content_length"] = res.Header().Get("Content-Length") + dlMessage["file_name"] = filepath.Base(args["file_path"]) + data, _ := json.Marshal(dlMessage) + _, err = downloadConfirmResponse.Write(data) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to send download confirm: %v", err) + h.logger.Error("ws event handler: download: failed to send download confirm") + continue + } + } + downloadConfirmResponse.WriteHeader(http.StatusOK) + if err := conn.WriteMessage(messageType, downloadConfirmResponse.Marshal()); err != nil { + h.logger.Debugf("ws event handler: download: failed to write in connection: %v", err) + h.logger.Error("ws event handler: download: failed to write in connection") + continue + } + if res.StatusCode == 0 { + messageType = websocket.BinaryMessage + data := res.Marshal() + head := 0 + tail := len(data) + for head+wsChunkLimit < tail { + if err := conn.WriteMessage(messageType, data[head:(head+wsChunkLimit)]); err != nil { + h.logger.Debugf("ws event handler: response: failed to write in connection: %v", err) + h.logger.Error("ws event handler: response: failed to write in connection") + return err + } + head += wsChunkLimit + } + if err := conn.WriteMessage(messageType, data[head:tail]); err != nil { + h.logger.Debugf("ws event handler: response: failed to write in connection: %v", err) + h.logger.Error("ws event handler: response: failed to write in connection") + return err + } + } + messageType = websocket.TextMessage + res.Header().Set("Content-Type", "application/json; charset=utf-8") + if res.Header().Get("Content-Length") != "" { + dlFinishedMessage := map[string]string{} + dlFinishedMessage["message"] = "download finished" + data, _ := json.Marshal(dlFinishedMessage) + _, err = res.Write(data) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to send download confirm: %v", err) + h.logger.Error("ws event handler: download: failed to send download confirm") + continue + } + res.WriteHeader(http.StatusOK) + } + logEventDescription(string(common.FileDownloadStream), to, res.StatusCode, h.logger) + case common.FileDownload: + jsonBytes, _ := json.Marshal(req.Params) + args := make(map[string]string) + if err := json.Unmarshal(jsonBytes, &args); err != nil { + h.logger.Debugf("ws event handler: download: failed to read params: %v", err) + h.logger.Error("ws event handler: download: failed to read params") + respondWithError(res, err) + continue + } + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + for k, v := range args { + err := writer.WriteField(k, v) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to write fields in form: %v", err) + h.logger.Error("ws event handler: download: failed to write fields in form") + respondWithError(res, err) + continue + } + } + err = writer.Close() + if err != nil { + h.logger.Debugf("ws event handler: download: failed to close writer: %v", err) + h.logger.Error("ws event handler: download: failed to close writer") + respondWithError(res, err) + continue + } + httpReq, err := newMultipartRequest(http.MethodPost, string(common.FileDownload), writer.Boundary(), body) + if err != nil { + respondWithError(res, err) + continue + } + h.FileDownloadHandler(res, httpReq) + if res.StatusCode != 0 { + errMessage := res.Params.(map[string]interface{}) + respondWithError(res, fmt.Errorf("%s", errMessage["message"])) + continue + } + downloadConfirmResponse := common.NewWebsocketResponse() + downloadConfirmResponse.Event = common.FileDownload + downloadConfirmResponse.Header().Set("Content-Type", "application/json; charset=utf-8") + if res.Header().Get("Content-Length") != "" { + dlMessage := map[string]string{} + dlMessage["content_length"] = res.Header().Get("Content-Length") + dlMessage["file_name"] = filepath.Base(args["file_path"]) + data, _ := json.Marshal(dlMessage) + _, err = downloadConfirmResponse.Write(data) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to send download confirm: %v", err) + h.logger.Error("ws event handler: download: failed to send download confirm") + continue + } + } + downloadConfirmResponse.WriteHeader(http.StatusOK) + if err := conn.WriteMessage(messageType, downloadConfirmResponse.Marshal()); err != nil { + h.logger.Debugf("ws event handler: download: failed to write in connection: %v", err) + h.logger.Error("ws event handler: download: failed to write in connection") + continue + } + messageType = websocket.BinaryMessage + if err := conn.WriteMessage(messageType, res.Marshal()); err != nil { + h.logger.Debugf("ws event handler: response: failed to write in connection: %v", err) + h.logger.Error("ws event handler: response: failed to write in connection") + return err + } + messageType = websocket.TextMessage + res.Header().Set("Content-Type", "application/json; charset=utf-8") + if res.Header().Get("Content-Length") != "" { + dlFinishedMessage := map[string]string{} + dlFinishedMessage["message"] = "download finished" + data, _ := json.Marshal(dlFinishedMessage) + _, err = res.Write(data) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to send download confirm: %v", err) + h.logger.Error("ws event handler: download: failed to send download confirm") + continue + } + res.WriteHeader(http.StatusOK) + } + logEventDescription(string(common.FileDownload), to, res.StatusCode, h.logger) + case common.FileUpload, common.FileUploadStream: + streaming := false + if req.Event == common.FileUploadStream { + streaming = true + } + httpReq, err := newMultipartRequestWithBinaryMessage(req.Params, "files", http.MethodPost, string(req.Event), streaming) + if err != nil { + respondWithError(res, err) + continue + } + h.FileUploadHandler(res, httpReq) + logEventDescription(string(common.FileUpload), to, res.StatusCode, h.logger) + case common.FileShare: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.FileShare), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.FileShareHandler(res, httpReq) + logEventDescription(string(common.FileShare), to, res.StatusCode, h.logger) + case common.FileReceive: + url := makeQueryParams(string(common.FileReceive), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.FileReceiveHandler(res, httpReq) + logEventDescription(string(common.FileReceive), to, res.StatusCode, h.logger) + case common.FileReceiveInfo: + url := makeQueryParams(string(common.FileReceiveInfo), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.FileReceiveInfoHandler(res, httpReq) + logEventDescription(string(common.FileReceiveInfo), to, res.StatusCode, h.logger) + case common.FileDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.FileDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.FileDeleteHandler(res, httpReq) + logEventDescription(string(common.FileDelete), to, res.StatusCode, h.logger) + case common.FileStat: + url := makeQueryParams(string(common.FileStat), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.FileStatHandler(res, httpReq) + logEventDescription(string(common.FileStat), to, res.StatusCode, h.logger) + + // kv related events + case common.KVCreate: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVCreate), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVCreateHandler(res, httpReq) + logEventDescription(string(common.KVCreate), to, res.StatusCode, h.logger) + case common.KVList: + url := makeQueryParams(string(common.KVList), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.KVListHandler(res, httpReq) + logEventDescription(string(common.KVList), to, res.StatusCode, h.logger) + case common.KVOpen: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVOpen), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVOpenHandler(res, httpReq) + logEventDescription(string(common.KVOpen), to, res.StatusCode, h.logger) + case common.KVCount: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVCount), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVCountHandler(res, httpReq) + logEventDescription(string(common.KVCount), to, res.StatusCode, h.logger) + case common.KVDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.KVDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVDeleteHandler(res, httpReq) + logEventDescription(string(common.KVDelete), to, res.StatusCode, h.logger) + case common.KVEntryPut: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVEntryPut), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVPutHandler(res, httpReq) + logEventDescription(string(common.KVEntryPut), to, res.StatusCode, h.logger) + case common.KVEntryGet: + url := makeQueryParams(string(common.KVEntryGet), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.KVGetHandler(res, httpReq) + logEventDescription(string(common.KVEntryGet), to, res.StatusCode, h.logger) + case common.KVEntryDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.KVEntryDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVDelHandler(res, httpReq) + logEventDescription(string(common.KVEntryDelete), to, res.StatusCode, h.logger) + case common.KVLoadCSV, common.KVLoadCSVStream: + streaming := false + if req.Event == common.KVLoadCSVStream { + streaming = true + } + httpReq, err := newMultipartRequestWithBinaryMessage(req.Params, "csv", http.MethodPost, string(req.Event), streaming) + if err != nil { + respondWithError(res, err) + continue + } + + h.KVLoadCSVHandler(res, httpReq) + logEventDescription(string(common.KVLoadCSV), to, res.StatusCode, h.logger) + case common.KVSeek: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVSeek), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVSeekHandler(res, httpReq) + logEventDescription(string(common.KVSeek), to, res.StatusCode, h.logger) + case common.KVSeekNext: + url := makeQueryParams(string(common.KVSeekNext), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.KVGetNextHandler(res, httpReq) + logEventDescription(string(common.KVSeekNext), to, res.StatusCode, h.logger) + + // doc related events + case common.DocCreate: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocCreate), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocCreateHandler(res, httpReq) + logEventDescription(string(common.DocCreate), to, res.StatusCode, h.logger) + case common.DocList: + url := makeQueryParams(string(common.DocList), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DocListHandler(res, httpReq) + logEventDescription(string(common.DocList), to, res.StatusCode, h.logger) + case common.DocOpen: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocOpen), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocOpenHandler(res, httpReq) + logEventDescription(string(common.DocOpen), to, res.StatusCode, h.logger) + case common.DocCount: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocCount), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocCountHandler(res, httpReq) + logEventDescription(string(common.DocCount), to, res.StatusCode, h.logger) + case common.DocDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.DocDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocDeleteHandler(res, httpReq) + logEventDescription(string(common.DocDelete), to, res.StatusCode, h.logger) + case common.DocFind: + url := makeQueryParams(string(common.DocFind), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DocFindHandler(res, httpReq) + logEventDescription(string(common.DocFind), to, res.StatusCode, h.logger) + case common.DocEntryPut: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocEntryPut), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocPutHandler(res, httpReq) + logEventDescription(string(common.DocEntryPut), to, res.StatusCode, h.logger) + case common.DocEntryGet: + url := makeQueryParams(string(common.DocEntryGet), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DocGetHandler(res, httpReq) + logEventDescription(string(common.DocEntryGet), to, res.StatusCode, h.logger) + case common.DocEntryDel: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.DocEntryDel), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocDelHandler(res, httpReq) + logEventDescription(string(common.DocEntryDel), to, res.StatusCode, h.logger) + case common.DocLoadJson, common.DocLoadJsonStream: + streaming := false + if req.Event == common.DocLoadJsonStream { + streaming = true + } + httpReq, err := newMultipartRequestWithBinaryMessage(req.Params, "json", http.MethodPost, string(req.Event), streaming) + if err != nil { + respondWithError(res, err) + continue + } + + h.DocLoadJsonHandler(res, httpReq) + logEventDescription(string(common.DocLoadJson), to, res.StatusCode, h.logger) + case common.DocIndexJson: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocIndexJson), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocIndexJsonHandler(res, httpReq) + logEventDescription(string(common.DocIndexJson), to, res.StatusCode, h.logger) + } + if err := conn.SetWriteDeadline(time.Now().Add(readDeadline)); err != nil { + return err + } + if err := conn.WriteMessage(messageType, res.Marshal()); err != nil { + h.logger.Debugf("ws event handler: response: failed to write in connection: %v", err) + h.logger.Error("ws event handler: response: failed to write in connection") + return err + } + } +} diff --git a/pkg/collection/document.go b/pkg/collection/document.go index 13b6d5f4..d25916cd 100644 --- a/pkg/collection/document.go +++ b/pkg/collection/document.go @@ -285,6 +285,7 @@ func (d *Document) DeleteDocumentDB(dbName string) error { } defer d.removeFromOpenedDB(dbName) } + docDB := d.getOpenedDb(dbName) //TODO: before deleting the indexes, unpin all the documents referenced in the ID index for _, si := range docDB.simpleIndexes { @@ -315,12 +316,15 @@ func (d *Document) DeleteDocumentDB(dbName string) error { // delete the document db from the DB file delete(docTables, dbName) - // store the rest of the document db - err = d.storeDocumentDBSchemas(docTables) - if err != nil { - d.logger.Errorf("deleting document db: ", err.Error()) - return err + if len(docTables) > 0 { + // store the rest of the document db + err = d.storeDocumentDBSchemas(docTables) + if err != nil { + d.logger.Errorf("deleting document db: ", err.Error()) + return err + } } + d.logger.Info("deleted document db: ", dbName) return nil } diff --git a/pkg/dir/rmdir_test.go b/pkg/dir/rmdir_test.go index 11942e0c..c9fdaf7b 100644 --- a/pkg/dir/rmdir_test.go +++ b/pkg/dir/rmdir_test.go @@ -205,5 +205,4 @@ func TestRmRootDir(t *testing.T) { t.Fatalf("could not delete directory") } }) - } diff --git a/pkg/file/reader.go b/pkg/file/reader.go index 3945f7c2..2f6903d0 100644 --- a/pkg/file/reader.go +++ b/pkg/file/reader.go @@ -231,7 +231,7 @@ func (r *Reader) ReadLine() ([]byte, error) { n, err := r.Read(buf) if err != nil { if errors.Is(err, io.EOF) { - if buf[n-1] != '\n' { + if n == 0 || buf[n-1] != '\n' { return nil, err } else { goto SUCC diff --git a/vendor/modules.txt b/vendor/modules.txt index 8961c140..676393e3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -126,6 +126,7 @@ github.com/gorilla/mux ## explicit github.com/gorilla/securecookie # github.com/gorilla/websocket v1.4.2 +## explicit github.com/gorilla/websocket # github.com/hashicorp/golang-lru v0.5.4 ## explicit diff --git a/version.go b/version.go index b42f7914..99cbbc09 100644 --- a/version.go +++ b/version.go @@ -5,7 +5,7 @@ package dfs var ( - version = "0.7.0" // manually set semantic version number + version = "0.7.1" // manually set semantic version number commit string // automatically set git commit hash Version = func() string {