Skip to content

Commit

Permalink
RHCLOUD-33514 Implement blocklist (#374)
Browse files Browse the repository at this point in the history
* implementing blocklist to block uploads and new runs from orgids

---------

Co-authored-by: Derek Horton <[email protected]>
  • Loading branch information
tahmidefaz and dehort authored Sep 25, 2024
1 parent f42e6c9 commit 22853a4
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 1 deletion.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ sample_upload.xz:
sample_rhc_sat_upload:
curl -v -F "file=@examples/rhcsat-success.jsonl;type=application/vnd.redhat.playbook-sat.v3+jsonl" -H "x-rh-identity: eyJpZGVudGl0eSI6IHsiYWNjb3VudF9udW1iZXIiOiAiMDAwMDAwMSIsICJ0eXBlIjogIlN5c3RlbSIsICJpbnRlcm5hbCI6IHsib3JnX2lkIjogIjAwMDAwMSJ9fX0=" -H "x-rh-request_id: 380b4a04-7eae-4dff-a0b8-6e1af9186df0" http://localhost:8080/api/ingress/v1/upload

sample_blocked_upload:
curl -v -F "file=@examples/events-success.jsonl;type=application/vnd.redhat.playbook.v1+jsonl" -H "x-rh-identity: eyJpZGVudGl0eSI6IHsiYWNjb3VudF9udW1iZXIiOiAiMDAwMDAwMSIsICJ0eXBlIjogIlN5c3RlbSIsICJpbnRlcm5hbCI6IHsib3JnX2lkIjogIjEzMzcifX19" -H "x-rh-request_id: 380b4a04-7eae-4dff-a0b8-6e1af9186df0" http://localhost:8080/api/ingress/v1/upload

sample: sample_request sample_upload

connector_create:
Expand Down
8 changes: 8 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ objects:
- name: SOURCES_PORT
value: ${SOURCES_CONNECTOR_PORT}

- name: BLOCKLIST_ORG_IDS
value: ${BLOCKLIST_ORG_IDS}

resources:
limits:
cpu: ${CPU_LIMIT}
Expand Down Expand Up @@ -266,6 +269,8 @@ objects:
value: ${STORAGE_MAX_CONCURRENCY}
- name: ARTIFACT_MAX_SIZE
value: ${ARTIFACT_MAX_SIZE}
- name: BLOCKLIST_ORG_IDS
value: ${BLOCKLIST_ORG_IDS}
resources:
limits:
cpu: ${CPU_LIMIT}
Expand Down Expand Up @@ -378,6 +383,9 @@ parameters:
- name: SOURCES_CONNECTOR_PORT
value: '8080'

- name: BLOCKLIST_ORG_IDS
value: ""

# Used for testing in ephemeral environments only.
- name: PSK_AUTH_TEST
value: "" # If a value is not provided the principal is ignored.
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:
CLOWDER_ENABLED: "false"
DB_HOST: "db"
PSK_AUTH_TEST: "xwKhCUzgJ8"
BLOCKLIST_ORG_IDS: "1337,7331"
restart: unless-stopped

zookeeper:
Expand Down Expand Up @@ -77,7 +78,7 @@ services:
- '8080:3000'
environment:
- INGRESS_STAGEBUCKET=insights-upload-perma
- INGRESS_VALIDTOPICS=playbook,playbook-sat
- INGRESS_VALID_UPLOAD_TYPES=playbook,playbook-sat
- OPENSHIFT_BUILD_COMMIT=somestring
- INGRESS_MAXSIZE=104857600
- INGRESS_MINIODEV=true
Expand Down
33 changes: 33 additions & 0 deletions internal/api/controllers/private/private_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"

"playbook-dispatcher/internal/common/config"
"playbook-dispatcher/internal/common/utils"
)

func TestConfig(t *testing.T) {
Expand Down Expand Up @@ -55,3 +58,33 @@ var _ = Describe("Validation", func() {
),
)
})

var _ = Describe("Blocklisted OrgIDs", func() {
DescribeTable("validateFields",
func(orgID string, result bool) {
cfg := config.Get()

cfg.Set("blocklist.org.ids", "1337,1234")

isBlocked := utils.IsOrgIdBlocklisted(cfg, orgID)

Expect(isBlocked).To(Equal(result))
},

Entry(
"unblocked orgid",
"01234",
false,
),
Entry(
"blocked org_id - 1",
"1337",
true,
),
Entry(
"blocked org_id - 2",
"1234",
true,
),
)
})
5 changes: 5 additions & 0 deletions internal/api/controllers/private/runsCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func (this *controllers) ApiInternalRunsCreate(ctx echo.Context) error {
return handleRunCreateError(err)
}

if utils.IsOrgIdBlocklisted(this.config, orgIdString) {
utils.GetLogFromEcho(ctx).Debugw("Rejecting request because the org_id is blocklisted")
return handleRunCreateError(&utils.BlocklistedOrgIdError{OrgID: orgIdString})
}

hosts := parseRunHosts(runInputV1.Hosts)

context = utils.WithOrgId(context, orgIdString)
Expand Down
4 changes: 4 additions & 0 deletions internal/api/controllers/private/runsCreateActions.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func handleRunCreateError(err error) *RunCreated {
return runCreateError(http.StatusNotFound)
}

