Skip to content

Commit

Permalink
bulker: increase topic partitions count when config changes
Browse files Browse the repository at this point in the history
ingest: browser mode: don't require slug or write key when only 1 site is configured
build: added allbuild.sh script
  • Loading branch information
absorbb committed Jan 25, 2024
1 parent 2fb2a0d commit 053dc8e
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 47 deletions.
20 changes: 13 additions & 7 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
/bulkerapp/bulker.env
/go.work
/go.work.sum
/sync-controller/syncctl.env
/bulkerapp/bulkerapp
.idea/*

**/.logs

#Compiled binaries
bulkerapp/bulkerapp
ingest/ingest
sync-controller/syncctl
sync-sidecar/sidecar

#Go workspace files
go.work
go.work.sum
48 changes: 48 additions & 0 deletions admin/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/hcsshim v0.11.0 h1:7EFNIY4igHEXUdj1zXgAyU3fLc7QfOKHbkldRVTBdiM=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
github.com/containerd/containerd v1.7.6 h1:oNAVsnhPoy4BTPQivLgTzI9Oleml9l/+eYIDYXRCYo8=
github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
github.com/docker/docker v24.0.6+incompatible h1:hceabKCtUgDqPu+qm0NgsaXf28Ljf4/pWFL7xjWWDgE=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/hjson/hjson-go/v4 v4.3.1 h1:wfmDwHGxjzmYKXRFL0Qr9nonY/Xxe5y7IalwjlY7ekA=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=
github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI=
github.com/opencontainers/runc v1.1.7 h1:y2EZDS8sNng4Ksf0GUYNhKbTShZJPJg1FiXJNH/uoCk=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/shirou/gopsutil/v3 v3.23.8 h1:xnATPiybo6GgdRoC4YoGnxXZFRc3dqQTGi73oLvvBrE=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/testcontainers/testcontainers-go v0.25.0 h1:erH6cQjsaJrH+rJDU9qIf89KFdhK0Bft0aEZHlYC3Vs=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb h1:c0vyKkb6yr3KR7jEfJaOSv4lG7xPkbN6r52aJz1d8a8=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
70 changes: 70 additions & 0 deletions all.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
FROM debian:bookworm-slim as base

RUN apt-get update -y
RUN apt-get install -y ca-certificates curl

ENV TZ=UTC

WORKDIR /app

FROM golang:1.21.6-bookworm as builder

ARG VERSION
ENV VERSION $VERSION
ARG BUILD_TIMESTAMP
ENV BUILD_TIMESTAMP $BUILD_TIMESTAMP

RUN apt-get install gcc libc6-dev

WORKDIR /app

RUN mkdir jitsubase kafkabase eventslog bulkerlib bulkerapp ingest sync-sidecar sync-controller
RUN mkdir connectors connectors/airbytecdk connectors/firebase

COPY jitsubase/go.* ./jitsubase/
COPY kafkabase/go.* ./kafkabase/
COPY eventslog/go.* ./eventslog/
COPY bulkerlib/go.* ./bulkerlib/
COPY bulkerapp/go.* ./bulkerapp/
COPY ingest/go.* ./ingest/
COPY sync-sidecar/go.* ./sync-sidecar/
COPY sync-controller/go.* ./sync-controller/

COPY admin/go.* ./admin/
COPY connectors/airbytecdk/go.* ./connectors/airbytecdk/
COPY connectors/firebase/go.* ./connectors/firebase/
COPY go.work ./go.work
COPY go.work.sum ./go.work.sum

#RUN go work init jitsubase kafkabase eventslog bulkerlib bulkerapp ingest sync-sidecar sync-controller
RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go mod download

WORKDIR /app

COPY . .

RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o bulker ./bulkerapp
RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o ingest ./ingest
RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o sidecar ./sync-sidecar
RUN --mount=type=cache,id=go_mod,mode=0755,target=/go/pkg/mod go build -ldflags="-X main.Commit=$VERSION -X main.Timestamp=$BUILD_TIMESTAMP" -o syncctl ./sync-controller


FROM base as bulker

COPY --from=builder /app/bulker ./
CMD ["/app/bulker"]

FROM base as ingest

COPY --from=builder /app/ingest ./
CMD ["/app/ingest"]

FROM base as sidecar

COPY --from=builder /app/sidecar ./
CMD ["/app/sidecar"]

FROM base as syncctl

COPY --from=builder /app/syncctl ./
CMD ["/app/syncctl"]
9 changes: 9 additions & 0 deletions allbuild.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

VERSION=$(git log -1 --pretty=%h)
BUILD_TIMESTAMP=$( date '+%F_%H:%M:%S' )

