Skip to content

Commit

Permalink
sidecar: send sync logs directly to postgres omitting bulker
Browse files Browse the repository at this point in the history
docs update
  • Loading branch information
absorbb committed Jan 30, 2024
1 parent 443e249 commit 0cbf1ac
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 41 deletions.
33 changes: 20 additions & 13 deletions .docs/server-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,6 @@ A list of auth tokens that authorizes user in HTTP interface separated by comma.

See above. A secret that is used for hashing tokens.

### `BULKER_MODE`

>**Note:**
> Not available yet. At the moment Bulker is always running in `two-way` mode.
*Optional, default: `two-way`*

How this particular instance of Bulker should work. Possible values:

* `producer` - this instance will only listen to [HTTP requests](./http-api.md) and send data to Kafka. It won't consume from Kafka
* `consumer` - this instance will only consume from Kafka. It won't listen to [HTTP requests](./http-api.md), except for `/ready` and `/metrics` endpoints
* `two-way` - this instance will both listen produce messages from [HTTP requests](./http-api.md) and consume from Kafka


## Connection to Kafka

Expand Down Expand Up @@ -254,6 +241,26 @@ Each environment variable `BULKER_DESTINATION_*` defines a destination. The valu
BULKER_DESTINATION_POSTGRES="{id: 'postgres', }"
```

### With HTTP Endpoint

#### `BULKER_CONFIG_SOURCE`

URL of endpoint that returns configuration of destination entities entities.

E.g. `jitsucom/console`'s export endpoint: `http://<consoles-domain>/api/admin/export/bulker-connections`

#### `BULKER_CONFIG_SOURCE_HTTP_AUTH_TOKEN`

Auth token for accessing `BULKER_CONFIG_SOURCE` endpoint.

E.g. for `jitsucom/console`'s export endpoint: `service-admin-account:CONSOLE_AUTH_TOKENS`

#### `BULKER_CONFIG_REFRESH_PERIOD_SEC`

**Default value: `5`**

Period in seconds for refreshing configuration from `BULKER_CONFIG_SOURCE` endpoint.

### With Redis

