From 41cc6e46df3c9c913fbcb46130c23774f057641f Mon Sep 17 00:00:00 2001 From: Sabyasachi Patra Date: Mon, 13 Dec 2021 18:10:34 +0530 Subject: [PATCH 1/6] websocket initialised, user and pod commands added (#152) * addressing #119, uploading same file multiple times * addressing #127, server requires postageBlockId to run * addressing #119, backup old file in case same name * code cleanup * addressing #117 and #118, issues related to pod deletion (#1) * addressing #117 and #118, issues related to pod deletion * fix reploading deleted file * fix #124, handle removed root directory * fixed #19, pod deletion requires password * mod tidy (#146) * mod tidy * manually restore vendor * manually restore vendor * fix #86, shared pod removed from list * websocket initialised, user and pod commands added * fixing lint * fixing lint * file related events added * download working * most of the kv and doc commands added * loadcsv, loadjson, indexjson * minor type changes * whitelist origins * add stream functionality * add streaming for loadcsv and loadjson * require centent_length for upload stream * lint fixes --- cmd/common/websocket_request.go | 138 +++++ cmd/dfs/cmd/server.go | 6 +- go.mod | 1 + pkg/api/doc_indexjson.go | 2 +- pkg/api/doc_new.go | 2 +- pkg/api/file_download.go | 2 +- pkg/api/kv_new.go | 2 +- pkg/api/login_middleware.go | 2 +- pkg/api/ws.go | 968 ++++++++++++++++++++++++++++++++ pkg/collection/document.go | 14 +- pkg/dir/rmdir_test.go | 1 - pkg/file/reader.go | 2 +- vendor/modules.txt | 1 + 13 files changed, 1127 insertions(+), 14 deletions(-) create mode 100644 cmd/common/websocket_request.go create mode 100644 pkg/api/ws.go diff --git a/cmd/common/websocket_request.go b/cmd/common/websocket_request.go new file mode 100644 index 00000000..4193bc72 --- /dev/null +++ b/cmd/common/websocket_request.go @@ -0,0 +1,138 @@ +package common + +import ( + "bytes" + "encoding/json" + "net/http" +) + +type Event string + +var ( + UserSignup Event = "/user/signup" + UserLogin Event = "/user/login" + UserImport Event = "/user/import" + UserPresent Event = "/user/present" + UserIsLoggedin Event = "/user/isloggedin" + UserLogout Event = "/user/logout" + UserExport Event = "/user/export" + UserDelete Event = "/user/delete" + UserStat Event = "/user/stat" + PodNew Event = "/pod/new" + PodOpen Event = "/pod/open" + PodClose Event = "/pod/close" + PodSync Event = "/pod/sync" + PodDelete Event = "/pod/delete" + PodLs Event = "/pod/ls" + PodStat Event = "/pod/stat" + PodShare Event = "/pod/share" + PodReceive Event = "/pod/receive" + PodReceiveInfo Event = "/pod/receiveinfo" + DirIsPresent Event = "/dir/present" + DirMkdir Event = "/dir/mkdir" + DirRmdir Event = "/dir/rmdir" + DirLs Event = "/dir/ls" + DirStat Event = "/dir/stat" + FileDownload Event = "/file/download" + FileDownloadStream Event = "/file/download/stream" + FileUpload Event = "/file/upload" + FileUploadStream Event = "/file/upload/stream" + FileShare Event = "/file/share" + FileReceive Event = "/file/receive" + FileReceiveInfo Event = "/file/receiveinfo" + FileDelete Event = "/file/delete" + FileStat Event = "/file/stat" + KVCreate Event = "/kv/new" + KVList Event = "/kv/ls" + KVOpen Event = "/kv/open" + KVDelete Event = "/kv/delete" + KVCount Event = "/kv/count" + KVEntryPut Event = "/kv/entry/put" + KVEntryGet Event = "/kv/entry/get" + KVEntryDelete Event = "/kv/entry/del" + KVLoadCSV Event = "/kv/loadcsv" + KVLoadCSVStream Event = "/kv/loadcsv/stream" + KVSeek Event = "/kv/seek" + KVSeekNext Event = "/kv/seek/next" + DocCreate Event = "/doc/new" + DocList Event = "/doc/ls" + DocOpen Event = "/doc/open" + DocCount Event = "/doc/count" + DocDelete Event = "/doc/delete" + DocFind Event = "/doc/find" + DocEntryPut Event = "/doc/entry/put" + DocEntryGet Event = "/doc/entry/get" + DocEntryDel Event = "/doc/entry/del" + DocLoadJson Event = "/doc/loadjson" + DocLoadJsonStream Event = "/doc/loadjson/stream" + DocIndexJson Event = "/doc/indexjson" +) + +type WebsocketRequest struct { + Event Event `json:"event"` + Params interface{} `json:"params,omitempty"` +} + +type FileRequest struct { + PodName string `json:"pod_name,omitempty"` + TableName string `json:"table_name,omitempty"` + DirPath string `json:"dir_path,omitempty"` + BlockSize string `json:"block_size,omitempty"` + FileName string `json:"file_name,omitempty"` +} + +type FileDownloadRequest struct { + PodName string `json:"pod_name,omitempty"` + Filepath string `json:"file_path,omitempty"` +} + +type WebsocketResponse struct { + Event Event `json:"event"` + StatusCode int `json:"code"` + Params interface{} `json:"params,omitempty"` + header http.Header + buf bytes.Buffer +} + +func NewWebsocketResponse() *WebsocketResponse { + return &WebsocketResponse{ + header: map[string][]string{}, + } +} + +func (w *WebsocketResponse) Header() http.Header { + return w.header +} + +func (w *WebsocketResponse) Write(bytes []byte) (int, error) { + if w.Header().Get("Content-Type") == "application/json; charset=utf-8" || + w.Header().Get("Content-Type") == "application/json" { + body := map[string]interface{}{} + err := json.Unmarshal(bytes, &body) + if err != nil { + return 0, err + } + w.Params = body + return len(bytes), nil + } + if w.Header().Get("Content-Length") != "" || w.Header().Get("Content-Length") != "0" { + return w.buf.Write(bytes) + } + return 0, nil +} + +func (w *WebsocketResponse) WriteHeader(statusCode int) { + w.StatusCode = statusCode +} + +func (w *WebsocketResponse) Marshal() []byte { + if w.Header().Get("Content-Type") == "application/json; charset=utf-8" || + w.Header().Get("Content-Type") == "application/json" { + data, _ := json.Marshal(w) + return data + } + if w.Header().Get("Content-Length") != "" { + return w.buf.Bytes() + } + return nil +} diff --git a/cmd/dfs/cmd/server.go b/cmd/dfs/cmd/server.go index 9f2e0eff..2691306c 100644 --- a/cmd/dfs/cmd/server.go +++ b/cmd/dfs/cmd/server.go @@ -23,7 +23,6 @@ import ( "strings" dfs "github.com/fairdatasociety/fairOS-dfs" - "github.com/fairdatasociety/fairOS-dfs/pkg/api" "github.com/fairdatasociety/fairOS-dfs/pkg/logging" "github.com/gorilla/mux" @@ -161,8 +160,11 @@ func startHttpService(logger logging.Logger) { return } }) - apiVersion := "v1" + + wsRouter := router.PathPrefix("/ws/" + apiVersion).Subrouter() + wsRouter.HandleFunc("/", handler.WebsocketHandler) + baseRouter := router.PathPrefix("/" + apiVersion).Subrouter() baseRouter.HandleFunc("/robots.txt", func(w http.ResponseWriter, r *http.Request) { _, err := fmt.Fprintln(w, "User-agent: *\nDisallow: /") diff --git a/go.mod b/go.mod index d43def3f..f41a1f14 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/gorilla/mux v1.7.4 github.com/gorilla/securecookie v1.1.1 + github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/klauspost/pgzip v1.2.5 github.com/mattn/go-tty v0.0.3 // indirect diff --git a/pkg/api/doc_indexjson.go b/pkg/api/doc_indexjson.go index 1a729c3f..d6113d52 100644 --- a/pkg/api/doc_indexjson.go +++ b/pkg/api/doc_indexjson.go @@ -49,7 +49,7 @@ func (h *Handler) DocIndexJsonHandler(w http.ResponseWriter, r *http.Request) { return } - podName := docReq.TableName + podName := docReq.PodName if podName == "" { h.logger.Errorf("doc indexjson: \"pod_name\" argument missing") jsonhttp.BadRequest(w, "doc indexjson: \"pod_name\" argument missing") diff --git a/pkg/api/doc_new.go b/pkg/api/doc_new.go index b2d1df1e..b6153682 100644 --- a/pkg/api/doc_new.go +++ b/pkg/api/doc_new.go @@ -116,5 +116,5 @@ func (h *Handler) DocCreateHandler(w http.ResponseWriter, r *http.Request) { return } - jsonhttp.OK(w, "document db created") + jsonhttp.Created(w, "document db created") } diff --git a/pkg/api/file_download.go b/pkg/api/file_download.go index c2e740c4..4a780803 100644 --- a/pkg/api/file_download.go +++ b/pkg/api/file_download.go @@ -83,7 +83,7 @@ func (h *Handler) FileDownloadHandler(w http.ResponseWriter, r *http.Request) { if err != nil { h.logger.Errorf("download: %v", err) w.Header().Set("Content-Type", " application/json") - jsonhttp.InternalServerError(w, "stat dir: "+err.Error()) + jsonhttp.InternalServerError(w, "download: "+err.Error()) } _ = reader.Close() } diff --git a/pkg/api/kv_new.go b/pkg/api/kv_new.go index 6c2697ca..156e987a 100644 --- a/pkg/api/kv_new.go +++ b/pkg/api/kv_new.go @@ -101,5 +101,5 @@ func (h *Handler) KVCreateHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.InternalServerError(w, "kv create: "+err.Error()) return } - jsonhttp.OK(w, "kv store created") + jsonhttp.Created(w, "kv store created") } diff --git a/pkg/api/login_middleware.go b/pkg/api/login_middleware.go index 9ea906b9..f57efc91 100644 --- a/pkg/api/login_middleware.go +++ b/pkg/api/login_middleware.go @@ -25,7 +25,7 @@ import ( ) // LoginMiddleware is a middleware that gets called before executing any of the protected handlers. -// this check if a there is a valid session id the request and then alows the request handler to +// this check if a there is a valid session id the request and then allows the request handler to // proceed for execution. func (h *Handler) LoginMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/api/ws.go b/pkg/api/ws.go new file mode 100644 index 00000000..b15b51dc --- /dev/null +++ b/pkg/api/ws.go @@ -0,0 +1,968 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/fairdatasociety/fairOS-dfs/cmd/common" + "github.com/fairdatasociety/fairOS-dfs/pkg/logging" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +const ( + wsChunkLimit = 1000000 +) + +var ( + readDeadline = 4 * time.Second + + writeDeadline = 4 * time.Second +) + +func (h *Handler) WebsocketHandler(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{} // use default options + upgrader.CheckOrigin = func(r *http.Request) bool { + // TODO: whitelist origins + return true + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + h.logger.Errorf("Error during connection upgrade:", err) + return + } + defer conn.Close() + + err = h.handleEvents(conn) + if err != nil { + h.logger.Errorf("Error during handling event:", err) + return + } +} + +func (h *Handler) handleEvents(conn *websocket.Conn) error { + defer conn.Close() + + err := conn.SetReadDeadline(time.Now().Add(readDeadline)) + if err != nil { + h.logger.Debugf("ws event handler: set read deadline failed on connection : %v", err) + h.logger.Error("ws event handler: set read deadline failed on connection") + return err + } + + // keep pinging to check pong + go func() { + pingPeriod := (readDeadline * 9) / 10 + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for range ticker.C { + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { + return + } + if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + h.logger.Debugf("ws event handler: upload: failed to send ping: %v", err) + h.logger.Error("ws event handler: upload: failed to send ping") + return + } + } + }() + + // add read deadline in pong + conn.SetPongHandler(func(message string) error { + if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { + h.logger.Debugf("ws event handler: set read deadline failed on connection : %v", err) + h.logger.Error("ws event handler: set read deadline failed on connection") + return err + } + return nil + }) + + var cookie []string + + // create a http request for feeding the http handler + newRequest := func(method, url string, buf []byte) (*http.Request, error) { + httpReq, err := http.NewRequest(method, url, bytes.NewBuffer(buf)) + if err != nil { + return nil, err + } + httpReq.Header.Add("Content-Type", "application/json") + httpReq.Header.Add("Content-Length", strconv.Itoa(len(buf))) + if cookie != nil { + httpReq.Header.Set("Cookie", cookie[0]) + } + return httpReq, nil + } + + // create a file upload request for feeding the http handler + newMultipartRequestWithBinaryMessage := func(params interface{}, formField, method, url string, streaming bool) (*http.Request, error) { + jsonBytes, _ := json.Marshal(params) + args := make(map[string]string) + if err := json.Unmarshal(jsonBytes, &args); err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read params: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read params") + return nil, err + } + + if err != nil { + return nil, err + } + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + fileName := "" + compression := "" + contentLength := "0" + // Add parameters + for k, v := range args { + if k == "file_name" { + fileName = v + } else if k == "content_length" { + contentLength = v + } else if k == "compression" { + compression = strings.ToLower(compression) + } + err := writer.WriteField(k, v) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to write fields in form: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to write fields in form") + return nil, err + } + } + + part, err := writer.CreateFormFile(formField, fileName) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to create files field in form: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to create files field in form") + return nil, err + } + if streaming { + if contentLength == "" || contentLength == "0" { + h.logger.Warning("streaming needs \"content_length\"") + return nil, fmt.Errorf("streaming needs \"content_length\"") + } + var totalRead int64 = 0 + for { + mt, reader, err := conn.NextReader() + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read next message: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read next message") + return nil, err + } + if mt != websocket.BinaryMessage { + h.logger.Warning("non binary message", mt) + return nil, fmt.Errorf("received non binary message inside upload stream aborting") + } + n, err := io.Copy(part, reader) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read file: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read file") + return nil, err + } + totalRead += n + if fmt.Sprintf("%d", totalRead) == contentLength { + h.logger.Debug("streamed full content") + break + } + } + } else { + mt, reader, err := conn.NextReader() + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read next message: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read next message") + return nil, err + } + if mt != websocket.BinaryMessage { + h.logger.Warning("non binary message", mt) + return nil, fmt.Errorf("file content should be as binary message") + } + _, err = io.Copy(part, reader) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to read file: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to read file") + return nil, err + } + } + + err = writer.Close() + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to close writer: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to close writer") + return nil, err + } + + httpReq, err := http.NewRequest(method, url, body) + if err != nil { + h.logger.Debugf("ws event handler: multipart rqst w/ body: failed to create http request: %v", err) + h.logger.Error("ws event handler: multipart rqst w/ body: failed to create http request") + return nil, err + } + contentType := fmt.Sprintf("multipart/form-data;boundary=%v", writer.Boundary()) + httpReq.Header.Set("Content-Type", contentType) + if cookie != nil { + httpReq.Header.Set("Cookie", cookie[0]) + } + if compression != "" { + httpReq.Header.Set(CompressionHeader, compression) + } + return httpReq, nil + } + + // create a http request for file download + newMultipartRequest := func(method, url, boundary string, r io.Reader) (*http.Request, error) { + httpReq, err := http.NewRequest(method, url, r) + if err != nil { + return nil, err + } + contentType := fmt.Sprintf("multipart/form-data;boundary=%v", boundary) + httpReq.Header.Set("Content-Type", contentType) + if cookie != nil { + httpReq.Header.Set("Cookie", cookie[0]) + } + return httpReq, nil + } + + respondWithError := func(response *common.WebsocketResponse, originalErr error) { + if originalErr == nil { + return + } + if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { + return + } + + message := map[string]interface{}{} + message["message"] = originalErr.Error() + response.Params = &message + response.StatusCode = http.StatusInternalServerError + + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { + return + } + if err := conn.WriteMessage(websocket.TextMessage, response.Marshal()); err != nil { + h.logger.Debugf("ws event handler: upload: failed to write error response: %v", err) + h.logger.Error("ws event handler: upload: failed to write error response") + return + } + } + + makeQueryParams := func(base string, params interface{}) string { + paramsMap := params.(map[string]interface{}) + url := base + "?" + for i, v := range paramsMap { + url = fmt.Sprintf("%s%s=%s&", url, i, v) + } + return url + } + + logEventDescription := func(url string, startTime time.Time, status int, logger logging.Logger) { + fields := logrus.Fields{ + "uri": url, + "duration": time.Since(startTime).String(), + "status": status, + } + logger.WithFields(fields).Log(logrus.DebugLevel, "ws event response: ") + } + + for { + res := common.NewWebsocketResponse() + messageType, message, err := conn.ReadMessage() + if err != nil { + h.logger.Debugf("ws event handler: read message error: %v", err) + h.logger.Error("ws event handler: read message error") + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + return err + } + return nil + } + to := time.Now() + req := &common.WebsocketRequest{} + err = json.Unmarshal(message, req) + if err != nil { + h.logger.Debugf("ws event handler: failed to read request: %v", err) + h.logger.Error("ws event handler: failed to read request") + if err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil { + return err + } + return conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, err.Error())) + } + res.Event = req.Event + if err := conn.SetReadDeadline(time.Time{}); err != nil { + continue + } + switch req.Event { + // user related events + case common.UserSignup: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.UserSignup), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.UserSignupHandler(res, httpReq) + cookie = res.Header()["Set-Cookie"] + logEventDescription(string(common.UserSignup), to, res.StatusCode, h.logger) + case common.UserLogin: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.UserLogin), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.UserLoginHandler(res, httpReq) + cookie = res.Header()["Set-Cookie"] + logEventDescription(string(common.UserLogin), to, res.StatusCode, h.logger) + case common.UserImport: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.UserImport), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.ImportUserHandler(res, httpReq) + logEventDescription(string(common.UserImport), to, res.StatusCode, h.logger) + case common.UserPresent: + url := makeQueryParams(string(common.UserPresent), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.UserPresentHandler(res, httpReq) + logEventDescription(string(common.UserPresent), to, res.StatusCode, h.logger) + case common.UserIsLoggedin: + url := makeQueryParams(string(common.UserIsLoggedin), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.IsUserLoggedInHandler(res, httpReq) + logEventDescription(string(common.UserIsLoggedin), to, res.StatusCode, h.logger) + case common.UserLogout: + httpReq, err := newRequest(http.MethodPost, string(common.UserLogout), nil) + if err != nil { + respondWithError(res, err) + continue + } + h.UserLogoutHandler(res, httpReq) + logEventDescription(string(common.UserLogout), to, res.StatusCode, h.logger) + case common.UserExport: + httpReq, err := newRequest(http.MethodPost, string(common.UserExport), nil) + if err != nil { + respondWithError(res, err) + continue + } + h.ExportUserHandler(res, httpReq) + logEventDescription(string(common.UserExport), to, res.StatusCode, h.logger) + case common.UserDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.UserDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.UserDeleteHandler(res, httpReq) + logEventDescription(string(common.UserDelete), to, res.StatusCode, h.logger) + case common.UserStat: + httpReq, err := newRequest(http.MethodGet, string(common.UserStat), nil) + if err != nil { + respondWithError(res, err) + continue + } + h.UserStatHandler(res, httpReq) + logEventDescription(string(common.UserStat), to, res.StatusCode, h.logger) + // pod related events + case common.PodReceive: + url := makeQueryParams(string(common.PodReceive), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.PodReceiveHandler(res, httpReq) + logEventDescription(string(common.PodReceive), to, res.StatusCode, h.logger) + case common.PodReceiveInfo: + url := makeQueryParams(string(common.PodReceiveInfo), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.PodReceiveInfoHandler(res, httpReq) + logEventDescription(string(common.PodReceiveInfo), to, res.StatusCode, h.logger) + case common.PodNew: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodNew), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodCreateHandler(res, httpReq) + logEventDescription(string(common.PodNew), to, res.StatusCode, h.logger) + case common.PodOpen: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodOpen), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodOpenHandler(res, httpReq) + logEventDescription(string(common.PodOpen), to, res.StatusCode, h.logger) + case common.PodClose: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodClose), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodCloseHandler(res, httpReq) + logEventDescription(string(common.PodClose), to, res.StatusCode, h.logger) + case common.PodSync: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodSync), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodSyncHandler(res, httpReq) + logEventDescription(string(common.PodSync), to, res.StatusCode, h.logger) + case common.PodShare: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.PodShare), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodShareHandler(res, httpReq) + logEventDescription(string(common.PodShare), to, res.StatusCode, h.logger) + case common.PodDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.PodDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.PodDeleteHandler(res, httpReq) + logEventDescription(string(common.PodDelete), to, res.StatusCode, h.logger) + case common.PodLs: + httpReq, err := newRequest(http.MethodGet, string(common.PodLs), nil) + if err != nil { + respondWithError(res, err) + continue + } + h.PodListHandler(res, httpReq) + logEventDescription(string(common.PodLs), to, res.StatusCode, h.logger) + case common.PodStat: + url := makeQueryParams(string(common.UserPresent), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.PodStatHandler(res, httpReq) + logEventDescription(string(common.PodStat), to, res.StatusCode, h.logger) + + // file related events + case common.DirMkdir: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DirMkdir), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryMkdirHandler(res, httpReq) + logEventDescription(string(common.DirMkdir), to, res.StatusCode, h.logger) + case common.DirRmdir: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.DirRmdir), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryRmdirHandler(res, httpReq) + logEventDescription(string(common.DirRmdir), to, res.StatusCode, h.logger) + case common.DirLs: + url := makeQueryParams(string(common.DirLs), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryLsHandler(res, httpReq) + logEventDescription(string(common.DirLs), to, res.StatusCode, h.logger) + case common.DirStat: + url := makeQueryParams(string(common.DirStat), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryStatHandler(res, httpReq) + logEventDescription(string(common.DirStat), to, res.StatusCode, h.logger) + case common.DirIsPresent: + url := makeQueryParams(string(common.DirIsPresent), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DirectoryPresentHandler(res, httpReq) + logEventDescription(string(common.DirIsPresent), to, res.StatusCode, h.logger) + case common.FileDownloadStream: + jsonBytes, _ := json.Marshal(req.Params) + args := make(map[string]string) + if err := json.Unmarshal(jsonBytes, &args); err != nil { + h.logger.Debugf("ws event handler: download: failed to read params: %v", err) + h.logger.Error("ws event handler: download: failed to read params") + respondWithError(res, err) + continue + } + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + for k, v := range args { + err := writer.WriteField(k, v) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to write fields in form: %v", err) + h.logger.Error("ws event handler: download: failed to write fields in form") + respondWithError(res, err) + continue + } + } + err = writer.Close() + if err != nil { + h.logger.Debugf("ws event handler: download: failed to close writer: %v", err) + h.logger.Error("ws event handler: download: failed to close writer") + respondWithError(res, err) + continue + } + httpReq, err := newMultipartRequest(http.MethodPost, string(common.FileDownload), writer.Boundary(), body) + if err != nil { + respondWithError(res, err) + continue + } + h.FileDownloadHandler(res, httpReq) + if res.StatusCode != 0 { + errMessage := res.Params.(map[string]interface{}) + respondWithError(res, fmt.Errorf("%s", errMessage["message"])) + continue + } + downloadConfirmResponse := common.NewWebsocketResponse() + downloadConfirmResponse.Event = common.FileDownloadStream + downloadConfirmResponse.Header().Set("Content-Type", "application/json; charset=utf-8") + if res.Header().Get("Content-Length") != "" { + dlMessage := map[string]string{} + dlMessage["content_length"] = res.Header().Get("Content-Length") + dlMessage["file_name"] = filepath.Base(args["file_path"]) + data, _ := json.Marshal(dlMessage) + _, err = downloadConfirmResponse.Write(data) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to send download confirm: %v", err) + h.logger.Error("ws event handler: download: failed to send download confirm") + continue + } + } + downloadConfirmResponse.WriteHeader(http.StatusOK) + if err := conn.WriteMessage(messageType, downloadConfirmResponse.Marshal()); err != nil { + h.logger.Debugf("ws event handler: download: failed to write in connection: %v", err) + h.logger.Error("ws event handler: download: failed to write in connection") + continue + } + if res.StatusCode == 0 { + messageType = websocket.BinaryMessage + data := res.Marshal() + head := 0 + tail := len(data) + for head+wsChunkLimit < tail { + if err := conn.WriteMessage(messageType, data[head:(head+wsChunkLimit)]); err != nil { + h.logger.Debugf("ws event handler: response: failed to write in connection: %v", err) + h.logger.Error("ws event handler: response: failed to write in connection") + return err + } + head += wsChunkLimit + } + if err := conn.WriteMessage(messageType, data[head:tail]); err != nil { + h.logger.Debugf("ws event handler: response: failed to write in connection: %v", err) + h.logger.Error("ws event handler: response: failed to write in connection") + return err + } + } + messageType = websocket.TextMessage + res.Header().Set("Content-Type", "application/json; charset=utf-8") + if res.Header().Get("Content-Length") != "" { + dlFinishedMessage := map[string]string{} + dlFinishedMessage["message"] = "download finished" + data, _ := json.Marshal(dlFinishedMessage) + _, err = res.Write(data) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to send download confirm: %v", err) + h.logger.Error("ws event handler: download: failed to send download confirm") + continue + } + res.WriteHeader(http.StatusOK) + } + logEventDescription(string(common.FileDownloadStream), to, res.StatusCode, h.logger) + case common.FileDownload: + jsonBytes, _ := json.Marshal(req.Params) + args := make(map[string]string) + if err := json.Unmarshal(jsonBytes, &args); err != nil { + h.logger.Debugf("ws event handler: download: failed to read params: %v", err) + h.logger.Error("ws event handler: download: failed to read params") + respondWithError(res, err) + continue + } + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + for k, v := range args { + err := writer.WriteField(k, v) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to write fields in form: %v", err) + h.logger.Error("ws event handler: download: failed to write fields in form") + respondWithError(res, err) + continue + } + } + err = writer.Close() + if err != nil { + h.logger.Debugf("ws event handler: download: failed to close writer: %v", err) + h.logger.Error("ws event handler: download: failed to close writer") + respondWithError(res, err) + continue + } + httpReq, err := newMultipartRequest(http.MethodPost, string(common.FileDownload), writer.Boundary(), body) + if err != nil { + respondWithError(res, err) + continue + } + h.FileDownloadHandler(res, httpReq) + if res.StatusCode != 0 { + errMessage := res.Params.(map[string]interface{}) + respondWithError(res, fmt.Errorf("%s", errMessage["message"])) + continue + } + downloadConfirmResponse := common.NewWebsocketResponse() + downloadConfirmResponse.Event = common.FileDownload + downloadConfirmResponse.Header().Set("Content-Type", "application/json; charset=utf-8") + if res.Header().Get("Content-Length") != "" { + dlMessage := map[string]string{} + dlMessage["content_length"] = res.Header().Get("Content-Length") + dlMessage["file_name"] = filepath.Base(args["file_path"]) + data, _ := json.Marshal(dlMessage) + _, err = downloadConfirmResponse.Write(data) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to send download confirm: %v", err) + h.logger.Error("ws event handler: download: failed to send download confirm") + continue + } + } + downloadConfirmResponse.WriteHeader(http.StatusOK) + if err := conn.WriteMessage(messageType, downloadConfirmResponse.Marshal()); err != nil { + h.logger.Debugf("ws event handler: download: failed to write in connection: %v", err) + h.logger.Error("ws event handler: download: failed to write in connection") + continue + } + messageType = websocket.BinaryMessage + if err := conn.WriteMessage(messageType, res.Marshal()); err != nil { + h.logger.Debugf("ws event handler: response: failed to write in connection: %v", err) + h.logger.Error("ws event handler: response: failed to write in connection") + return err + } + messageType = websocket.TextMessage + res.Header().Set("Content-Type", "application/json; charset=utf-8") + if res.Header().Get("Content-Length") != "" { + dlFinishedMessage := map[string]string{} + dlFinishedMessage["message"] = "download finished" + data, _ := json.Marshal(dlFinishedMessage) + _, err = res.Write(data) + if err != nil { + h.logger.Debugf("ws event handler: download: failed to send download confirm: %v", err) + h.logger.Error("ws event handler: download: failed to send download confirm") + continue + } + res.WriteHeader(http.StatusOK) + } + logEventDescription(string(common.FileDownload), to, res.StatusCode, h.logger) + case common.FileUpload, common.FileUploadStream: + streaming := false + if req.Event == common.FileUploadStream { + streaming = true + } + httpReq, err := newMultipartRequestWithBinaryMessage(req.Params, "files", http.MethodPost, string(req.Event), streaming) + if err != nil { + respondWithError(res, err) + continue + } + h.FileUploadHandler(res, httpReq) + logEventDescription(string(common.FileUpload), to, res.StatusCode, h.logger) + case common.FileShare: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.FileShare), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.FileShareHandler(res, httpReq) + logEventDescription(string(common.FileShare), to, res.StatusCode, h.logger) + case common.FileReceive: + url := makeQueryParams(string(common.FileReceive), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.FileReceiveHandler(res, httpReq) + logEventDescription(string(common.FileReceive), to, res.StatusCode, h.logger) + case common.FileReceiveInfo: + url := makeQueryParams(string(common.FileReceiveInfo), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.FileReceiveInfoHandler(res, httpReq) + logEventDescription(string(common.FileReceiveInfo), to, res.StatusCode, h.logger) + case common.FileDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.FileDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.FileDeleteHandler(res, httpReq) + logEventDescription(string(common.FileDelete), to, res.StatusCode, h.logger) + case common.FileStat: + url := makeQueryParams(string(common.FileStat), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.FileStatHandler(res, httpReq) + logEventDescription(string(common.FileStat), to, res.StatusCode, h.logger) + + // kv related events + case common.KVCreate: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVCreate), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVCreateHandler(res, httpReq) + logEventDescription(string(common.KVCreate), to, res.StatusCode, h.logger) + case common.KVList: + url := makeQueryParams(string(common.KVList), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.KVListHandler(res, httpReq) + logEventDescription(string(common.KVList), to, res.StatusCode, h.logger) + case common.KVOpen: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVOpen), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVOpenHandler(res, httpReq) + logEventDescription(string(common.KVOpen), to, res.StatusCode, h.logger) + case common.KVCount: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVCount), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVCountHandler(res, httpReq) + logEventDescription(string(common.KVCount), to, res.StatusCode, h.logger) + case common.KVDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.KVDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVDeleteHandler(res, httpReq) + logEventDescription(string(common.KVDelete), to, res.StatusCode, h.logger) + case common.KVEntryPut: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVEntryPut), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVPutHandler(res, httpReq) + logEventDescription(string(common.KVEntryPut), to, res.StatusCode, h.logger) + case common.KVEntryGet: + url := makeQueryParams(string(common.KVEntryGet), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.KVGetHandler(res, httpReq) + logEventDescription(string(common.KVEntryGet), to, res.StatusCode, h.logger) + case common.KVEntryDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.KVEntryDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVDelHandler(res, httpReq) + logEventDescription(string(common.KVEntryDelete), to, res.StatusCode, h.logger) + case common.KVLoadCSV, common.KVLoadCSVStream: + streaming := false + if req.Event == common.KVLoadCSVStream { + streaming = true + } + httpReq, err := newMultipartRequestWithBinaryMessage(req.Params, "csv", http.MethodPost, string(req.Event), streaming) + if err != nil { + respondWithError(res, err) + continue + } + + h.KVLoadCSVHandler(res, httpReq) + logEventDescription(string(common.KVLoadCSV), to, res.StatusCode, h.logger) + case common.KVSeek: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.KVSeek), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.KVSeekHandler(res, httpReq) + logEventDescription(string(common.KVSeek), to, res.StatusCode, h.logger) + case common.KVSeekNext: + url := makeQueryParams(string(common.KVSeekNext), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.KVGetNextHandler(res, httpReq) + logEventDescription(string(common.KVSeekNext), to, res.StatusCode, h.logger) + + // doc related events + case common.DocCreate: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocCreate), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocCreateHandler(res, httpReq) + logEventDescription(string(common.DocCreate), to, res.StatusCode, h.logger) + case common.DocList: + url := makeQueryParams(string(common.DocList), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DocListHandler(res, httpReq) + logEventDescription(string(common.DocList), to, res.StatusCode, h.logger) + case common.DocOpen: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocOpen), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocOpenHandler(res, httpReq) + logEventDescription(string(common.DocOpen), to, res.StatusCode, h.logger) + case common.DocCount: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocCount), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocCountHandler(res, httpReq) + logEventDescription(string(common.DocCount), to, res.StatusCode, h.logger) + case common.DocDelete: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.DocDelete), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocDeleteHandler(res, httpReq) + logEventDescription(string(common.DocDelete), to, res.StatusCode, h.logger) + case common.DocFind: + url := makeQueryParams(string(common.DocFind), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DocFindHandler(res, httpReq) + logEventDescription(string(common.DocFind), to, res.StatusCode, h.logger) + case common.DocEntryPut: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocEntryPut), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocPutHandler(res, httpReq) + logEventDescription(string(common.DocEntryPut), to, res.StatusCode, h.logger) + case common.DocEntryGet: + url := makeQueryParams(string(common.DocEntryGet), req.Params) + httpReq, err := newRequest(http.MethodGet, url, nil) + if err != nil { + respondWithError(res, err) + continue + } + h.DocGetHandler(res, httpReq) + logEventDescription(string(common.DocEntryGet), to, res.StatusCode, h.logger) + case common.DocEntryDel: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodDelete, string(common.DocEntryDel), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocDelHandler(res, httpReq) + logEventDescription(string(common.DocEntryDel), to, res.StatusCode, h.logger) + case common.DocLoadJson, common.DocLoadJsonStream: + streaming := false + if req.Event == common.DocLoadJsonStream { + streaming = true + } + httpReq, err := newMultipartRequestWithBinaryMessage(req.Params, "json", http.MethodPost, string(req.Event), streaming) + if err != nil { + respondWithError(res, err) + continue + } + + h.DocLoadJsonHandler(res, httpReq) + logEventDescription(string(common.DocLoadJson), to, res.StatusCode, h.logger) + case common.DocIndexJson: + jsonBytes, _ := json.Marshal(req.Params) + httpReq, err := newRequest(http.MethodPost, string(common.DocIndexJson), jsonBytes) + if err != nil { + respondWithError(res, err) + continue + } + h.DocIndexJsonHandler(res, httpReq) + logEventDescription(string(common.DocIndexJson), to, res.StatusCode, h.logger) + } + if err := conn.SetWriteDeadline(time.Now().Add(readDeadline)); err != nil { + return err + } + if err := conn.WriteMessage(messageType, res.Marshal()); err != nil { + h.logger.Debugf("ws event handler: response: failed to write in connection: %v", err) + h.logger.Error("ws event handler: response: failed to write in connection") + return err + } + } +} diff --git a/pkg/collection/document.go b/pkg/collection/document.go index 13b6d5f4..d25916cd 100644 --- a/pkg/collection/document.go +++ b/pkg/collection/document.go @@ -285,6 +285,7 @@ func (d *Document) DeleteDocumentDB(dbName string) error { } defer d.removeFromOpenedDB(dbName) } + docDB := d.getOpenedDb(dbName) //TODO: before deleting the indexes, unpin all the documents referenced in the ID index for _, si := range docDB.simpleIndexes { @@ -315,12 +316,15 @@ func (d *Document) DeleteDocumentDB(dbName string) error { // delete the document db from the DB file delete(docTables, dbName) - // store the rest of the document db - err = d.storeDocumentDBSchemas(docTables) - if err != nil { - d.logger.Errorf("deleting document db: ", err.Error()) - return err + if len(docTables) > 0 { + // store the rest of the document db + err = d.storeDocumentDBSchemas(docTables) + if err != nil { + d.logger.Errorf("deleting document db: ", err.Error()) + return err + } } + d.logger.Info("deleted document db: ", dbName) return nil } diff --git a/pkg/dir/rmdir_test.go b/pkg/dir/rmdir_test.go index 11942e0c..c9fdaf7b 100644 --- a/pkg/dir/rmdir_test.go +++ b/pkg/dir/rmdir_test.go @@ -205,5 +205,4 @@ func TestRmRootDir(t *testing.T) { t.Fatalf("could not delete directory") } }) - } diff --git a/pkg/file/reader.go b/pkg/file/reader.go index 3945f7c2..2f6903d0 100644 --- a/pkg/file/reader.go +++ b/pkg/file/reader.go @@ -231,7 +231,7 @@ func (r *Reader) ReadLine() ([]byte, error) { n, err := r.Read(buf) if err != nil { if errors.Is(err, io.EOF) { - if buf[n-1] != '\n' { + if n == 0 || buf[n-1] != '\n' { return nil, err } else { goto SUCC diff --git a/vendor/modules.txt b/vendor/modules.txt index 8961c140..676393e3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -126,6 +126,7 @@ github.com/gorilla/mux ## explicit github.com/gorilla/securecookie # github.com/gorilla/websocket v1.4.2 +## explicit github.com/gorilla/websocket # github.com/hashicorp/golang-lru v0.5.4 ## explicit From a17477376a98e2bfb32487d187aad810ff305fbd Mon Sep 17 00:00:00 2001 From: Sabyasachi Patra Date: Mon, 20 Dec 2021 09:39:48 +0530 Subject: [PATCH 2/6] fix #165 (#166) --- cmd/dfs-cli/cmd/pod.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/dfs-cli/cmd/pod.go b/cmd/dfs-cli/cmd/pod.go index f6fb6d69..cedd6521 100644 --- a/cmd/dfs-cli/cmd/pod.go +++ b/cmd/dfs-cli/cmd/pod.go @@ -48,8 +48,10 @@ func podNew(podName string) { } func deletePod(podName string) { + password := getPassword() delPod := common.PodRequest{ - PodName: podName, + PodName: podName, + Password: password, } jsonData, err := json.Marshal(delPod) if err != nil { From 0ca90c7f882ef69f612b2dfee1133c405c7e0707 Mon Sep 17 00:00:00 2001 From: Sabyasachi Patra Date: Mon, 20 Dec 2021 09:40:52 +0530 Subject: [PATCH 3/6] cli can connect to remote dfs server (#164) * cli can connect to remote dfs server * lint fixes --- cmd/dfs-cli/cmd/fdfs-api.go | 19 +++++-------------- cmd/dfs-cli/cmd/prompt.go | 2 +- cmd/dfs-cli/cmd/root.go | 24 +++++++++++++++++------- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/cmd/dfs-cli/cmd/fdfs-api.go b/cmd/dfs-cli/cmd/fdfs-api.go index f7139db8..3073b63a 100644 --- a/cmd/dfs-cli/cmd/fdfs-api.go +++ b/cmd/dfs-cli/cmd/fdfs-api.go @@ -47,13 +47,13 @@ type FdfsClient struct { cookie *http.Cookie } -func NewFdfsClient(host, port string) (*FdfsClient, error) { +func NewFdfsClient(fdfsServer string) (*FdfsClient, error) { client, err := createHTTPClient() if err != nil { return nil, err } return &FdfsClient{ - url: fmt.Sprintf("http://" + host + ":" + port), + url: fdfsServer, client: client, }, nil } @@ -84,24 +84,15 @@ func (s *FdfsClient) CheckConnection() bool { if err != nil { return false } + defer response.Body.Close() req.Close = true if response.StatusCode != http.StatusOK { return false } - data, err := ioutil.ReadAll(response.Body) - if err != nil { - return false - } - err = response.Body.Close() - if err != nil { - return false - } - if !strings.HasPrefix(string(data), "FairOS-dfs") { - return false - } - return true + _, err = ioutil.ReadAll(response.Body) + return err == nil } func (s *FdfsClient) postReq(method, urlPath string, jsonBytes []byte) ([]byte, error) { diff --git a/cmd/dfs-cli/cmd/prompt.go b/cmd/dfs-cli/cmd/prompt.go index 1629d42b..99c1a10b 100644 --- a/cmd/dfs-cli/cmd/prompt.go +++ b/cmd/dfs-cli/cmd/prompt.go @@ -109,7 +109,7 @@ type Message struct { // NewPrompt spawns dfs-client and checks if the it is connected to it. func NewPrompt() { var err error - fdfsAPI, err = NewFdfsClient(fdfsHost, fdfsPort) + fdfsAPI, err = NewFdfsClient(fdfsServer) if err != nil { fmt.Println("could not create fdfs client") os.Exit(1) diff --git a/cmd/dfs-cli/cmd/root.go b/cmd/dfs-cli/cmd/root.go index 1fa30c0d..301af5cb 100644 --- a/cmd/dfs-cli/cmd/root.go +++ b/cmd/dfs-cli/cmd/root.go @@ -29,9 +29,8 @@ import ( ) var ( - cfgFile string - fdfsHost string - fdfsPort string + cfgFile string + fdfsServer string ) // rootCmd represents the base command when called without any subcommands @@ -42,8 +41,7 @@ var rootCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { fmt.Println("version : ", dfs.Version) - fmt.Println("fdfsHost : ", fdfsHost) - fmt.Println("fdfsPort : ", fdfsPort) + fmt.Println("fdfsServer : ", fdfsServer) NewPrompt() initPrompt() }, @@ -88,8 +86,20 @@ func init() { // when this action is called directly. rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") - rootCmd.PersistentFlags().StringVar(&fdfsHost, "fdfsHost", "127.0.0.1", "fdfs host") - rootCmd.PersistentFlags().StringVar(&fdfsPort, "fdfsPort", "9090", "fdfs port") + rootCmd.PersistentFlags().String("fdfsHost", "127.0.0.1", "fdfs host") + rootCmd.PersistentFlags().String("fdfsPort", "9090", "fdfs port") + + rootCmd.PersistentFlags().StringVar(&fdfsServer, "fdfsServer", "http://localhost:9090", "fdfs server api endpoint") + + if err := rootCmd.PersistentFlags().MarkDeprecated("fdfsHost", "run --fdfsServer, fdfs server api endpoint"); err != nil { + fmt.Println(err) + os.Exit(1) + } + + if err := rootCmd.PersistentFlags().MarkDeprecated("fdfsPort", "run --fdfsServer, fdfs server api endpoint"); err != nil { + fmt.Println(err) + os.Exit(1) + } } // initConfig reads in config file and ENV variables if set. From 9f2603bdaadb86d972b58e9b1b240f9e55089c0a Mon Sep 17 00:00:00 2001 From: asabya Date: Mon, 20 Dec 2021 10:08:13 +0530 Subject: [PATCH 4/6] bump version --- version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.go b/version.go index b42f7914..99cbbc09 100644 --- a/version.go +++ b/version.go @@ -5,7 +5,7 @@ package dfs var ( - version = "0.7.0" // manually set semantic version number + version = "0.7.1" // manually set semantic version number commit string // automatically set git commit hash Version = func() string { From 90233cab40457a86effeaa45da4003f228cc5482 Mon Sep 17 00:00:00 2001 From: Sabyasachi Patra Date: Tue, 25 Jan 2022 11:57:06 +0530 Subject: [PATCH 5/6] handle #172 (#173) * addressing #119, uploading same file multiple times * addressing #122, file delete Matadata unmarshal * addressing #123, fix response for non-existing pod * addressing #127, server requires postageBlockId to run * addressing #119, backup old file in case same name * code cleanup * code cleanup * code cleanup * code cleanup * addressing #117 and #118, issues related to pod deletion (#1) * addressing #117 and #118, issues related to pod deletion * fix reploading deleted file * fixes #141, rmdir updates feed with magicword * fixing #139 * fix #140, accept mutable bool in request * #102 handle exception for invalid json * related to #100, doc new podname missing * fix #124, handle removed root directory * handle #96 * handle #24, use shlex to parse prompt args * fixed #19, pod deletion requires password * check for root dir presence in mkdir * mod tidy (#146) * mod tidy * manually restore vendor * manually restore vendor * fix #86, shared pod removed from list * minor changes * #92, table name added in count response * fix w/ deepsource recommendation * fix w/ deepsource recommendation * #102, add test for invalid json * server running from config vales * fix lint errors * fix flag usage * add default variables * rebase * resolve 163 * handle #172 * handle #138, new method for kv/entry/get * #130 handled, /pod/present api added * podPresent only for loggedin user * #82, add /kv/present * #157, add basic string validation * update readme * update openapi (#180) * update openapi * update version --- README.md | 41 +++++++- cmd/dfs-cli/cmd/prompt.go | 63 ++++++++++- cmd/dfs-cli/cmd/root.go | 4 - cmd/dfs-cli/cmd/user.go | 2 +- cmd/dfs/cmd/config.go | 49 +++++++++ cmd/dfs/cmd/root.go | 118 +++++++++++++++------ cmd/dfs/cmd/server.go | 63 +++++++++-- download.sh | 101 ++++++++++++++++++ go.mod | 1 + openapi/dfs.yaml | 196 ++++++++++++++++++++++++++++++++--- pkg/api/handler.go | 4 +- pkg/api/kv_put_get_del.go | 193 +++++++++++++++++++++++++++++++++- pkg/api/pod_present.go | 48 +++++++++ pkg/api/user_login.go | 4 +- pkg/api/user_present.go | 6 +- pkg/blockstore/bee/client.go | 12 +-- pkg/dfs/api.go | 4 +- pkg/dfs/pod_api.go | 8 ++ pkg/file/upload.go | 125 ++++++++++++---------- pkg/pod/utils.go | 19 ++++ vendor/modules.txt | 1 + version.go | 2 +- 22 files changed, 924 insertions(+), 140 deletions(-) create mode 100644 cmd/dfs/cmd/config.go create mode 100755 download.sh create mode 100644 pkg/api/pod_present.go diff --git a/README.md b/README.md index 0bfe2f1b..e6434911 100644 --- a/README.md +++ b/README.md @@ -31,12 +31,50 @@ The user can share files in his pod with any other user just like in other centr Pod creation is cheap. A user can create multiple pods and use it to organise his data. for ex: Personal-Pod, Applications-Pod etc. ### How to run FairOS-dfs? -Download the latest release from https://github.com/fairDataSociety/fairOS-dfs/releases. +Run the following command to download the latest release + +``` +curl -o- https://raw.githubusercontent.com/fairDataSociety/fairOS-dfs/master/download.sh | bash +``` +``` +wget -qO- https://raw.githubusercontent.com/fairDataSociety/fairOS-dfs/master/download.sh | bash +``` + +Or download the latest release from https://github.com/fairDataSociety/fairOS-dfs/releases. Or use Docker to run the project https://docs.fairos.fairdatasociety.org/docs/fairOS-dfs/docker-installation. Or build the latest version with the instruction https://docs.fairos.fairdatasociety.org/docs/fairOS-dfs/manual-installation. +### Configure FairOS-dfs +To get the most out of your FairOS-dfs it is important that you configure FairOS-dfs for your specific use case! + +##### Configuration for Bee +``` +bee: + bee-api-endpoint: http://localhost:1633 + bee-debug-api-endpoint: http://localhost:1635 + postage-batch-id: "" +``` + +##### Configuration for FairOS-dfs +``` +dfs: + data-dir: /Users/fairos/.fairOS/dfs + ports: + http-port: :9090 + pprof-port: :9091 +``` + +#### Other configuration +``` +cookie-domain: api.fairos.io +cors-allowed-origins: [] +verbosity: trace +``` + +Run `dfs config` to see all configurations + ### Introduction to Key Value Store over Swarm [![](https://j.gifs.com/6XZwvl.gif)](https://gateway.ethswarm.org/access/130dcf7d01442836bc14c8c38db32ebfc4d5771c28677438b6a2a2a078bd1414) @@ -53,3 +91,4 @@ https://docs.fairos.fairdatasociety.org/docs/fairOS-dfs/cli-reference To make binaries for all platforms run this command `./generate-exe.sh` + diff --git a/cmd/dfs-cli/cmd/prompt.go b/cmd/dfs-cli/cmd/prompt.go index 99c1a10b..6d12a215 100644 --- a/cmd/dfs-cli/cmd/prompt.go +++ b/cmd/dfs-cli/cmd/prompt.go @@ -136,6 +136,53 @@ func changeLivePrefix() (string, bool) { return currentPrompt, true } +var userSuggestions = []prompt.Suggest{ + {Text: "new", Description: "create a new user"}, + {Text: "del", Description: "delete a existing user"}, + {Text: "login", Description: "login to a existing user"}, + {Text: "logout", Description: "logout from a logged in user"}, + {Text: "present", Description: "is user present"}, + {Text: "export ", Description: "exports the user"}, + {Text: "import ", Description: "imports the user"}, + {Text: "stat ", Description: "shows information about a user"}, +} + +var podSuggestions = []prompt.Suggest{ + {Text: "new", Description: "create a new pod for a user"}, + {Text: "del", Description: "delete a existing pod of a user"}, + {Text: "open", Description: "open to a existing pod of a user"}, + {Text: "close", Description: "close a already opened pod of a user"}, + {Text: "ls", Description: "list all the existing pods of a user"}, + {Text: "stat", Description: "show the metadata of a pod of a user"}, + {Text: "sync", Description: "sync the pod from swarm"}, +} + +var kvSuggestions = []prompt.Suggest{ + {Text: "new", Description: "create new key value store"}, + {Text: "delete", Description: "delete the key value store"}, + {Text: "ls", Description: "lists all the key value stores"}, + {Text: "open", Description: "open already created key value store"}, + {Text: "get", Description: "get value from key"}, + {Text: "put", Description: "put key and value in kv store"}, + {Text: "del", Description: "delete key and value from the store"}, + {Text: "loadcsv", Description: "loads the csv file in to kv store"}, + {Text: "seek", Description: "seek to the given start prefix"}, + {Text: "getnext", Description: "get the next element"}, +} + +var docSuggestions = []prompt.Suggest{ + {Text: "new", Description: "creates a new document store"}, + {Text: "delete", Description: "deletes a document store"}, + {Text: "open", Description: "open the document store"}, + {Text: "ls", Description: "list all document dbs"}, + {Text: "count", Description: "count the docs in the table satisfying the expression"}, + {Text: "find", Description: "find the docs in the table satisfying the expression and limit"}, + {Text: "put", Description: "insert a json document in to document store"}, + {Text: "get", Description: "get the document having the id from the store"}, + {Text: "del", Description: "delete the document having the id from the store"}, + {Text: "loadjson", Description: "load the json file in to the newly created document db"}, +} + var suggestions = []prompt.Suggest{ {Text: "user new", Description: "create a new user"}, {Text: "user del", Description: "delete a existing user"}, @@ -149,7 +196,7 @@ var suggestions = []prompt.Suggest{ {Text: "pod del", Description: "delete a existing pod of a user"}, {Text: "pod open", Description: "open to a existing pod of a user"}, {Text: "pod close", Description: "close a already opened pod of a user"}, - {Text: "pod ls", Description: "list all the existing pods of auser"}, + {Text: "pod ls", Description: "list all the existing pods of a user"}, {Text: "pod stat", Description: "show the metadata of a pod of a user"}, {Text: "pod sync", Description: "sync the pod from swarm"}, {Text: "kv new", Description: "create new key value store"}, @@ -187,10 +234,20 @@ var suggestions = []prompt.Suggest{ } func completer(in prompt.Document) []prompt.Suggest { - w := in.GetWordBeforeCursor() - if w == "" { + w := in.Text + if w == "" || len(strings.Split(w, " ")) >= 3 { return []prompt.Suggest{} } + + if strings.HasPrefix(in.TextBeforeCursor(), "user") { + return prompt.FilterHasPrefix(userSuggestions, in.GetWordBeforeCursor(), true) + } else if strings.HasPrefix(in.TextBeforeCursor(), "pod") { + return prompt.FilterHasPrefix(podSuggestions, in.GetWordBeforeCursor(), true) + } else if strings.HasPrefix(in.TextBeforeCursor(), "kv") { + return prompt.FilterHasPrefix(kvSuggestions, in.GetWordBeforeCursor(), true) + } else if strings.HasPrefix(in.TextBeforeCursor(), "doc") { + return prompt.FilterHasPrefix(docSuggestions, in.GetWordBeforeCursor(), true) + } return prompt.FilterHasPrefix(suggestions, w, true) } diff --git a/cmd/dfs-cli/cmd/root.go b/cmd/dfs-cli/cmd/root.go index 301af5cb..516c16c7 100644 --- a/cmd/dfs-cli/cmd/root.go +++ b/cmd/dfs-cli/cmd/root.go @@ -82,10 +82,6 @@ func init() { defaultConfig := filepath.Join(home, ".fairOS/dfs-cli.yml") rootCmd.PersistentFlags().StringVar(&cfgFile, "config", defaultConfig, "config file") - // Cobra also supports local flags, which will only run - // when this action is called directly. - rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") - rootCmd.PersistentFlags().String("fdfsHost", "127.0.0.1", "fdfs host") rootCmd.PersistentFlags().String("fdfsPort", "9090", "fdfs port") diff --git a/cmd/dfs-cli/cmd/user.go b/cmd/dfs-cli/cmd/user.go index 603d788c..cbd09e97 100644 --- a/cmd/dfs-cli/cmd/user.go +++ b/cmd/dfs-cli/cmd/user.go @@ -201,7 +201,7 @@ func presentUser(userName string) { fmt.Println("user present: ", err) return } - var resp api.UserPresentResponse + var resp api.PresentResponse err = json.Unmarshal(data, &resp) if err != nil { fmt.Println("import user: ", err) diff --git a/cmd/dfs/cmd/config.go b/cmd/dfs/cmd/config.go new file mode 100644 index 00000000..652dae36 --- /dev/null +++ b/cmd/dfs/cmd/config.go @@ -0,0 +1,49 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "gopkg.in/yaml.v2" +) + +var ( + optionCORSAllowedOrigins = "cors-allowed-origins" + optionDFSDataDir = "dfs.data-dir" + optionDFSHttpPort = "dfs.ports.http-port" + optionDFSPprofPort = "dfs.ports.pprof-port" + optionVerbosity = "verbosity" + optionBeeApi = "bee.bee-api-endpoint" + optionBeeDebugApi = "bee.bee-debug-api-endpoint" + optionBeePostageBatchId = "bee.postage-batch-id" + optionCookieDomain = "cookie-domain" + + defaultCORSAllowedOrigins = []string{} + defaultDFSHttpPort = ":9090" + defaultDFSPprofPort = ":9091" + defaultVerbosity = "trace" + defaultBeeApi = "http://localhost:1633" + defaultBeeDebugApi = "http://localhost:1635" + defaultCookieDomain = "api.fairos.io" +) + +var configCmd = &cobra.Command{ + Use: "config", + Short: "Print default or provided configuration in yaml format", + RunE: func(cmd *cobra.Command, args []string) (err error) { + + if len(args) > 0 { + return cmd.Help() + } + + d := config.AllSettings() + ym, err := yaml.Marshal(d) + if err != nil { + return err + } + cmd.Println(string(ym)) + return nil + }, +} + +func init() { + rootCmd.AddCommand(configCmd) +} diff --git a/cmd/dfs/cmd/root.go b/cmd/dfs/cmd/root.go index eefe6b23..6aadc4a2 100644 --- a/cmd/dfs/cmd/root.go +++ b/cmd/dfs/cmd/root.go @@ -21,30 +21,53 @@ import ( "os" "path/filepath" - homedir "github.com/mitchellh/go-homedir" + "github.com/mitchellh/go-homedir" "github.com/spf13/cobra" "github.com/spf13/viper" ) var ( - cfgFile string - beeHost string - beePort string - verbosity string - dataDir string + defaultDir = filepath.Join(".fairOS", "dfs") + defaultConfig = ".dfs.yaml" + + cfgFile string + beeApi string + beeDebugApi string + verbosity string + dataDir string + + dataDirPath string + config = viper.New() ) // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ Use: "dfs", Short: "Decentralised file system over Swarm(https://ethswarm.org/)", - Long: `dfs is the file system layer of internetOS. It is a thin layer over Swarm. -It adds features to Swarm that is required by the internetOS to parallelize computation of data. + Long: `dfs is the file system layer of fairOS. It is a thin layer over Swarm. +It adds features to Swarm that is required by the fairOS to parallelize computation of data. It manages the metadata of directories and files created and expose them to higher layers. It can also be used as a standalone personal, decentralised drive over the internet`, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if err := config.BindPFlag(optionDFSDataDir, cmd.Flags().Lookup("dataDir")); err != nil { + return err + } + if err := config.BindPFlag(optionBeeApi, cmd.Flags().Lookup("beeApi")); err != nil { + return err + } + if err := config.BindPFlag(optionBeeDebugApi, cmd.Flags().Lookup("beeDebugApi")); err != nil { + return err + } + if err := config.BindPFlag(optionVerbosity, cmd.Flags().Lookup("verbosity")); err != nil { + return err + } - //Run: func(cmd *cobra.Command, args []string) { - //}, + dataDir = config.GetString(optionDFSDataDir) + beeApi = config.GetString(optionBeeApi) + beeDebugApi = config.GetString(optionBeeDebugApi) + verbosity = config.GetString(optionVerbosity) + return nil + }, } // Execute adds all child commands to the root command and sets flags appropriately. @@ -76,29 +99,41 @@ func init() { fmt.Println(err) os.Exit(1) } - // Here you will define your flags and configuration settings. // Cobra supports persistent flags, which, if defined here, // will be global for your application. - defaultConfig := filepath.Join(home, ".fairOS/dfs.yml") - rootCmd.PersistentFlags().StringVar(&cfgFile, "config", defaultConfig, "config file") - - // Cobra also supports local flags, which will only run - // when this action is called directly. - rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") - - defaultDataDir := filepath.Join(home, ".fairOS/dfs") - rootCmd.PersistentFlags().StringVar(&dataDir, "dataDir", defaultDataDir, "store data in this dir") - rootCmd.PersistentFlags().StringVar(&beeHost, "beeHost", "127.0.0.1", "bee host") - rootCmd.PersistentFlags().StringVar(&beePort, "beePort", "1633", "bee port") - rootCmd.PersistentFlags().StringVar(&verbosity, "verbosity", "5", "verbosity level") + configPath := filepath.Join(home, defaultConfig) + + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", configPath, "config file") + + dataDirPath = filepath.Join(home, defaultDir) + rootCmd.PersistentFlags().String("dataDir", dataDirPath, "store data in this dir") + rootCmd.PersistentFlags().String("beeApi", "localhost:1633", "full bee api endpoint") + rootCmd.PersistentFlags().String("beeDebugApi", "localhost:1635", "full bee debug api endpoint") + rootCmd.PersistentFlags().String("verbosity", "trace", "verbosity level") + + rootCmd.PersistentFlags().String("beeHost", "127.0.0.1", "bee host") + rootCmd.PersistentFlags().String("beePort", "1633", "bee port") + if err := rootCmd.PersistentFlags().MarkDeprecated("beeHost", "run --beeApi, full bee api endpoint"); err != nil { + fmt.Println(err) + os.Exit(1) + } + if err := rootCmd.PersistentFlags().MarkDeprecated("beePort", "run --beeApi, full bee api endpoint"); err != nil { + fmt.Println(err) + os.Exit(1) + } } // initConfig reads in config file and ENV variables if set. func initConfig() { if cfgFile != "" { + // check file stat + if _, err := os.Stat(cfgFile); err != nil { + // if there is no configFile, write it + writeConfig() + } // Use config file from the flag. - viper.SetConfigFile(cfgFile) + config.SetConfigFile(cfgFile) } else { // Find home dir. home, err := homedir.Dir() @@ -106,16 +141,37 @@ func initConfig() { fmt.Println(err) os.Exit(1) } + // check file stat + cfgFile = filepath.Join(home, defaultConfig) + if _, err := os.Stat(cfgFile); err != nil { + // if there is no configFile, write it + writeConfig() + } - // Search config in home dir with name ".dfs" (without extension). - viper.AddConfigPath(home) - viper.SetConfigName(".fairOS/dfs") + config.SetConfigFile(cfgFile) } - viper.AutomaticEnv() // read in environment variables that match - + config.AutomaticEnv() // read in environment variables that match // If a config file is found, read it in. - if err := viper.ReadInConfig(); err == nil { - fmt.Println("Using config file:", viper.ConfigFileUsed()) + if err := config.ReadInConfig(); err == nil { + fmt.Println("Using config file:", config.ConfigFileUsed()) + } +} + +func writeConfig() { + c := viper.New() + c.Set(optionCORSAllowedOrigins, defaultCORSAllowedOrigins) + c.Set(optionDFSDataDir, dataDirPath) + c.Set(optionDFSHttpPort, defaultDFSHttpPort) + c.Set(optionDFSPprofPort, defaultDFSPprofPort) + c.Set(optionVerbosity, defaultVerbosity) + c.Set(optionBeeApi, defaultBeeApi) + c.Set(optionBeeDebugApi, defaultBeeDebugApi) + c.Set(optionBeePostageBatchId, "") + c.Set(optionCookieDomain, defaultCookieDomain) + + if err := c.WriteConfigAs(cfgFile); err != nil { + fmt.Println("failed to write config file") + os.Exit(1) } } diff --git a/cmd/dfs/cmd/server.go b/cmd/dfs/cmd/server.go index 2691306c..b4387628 100644 --- a/cmd/dfs/cmd/server.go +++ b/cmd/dfs/cmd/server.go @@ -17,6 +17,7 @@ limitations under the License. package cmd import ( + "encoding/hex" "fmt" "io/ioutil" "net/http" @@ -46,11 +47,43 @@ var serverCmd = &cobra.Command{ Short: "starts a HTTP server for the dfs", Long: `Serves all the dfs commands through an HTTP server so that the upper layers can consume it.`, + PreRunE: func(cmd *cobra.Command, args []string) error { + if err := config.BindPFlag(optionDFSHttpPort, cmd.Flags().Lookup("httpPort")); err != nil { + return err + } + if err := config.BindPFlag(optionDFSPprofPort, cmd.Flags().Lookup("pprofPort")); err != nil { + return err + } + if err := config.BindPFlag(optionCookieDomain, cmd.Flags().Lookup("cookieDomain")); err != nil { + return err + } + if err := config.BindPFlag(optionCORSAllowedOrigins, cmd.Flags().Lookup("cors-origins")); err != nil { + return err + } + if err := config.BindPFlag(optionBeePostageBatchId, cmd.Flags().Lookup("postageBlockId")); err != nil { + return err + } + return nil + }, Run: func(cmd *cobra.Command, args []string) { + httpPort = config.GetString(optionDFSHttpPort) + pprofPort = config.GetString(optionDFSPprofPort) + cookieDomain = config.GetString(optionCookieDomain) + postageBlockId = config.GetString(optionBeePostageBatchId) + corsOrigins = config.GetStringSlice(optionCORSAllowedOrigins) + verbosity = config.GetString(optionVerbosity) if postageBlockId == "" { _ = cmd.Help() fmt.Println("\npostageBlockId is required to run server") return + } else if len(postageBlockId) != 64 { + fmt.Println("\npostageBlockId is invalid") + return + } + _, err := hex.DecodeString(postageBlockId) + if err != nil { + fmt.Println("\npostageBlockId is invalid") + return } var logger logging.Logger @@ -75,15 +108,15 @@ can consume it.`, logger.Info("configuration values") logger.Info("version : ", dfs.Version) logger.Info("dataDir : ", dataDir) - logger.Info("beeHost : ", beeHost) - logger.Info("beePort : ", beePort) + logger.Info("beeApi : ", beeApi) + logger.Info("beeDebugApi : ", beeDebugApi) logger.Info("verbosity : ", verbosity) logger.Info("httpPort : ", httpPort) logger.Info("pprofPort : ", pprofPort) logger.Info("cookieDomain : ", cookieDomain) logger.Info("postageBlockId : ", postageBlockId) logger.Info("corsOrigins : ", corsOrigins) - hdlr, err := api.NewHandler(dataDir, beeHost, beePort, cookieDomain, postageBlockId, logger) + hdlr, err := api.NewHandler(dataDir, beeApi, beeDebugApi, cookieDomain, postageBlockId, logger) if err != nil { logger.Error(err.Error()) return @@ -94,11 +127,11 @@ can consume it.`, } func init() { - serverCmd.Flags().StringVar(&httpPort, "httpPort", "9090", "http port") - serverCmd.Flags().StringVar(&pprofPort, "pprofPort", "9091", "pprof port") - serverCmd.Flags().StringVar(&cookieDomain, "cookieDomain", "api.fairos.io", "the domain to use in the cookie") - serverCmd.Flags().StringVar(&postageBlockId, "postageBlockId", "", "the postage block used to store the data in bee") - serverCmd.Flags().StringSliceVar(&corsOrigins, "cors-origins", []string{}, "allow CORS headers for the given origins") + serverCmd.Flags().String("httpPort", defaultDFSHttpPort, "http port") + serverCmd.Flags().String("pprofPort", defaultDFSPprofPort, "pprof port") + serverCmd.Flags().String("cookieDomain", defaultCookieDomain, "the domain to use in the cookie") + serverCmd.Flags().String("postageBlockId", "", "the postage block used to store the data in bee") + serverCmd.Flags().StringSlice("cors-origins", defaultCORSAllowedOrigins, "allow CORS headers for the given origins") rootCmd.AddCommand(serverCmd) } @@ -122,7 +155,12 @@ func startHttpService(logger logging.Logger) { logger.Errorf("error in API /: ", err) return } - _, err = fmt.Fprintln(w, beeHost+":"+beePort) + _, err = fmt.Fprintln(w, beeApi) + if err != nil { + logger.Errorf("error in API /: ", err) + return + } + _, err = fmt.Fprintln(w, beeDebugApi) if err != nil { logger.Errorf("error in API /: ", err) return @@ -195,6 +233,7 @@ func startHttpService(logger logging.Logger) { baseRouter.HandleFunc("/pod/receiveinfo", handler.PodReceiveInfoHandler).Methods("GET") podRouter := baseRouter.PathPrefix("/pod/").Subrouter() podRouter.Use(handler.LoginMiddleware) + podRouter.HandleFunc("/present", handler.PodPresentHandler).Methods("GET") podRouter.HandleFunc("/new", handler.PodCreateHandler).Methods("POST") podRouter.HandleFunc("/open", handler.PodOpenHandler).Methods("POST") podRouter.HandleFunc("/close", handler.PodCloseHandler).Methods("POST") @@ -233,8 +272,10 @@ func startHttpService(logger logging.Logger) { kvRouter.HandleFunc("/open", handler.KVOpenHandler).Methods("POST") kvRouter.HandleFunc("/count", handler.KVCountHandler).Methods("POST") kvRouter.HandleFunc("/delete", handler.KVDeleteHandler).Methods("DELETE") + kvRouter.HandleFunc("/present", handler.KVPresentHandler).Methods("GET") kvRouter.HandleFunc("/entry/put", handler.KVPutHandler).Methods("POST") kvRouter.HandleFunc("/entry/get", handler.KVGetHandler).Methods("GET") + kvRouter.HandleFunc("/entry/get-data", handler.KVGetDataHandler).Methods("GET") kvRouter.HandleFunc("/entry/del", handler.KVDelHandler).Methods("DELETE") kvRouter.HandleFunc("/loadcsv", handler.KVLoadCSVHandler).Methods("POST") kvRouter.HandleFunc("/seek", handler.KVSeekHandler).Methods("POST") @@ -274,7 +315,7 @@ func startHttpService(logger logging.Logger) { // starting the pprof server go func() { logger.Infof("fairOS-dfs pprof listening on port: %v", pprofPort) - err := http.ListenAndServe("localhost:"+pprofPort, nil) + err := http.ListenAndServe("localhost"+pprofPort, nil) if err != nil { logger.Errorf("pprof listenAndServe: %v ", err.Error()) return @@ -282,7 +323,7 @@ func startHttpService(logger logging.Logger) { }() logger.Infof("fairOS-dfs API server listening on port: %v", httpPort) - err := http.ListenAndServe(":"+httpPort, handler) + err := http.ListenAndServe(httpPort, handler) if err != nil { logger.Errorf("http listenAndServe: %v ", err.Error()) return diff --git a/download.sh b/download.sh new file mode 100755 index 00000000..702e28d1 --- /dev/null +++ b/download.sh @@ -0,0 +1,101 @@ +#!/usr/bin/env bash + +{ + +GH_README="https://github.com/fairDataSociety/fairOS-dfs#how-to-run-fairos-dfs" + +dfs_has() { + type "$1" > /dev/null 2>&1 +} + +dfs_echo() { + command printf %s\\n "$*" 2>/dev/null +} + +dfs_download() { + if ! dfs_has "curl"; then + dfs_echo "Error: you need to have wget installed and in your path. Use brew (mac) or apt (unix) to install curl" + exit 1 + fi + + if ! dfs_has "wget"; then + dfs_echo "Error: you need to have wget installed and in your path. Use brew (mac) or apt (unix) to install wget" + exit 1 + fi + + eval curl -s https://api.github.com/repos/fairDataSociety/fairOS-dfs/releases/latest \ +| grep "$1" \ +| cut -d : -f 2,3 \ +| tr -d \" \ +| wget -qi - + +} + +install_dfs() { + BIN_NAME="dfs-" + + if [[ "$OSTYPE" == "linux-gnu" ]]; then + DETECTED_OS="linux" # TODO (Test) + elif [[ "$OSTYPE" == "darwin"* ]]; then + DETECTED_OS="mac" + elif [[ "$OSTYPE" == "cygwin" ]]; then + DETECTED_OS="linux" # TODO (Test) + elif [[ "$OSTYPE" == "msys" ]]; then + DETECTED_OS="windows" + elif [[ "$OSTYPE" == "win32" ]]; then + DETECTED_OS="windows" # TODO (Test) + elif [[ "$OSTYPE" == "freebsd"* ]]; then + DETECTED_OS="linux" # TODO (Test) + else + dfs_echo "Error: unable to detect operating system. Please install manually by referring to $GH_README" + exit 1 + fi + + ARCH=$(uname -m) + + echo " /@@@@@@ /@@ /@@@@@@ /@@@@@@ /@@ /@@@@@@" + echo " /@@__ @@ |__/ /@@__ @@ /@@__ @@ | @@ /@@__ @@" + echo "| @@ \__//@@@@@@ /@@ /@@@@@@ | @@ \ @@| @@ \__/ /@@@@@@@| @@ \__//@@@@@@@" + echo "| @@@@ |____ @@| @@ /@@__ @@| @@ | @@| @@@@@@ /@@@@@@ /@@__ @@| @@@@ /@@_____/" + echo "| @@_/ /@@@@@@@| @@| @@ \__/| @@ | @@ \____ @@|______/| @@ | @@| @@_/ | @@@@@@" + echo "| @@ /@@__ @@| @@| @@ | @@ | @@ /@@ \ @@ | @@ | @@| @@ \____ @@" + echo "| @@ | @@@@@@@| @@| @@ | @@@@@@/| @@@@@@/ | @@@@@@@| @@ /@@@@@@@/" + echo "|__/ \_______/|__/|__/ \______/ \______/ \_______/|__/ |_______/" + + echo "========== FairOs-dfs Installation ==========" + echo "Detected OS: $DETECTED_OS" + echo "Detected Architecture: $ARCH" + echo "=====================================================" + + if [[ "$ARCH" == "arm64" && $DETECTED_OS == "mac" ]]; then + BIN_NAME="dfs-darwin-amd64" + dfs_echo $BIN_NAME + elif [[ "$ARCH" == "x86_64" && $DETECTED_OS == "windows" ]]; then + BIN_NAME="dfs-windows-amd64.exe" + dfs_echo $BIN_NAME + elif [[ "$ARCH" == "x86_32" && $DETECTED_OS == "windows" ]]; then + BIN_NAME="dfs-windows-386.exe" + dfs_echo $BIN_NAME + elif [[ "$ARCH" == "arm64" && $DETECTED_OS == "linux" ]]; then + BIN_NAME="dfs-linux-arm64.exe" + dfs_echo $BIN_NAME + elif [[ "$ARCH" == "x86_32" && $DETECTED_OS == "linux" ]]; then + BIN_NAME="dfs-linux-386.exe" + dfs_echo $BIN_NAME + elif [[ "$ARCH" == "x86_64" && $DETECTED_OS == "linux" ]]; then + BIN_NAME="dfs-linux-amd64.exe" + dfs_echo $BIN_NAME + elif [[ "$ARCH" == "amd64" && $DETECTED_OS == "linux" ]]; then + BIN_NAME="dfs-linux-amd64.exe" + dfs_echo $BIN_NAME + else + dfs_echo "Error: unable to detect architecture. Please install manually by referring to $GH_README" + exit 1 + fi + + dfs_download $BIN_NAME +} + +install_dfs + +} \ No newline at end of file diff --git a/go.mod b/go.mod index f41a1f14..e827b029 100644 --- a/go.mod +++ b/go.mod @@ -28,5 +28,6 @@ require ( github.com/tyler-smith/go-bip39 v1.0.2 golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf + gopkg.in/yaml.v2 v2.3.0 resenje.org/jsonhttp v0.2.0 ) diff --git a/openapi/dfs.yaml b/openapi/dfs.yaml index 7ec4e590..e46a0774 100644 --- a/openapi/dfs.yaml +++ b/openapi/dfs.yaml @@ -1,7 +1,7 @@ openapi: 3.0.0 info: - version: 0.6.0 + version: 0.7.3 title: FairOS-dfs API description: 'A list of the currently provided Interfaces to interact with FairOS decentralised file system(dfs), implementing user, pod, file system, key value store and document store' @@ -10,7 +10,7 @@ externalDocs: url: 'https://fairos.io' servers: - - url: 'http://{apiRoot}:{port}/v1' + - url: 'https://{apiRoot}:{port}/v1' variables: apiRoot: default: 'localhost' @@ -56,7 +56,7 @@ paths: schema: oneOf: - $ref: '#/components/schemas/UserSignupResponse' - - $ref: '#/components/schemas/UserSignupResponseWithMenonic' + - $ref: '#/components/schemas/UserSignupResponseWithMnemonic' '400': $ref: '#/components/responses/400' '500': @@ -106,8 +106,8 @@ paths: summary: 'Import user' description: 'Import a user from one dfs server to another and subsequently logs in returning the authentication cookie `fairOS-dfs`. The cookie needs to be sent n the request header in all subsequent requests. - Importing is usefull when a user switches machines or lost a old machine which was running his dfs server. - Imput contains a form object containing the login, password, ethereum address and optionally mnemonic. + Importing is useful when a user switches machines or lost a old machine which was running his dfs server. + Input contains a form object containing the login, password, ethereum address and optionally mnemonic. If mnemonic is provided a user will be created while importing.' tags: - User @@ -568,8 +568,12 @@ paths: properties: pod_name: $ref: '#/components/schemas/PodName' + password: + $ref: '#/components/schemas/Password' required: - pod_name + - password + responses: '200': description: 'Ok' @@ -586,7 +590,7 @@ paths: '/pod/ls': get: summary: 'List pod' - description: 'Lista all pods of a user' + description: 'Lists all pods of a user' tags: - Pod parameters: @@ -649,8 +653,40 @@ paths: '500': $ref: '#/components/responses/500' - '/dir/mkdir': + '/pod/present': get: + summary: 'Pod Present' + description: 'Is Pod present' + tags: + - Pod + parameters: + - in: cookie + name: fairOS-dfs + schema: + type: string + - in: query + name: fields + schema: + type: object + properties: + pod_name: + $ref: '#/components/schemas/PodName' + required: + - pod_name + responses: + '200': + description: 'Ok' + content: + application/json: + schema: + $ref: '#/components/schemas/DirPresentResponse' + '400': + $ref: '#/components/responses/400' + '500': + $ref: '#/components/responses/500' + + '/dir/mkdir': + post: summary: 'Make dir' description: 'make a new directory inside a pod' tags: @@ -686,7 +722,7 @@ paths: $ref: '#/components/responses/500' '/dir/rmdir': - get: + delete: summary: 'Remove dir' description: 'remove a directory inside a pod' tags: @@ -833,7 +869,7 @@ paths: $ref: '#/components/responses/500' '/file/upload': - get: + post: summary: 'Upload File' description: 'upload a file to dfs' tags: @@ -921,8 +957,44 @@ paths: '500': $ref: '#/components/responses/500' + post: + summary: 'Download file' + description: 'Download a file from the pod tp the local dir' + tags: + - File System + parameters: + - in: cookie + name: fairOS-dfs + schema: + type: string + requestBody: + content: + application/json: + schema: + type: object + properties: + pod_name: + $ref: '#/components/schemas/PodName' + file_path: + $ref: '#/components/schemas/DirPath' + required: + - pod_name + - file_path + responses: + '200': + description: 'Ok' + content: + application/octet-stream: + schema: + type: string + format: binary + '400': + $ref: '#/components/responses/400' + '500': + $ref: '#/components/responses/500' + '/file/share': - get: + post: summary: 'Share file' description: 'Share a file with another user' tags: @@ -1244,10 +1316,9 @@ paths: '200': description: 'Ok' content: - application/octet-stream: + application/json: schema: - type: string - format: binary + $ref: '#/components/schemas/KVCount' '400': $ref: '#/components/responses/400' '500': @@ -1375,6 +1446,51 @@ paths: '500': $ref: '#/components/responses/500' + '/kv/entry/get-data': + get: + summary: 'Get Value' + description: 'Get value given a key' + tags: + - Key Value Store + parameters: + - in: cookie + name: fairOS-dfs + schema: + type: string + - in: query + name: fields + schema: + type: object + properties: + pod_name: + $ref: '#/components/schemas/PodName' + table_name: + $ref: '#/components/schemas/KVTableName' + key: + $ref: '#/components/schemas/KVKey' + format: + type: string + enum: [ string, byte-string ] + required: + - pod_name + - table_name + - key + responses: + '200': + description: 'Ok' + content: + application/json: + schema: + properties: + keys: + $ref: '#/components/schemas/KVgetResponseKeys' + values: + $ref: '#/components/schemas/KVgetResponseValues' + '400': + $ref: '#/components/responses/400' + '500': + $ref: '#/components/responses/500' + '/kv/entry/del': delete: summary: 'Delete Value' @@ -1544,6 +1660,44 @@ paths: '500': $ref: '#/components/responses/500' + '/kv/present': + get: + summary: 'Key Present' + description: 'Is Key present' + tags: + - Key Value Store + parameters: + - in: cookie + name: fairOS-dfs + schema: + type: string + - in: query + name: fields + schema: + type: object + properties: + pod_name: + $ref: '#/components/schemas/PodName' + table_name: + $ref: '#/components/schemas/KVTableName' + key: + $ref: '#/components/schemas/KVKey' + required: + - pod_name + - table_name + - key + responses: + '200': + description: 'Ok' + content: + application/json: + schema: + $ref: '#/components/schemas/DirPresentResponse' + '400': + $ref: '#/components/responses/400' + '500': + $ref: '#/components/responses/500' + '/doc/new': post: summary: 'Create DocumentDB' @@ -1701,7 +1855,7 @@ paths: '/doc/delete': delete: summary: 'Delete DocumentDB' - description: 'Delete the given ocument DB and all its documents and indexes' + description: 'Delete the given document DB and all its documents and indexes' tags: - Document DB parameters: @@ -2052,7 +2206,7 @@ components: $ref: '#/components/schemas/UserName' password: $ref: '#/components/schemas/Password' - address: + mnemonic: $ref: '#/components/schemas/Mnemonic' @@ -2361,6 +2515,17 @@ components: type: string example: "kv_table1" + KVCount: + type: object + properties: + count: + type: integer + table_name: + $ref: '#/components/schemas/KVTableName' + required: + - file_name + - reference + KVMemoryTable: type: string example: "true" @@ -2394,7 +2559,6 @@ components: type: string example: first_name=string,age=number,tags=map - DocTableName: type: string example: "doc_table1" diff --git a/pkg/api/handler.go b/pkg/api/handler.go index 86851e86..44113214 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -26,8 +26,8 @@ type Handler struct { logger logging.Logger } -func NewHandler(dataDir, beeHost, beePort, cookieDomain, postageBlockId string, logger logging.Logger) (*Handler, error) { - api, err := dfs.NewDfsAPI(dataDir, beeHost, beePort, cookieDomain, postageBlockId, logger) +func NewHandler(dataDir, beeApi, beeDebugApi, cookieDomain, postageBlockId string, logger logging.Logger) (*Handler, error) { + api, err := dfs.NewDfsAPI(dataDir, beeApi, beeDebugApi, cookieDomain, postageBlockId, logger) if err != nil { return nil, dfs.ErrBeeClient } diff --git a/pkg/api/kv_put_get_del.go b/pkg/api/kv_put_get_del.go index 784022c3..dc6fd8dc 100644 --- a/pkg/api/kv_put_get_del.go +++ b/pkg/api/kv_put_get_del.go @@ -18,10 +18,11 @@ package api import ( "encoding/json" + "fmt" "net/http" "github.com/fairdatasociety/fairOS-dfs/cmd/common" - + "github.com/fairdatasociety/fairOS-dfs/pkg/collection" "github.com/fairdatasociety/fairOS-dfs/pkg/cookie" "resenje.org/jsonhttp" ) @@ -31,6 +32,11 @@ type KVResponse struct { Values []byte `json:"values"` } +type KVResponseRaw struct { + Keys []string `json:"keys,omitempty"` + Values string `json:"values"` +} + // KVPutHandler is the api handler to insert a key and value in to the kv table // it takes three arguments // - table_name: the name of the kv table @@ -104,7 +110,8 @@ func (h *Handler) KVPutHandler(w http.ResponseWriter, r *http.Request) { } // KVGetHandler is the api handler to get a value from the kv table -// it takes two arguments +// it takes three arguments +// - pod_name: the name of the pod // - table_name: the name of the kv table // - key: the key string func (h *Handler) KVGetHandler(w http.ResponseWriter, r *http.Request) { @@ -163,6 +170,10 @@ func (h *Handler) KVGetHandler(w http.ResponseWriter, r *http.Request) { columns, data, err := h.dfsAPI.KVGet(sessionId, podName, name, key) if err != nil { h.logger.Errorf("kv get: %v", err) + if err == collection.ErrEntryNotFound { + jsonhttp.NotFound(w, "kv get: "+err.Error()) + return + } jsonhttp.InternalServerError(w, "kv get: "+err.Error()) return } @@ -179,6 +190,112 @@ func (h *Handler) KVGetHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.OK(w, &resp) } +// KVGetDataHandler is the api handler to get a value from the kv table +// it takes four arguments +// - pod_name: the name of the pod +// - table_name: the name of the kv table +// - key: the key string +// - format: whether the data should be string or byte-string +func (h *Handler) KVGetDataHandler(w http.ResponseWriter, r *http.Request) { + keys, ok := r.URL.Query()["pod_name"] + if !ok || len(keys[0]) < 1 { + h.logger.Errorf("kv get: \"pod_name\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"pod_name\" argument missing") + return + } + podName := keys[0] + if podName == "" { + h.logger.Errorf("kv get: \"pod_name\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"pod_name\" argument missing") + return + } + + keys, ok = r.URL.Query()["table_name"] + if !ok || len(keys[0]) < 1 { + h.logger.Errorf("kv get: \"table_name\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"table_name\" argument missing") + return + } + name := keys[0] + if name == "" { + h.logger.Errorf("kv get: \"table_name\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"table_name\" argument missing") + return + } + + keys, ok = r.URL.Query()["key"] + if !ok || len(keys[0]) < 1 { + h.logger.Errorf("kv get: \"sharing_ref\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"sharing_ref\" argument missing") + return + } + key := keys[0] + if key == "" { + h.logger.Errorf("kv get: \"key\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"key\" argument missing") + return + } + + formats, ok := r.URL.Query()["format"] + if !ok || len(formats[0]) < 1 { + h.logger.Errorf("kv get: \"format\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"format\" argument missing") + return + } + format := formats[0] + if format == "" { + h.logger.Errorf("kv get: \"format\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"format\" argument missing") + return + } + + if format != "string" && format != "byte-string" { + h.logger.Errorf("kv get: \"format\" argument is unknown") + jsonhttp.BadRequest(w, "kv get: \"format\" argument is unknown") + return + } + + // get values from cookie + sessionId, err := cookie.GetSessionIdFromCookie(r) + if err != nil { + h.logger.Errorf("kv get: invalid cookie: %v", err) + jsonhttp.BadRequest(w, ErrInvalidCookie) + return + } + if sessionId == "" { + h.logger.Errorf("kv get: \"cookie-id\" parameter missing in cookie") + jsonhttp.BadRequest(w, "kv get: \"cookie-id\" parameter missing in cookie") + return + } + + columns, data, err := h.dfsAPI.KVGet(sessionId, podName, name, key) + if err != nil { + h.logger.Errorf("kv get: %v", err) + if err == collection.ErrEntryNotFound { + jsonhttp.NotFound(w, "kv get: "+err.Error()) + return + } + jsonhttp.InternalServerError(w, "kv get: "+err.Error()) + return + } + + var resp KVResponseRaw + if columns != nil { + resp.Keys = columns + } else { + resp.Keys = []string{key} + } + + if format == "string" { + resp.Values = string(data) + } else { + resp.Values = fmt.Sprintf("%v", data) + } + + w.Header().Set("Content-Type", "application/json") + jsonhttp.OK(w, &resp) +} + // KVDelHandler is the api handler to delete a key and value from the kv table // it takes two arguments // - table_name: the name of the kv table @@ -242,3 +359,75 @@ func (h *Handler) KVDelHandler(w http.ResponseWriter, r *http.Request) { } jsonhttp.OK(w, "key deleted") } + +// KVPresentHandler is the api handler to check if a value exists in the kv table +// it takes three arguments +// - pod_name: the name of the pod +// - table_name: the name of the kv table +// - key: the key string +func (h *Handler) KVPresentHandler(w http.ResponseWriter, r *http.Request) { + keys, ok := r.URL.Query()["pod_name"] + if !ok || len(keys[0]) < 1 { + h.logger.Errorf("kv get: \"pod_name\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"pod_name\" argument missing") + return + } + podName := keys[0] + if podName == "" { + h.logger.Errorf("kv get: \"pod_name\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"pod_name\" argument missing") + return + } + + keys, ok = r.URL.Query()["table_name"] + if !ok || len(keys[0]) < 1 { + h.logger.Errorf("kv get: \"table_name\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"table_name\" argument missing") + return + } + name := keys[0] + if name == "" { + h.logger.Errorf("kv get: \"table_name\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"table_name\" argument missing") + return + } + + keys, ok = r.URL.Query()["key"] + if !ok || len(keys[0]) < 1 { + h.logger.Errorf("kv get: \"sharing_ref\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"sharing_ref\" argument missing") + return + } + key := keys[0] + if key == "" { + h.logger.Errorf("kv get: \"key\" argument missing") + jsonhttp.BadRequest(w, "kv get: \"key\" argument missing") + return + } + + // get values from cookie + sessionId, err := cookie.GetSessionIdFromCookie(r) + if err != nil { + h.logger.Errorf("kv get: invalid cookie: %v", err) + jsonhttp.BadRequest(w, ErrInvalidCookie) + return + } + if sessionId == "" { + h.logger.Errorf("kv get: \"cookie-id\" parameter missing in cookie") + jsonhttp.BadRequest(w, "kv get: \"cookie-id\" parameter missing in cookie") + return + } + w.Header().Set("Content-Type", "application/json") + + _, _, err = h.dfsAPI.KVGet(sessionId, podName, name, key) + if err != nil { + jsonhttp.OK(w, &PresentResponse{ + Present: false, + }) + return + } + + jsonhttp.OK(w, &PresentResponse{ + Present: true, + }) +} diff --git a/pkg/api/pod_present.go b/pkg/api/pod_present.go new file mode 100644 index 00000000..935ceded --- /dev/null +++ b/pkg/api/pod_present.go @@ -0,0 +1,48 @@ +package api + +import ( + "net/http" + + "github.com/fairdatasociety/fairOS-dfs/pkg/cookie" + + "resenje.org/jsonhttp" +) + +// PodPresentHandler is the api handler to check if a pod is present +// it takes pod_name as query parameter +func (h *Handler) PodPresentHandler(w http.ResponseWriter, r *http.Request) { + keys, ok := r.URL.Query()["pod_name"] + if !ok || len(keys[0]) < 1 { + h.logger.Errorf("doc ls: \"pod_name\" argument missing") + jsonhttp.BadRequest(w, "doc ls: \"pod_name\" argument missing") + return + } + podName := keys[0] + if podName == "" { + h.logger.Errorf("doc ls: \"pod_name\" argument missing") + jsonhttp.BadRequest(w, "doc ls: \"pod_name\" argument missing") + return + } + + // get values from cookie + sessionId, err := cookie.GetSessionIdFromCookie(r) + if err != nil { + h.logger.Errorf("pod open: invalid cookie: %v", err) + jsonhttp.BadRequest(w, ErrInvalidCookie) + return + } + if sessionId == "" { + h.logger.Errorf("pod open: \"cookie-id\" parameter missing in cookie") + jsonhttp.BadRequest(w, "pod open: \"cookie-id\" parameter missing in cookie") + return + } + if h.dfsAPI.IsPodExist(podName, sessionId) { + jsonhttp.OK(w, &PresentResponse{ + Present: true, + }) + } else { + jsonhttp.OK(w, &PresentResponse{ + Present: false, + }) + } +} diff --git a/pkg/api/user_login.go b/pkg/api/user_login.go index 4f9816f2..e63f8788 100644 --- a/pkg/api/user_login.go +++ b/pkg/api/user_login.go @@ -49,8 +49,8 @@ func (h *Handler) UserLoginHandler(w http.ResponseWriter, r *http.Request) { user := userReq.UserName password := userReq.Password if user == "" { - h.logger.Errorf("user login: \"user\" argument missing") - jsonhttp.BadRequest(w, "user login: \"user\" argument missing") + h.logger.Errorf("user login: \"user_name\" argument missing") + jsonhttp.BadRequest(w, "user login: \"user_name\" argument missing") return } if password == "" { diff --git a/pkg/api/user_present.go b/pkg/api/user_present.go index 27520496..f9f1c343 100644 --- a/pkg/api/user_present.go +++ b/pkg/api/user_present.go @@ -22,7 +22,7 @@ import ( "resenje.org/jsonhttp" ) -type UserPresentResponse struct { +type PresentResponse struct { Present bool `json:"present"` } @@ -47,11 +47,11 @@ func (h *Handler) UserPresentHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", " application/json") // check if user is present if h.dfsAPI.IsUserNameAvailable(user) { - jsonhttp.OK(w, &UserPresentResponse{ + jsonhttp.OK(w, &PresentResponse{ Present: true, }) } else { - jsonhttp.OK(w, &UserPresentResponse{ + jsonhttp.OK(w, &PresentResponse{ Present: false, }) } diff --git a/pkg/blockstore/bee/client.go b/pkg/blockstore/bee/client.go index 3586ef64..05c13cd9 100644 --- a/pkg/blockstore/bee/client.go +++ b/pkg/blockstore/bee/client.go @@ -54,9 +54,8 @@ const ( ) type BeeClient struct { - host string - port string url string + debugUrl string client *http.Client hasher *bmtlegacy.Hasher chunkCache *lru.Cache @@ -75,7 +74,7 @@ type bytesPostResponse struct { } // NewBeeClient creates a new client which connects to the Swarm bee node to access the Swarm network. -func NewBeeClient(host, port, postageBlockId string, logger logging.Logger) *BeeClient { +func NewBeeClient(apiUrl, debugApiUrl, postageBlockId string, logger logging.Logger) *BeeClient { p := bmtlegacy.NewTreePool(hashFunc, swarm.Branches, bmtlegacy.PoolSize) cache, err := lru.New(chunkCacheSize) if err != nil { @@ -91,9 +90,8 @@ func NewBeeClient(host, port, postageBlockId string, logger logging.Logger) *Bee } return &BeeClient{ - host: host, - port: port, - url: fmt.Sprintf("http://" + host + ":" + port), + url: apiUrl, + debugUrl: debugApiUrl, client: createHTTPClient(), hasher: bmtlegacy.New(p), chunkCache: cache, @@ -462,7 +460,7 @@ func (s *BeeClient) GetNewPostageBatch() error { to := time.Now() s.logger.Infof("Trying to get new postage batch id") path := filepath.Join(postageBatchUrl, "10000000/20") - fullUrl := fmt.Sprintf(s.url + path) + fullUrl := fmt.Sprintf(s.debugUrl + path) req, err := http.NewRequest(http.MethodPost, fullUrl, nil) if err != nil { return err diff --git a/pkg/dfs/api.go b/pkg/dfs/api.go index 1ab577da..fec98fba 100644 --- a/pkg/dfs/api.go +++ b/pkg/dfs/api.go @@ -31,8 +31,8 @@ type DfsAPI struct { } // NewDfsAPI is the main entry point for the df controller. -func NewDfsAPI(dataDir, host, port, cookieDomain, postageBlockId string, logger logging.Logger) (*DfsAPI, error) { - c := bee.NewBeeClient(host, port, postageBlockId, logger) +func NewDfsAPI(dataDir, apiUrl, debugApiUrl, cookieDomain, postageBlockId string, logger logging.Logger) (*DfsAPI, error) { + c := bee.NewBeeClient(apiUrl, debugApiUrl, postageBlockId, logger) if !c.CheckConnection() { return nil, ErrBeeClient } diff --git a/pkg/dfs/pod_api.go b/pkg/dfs/pod_api.go index 89ff4203..36e929aa 100644 --- a/pkg/dfs/pod_api.go +++ b/pkg/dfs/pod_api.go @@ -245,3 +245,11 @@ func (d *DfsAPI) PodReceive(sessionId string, ref utils.Reference) (*pod.Info, e return ui.GetPod().ReceivePod(ref) } + +func (d *DfsAPI) IsPodExist(podName, sessionId string) bool { + ui := d.users.GetLoggedInUserInfo(sessionId) + if ui == nil { + return false + } + return ui.GetPod().IsPodPresent(podName) +} diff --git a/pkg/file/upload.go b/pkg/file/upload.go index bbd1780c..5a63e1e3 100644 --- a/pkg/file/upload.go +++ b/pkg/file/upload.go @@ -65,69 +65,86 @@ func (f *File) Upload(fd io.Reader, podFileName string, fileSize int64, blockSiz refMap := make(map[int]*BlockInfo) refMapMu := sync.RWMutex{} var contentBytes []byte - for { - data := make([]byte, blockSize, blockSize+1024) - r, err := reader.Read(data) - totalLength += uint64(r) - if err != nil { - if err == io.EOF { - if totalLength < uint64(fileSize) { - return fmt.Errorf("invalid file length of file data received") + wg.Add(1) + go func() { + var mainErr error + for { + if mainErr != nil { + errC <- mainErr + wg.Done() + return + } + data := make([]byte, blockSize, blockSize+1024) + r, err := reader.Read(data) + totalLength += uint64(r) + if err != nil { + if err == io.EOF { + if totalLength < uint64(fileSize) { + errC <- fmt.Errorf("invalid file length of file data received") + return + } + wg.Done() + break } - break + errC <- err + return } - return err - } - // determine the content type from the first 512 bytes of the file - if len(contentBytes) < 512 { - contentBytes = append(contentBytes, data[:r]...) - if len(contentBytes) >= 512 { - cBytes := bytes.NewReader(contentBytes[:512]) - cReader := bufio.NewReader(cBytes) - meta.ContentType = f.getContentType(cReader) + // determine the content type from the first 512 bytes of the file + if len(contentBytes) < 512 { + contentBytes = append(contentBytes, data[:r]...) + if len(contentBytes) >= 512 { + cBytes := bytes.NewReader(contentBytes[:512]) + cReader := bufio.NewReader(cBytes) + meta.ContentType = f.getContentType(cReader) + } } - } - wg.Add(1) - worker <- true - go func(counter, size int) { - blockName := fmt.Sprintf("block-%05d", counter) - defer func() { - <-worker - wg.Done() - f.logger.Info("done uploading block ", blockName) - }() - - f.logger.Info("Uploading ", blockName) - // compress the data - uploadData := data[:size] - if compression != "" { - uploadData, err = compress(data[:size], compression, blockSize) - if err != nil { - errC <- err + wg.Add(1) + worker <- true + go func(counter, size int) { + blockName := fmt.Sprintf("block-%05d", counter) + defer func() { + <-worker + wg.Done() + if mainErr != nil { + f.logger.Error("failed uploading block ", blockName) + return + } + f.logger.Info("done uploading block ", blockName) + }() + + f.logger.Info("Uploading ", blockName) + // compress the data + uploadData := data[:size] + if compression != "" { + uploadData, err = compress(data[:size], compression, blockSize) + if err != nil { + mainErr = err + return + } } - } - addr, uploadErr := f.client.UploadBlob(uploadData, true, true) - if uploadErr != nil { - errC <- uploadErr - return - } - fileBlock := &BlockInfo{ - Name: blockName, - Size: uint32(size), - CompressedSize: uint32(len(uploadData)), - Reference: utils.NewReference(addr), - } + addr, uploadErr := f.client.UploadBlob(uploadData, true, true) + if uploadErr != nil { + mainErr = uploadErr + return + } + fileBlock := &BlockInfo{ + Name: blockName, + Size: uint32(size), + CompressedSize: uint32(len(uploadData)), + Reference: utils.NewReference(addr), + } - refMapMu.Lock() - defer refMapMu.Unlock() - refMap[counter] = fileBlock - }(i, r) + refMapMu.Lock() + defer refMapMu.Unlock() + refMap[counter] = fileBlock + }(i, r) - i++ - } + i++ + } + }() go func() { wg.Wait() diff --git a/pkg/pod/utils.go b/pkg/pod/utils.go index 0fd1e257..a27d3bbb 100644 --- a/pkg/pod/utils.go +++ b/pkg/pod/utils.go @@ -33,6 +33,25 @@ func (p *Pod) IsPodOpened(podName string) bool { return false } +func (p *Pod) IsPodPresent(podName string) bool { + podName, err := CleanPodName(podName) + if err != nil { + return false + } + // check if pods is present and get free index + pods, sharedPods, err := p.loadUserPods() + if err != nil { + return false + } + if p.checkIfPodPresent(pods, podName) { + return true + } + if p.checkIfSharedPodPresent(sharedPods, podName) { + return true + } + return false +} + func (*Pod) GetPath(inode *d.Inode) string { if inode != nil { return inode.Meta.Path diff --git a/vendor/modules.txt b/vendor/modules.txt index 676393e3..6ecb3b4d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -329,6 +329,7 @@ gopkg.in/ini.v1 # gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce gopkg.in/natefinch/npipe.v2 # gopkg.in/yaml.v2 v2.3.0 +## explicit gopkg.in/yaml.v2 # resenje.org/jsonhttp v0.2.0 ## explicit diff --git a/version.go b/version.go index 99cbbc09..66db20e1 100644 --- a/version.go +++ b/version.go @@ -5,7 +5,7 @@ package dfs var ( - version = "0.7.1" // manually set semantic version number + version = "0.7.3" // manually set semantic version number commit string // automatically set git commit hash Version = func() string { From 12eb5de3dd4ad712d0ce17e6bbd389a8a5a05b6f Mon Sep 17 00:00:00 2001 From: Sabyasachi Patra Date: Tue, 25 Jan 2022 21:24:26 +0530 Subject: [PATCH 6/6] whitelist ws origins (#182) --- cmd/dfs/cmd/server.go | 2 +- openapi/dfs.yaml | 2 +- pkg/api/handler.go | 9 ++++++--- pkg/api/ws.go | 9 +++++++-- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/cmd/dfs/cmd/server.go b/cmd/dfs/cmd/server.go index b4387628..804c6106 100644 --- a/cmd/dfs/cmd/server.go +++ b/cmd/dfs/cmd/server.go @@ -116,7 +116,7 @@ can consume it.`, logger.Info("cookieDomain : ", cookieDomain) logger.Info("postageBlockId : ", postageBlockId) logger.Info("corsOrigins : ", corsOrigins) - hdlr, err := api.NewHandler(dataDir, beeApi, beeDebugApi, cookieDomain, postageBlockId, logger) + hdlr, err := api.NewHandler(dataDir, beeApi, beeDebugApi, cookieDomain, postageBlockId, corsOrigins, logger) if err != nil { logger.Error(err.Error()) return diff --git a/openapi/dfs.yaml b/openapi/dfs.yaml index e46a0774..e1a19b51 100644 --- a/openapi/dfs.yaml +++ b/openapi/dfs.yaml @@ -10,7 +10,7 @@ externalDocs: url: 'https://fairos.io' servers: - - url: 'https://{apiRoot}:{port}/v1' + - url: 'http://{apiRoot}:{port}/v1' variables: apiRoot: default: 'localhost' diff --git a/pkg/api/handler.go b/pkg/api/handler.go index 44113214..29f7a37b 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -24,15 +24,18 @@ import ( type Handler struct { dfsAPI *dfs.DfsAPI logger logging.Logger + + whitelistedOrigins []string } -func NewHandler(dataDir, beeApi, beeDebugApi, cookieDomain, postageBlockId string, logger logging.Logger) (*Handler, error) { +func NewHandler(dataDir, beeApi, beeDebugApi, cookieDomain, postageBlockId string, whitelistedOrigins []string, logger logging.Logger) (*Handler, error) { api, err := dfs.NewDfsAPI(dataDir, beeApi, beeDebugApi, cookieDomain, postageBlockId, logger) if err != nil { return nil, dfs.ErrBeeClient } return &Handler{ - dfsAPI: api, - logger: logger, + dfsAPI: api, + logger: logger, + whitelistedOrigins: whitelistedOrigins, }, nil } diff --git a/pkg/api/ws.go b/pkg/api/ws.go index b15b51dc..008ae48a 100644 --- a/pkg/api/ws.go +++ b/pkg/api/ws.go @@ -31,8 +31,13 @@ var ( func (h *Handler) WebsocketHandler(w http.ResponseWriter, r *http.Request) { upgrader := websocket.Upgrader{} // use default options upgrader.CheckOrigin = func(r *http.Request) bool { - // TODO: whitelist origins - return true + origin := r.Header.Get("Origin") + for _, v := range h.whitelistedOrigins { + if origin == v { + return true + } + } + return false } conn, err := upgrader.Upgrade(w, r, nil) if err != nil {