docker buildx build --build-arg VERSION="$VERSION" --build-arg BUILD_TIMESTAMP="$BUILD_TIMESTAMP" --platform linux/amd64 -f all.Dockerfile --target bulker -t jitsucom/bulker:"$VERSION" -t jitsucom/bulker:beta --push .
docker buildx build --build-arg VERSION="$VERSION" --build-arg BUILD_TIMESTAMP="$BUILD_TIMESTAMP" --platform linux/amd64 -f all.Dockerfile --target ingest -t jitsucom/ingest:"$VERSION" -t jitsucom/ingest:beta --push .
docker buildx build --build-arg VERSION="$VERSION" --build-arg BUILD_TIMESTAMP="$BUILD_TIMESTAMP" --platform linux/amd64 -f all.Dockerfile --target sidecar -t jitsucom/sidecar:"$VERSION" -t jitsucom/sidecar:beta --push .
docker buildx build --build-arg VERSION="$VERSION" --build-arg BUILD_TIMESTAMP="$BUILD_TIMESTAMP" --platform linux/amd64 -f all.Dockerfile --target syncctl -t jitsucom/syncctl:"$VERSION" -t jitsucom/syncctl:beta --push .
64 changes: 45 additions & 19 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ type TopicManager struct {
repository *Repository
cron *Cron
// consumedTopics by destinationId. Consumed topics are topics that have consumer started
consumedTopics map[string]utils.Set[string]
abandonedTopics utils.Set[string]
staleTopics utils.Set[string]
allTopics utils.Set[string]
consumedTopics map[string]utils.Set[string]
topicsLastMessageDates map[string]time.Time
abandonedTopics utils.Set[string]
staleTopics utils.Set[string]
allTopics utils.Set[string]

//batch consumers by destinationId
batchConsumers map[string][]BatchConsumer
Expand Down Expand Up @@ -142,20 +143,20 @@ func (tm *TopicManager) LoadMetadata() {
}
}
}
//start := time.Now()
//res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
//if err != nil {
// tm.Errorf("Error getting topic offsets: %v", err)
//} else {
// for tp, offset := range res.ResultInfos {
// if offset.Offset >= 0 {
// topicsLastMessageDates[*tp.Topic] = time.UnixMilli(offset.Timestamp)
// } else {
// topicsLastMessageDates[*tp.Topic] = time.Time{}
// }
// }
//}
//tm.Infof("Got topic offsets in %v", time.Since(start))
start := time.Now()
res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
if err != nil {
tm.Errorf("Error getting topic offsets: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset >= 0 {
topicsLastMessageDates[*tp.Topic] = time.UnixMilli(offset.Timestamp)
} else {
topicsLastMessageDates[*tp.Topic] = time.Time{}
}
}
tm.Debugf("Got topic offsets for %d topics in %v", len(topicsLastMessageDates), time.Since(start))
}

