Skip to content

Commit

Permalink
Fixed a bunch of file loading/namespace/detection issues
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Jan 20, 2025
1 parent c15c71c commit a8634d2
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 85 deletions.
8 changes: 8 additions & 0 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9566,6 +9566,9 @@ func GetDisabledRules(ctx context.Context, orgId string) (*DisabledRules, error)

defer res.Body.Close()
if res.StatusCode == 404 {
// Index empty
//log.Printf("[DEBUG] No disabled rules for org %s. Should auto-index?", orgId)

return disabledRules, nil
}

Expand All @@ -9584,6 +9587,11 @@ func GetDisabledRules(ctx context.Context, orgId string) (*DisabledRules, error)
} else {
key := datastore.NameKey(nameKey, orgId, nil)
if err := project.Dbclient.Get(ctx, key, disabledRules); err != nil {
if strings.Contains(err.Error(), "no such entity") {
//log.Printf("[DEBUG] No disabled rules for org %s. Should auto-index?", orgId)
return disabledRules, nil
}

log.Printf("[WARNING] Error getting disabled for org %s: %s", orgId, err)
return disabledRules, err
}
Expand Down
34 changes: 22 additions & 12 deletions detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ func HandleGetDetectionRules(resp http.ResponseWriter, request *http.Request) {
continue
}

var fileContent []byte
if file.Status != "active" {
continue
}

var fileContent []byte
if file.Encrypted {
if project.Environment == "cloud" || file.StorageArea == "google_storage" {
log.Printf("[ERROR] No namespace handler for cloud decryption (detection)!")
Expand Down Expand Up @@ -329,19 +332,21 @@ func HandleFolderToggle(resp http.ResponseWriter, request *http.Request) {
}

location := strings.Split(request.URL.String(), "/")
if location[1] != "api" || len(location) < 6 {
log.Printf("Path too short or incorrect: %s", request.URL.String())
if location[1] != "api" || len(location) < 7 {
log.Printf("[ERROR] Path too short or incorrect for detection toggle (2): %s", request.URL.String())
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
}

ctx := GetContext(request)
action := location[5]

detectionType := location[4]
_ = detectionType
action := location[6]

rules, err := GetDisabledRules(ctx, user.ActiveOrg.Id)
if err != nil {
log.Printf("[WARNING] Cannot get the rules, reason %s", err)
resp.WriteHeader(404)
resp.Write([]byte(`{"success": false}`))
return
Expand Down Expand Up @@ -467,8 +472,8 @@ func HandleGetSelectedRules(resp http.ResponseWriter, request *http.Request) {
var triggerId string
location := strings.Split(request.URL.String(), "/")
if len(location) < 5 || location[1] != "api" {
log.Printf("[INFO] Path too short or incorrect: %d", len(location))
resp.WriteHeader(401)
log.Printf("[ERROR] Path too short or incorrect: %d", len(location))
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false}`))
return
}
Expand Down Expand Up @@ -520,8 +525,8 @@ func HandleSaveSelectedRules(resp http.ResponseWriter, request *http.Request) {

location := strings.Split(request.URL.String(), "/")
if len(location) < 5 || location[1] != "api" {
log.Printf("[INFO] Path too short or incorrect: %d", len(location))
resp.WriteHeader(http.StatusBadRequest)
log.Printf("[INFO] Path too short or incorrect (1): %d", len(location))
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false}`))
return
}
Expand Down Expand Up @@ -604,7 +609,7 @@ func HandleDetectionAutoConnect(resp http.ResponseWriter, request *http.Request)
log.Printf("[ERROR] Failed to create Sigma handling workflow: %s", err)
}

log.Printf("[DEBUG] Sending orborus request to start Sigma handling workflow")
log.Printf("[DEBUG] Sending orborus request to start Sigma handling IF an available environment is found.")

execType := "START_TENZIR"
err = SetDetectionOrborusRequest(ctx, user.ActiveOrg.Id, execType, "", "SIGMA", "SHUFFLE_DISCOVER")
Expand Down Expand Up @@ -646,6 +651,8 @@ func HandleDetectionAutoConnect(resp http.ResponseWriter, request *http.Request)
}

} else {
log.Printf("[ERROR] Detection Type '%s' not implemented", detectionType)

resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Detection Type '%s' not implemented"}`, detectionType)))
return
Expand Down Expand Up @@ -689,6 +696,8 @@ func SetDetectionOrborusRequest(ctx context.Context, orgId, execType, fileName,
continue
}

// Validates if the environment already has a lake running
/*
cacheKey := fmt.Sprintf("queueconfig-%s-%s", env.Name, env.OrgId)
cache, err := GetCache(ctx, cacheKey)
if err == nil {
Expand All @@ -702,6 +711,7 @@ func SetDetectionOrborusRequest(ctx context.Context, orgId, execType, fileName,
}
}
}
*/

selectedEnvironments = append(selectedEnvironments, env)
}
Expand All @@ -711,11 +721,11 @@ func SetDetectionOrborusRequest(ctx context.Context, orgId, execType, fileName,
log.Printf("[ERROR] No environments needing a lake. Found lake nodes: %d", lakeNodes)
return nil
} else {
return fmt.Errorf("No valid environments found")
return fmt.Errorf("No valid environments found for detection distribution")
}
}