Set `BULKER_CONFIG_SOURCE` to `redis://...` or `rediss://...` and Bulker will read destinations from Redis `enrichedConnections` key.
Expand Down
2 changes: 0 additions & 2 deletions ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ type Config struct {
// # REPOSITORY CONFIG - settings for loading streams from repository
RepositoryConfig `mapstructure:",squash"`

DatabaseURL string `mapstructure:"DATABASE_URL"`

DataDomain string `mapstructure:"DATA_DOMAIN"`

// For ingest endpoint only
Expand Down
2 changes: 0 additions & 2 deletions sync-controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ type Config struct {
BulkerURL string `mapstructure:"BULKER_URL" default:"http://localhost:3042"`
BulkerAuthToken string `mapstructure:"BULKER_AUTH_TOKEN"`

BulkerLogsConnectionId string `mapstructure:"BULKER_LOGS_CONNECTION_ID"`

// # Kubernetes

// KubernetesNamespace namespace of bulker app. Default: `default`
Expand Down
23 changes: 11 additions & 12 deletions sync-controller/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,18 +302,17 @@ func (j *JobRunner) createPod(podName string, task TaskDescriptor, configuration
}
databaseURL := utils.NvlString(j.config.SidecarDatabaseURL, j.config.DatabaseURL)
sideCarEnv := map[string]string{
"STDOUT_PIPE_FILE": "/pipes/stdout",
"STDERR_PIPE_FILE": "/pipes/stderr",
"BULKER_URL": j.config.BulkerURL,
"BULKER_AUTH_TOKEN": j.config.BulkerAuthToken,
"LOGS_CONNECTION_ID": j.config.BulkerLogsConnectionId,
"PACKAGE": task.Package,
"PACKAGE_VERSION": task.PackageVersion,
"COMMAND": task.TaskType,
"TABLE_NAME_PREFIX": task.TableNamePrefix,
"DATABASE_URL": databaseURL,
"STARTED_BY": task.StartedBy,
"STARTED_AT": task.StartedAt,
"STDOUT_PIPE_FILE": "/pipes/stdout",
"STDERR_PIPE_FILE": "/pipes/stderr",
"BULKER_URL": j.config.BulkerURL,
"BULKER_AUTH_TOKEN": j.config.BulkerAuthToken,
"PACKAGE": task.Package,
"PACKAGE_VERSION": task.PackageVersion,
"COMMAND": task.TaskType,
"TABLE_NAME_PREFIX": task.TableNamePrefix,
"DATABASE_URL": databaseURL,
"STARTED_BY": task.StartedBy,
"STARTED_AT": task.StartedAt,
}
if task.SyncID != "" {
sideCarEnv["SYNC_ID"] = task.SyncID
Expand Down
7 changes: 7 additions & 0 deletions sync-sidecar/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ ON CONFLICT ON CONSTRAINT source_check_pkey DO UPDATE SET status = $4, descripti

insertCheckErrorSQL = `INSERT INTO source_check (package, version, key, status, description, timestamp) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT ON CONSTRAINT source_check_pkey DO NOTHING`

insertIntoTaskLog = `INSERT INTO task_log (id, level, logger, message, sync_id, task_id, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7)`
)

func UpsertSpec(dbpool *pgxpool.Pool, packageName, packageVersion, specs any, timestamp time.Time, error string) error {
Expand Down Expand Up @@ -94,3 +96,8 @@ func InsertCheckError(dbpool *pgxpool.Pool, packageName, packageVersion, storage
_, err := dbpool.Exec(context.Background(), insertCheckErrorSQL, packageName, packageVersion, storageKey, status, description, timestamp)
return err
}

func InsertTaskLog(dbpool *pgxpool.Pool, id, level, logger, message, syncId, taskId string, timestamp time.Time) error {
_, err := dbpool.Exec(context.Background(), insertIntoTaskLog, id, level, logger, message, syncId, taskId, timestamp)
return err
}
14 changes: 2 additions & 12 deletions sync-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/sync-sidecar/db"
"io"
"net/http"
"net/url"
Expand All @@ -26,7 +27,6 @@ type AbstractSideCar struct {

stdOutPipeFile string
stdErrPipeFile string
logsConnection string

bulkerURL string
bulkerAuthToken string
Expand Down Expand Up @@ -57,7 +57,6 @@ func main() {
packageVersion: os.Getenv("PACKAGE_VERSION"),
stdOutPipeFile: os.Getenv("STDOUT_PIPE_FILE"),
stdErrPipeFile: os.Getenv("STDERR_PIPE_FILE"),
logsConnection: os.Getenv("LOGS_CONNECTION_ID"),
bulkerURL: os.Getenv("BULKER_URL"),
bulkerAuthToken: os.Getenv("BULKER_AUTH_TOKEN"),
databaseURL: os.Getenv("DATABASE_URL"),
Expand Down Expand Up @@ -143,16 +142,7 @@ func (s *AbstractSideCar) _log(logger, level, message string) {
}

func (s *AbstractSideCar) sendLog(logger, level string, message string) error {
logMessage := map[string]any{
"id": uuid.New().String(),
"timestamp": time.Now().Format(time.RFC3339Nano),
"sync_id": s.syncId,
"task_id": s.taskId,
"logger": logger,
"level": level,
"message": message,
}
return s.bulkerEvent(s.logsConnection, "task_log", logMessage)
return db.InsertTaskLog(s.dbpool, uuid.New().String(), level, logger, message, s.syncId, s.taskId, time.Now())
}

func (s *AbstractSideCar) bulkerEvent(connection, tableName string, payload any) error {
Expand Down

0 comments on commit 0cbf1ac

Please sign in to comment.