if err != nil {
metrics.TopicManagerError("load_metadata_error").Inc()
Expand All @@ -169,6 +170,9 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat
tm.Lock()
defer tm.Unlock()
start := time.Now()
if len(lastMessageDates) > 0 {
tm.topicsLastMessageDates = lastMessageDates
}
staleTopicsCutOff := time.Now().Add(-1 * time.Duration(tm.config.KafkaTopicRetentionHours) * time.Hour)
var abandonedTopicsCount float64
var otherTopicsCount float64
Expand All @@ -184,7 +188,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat
abandonedTopicsCount++
continue
}
lastMessageDate, ok := lastMessageDates[topic]
lastMessageDate, ok := tm.topicsLastMessageDates[topic]
if ok && (lastMessageDate.IsZero() || lastMessageDate.Before(staleTopicsCutOff)) {
staleTopics.Put(topic)
tm.Debugf("Topic %s is stale. Last message date: %v", topic, lastMessageDate)
Expand Down Expand Up @@ -517,6 +521,28 @@ func (tm *TopicManager) EnsureDestinationTopic(destination *Destination, topicId
func (tm *TopicManager) ensureTopic(topicId string, partitions int, config map[string]string) error {
if !tm.allTopics.Contains(topicId) {
return tm.createTopic(topicId, partitions, config)
} else if !tm.ready && partitions > 1 {
//check topic partitions count and increase when necessary
meta, err := tm.kaftaAdminClient.GetMetadata(&topicId, false, tm.config.KafkaAdminMetadataTimeoutMs)
if err != nil {
tm.SystemErrorf("Error getting metadata for topic %s: %v", topicId, err)
}
m, ok := meta.Topics[topicId]
if ok {
currentPartitionsCount := len(m.Partitions)
if partitions > currentPartitionsCount {
tm.Infof("Topic %s has %d partitions. Increasing to %d", topicId, currentPartitionsCount, partitions)
_, err = tm.kaftaAdminClient.CreatePartitions(context.Background(), []kafka.PartitionsSpecification{
{
Topic: topicId,
IncreaseTo: partitions,
},
})
if err != nil {
tm.SystemErrorf("Error increasing partitions for topic %s: %v", topicId, err)
}
}
}
}
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions ingest/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/eventslog"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/jitsubase/pg"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/kafkabase"
"net/http"
Expand All @@ -18,7 +16,6 @@ import (
type Context struct {
config *Config
kafkaConfig *kafka.ConfigMap
dbpool *pgxpool.Pool
repository appbase.Repository[Streams]
scriptRepository appbase.Repository[Script]
producer *kafkabase.Producer
Expand All @@ -35,10 +32,6 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
if err != nil {
return err
}
a.dbpool, err = pg.NewPGPool(a.config.DatabaseURL)
if err != nil {
return fmt.Errorf("Unable to create postgres connection pool: %v\n", err)
}
a.repository = NewStreamsRepository(a.config.RepositoryURL, a.config.RepositoryAuthToken, a.config.RepositoryRefreshPeriodSec, a.config.CacheDir)
a.scriptRepository = NewScriptRepository(a.config.ScriptOrigin, a.config.CacheDir)
a.eventsLogService = &eventslog.DummyEventsLogService{}
Expand Down Expand Up @@ -86,7 +79,6 @@ func (a *Context) Cleanup() error {
_ = a.eventsLogService.Close()
_ = a.scriptRepository.Close()
a.repository.Close()
a.dbpool.Close()
return nil
}

Expand Down
7 changes: 1 addition & 6 deletions ingest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ go 1.21
require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/gin-gonic/gin v1.9.1
github.com/jackc/pgx/v5 v5.5.1
github.com/json-iterator/go v1.1.12
github.com/penglongli/gin-metrics v0.1.10
github.com/prometheus/client_golang v1.17.0
github.com/spf13/viper v1.17.0
Expand All @@ -31,9 +29,7 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand Down Expand Up @@ -66,7 +62,6 @@ require (
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
Expand Down
5 changes: 0 additions & 5 deletions ingest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
Expand Down Expand Up @@ -91,7 +87,6 @@ golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb h1:c0vyKkb6yr3KR7jEfJaOSv4lG7xPkbN6r52aJz1d8a8=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
Expand Down
6 changes: 5 additions & 1 deletion ingest/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type RepositoryConfig struct {
RepositoryURL string `mapstructure:"REPOSITORY_URL" default:"http://console:3000/api/admin/export/streams-with-destinations"`
RepositoryURL string `mapstructure:"REPOSITORY_URL"`
RepositoryAuthToken string `mapstructure:"REPOSITORY_AUTH_TOKEN"`
RepositoryRefreshPeriodSec int `mapstructure:"REPOSITORY_REFRESH_PERIOD_SEC" default:"2"`
}
Expand All @@ -35,6 +35,10 @@ func (s *Streams) GetStreamsByDomain(domain string) []*StreamWithDestinations {
return s.streamsByDomains[domain]
}

func (s *Streams) GetStreams() []*StreamWithDestinations {
return s.streams
}

type StreamsRepositoryData struct {
data atomic.Pointer[Streams]
}
Expand Down
10 changes: 9 additions & 1 deletion ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (r *Router) getStream(loc *StreamCredentials) *StreamWithDestinations {
} else if loc.IngestType == IngestTypeS2S {
locators = []StreamLocator{r.WriteKeyStreamLocator, r.SlugStreamLocator, r.AmbiguousDomainStreamLocator}
} else {
locators = []StreamLocator{r.SlugStreamLocator, r.DomainStreamLocator, r.WriteKeyStreamLocator}
locators = []StreamLocator{r.SlugStreamLocator, r.DomainStreamLocator, r.WriteKeyStreamLocator, r.SlugStreamLocator}
}
for _, locator := range locators {
stream := locator(loc)
Expand Down Expand Up @@ -510,6 +510,14 @@ func (r *Router) AmbiguousDomainStreamLocator(loc *StreamCredentials) *StreamWit
return nil
}

func (r *Router) SoleStreamLocator(_ *StreamCredentials) *StreamWithDestinations {
streams := r.repository.GetData().GetStreams()
if len(streams) == 1 {
return streams[0]
}
return nil
}

func maskWriteKey(writeKey string) string {
if writeKey != "" {
parts := strings.Split(writeKey, ":")
Expand Down

0 comments on commit 053dc8e

Please sign in to comment.