log.Printf("[DEBUG] Found %d potentially valid environment(s)", len(selectedEnvironments))
log.Printf("[DEBUG] Found %d potentially valid environment for detection distribution (s)", len(selectedEnvironments))

deployedToActiveEnv := false
for _, env := range selectedEnvironments {
Expand Down
79 changes: 23 additions & 56 deletions files.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,16 +515,21 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) {
namespace = strings.Split(namespace, "?")[0]
}

namespace = strings.Replace(namespace, "%20", " ", -1)
// URL decode namespace
namespace, err := url.QueryUnescape(namespace)
if err != nil {
log.Printf("[WARNING] Failed to decode namespace value '%s': %s", namespace, err)
}

// 1. Check user directly
// 2. Check workflow execution authorization
user, err := HandleApiAuthentication(resp, request)
if err != nil {
//log.Printf("[AUDIT] INITIAL Api authentication failed in file download: %s", err)
orgId, err := fileAuthentication(request)
if err != nil {
log.Printf("[WARNING] Bad file authentication in get namespace %s: %s", namespace, err)

orgId, fileerr := fileAuthentication(request)
if fileerr != nil {
log.Printf("[WARNING] Bad authentication in get namespace AFTER trying normal user auth %s: %s & %s", namespace, err, fileerr)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
Expand Down Expand Up @@ -771,70 +776,30 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) {
}
}

//zipfile := fmt.Sprintf("%s.zip", namespace)
buf := new(bytes.Buffer)
zipWriter := zip.NewWriter(buf)

// FIXME: Goroutine this + Cache it for future requests

packed := 0
for _, file := range fileResponse.Files {
var filedata = []byte{}
if file.Encrypted {
if project.Environment == "cloud" || file.StorageArea == "google_storage" {
log.Printf("[ERROR] No namespace handler for cloud decryption (files)!")
} else {
Openfile, err := os.Open(file.DownloadPath)
defer Openfile.Close() //Close after function return

allText := []byte{}
buf := make([]byte, 1024)
for {
n, err := Openfile.Read(buf)
if err == io.EOF {
break
}

if err != nil {
continue
}

if n > 0 {
//fmt.Println(string(buf[:n]))
allText = append(allText, buf[:n]...)
}
}

passphrase := fmt.Sprintf("%s_%s", user.ActiveOrg.Id, file.Id)
if len(file.ReferenceFileId) > 0 {
passphrase = fmt.Sprintf("%s_%s", user.ActiveOrg.Id, file.ReferenceFileId)
}

data, err := HandleKeyDecryption(allText, passphrase)
if err != nil {
log.Printf("[ERROR] Failed decrypting file (3): %s", err)
} else {
//log.Printf("[DEBUG] File size of %s reduced from %d to %d after decryption (1)", file.Id, len(allText), len(data))
allText = []byte(data)
}

filedata = allText
}
} else {
filedata, err = ioutil.ReadFile(file.DownloadPath)
if err != nil {
log.Printf("Filereading failed for %s create zip file : %v", file.Filename, err)
continue
}
// Goroutine this get file section
filedata, err := GetFileContent(ctx, &file, nil)
if err != nil {
log.Printf("[ERROR] Failed getting file content for %s (%s): %s", file.Filename, file.Id, err)
continue
}

//log.Printf("DATA: %s", string(filedata))
if len(filedata) == 0 {
log.Printf("[ERROR] No data found for file %s (%s)", file.Filename, file.Id)
}

zipFile, err := zipWriter.Create(file.Filename)
if err != nil {
log.Printf("[WARNING] Packing failed for %s create zip file: %v", file.Filename, err)
continue
}

// Have to use Fprintln otherwise it tries to parse all strings etc.
if _, err := fmt.Fprintln(zipFile, string(filedata)); err != nil {
log.Printf("[WARNING] Datapasting failed for %s when creating zip file from bucket: %v", file.Filename, err)
continue
Expand All @@ -846,19 +811,20 @@ func HandleGetFileNamespace(resp http.ResponseWriter, request *http.Request) {
err = zipWriter.Close()
if err != nil {
log.Printf("[WARNING] Packing failed to close zip file writer: %v", err)
resp.WriteHeader(401)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}

if packed == 0 {
log.Printf("[WARNING] Couldn't find anything for namespace %s in org %s", namespace, user.ActiveOrg.Id)
resp.WriteHeader(401)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}

log.Printf("[DEBUG] Packed %d files from namespace %s into the zip for %s (%s)", packed, namespace, user.Username, user.Id)

FileHeader := make([]byte, 512)
FileContentType := http.DetectContentType(FileHeader)
resp.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s.zip", namespace))
Expand Down Expand Up @@ -981,6 +947,7 @@ func GetFileContent(ctx context.Context, file *File, resp http.ResponseWriter) (
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false, "reason": "Failed setting file to deleted"}`))
}

return []byte{}, err
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/shuffle/shuffle-shared

go 1.22.7
go 1.22.2

toolchain go1.23.1

Expand Down
55 changes: 40 additions & 15 deletions pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,49 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
log.Printf("[AUDIT] User %s in org %s (%s) is creating a new pipeline with command '%s' in environment '%s'", user.Username, user.ActiveOrg.Name, user.ActiveOrg.Id, pipeline.Type, pipeline.Environment)

if len(pipeline.Name) < 1 {
pipeline.Name = pipeline.Command

/*
log.Printf("[WARNING] Name is required for new pipelines")
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false, "reason": "Name is required"}`))
return
*/
}

if len(pipeline.Environment) < 1 {
log.Printf("[WARNING] Environment is required for new pipelines")
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false, "reason": "Environment is required"}`))
ctx := GetContext(request)
environments, err := GetEnvironments(ctx, user.ActiveOrg.Id)
if err != nil {
log.Printf("[WARNING] Error getting environments: %s", err)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}

if len(pipeline.Environment) < 1 {
for _, env := range environments {
if env.Archived {
continue
}

if strings.ToLower(env.Type) == "cloud" {
continue
}

pipeline.Environment = env.Name
if env.DataLake.Enabled {
break
}
}

if len(pipeline.Environment) < 1 {
log.Printf("[WARNING] Environment is required for new pipelines")
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false, "reason": "No environment found"}`))
return
}
}

