Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RHCLOUD-33514 Implement blocklist #374

Merged
merged 11 commits into from
Sep 25, 2024
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ sample_upload.xz:
sample_rhc_sat_upload:
curl -v -F "file=@examples/rhcsat-success.jsonl;type=application/vnd.redhat.playbook-sat.v3+jsonl" -H "x-rh-identity: eyJpZGVudGl0eSI6IHsiYWNjb3VudF9udW1iZXIiOiAiMDAwMDAwMSIsICJ0eXBlIjogIlN5c3RlbSIsICJpbnRlcm5hbCI6IHsib3JnX2lkIjogIjAwMDAwMSJ9fX0=" -H "x-rh-request_id: 380b4a04-7eae-4dff-a0b8-6e1af9186df0" http://localhost:8080/api/ingress/v1/upload

sample_blocked_upload:
curl -v -F "file=@examples/events-success.jsonl;type=application/vnd.redhat.playbook.v1+jsonl" -H "x-rh-identity: eyJpZGVudGl0eSI6IHsiYWNjb3VudF9udW1iZXIiOiAiMDAwMDAwMSIsICJ0eXBlIjogIlN5c3RlbSIsICJpbnRlcm5hbCI6IHsib3JnX2lkIjogIjEzMzcifX19" -H "x-rh-request_id: 380b4a04-7eae-4dff-a0b8-6e1af9186df0" http://localhost:8080/api/ingress/v1/upload

sample: sample_request sample_upload

connector_create:
Expand Down
8 changes: 8 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ objects:
- name: SOURCES_PORT
value: ${SOURCES_CONNECTOR_PORT}

- name: BLOCKLIST_ORG_IDS
value: ${BLOCKLIST_ORG_IDS}

resources:
limits:
cpu: ${CPU_LIMIT}
Expand Down Expand Up @@ -266,6 +269,8 @@ objects:
value: ${STORAGE_MAX_CONCURRENCY}
- name: ARTIFACT_MAX_SIZE
value: ${ARTIFACT_MAX_SIZE}
- name: BLOCKLIST_ORG_IDS
value: ${BLOCKLIST_ORG_IDS}
resources:
limits:
cpu: ${CPU_LIMIT}
Expand Down Expand Up @@ -378,6 +383,9 @@ parameters:
- name: SOURCES_CONNECTOR_PORT
value: '8080'

- name: BLOCKLIST_ORG_IDS
value: ""

# Used for testing in ephemeral environments only.
- name: PSK_AUTH_TEST
value: "" # If a value is not provided the principal is ignored.
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:
CLOWDER_ENABLED: "false"
DB_HOST: "db"
PSK_AUTH_TEST: "xwKhCUzgJ8"
BLOCKLIST_ORG_IDS: "1337,7331"
restart: unless-stopped

zookeeper:
Expand Down Expand Up @@ -77,7 +78,7 @@ services:
- '8080:3000'
environment:
- INGRESS_STAGEBUCKET=insights-upload-perma
- INGRESS_VALIDTOPICS=playbook,playbook-sat
- INGRESS_VALID_UPLOAD_TYPES=playbook,playbook-sat
- OPENSHIFT_BUILD_COMMIT=somestring
- INGRESS_MAXSIZE=104857600
- INGRESS_MINIODEV=true
Expand Down
33 changes: 33 additions & 0 deletions internal/api/controllers/private/private_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"

"playbook-dispatcher/internal/common/config"
"playbook-dispatcher/internal/common/utils"
)

func TestConfig(t *testing.T) {
Expand Down Expand Up @@ -55,3 +58,33 @@ var _ = Describe("Validation", func() {
),
)
})

var _ = Describe("Blocklisted OrgIDs", func() {
DescribeTable("validateFields",
func(orgID string, result bool) {
cfg := config.Get()

cfg.Set("blocklist.org.ids", "1337,1234")

isBlocked := utils.IsOrgIdBlocklisted(cfg, orgID)

Expect(isBlocked).To(Equal(result))
},

Entry(
"unblocked orgid",
"01234",
false,
),
Entry(
"blocked org_id - 1",
"1337",
true,
),
Entry(
"blocked org_id - 2",
"1234",
true,
),
)
})
5 changes: 5 additions & 0 deletions internal/api/controllers/private/runsCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func (this *controllers) ApiInternalRunsCreate(ctx echo.Context) error {
return handleRunCreateError(err)
}

if utils.IsOrgIdBlocklisted(this.config, orgIdString) {
utils.GetLogFromEcho(ctx).Debugw("Rejecting request because the org_id is blocklisted")
return handleRunCreateError(&utils.BlocklistedOrgIdError{OrgID: orgIdString})
}

hosts := parseRunHosts(runInputV1.Hosts)

context = utils.WithOrgId(context, orgIdString)
Expand Down
4 changes: 4 additions & 0 deletions internal/api/controllers/private/runsCreateActions.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func handleRunCreateError(err error) *RunCreated {
return runCreateError(http.StatusNotFound)
}