if _, ok := err.(*utils.BlocklistedOrgIdError); ok {
return runCreateError(http.StatusBadRequest)
}

return runCreateError(http.StatusInternalServerError)
}

Expand Down
5 changes: 5 additions & 0 deletions internal/api/controllers/private/runsCreateV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (this *controllers) ApiInternalV2RunsCreate(ctx echo.Context) error {
context := utils.WithOrgId(ctx.Request().Context(), string(runInputV2.OrgId))
context = utils.WithRequestType(context, getRequestTypeLabel(runInputV2))

if utils.IsOrgIdBlocklisted(this.config, string(runInputV2.OrgId)) {
utils.GetLogFromEcho(ctx).Debugw("Rejecting request because the org_id is blocklisted")
return handleRunCreateError(&utils.BlocklistedOrgIdError{OrgID: string(runInputV2.OrgId)})
}

recipient := parseValidatedUUID(string(runInputV2.Recipient))

hosts := parseRunHosts(runInputV2.Hosts)
Expand Down
2 changes: 2 additions & 0 deletions internal/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func Get() *viper.Viper {

options.SetDefault("db.sslmode", "disable")

options.SetDefault("blocklist.org.ids", "")

if clowder.IsClowderEnabled() {

cfg := clowder.LoadedConfig
Expand Down
8 changes: 8 additions & 0 deletions internal/common/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import (
"net/http"
)

type BlocklistedOrgIdError struct {
OrgID string
}

func UnexpectedResponse(res *http.Response) error {
return fmt.Errorf(`unexpected status code "%d" or content type "%s"`, res.StatusCode, res.Header.Get("content-type"))
}

func (this *BlocklistedOrgIdError) Error() string {
return fmt.Sprintf("This org_id (%s) is blocklisted.", this.OrgID)
}
11 changes: 11 additions & 0 deletions internal/common/utils/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,14 @@ func LoadSchemas(cfg *viper.Viper, schemaNames []string) (schemas []*jsonschema.
}
return
}

func IsOrgIdBlocklisted(cfg *viper.Viper, orgId string) bool {
blocklistedOrgIds := strings.Split(cfg.GetString("blocklist.org.ids"), ",")
for _, blockedOrgId := range blocklistedOrgIds {
if blockedOrgId == orgId {
return true
}
}

return false
}
6 changes: 6 additions & 0 deletions internal/validator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,18 @@ func (this *handler) onMessage(ctx context.Context, msg *kafka.Message) {
ctx = utils.SetLog(ctx, utils.GetLogFromContext(ctx).With("url", request.URL))
utils.GetLogFromContext(ctx).Debugw("Processing request",
"account", request.Account,
"org_id", request.OrgID,
"topic", *msg.TopicPartition.Topic,
"partition", msg.TopicPartition.Partition,
"offset", msg.TopicPartition.Offset.String(),
"size", request.Size,
)

if utils.IsOrgIdBlocklisted(cfg, request.OrgID) {
utils.GetLogFromContext(ctx).Debugw("Rejecting payload because the org_id is blocklisted")
return
}

if err := this.validateRequest(&request); err != nil {
this.validationFailed(ctx, err, requestType, &request)
return
Expand Down
38 changes: 38 additions & 0 deletions internal/validator/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package validator
import (
"bytes"
"encoding/base64"
"encoding/json"
"io/ioutil"
"playbook-dispatcher/internal/common/constants"
kafkaUtils "playbook-dispatcher/internal/common/kafka"
messageModel "playbook-dispatcher/internal/common/model/message"
"playbook-dispatcher/internal/common/utils/test"

k "github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/ghodss/yaml"
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
Expand Down Expand Up @@ -56,6 +61,22 @@ var _ = Describe("Handler", func() {
})
})

Describe("Blocklisted OrgIDs", func() {
It("Rejects archives if org_id is blocklisted", func() {
cfg.Set("blocklist.org.ids", "1337")

req := &messageModel.IngressValidationRequest{
OrgID: "1337",
Size: 1024,
RequestID: "1234-56789",
}

kafkaMessage := newKafkaMessage(req, playbookPayloadHeaderValue)

instance.onMessage(test.TestContext(), kafkaMessage)
})
})

Describe("Validation", func() {

DescribeTable("Rejects invalid files",
Expand Down Expand Up @@ -185,3 +206,20 @@ fdqPl7IwpOzJmfqrZ1duqTJ62NbTeDDPjOvQ6F70PsJi4KXiLSqngthpIkJLtF3l

// TODO: test parsing (timestamps, etc.)
})

func newKafkaMessage(value interface{}, requestType string) *k.Message {
marshalled, err := json.Marshal(value)
Expect(err).ToNot(HaveOccurred())

topic := "platform.upload.announce"

return &k.Message{
Value: marshalled,
Headers: kafkaUtils.Headers(constants.HeaderRequestId, "test", constants.HeaderRequestType, requestType),
TopicPartition: k.TopicPartition{
Topic: &topic,
Partition: 0,
Offset: k.Offset(0),
},
}
}

0 comments on commit 22853a4

Please sign in to comment.