pipeline.Environment = strings.TrimSpace(pipeline.Environment)
if strings.ToLower(pipeline.Environment) == "cloud" {
log.Printf("[WARNING] Cloud is not a valid environment")
Expand All @@ -76,14 +106,6 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
return
}

ctx := GetContext(request)
environments, err := GetEnvironments(ctx, user.ActiveOrg.Id)
if err != nil {
log.Printf("[WARNING] Error getting environments: %s", err)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}

envFound := false
for _, env := range environments {
Expand Down Expand Up @@ -154,7 +176,9 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
}
}

//log.Printf("[INFO] Pipeline type: %s", formattedType)
if len(pipeline.TriggerId) < 1 {
pipeline.TriggerId = uuid.New().String()
}

// 2. Send to environment queue
execRequest := ExecutionRequest{
Expand Down Expand Up @@ -210,7 +234,8 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
resp.Write([]byte(`{"success": false}`))
return
}
log.Printf("[INFO] Set up pipeline with trigger ID %s and environment %s", pipeline.TriggerId, pipeline.Environment)

log.Printf("[INFO] Set up pipeline '%s' with trigger ID '%s' and environment '%s'", pipeline.Command, pipeline.TriggerId, pipeline.Environment)
}

err = SetWorkflowQueue(ctx, execRequest, parsedEnv)
Expand All @@ -222,7 +247,7 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
}

resp.WriteHeader(200)
resp.Write([]byte(`{"success": true, "reason": "Pipeline handled successfully."}`))
resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Pipeline queued to be deployed in environment '%s'."}`, pipeline.Environment)))
}

func deletePipeline(ctx context.Context, pipeline Pipeline) error {
Expand Down
2 changes: 1 addition & 1 deletion shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -22790,7 +22790,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
if strings.ToLower(action.Environment) == "cloud" && project.Environment == "cloud" {
//log.Printf("[DEBUG] Couldn't find environment %s in cloud for some reason.", action.Environment)
} else {
log.Printf("[WARNING][%s] Couldn't find environment %s when running workflow '%s'. Maybe it's inactive?", workflowExecution.Id, action.Environment, workflowExecution.Workflow.ID)
log.Printf("[WARNING][%s] Couldn't find environment %s when running workflow '%s'. Maybe it's inactive?", workflowExecution.ExecutionId, action.Environment, workflowExecution.Workflow.ID)
return workflowExecution, ExecInfo{}, "Couldn't find the environment", errors.New(fmt.Sprintf("Couldn't find env '%s' in org '%s'", action.Environment, workflowExecution.ExecutionOrg))
}
}
Expand Down

0 comments on commit a8634d2

Please sign in to comment.