diff --git a/db-connector.go b/db-connector.go index a1fed67..af128c4 100755 --- a/db-connector.go +++ b/db-connector.go @@ -9566,6 +9566,9 @@ func GetDisabledRules(ctx context.Context, orgId string) (*DisabledRules, error) defer res.Body.Close() if res.StatusCode == 404 { + // Index empty + //log.Printf("[DEBUG] No disabled rules for org %s. Should auto-index?", orgId) + return disabledRules, nil } @@ -9584,6 +9587,11 @@ func GetDisabledRules(ctx context.Context, orgId string) (*DisabledRules, error) } else { key := datastore.NameKey(nameKey, orgId, nil) if err := project.Dbclient.Get(ctx, key, disabledRules); err != nil { + if strings.Contains(err.Error(), "no such entity") { + //log.Printf("[DEBUG] No disabled rules for org %s. Should auto-index?", orgId) + return disabledRules, nil + } + log.Printf("[WARNING] Error getting disabled for org %s: %s", orgId, err) return disabledRules, err } diff --git a/detection.go b/detection.go index f6045a3..35e82a1 100644 --- a/detection.go +++ b/detection.go @@ -75,8 +75,11 @@ func HandleGetDetectionRules(resp http.ResponseWriter, request *http.Request) { continue } - var fileContent []byte + if file.Status != "active" { + continue + } + var fileContent []byte if file.Encrypted { if project.Environment == "cloud" || file.StorageArea == "google_storage" { log.Printf("[ERROR] No namespace handler for cloud decryption (detection)!") @@ -329,19 +332,21 @@ func HandleFolderToggle(resp http.ResponseWriter, request *http.Request) { } location := strings.Split(request.URL.String(), "/") - if location[1] != "api" || len(location) < 6 { - log.Printf("Path too short or incorrect: %s", request.URL.String()) + if location[1] != "api" || len(location) < 7 { + log.Printf("[ERROR] Path too short or incorrect for detection toggle (2): %s", request.URL.String()) resp.WriteHeader(401) resp.Write([]byte(`{"success": false}`)) return } ctx := GetContext(request) - action := location[5] + + detectionType := location[4] + _ = detectionType + action := location[6] rules, err := GetDisabledRules(ctx, user.ActiveOrg.Id) if err != nil { - log.Printf("[WARNING] Cannot get the rules, reason %s", err) resp.WriteHeader(404) resp.Write([]byte(`{"success": false}`)) return @@ -467,8 +472,8 @@ func HandleGetSelectedRules(resp http.ResponseWriter, request *http.Request) { var triggerId string location := strings.Split(request.URL.String(), "/") if len(location) < 5 || location[1] != "api" { - log.Printf("[INFO] Path too short or incorrect: %d", len(location)) - resp.WriteHeader(401) + log.Printf("[ERROR] Path too short or incorrect: %d", len(location)) + resp.WriteHeader(400) resp.Write([]byte(`{"success": false}`)) return } @@ -520,8 +525,8 @@ func HandleSaveSelectedRules(resp http.ResponseWriter, request *http.Request) { location := strings.Split(request.URL.String(), "/") if len(location) < 5 || location[1] != "api" { - log.Printf("[INFO] Path too short or incorrect: %d", len(location)) - resp.WriteHeader(http.StatusBadRequest) + log.Printf("[INFO] Path too short or incorrect (1): %d", len(location)) + resp.WriteHeader(400) resp.Write([]byte(`{"success": false}`)) return } @@ -604,7 +609,7 @@ func HandleDetectionAutoConnect(resp http.ResponseWriter, request *http.Request) log.Printf("[ERROR] Failed to create Sigma handling workflow: %s", err) } - log.Printf("[DEBUG] Sending orborus request to start Sigma handling workflow") + log.Printf("[DEBUG] Sending orborus request to start Sigma handling IF an available environment is found.") execType := "START_TENZIR" err = SetDetectionOrborusRequest(ctx, user.ActiveOrg.Id, execType, "", "SIGMA", "SHUFFLE_DISCOVER") @@ -646,6 +651,8 @@ func HandleDetectionAutoConnect(resp http.ResponseWriter, request *http.Request) } } else { + log.Printf("[ERROR] Detection Type '%s' not implemented", detectionType) + resp.WriteHeader(400) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Detection Type '%s' not implemented"}`, detectionType))) return @@ -689,6 +696,8 @@ func SetDetectionOrborusRequest(ctx context.Context, orgId, execType, fileName, continue } + // Validates if the environment already has a lake running + /* cacheKey := fmt.Sprintf("queueconfig-%s-%s", env.Name, env.OrgId) cache, err := GetCache(ctx, cacheKey) if err == nil { @@ -702,6 +711,7 @@ func SetDetectionOrborusRequest(ctx context.Context, orgId, execType, fileName, } } } + */ selectedEnvironments = append(selectedEnvironments, env) } @@ -711,11 +721,11 @@ func SetDetectionOrborusRequest(ctx context.Context, orgId, execType, fileName, log.Printf("[ERROR] No environments needing a lake. Found lake nodes: %d", lakeNodes) return nil } else { - return fmt.Errorf("No valid environments found") + return fmt.Errorf("No valid environments found for detection distribution") } } - log.Printf("[DEBUG] Found %d potentially valid environment(s)", len(selectedEnvironments)) + log.Printf("[DEBUG] Found %d potentially valid environment for detection distribution (s)", len(selectedEnvironments)) deployedToActiveEnv := false for _, env := range selectedEnvironments { diff --git a/files.go b/files.go index 249e267..d89fe7f 100755 --- a/files.go +++ b/files.go @@ -515,16 +515,21 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) { namespace = strings.Split(namespace, "?")[0] } - namespace = strings.Replace(namespace, "%20", " ", -1) + // URL decode namespace + namespace, err := url.QueryUnescape(namespace) + if err != nil { + log.Printf("[WARNING] Failed to decode namespace value '%s': %s", namespace, err) + } // 1. Check user directly // 2. Check workflow execution authorization user, err := HandleApiAuthentication(resp, request) if err != nil { //log.Printf("[AUDIT] INITIAL Api authentication failed in file download: %s", err) - orgId, err := fileAuthentication(request) - if err != nil { - log.Printf("[WARNING] Bad file authentication in get namespace %s: %s", namespace, err) + + orgId, fileerr := fileAuthentication(request) + if fileerr != nil { + log.Printf("[WARNING] Bad authentication in get namespace AFTER trying normal user auth %s: %s & %s", namespace, err, fileerr) resp.WriteHeader(401) resp.Write([]byte(`{"success": false}`)) return @@ -771,62 +776,23 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) { } } - //zipfile := fmt.Sprintf("%s.zip", namespace) buf := new(bytes.Buffer) zipWriter := zip.NewWriter(buf) + // FIXME: Goroutine this + Cache it for future requests + packed := 0 for _, file := range fileResponse.Files { - var filedata = []byte{} - if file.Encrypted { - if project.Environment == "cloud" || file.StorageArea == "google_storage" { - log.Printf("[ERROR] No namespace handler for cloud decryption (files)!") - } else { - Openfile, err := os.Open(file.DownloadPath) - defer Openfile.Close() //Close after function return - - allText := []byte{} - buf := make([]byte, 1024) - for { - n, err := Openfile.Read(buf) - if err == io.EOF { - break - } - - if err != nil { - continue - } - - if n > 0 { - //fmt.Println(string(buf[:n])) - allText = append(allText, buf[:n]...) - } - } - - passphrase := fmt.Sprintf("%s_%s", user.ActiveOrg.Id, file.Id) - if len(file.ReferenceFileId) > 0 { - passphrase = fmt.Sprintf("%s_%s", user.ActiveOrg.Id, file.ReferenceFileId) - } - - data, err := HandleKeyDecryption(allText, passphrase) - if err != nil { - log.Printf("[ERROR] Failed decrypting file (3): %s", err) - } else { - //log.Printf("[DEBUG] File size of %s reduced from %d to %d after decryption (1)", file.Id, len(allText), len(data)) - allText = []byte(data) - } - - filedata = allText - } - } else { - filedata, err = ioutil.ReadFile(file.DownloadPath) - if err != nil { - log.Printf("Filereading failed for %s create zip file : %v", file.Filename, err) - continue - } + // Goroutine this get file section + filedata, err := GetFileContent(ctx, &file, nil) + if err != nil { + log.Printf("[ERROR] Failed getting file content for %s (%s): %s", file.Filename, file.Id, err) + continue } - //log.Printf("DATA: %s", string(filedata)) + if len(filedata) == 0 { + log.Printf("[ERROR] No data found for file %s (%s)", file.Filename, file.Id) + } zipFile, err := zipWriter.Create(file.Filename) if err != nil { @@ -834,7 +800,6 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) { continue } - // Have to use Fprintln otherwise it tries to parse all strings etc. if _, err := fmt.Fprintln(zipFile, string(filedata)); err != nil { log.Printf("[WARNING] Datapasting failed for %s when creating zip file from bucket: %v", file.Filename, err) continue @@ -846,19 +811,20 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) { err = zipWriter.Close() if err != nil { log.Printf("[WARNING] Packing failed to close zip file writer: %v", err) - resp.WriteHeader(401) + resp.WriteHeader(500) resp.Write([]byte(`{"success": false}`)) return } if packed == 0 { log.Printf("[WARNING] Couldn't find anything for namespace %s in org %s", namespace, user.ActiveOrg.Id) - resp.WriteHeader(401) + resp.WriteHeader(500) resp.Write([]byte(`{"success": false}`)) return } log.Printf("[DEBUG] Packed %d files from namespace %s into the zip for %s (%s)", packed, namespace, user.Username, user.Id) + FileHeader := make([]byte, 512) FileContentType := http.DetectContentType(FileHeader) resp.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s.zip", namespace)) @@ -981,6 +947,7 @@ func GetFileContent(ctx context.Context, file *File, resp http.ResponseWriter) ( resp.WriteHeader(500) resp.Write([]byte(`{"success": false, "reason": "Failed setting file to deleted"}`)) } + return []byte{}, err } diff --git a/go.mod b/go.mod index 97b8ec0..6aa5eee 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/shuffle/shuffle-shared -go 1.22.7 +go 1.22.2 toolchain go1.23.1 diff --git a/pipelines.go b/pipelines.go index c98efcd..c76b9a3 100644 --- a/pipelines.go +++ b/pipelines.go @@ -55,19 +55,49 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) log.Printf("[AUDIT] User %s in org %s (%s) is creating a new pipeline with command '%s' in environment '%s'", user.Username, user.ActiveOrg.Name, user.ActiveOrg.Id, pipeline.Type, pipeline.Environment) if len(pipeline.Name) < 1 { + pipeline.Name = pipeline.Command + + /* log.Printf("[WARNING] Name is required for new pipelines") resp.WriteHeader(400) resp.Write([]byte(`{"success": false, "reason": "Name is required"}`)) return + */ } - if len(pipeline.Environment) < 1 { - log.Printf("[WARNING] Environment is required for new pipelines") - resp.WriteHeader(400) - resp.Write([]byte(`{"success": false, "reason": "Environment is required"}`)) + ctx := GetContext(request) + environments, err := GetEnvironments(ctx, user.ActiveOrg.Id) + if err != nil { + log.Printf("[WARNING] Error getting environments: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(`{"success": false}`)) return } + if len(pipeline.Environment) < 1 { + for _, env := range environments { + if env.Archived { + continue + } + + if strings.ToLower(env.Type) == "cloud" { + continue + } + + pipeline.Environment = env.Name + if env.DataLake.Enabled { + break + } + } + + if len(pipeline.Environment) < 1 { + log.Printf("[WARNING] Environment is required for new pipelines") + resp.WriteHeader(400) + resp.Write([]byte(`{"success": false, "reason": "No environment found"}`)) + return + } + } + pipeline.Environment = strings.TrimSpace(pipeline.Environment) if strings.ToLower(pipeline.Environment) == "cloud" { log.Printf("[WARNING] Cloud is not a valid environment") @@ -76,14 +106,6 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) return } - ctx := GetContext(request) - environments, err := GetEnvironments(ctx, user.ActiveOrg.Id) - if err != nil { - log.Printf("[WARNING] Error getting environments: %s", err) - resp.WriteHeader(500) - resp.Write([]byte(`{"success": false}`)) - return - } envFound := false for _, env := range environments { @@ -154,7 +176,9 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) } } - //log.Printf("[INFO] Pipeline type: %s", formattedType) + if len(pipeline.TriggerId) < 1 { + pipeline.TriggerId = uuid.New().String() + } // 2. Send to environment queue execRequest := ExecutionRequest{ @@ -210,7 +234,8 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) resp.Write([]byte(`{"success": false}`)) return } - log.Printf("[INFO] Set up pipeline with trigger ID %s and environment %s", pipeline.TriggerId, pipeline.Environment) + + log.Printf("[INFO] Set up pipeline '%s' with trigger ID '%s' and environment '%s'", pipeline.Command, pipeline.TriggerId, pipeline.Environment) } err = SetWorkflowQueue(ctx, execRequest, parsedEnv) @@ -222,7 +247,7 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request) } resp.WriteHeader(200) - resp.Write([]byte(`{"success": true, "reason": "Pipeline handled successfully."}`)) + resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Pipeline queued to be deployed in environment '%s'."}`, pipeline.Environment))) } func deletePipeline(ctx context.Context, pipeline Pipeline) error { diff --git a/shared.go b/shared.go index 2b587f5..a2c87ed 100755 --- a/shared.go +++ b/shared.go @@ -22790,7 +22790,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h if strings.ToLower(action.Environment) == "cloud" && project.Environment == "cloud" { //log.Printf("[DEBUG] Couldn't find environment %s in cloud for some reason.", action.Environment) } else { - log.Printf("[WARNING][%s] Couldn't find environment %s when running workflow '%s'. Maybe it's inactive?", workflowExecution.Id, action.Environment, workflowExecution.Workflow.ID) + log.Printf("[WARNING][%s] Couldn't find environment %s when running workflow '%s'. Maybe it's inactive?", workflowExecution.ExecutionId, action.Environment, workflowExecution.Workflow.ID) return workflowExecution, ExecInfo{}, "Couldn't find the environment", errors.New(fmt.Sprintf("Couldn't find env '%s' in org '%s'", action.Environment, workflowExecution.ExecutionOrg)) } }