if _, ok := err.(*utils.BlocklistedOrgIdError); ok {
return runCreateError(http.StatusBadRequest)
}

return runCreateError(http.StatusInternalServerError)
}

Expand Down
5 changes: 5 additions & 0 deletions internal/api/controllers/private/runsCreateV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (this *controllers) ApiInternalV2RunsCreate(ctx echo.Context) error {
context := utils.WithOrgId(ctx.Request().Context(), string(runInputV2.OrgId))
context = utils.WithRequestType(context, getRequestTypeLabel(runInputV2))

if utils.IsOrgIdBlocklisted(this.config, string(runInputV2.OrgId)) {
utils.GetLogFromEcho(ctx).Debugw("Rejecting request because the org_id is blocklisted")
return handleRunCreateError(&utils.BlocklistedOrgIdError{OrgID: string(runInputV2.OrgId)})
}

recipient := parseValidatedUUID(string(runInputV2.Recipient))

hosts := parseRunHosts(runInputV2.Hosts)
Expand Down
2 changes: 2 additions & 0 deletions internal/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func Get() *viper.Viper {

options.SetDefault("db.sslmode", "disable")

options.SetDefault("blocklist.org.ids", "")

if clowder.IsClowderEnabled() {

cfg := clowder.LoadedConfig
Expand Down
8 changes: 8 additions & 0 deletions internal/common/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import (
"net/http"
)

type BlocklistedOrgIdError struct {
OrgID string
}

func UnexpectedResponse(res *http.Response) error {
return fmt.Errorf(`unexpected status code "%d" or content type "%s"`, res.StatusCode, res.Header.Get("content-type"))
}

func (this *BlocklistedOrgIdError) Error() string {
return fmt.Sprintf("This org_id (%s) is blocklisted.", this.OrgID)
}
11 changes: 11 additions & 0 deletions internal/common/utils/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,14 @@ func LoadSchemas(cfg *viper.Viper, schemaNames []string) (schemas []*jsonschema.
}
return
}

func IsOrgIdBlocklisted(cfg *viper.Viper, orgId string) bool {
blocklistedOrgIds := strings.Split(cfg.GetString("blocklist.org.ids"), ",")
for _, blockedOrgId := range blocklistedOrgIds {
if blockedOrgId == orgId {
return true
}
}

return false
}
6 changes: 6 additions & 0 deletions internal/validator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,18 @@ func (this *handler) onMessage(ctx context.Context, msg *kafka.Message) {
ctx = utils.SetLog(ctx, utils.GetLogFromContext(ctx).With("url", request.URL))
utils.GetLogFromContext(ctx).Debugw("Processing request",
"account", request.Account,
"org_id", request.OrgID,
"topic", *msg.TopicPartition.Topic,
"partition", msg.TopicPartition.Partition,
"offset", msg.TopicPartition.Offset.String(),
"size", request.Size,
)

if utils.IsOrgIdBlocklisted(cfg, request.OrgID) {
utils.GetLogFromContext(ctx).Debugw("Rejecting payload because the org_id is blocklisted")
return
}

if err := this.validateRequest(&request); err != nil {
this.validationFailed(ctx, err, requestType, &request)
return
Expand Down
38 changes: 38 additions & 0 deletions internal/validator/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package validator
import (
"bytes"
"encoding/base64"
"encoding/json"
"io/ioutil"
"playbook-dispatcher/internal/common/constants"
kafkaUtils "playbook-dispatcher/internal/common/kafka"
messageModel "playbook-dispatcher/internal/common/model/message"
"playbook-dispatcher/internal/common/utils/test"

k "github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/ghodss/yaml"
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
Expand Down Expand Up @@ -56,6 +61,22 @@ var _ = Describe("Handler", func() {
})
})

Describe("Blocklisted OrgIDs", func() {
It("Rejects archives if org_id is blocklisted", func() {
cfg.Set("blocklist.org.ids", "1337")

req := &messageModel.IngressValidationRequest{
OrgID: "1337",
Size: 1024,
RequestID: "1234-56789",
}

kafkaMessage := newKafkaMessage(req, playbookPayloadHeaderValue)

instance.onMessage(test.TestContext(), kafkaMessage)
})
})

Describe("Validation", func() {

DescribeTable("Rejects invalid files",
Expand Down Expand Up @@ -185,3 +206,20 @@ fdqPl7IwpOzJmfqrZ1duqTJ62NbTeDDPjOvQ6F70PsJi4KXiLSqngthpIkJLtF3l

// TODO: test parsing (timestamps, etc.)
})

func newKafkaMessage(value interface{}, requestType string) *k.Message {
marshalled, err := json.Marshal(value)
Expect(err).ToNot(HaveOccurred())

topic := "platform.upload.announce"

return &k.Message{
Value: marshalled,
Headers: kafkaUtils.Headers(constants.HeaderRequestId, "test", constants.HeaderRequestType, requestType),
TopicPartition: k.TopicPartition{
Topic: &topic,
Partition: 0,
Offset: k.Offset(0),
},
}
}
Loading