diff --git a/frontend/apps/web/app/(mgmt)/[account]/jobs/[id]/source/components/DataGenConnectionCard.tsx b/frontend/apps/web/app/(mgmt)/[account]/jobs/[id]/source/components/DataGenConnectionCard.tsx index 2ad7b7261e..7369281c7d 100644 --- a/frontend/apps/web/app/(mgmt)/[account]/jobs/[id]/source/components/DataGenConnectionCard.tsx +++ b/frontend/apps/web/app/(mgmt)/[account]/jobs/[id]/source/components/DataGenConnectionCard.tsx @@ -1,7 +1,7 @@ 'use client'; import { - SINGLE_TABLE_SCHEMA_FORM_SCHEMA, - SingleTableSchemaFormValues, + MULTI_TABLE_SCHEMA_FORM_SCHEMA, + MultiTableSchemaFormValues, } from '@/app/(mgmt)/[account]/new/job/schema'; import { SchemaTable } from '@/components/jobs/SchemaTable/SchemaTable'; import { getSchemaConstraintHandler } from '@/components/jobs/SchemaTable/schema-constraint-handler'; @@ -87,8 +87,8 @@ export default function DataGenConnectionCard({ jobId }: Props): ReactElement { fkSourceConnectionId ?? '' ); - const form = useForm({ - resolver: yupResolver(SINGLE_TABLE_SCHEMA_FORM_SCHEMA), + const form = useForm({ + resolver: yupResolver(MULTI_TABLE_SCHEMA_FORM_SCHEMA), values: getJobSource(data?.job), }); @@ -107,7 +107,7 @@ export default function DataGenConnectionCard({ jobId }: Props): ReactElement { return ; } - async function onSubmit(values: SingleTableSchemaFormValues) { + async function onSubmit(values: MultiTableSchemaFormValues) { const job = data?.job; if (!job) { return; @@ -182,7 +182,7 @@ export default function DataGenConnectionCard({ jobId }: Props): ReactElement { ); } -function getJobSource(job?: Job): SingleTableSchemaFormValues { +function getJobSource(job?: Job): MultiTableSchemaFormValues { if (!job) { return { mappings: [], @@ -200,7 +200,7 @@ function getJobSource(job?: Job): SingleTableSchemaFormValues { } } - const mappings: SingleTableSchemaFormValues['mappings'] = ( + const mappings: MultiTableSchemaFormValues['mappings'] = ( job.mappings ?? [] ).map((mapping) => { return { @@ -221,10 +221,31 @@ function getJobSource(job?: Job): SingleTableSchemaFormValues { async function updateJobConnection( accountId: string, job: Job, - values: SingleTableSchemaFormValues + values: MultiTableSchemaFormValues ): Promise { - const schema = values.mappings.length > 0 ? values.mappings[0].schema : null; - const table = values.mappings.length > 0 ? values.mappings[0].table : null; + const schemas = values.mappings.reduce( + (prev, curr) => { + const prevTables = prev[curr.schema] || {}; + return { + ...prev, + [curr.schema]: { ...prevTables, [curr.table]: curr.table }, + }; + }, + {} as Record> + ); + const schemaRecords = Object.entries(schemas).map(([s, tables]) => { + return new GenerateSourceSchemaOption({ + schema: s, + tables: Object.keys(tables).map( + (t) => + new GenerateSourceTableOption({ + rowCount: BigInt(values.numRows), + table: t, + }) + ), + }); + }); + const res = await fetch( `/api/accounts/${accountId}/jobs/${job.id}/source-connection`, { @@ -252,20 +273,7 @@ async function updateJobConnection( case: 'generate', value: new GenerateSourceOptions({ fkSourceConnectionId: getFkIdFromGenerateSource(job.source), - schemas: - schema && table - ? [ - new GenerateSourceSchemaOption({ - schema: schema, - tables: [ - new GenerateSourceTableOption({ - table: table, - rowCount: BigInt(values.numRows), - }), - ], - }), - ] - : [], + schemas: schemaRecords, }), }, }), diff --git a/frontend/apps/web/app/(mgmt)/[account]/new/job/generate/single/schema/page.tsx b/frontend/apps/web/app/(mgmt)/[account]/new/job/generate/single/schema/page.tsx index 27bdc86b45..db91344459 100644 --- a/frontend/apps/web/app/(mgmt)/[account]/new/job/generate/single/schema/page.tsx +++ b/frontend/apps/web/app/(mgmt)/[account]/new/job/generate/single/schema/page.tsx @@ -57,9 +57,9 @@ import { useSessionStorage } from 'usehooks-ts'; import JobsProgressSteps, { DATA_GEN_STEPS } from '../../../JobsProgressSteps'; import { DefineFormValues, - SINGLE_TABLE_SCHEMA_FORM_SCHEMA, + MULTI_TABLE_SCHEMA_FORM_SCHEMA, + MultiTableSchemaFormValues, SingleTableConnectFormValues, - SingleTableSchemaFormValues, } from '../../../schema'; const isBrowser = () => typeof window !== 'undefined'; @@ -98,7 +98,7 @@ export default function Page({ searchParams }: PageProps): ReactElement { const formKey = `${sessionPrefix}-new-job-single-table-schema`; - const [schemaFormData] = useSessionStorage( + const [schemaFormData] = useSessionStorage( formKey, { mappings: [], @@ -114,8 +114,8 @@ export default function Page({ searchParams }: PageProps): ReactElement { const form = useForm({ mode: 'onChange', - resolver: yupResolver( - SINGLE_TABLE_SCHEMA_FORM_SCHEMA + resolver: yupResolver( + MULTI_TABLE_SCHEMA_FORM_SCHEMA ), values: schemaFormData, }); @@ -130,7 +130,7 @@ export default function Page({ searchParams }: PageProps): ReactElement { setIsClient(true); }, []); - async function onSubmit(values: SingleTableSchemaFormValues) { + async function onSubmit(values: MultiTableSchemaFormValues) { if (!account) { return; } @@ -296,7 +296,7 @@ export default function Page({ searchParams }: PageProps): ReactElement { async function createNewJob( define: DefineFormValues, connect: SingleTableConnectFormValues, - schema: SingleTableSchemaFormValues, + schema: MultiTableSchemaFormValues, accountId: string, connections: Connection[] ): Promise { @@ -328,9 +328,29 @@ async function createNewJob( }), }); } - const tableSchema = - schema.mappings.length > 0 ? schema.mappings[0].schema : null; - const table = schema.mappings.length > 0 ? schema.mappings[0].table : null; + const schemas = schema.mappings.reduce( + (prev, curr) => { + const prevTables = prev[curr.schema] || {}; + return { + ...prev, + [curr.schema]: { ...prevTables, [curr.table]: curr.table }, + }; + }, + {} as Record> + ); + const schemaRecords = Object.entries(schemas).map(([s, tables]) => { + return new GenerateSourceSchemaOption({ + schema: s, + tables: Object.keys(tables).map( + (t) => + new GenerateSourceTableOption({ + rowCount: BigInt(schema.numRows), + table: t, + }) + ), + }); + }); + const body = new CreateJobRequest({ accountId, jobName: define.jobName, @@ -352,20 +372,7 @@ async function createNewJob( case: 'generate', value: new GenerateSourceOptions({ fkSourceConnectionId: connect.connectionId, - schemas: - tableSchema && table - ? [ - new GenerateSourceSchemaOption({ - schema: tableSchema, - tables: [ - new GenerateSourceTableOption({ - rowCount: BigInt(schema.numRows), - table: table, - }), - ], - }), - ] - : [], + schemas: schemaRecords, }), }, }), diff --git a/frontend/apps/web/app/(mgmt)/[account]/new/job/schema.ts b/frontend/apps/web/app/(mgmt)/[account]/new/job/schema.ts index 55541f93a7..5542593fc8 100644 --- a/frontend/apps/web/app/(mgmt)/[account]/new/job/schema.ts +++ b/frontend/apps/web/app/(mgmt)/[account]/new/job/schema.ts @@ -251,6 +251,14 @@ export type SingleTableSchemaFormValues = Yup.InferType< typeof SINGLE_TABLE_SCHEMA_FORM_SCHEMA >; +export const MULTI_TABLE_SCHEMA_FORM_SCHEMA = Yup.object({ + mappings: Yup.array().of(JOB_MAPPING_SCHEMA).required(), + numRows: Yup.number().required().min(1), +}); +export type MultiTableSchemaFormValues = Yup.InferType< + typeof MULTI_TABLE_SCHEMA_FORM_SCHEMA +>; + export const SUBSET_FORM_SCHEMA = Yup.object({ subsets: Yup.array(SINGLE_SUBSET_FORM_SCHEMA).required(), subsetOptions: Yup.object({ diff --git a/frontend/apps/web/components/jobs/SchemaTable/SchemaTable.tsx b/frontend/apps/web/components/jobs/SchemaTable/SchemaTable.tsx index f10b8d84d4..08b7e89aaa 100644 --- a/frontend/apps/web/components/jobs/SchemaTable/SchemaTable.tsx +++ b/frontend/apps/web/components/jobs/SchemaTable/SchemaTable.tsx @@ -142,7 +142,7 @@ export function SchemaTable(props: Props): ReactElement { options={getDualListBoxOptions(schema, data)} selected={selectedItems} onChange={toggleItem} - mode={jobType === 'generate' ? 'single' : 'many'} + mode={'many'} /> diff --git a/worker/internal/benthos/config.go b/worker/internal/benthos/config.go index a30fd9be5e..ecc0c081a3 100644 --- a/worker/internal/benthos/config.go +++ b/worker/internal/benthos/config.go @@ -54,9 +54,19 @@ type InputConfig struct { } type Inputs struct { - SqlSelect *SqlSelect `json:"sql_select,omitempty" yaml:"sql_select,omitempty"` - PooledSqlRaw *InputPooledSqlRaw `json:"pooled_sql_raw,omitempty" yaml:"pooled_sql_raw,omitempty"` - Generate *Generate `json:"generate,omitempty" yaml:"generate,omitempty"` + SqlSelect *SqlSelect `json:"sql_select,omitempty" yaml:"sql_select,omitempty"` + PooledSqlRaw *InputPooledSqlRaw `json:"pooled_sql_raw,omitempty" yaml:"pooled_sql_raw,omitempty"` + Generate *Generate `json:"generate,omitempty" yaml:"generate,omitempty"` + GenerateSqlSelect *GenerateSqlSelect `json:"generate_sql_select,omitempty" yaml:"generate_sql_select,omitempty"` +} + +type GenerateSqlSelect struct { + Mapping string `json:"mapping,omitempty" yaml:"mapping,omitempty"` + Count int `json:"count" yaml:"count"` + Driver string `json:"driver" yaml:"driver"` + Dsn string `json:"dsn" yaml:"dsn"` + TableColumnsMap map[string][]string `json:"table_columns_map" yaml:"table_columns_map"` + ColumnNameMap map[string]string `json:"column_name_map,omitempty" yaml:"column_name_map,omitempty"` } type Generate struct { @@ -186,8 +196,9 @@ type SwitchOutputCase struct { Output Outputs `json:"output,omitempty" yaml:"output,omitempty"` } type ErrorOutputConfig struct { - ErrorMsg string `json:"error_msg" yaml:"error_msg"` - Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"` + ErrorMsg string `json:"error_msg" yaml:"error_msg"` + MaxRetries *int `json:"max_retries,omitempty" yaml:"max_retries,omitempty"` + Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"` } type RedisHashOutputConfig struct { diff --git a/worker/internal/benthos/error/output_error.go b/worker/internal/benthos/error/output_error.go index 53e7b724f3..a8d2ce5d2a 100644 --- a/worker/internal/benthos/error/output_error.go +++ b/worker/internal/benthos/error/output_error.go @@ -13,6 +13,7 @@ func errorOutputSpec() *service.ConfigSpec { return service.NewConfigSpec(). Summary(`Sends stop Activity signal`). Field(service.NewStringField("error_msg")). + Field(service.NewIntField("max_retries").Optional()). Field(service.NewIntField("max_in_flight").Default(64)). Field(service.NewBatchPolicyField("batching")) } @@ -31,6 +32,7 @@ func RegisterErrorOutput(env *service.Environment, stopActivityChannel chan erro if err != nil { return nil, service.BatchPolicy{}, -1, err } + out, err := newErrorOutput(conf, mgr, stopActivityChannel) if err != nil { return nil, service.BatchPolicy{}, -1, err @@ -44,10 +46,19 @@ func newErrorOutput(conf *service.ParsedConfig, mgr *service.Resources, channel if err != nil { return nil, err } + var retries *int + if conf.Contains("max_retries") { + maxRetries, err := conf.FieldInt("max_retries") + if err != nil { + return nil, err + } + retries = &maxRetries + } return &errorOutput{ logger: mgr.Logger(), stopActivityChannel: channel, errorMsg: errMsg, + retries: retries, }, nil } @@ -55,6 +66,7 @@ type errorOutput struct { logger *service.Logger stopActivityChannel chan error errorMsg *service.InterpolatedString + retries *int } func (e *errorOutput) Connect(ctx context.Context) error { @@ -67,7 +79,10 @@ func (e *errorOutput) WriteBatch(ctx context.Context, batch service.MessageBatch if err != nil { return fmt.Errorf("error message interpolation error: %w", err) } - if neosync_benthos.IsMaxConnectionError(errMsg) { + if !neosync_benthos.ShouldTerminate(errMsg) || (e.retries != nil && *e.retries > 0) { + if e.retries != nil { + *e.retries-- + } // throw error so that benthos retries return errors.New(errMsg) } diff --git a/worker/internal/benthos/sql/input_generate_table_records.go b/worker/internal/benthos/sql/input_generate_table_records.go new file mode 100644 index 0000000000..518c79cd90 --- /dev/null +++ b/worker/internal/benthos/sql/input_generate_table_records.go @@ -0,0 +1,367 @@ +package neosync_benthos_sql + +// combo of generate, sql select and mapping + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "sync" + + "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/benthosdev/benthos/v4/public/service" + "github.com/doug-martin/goqu/v9" + "github.com/doug-martin/goqu/v9/exp" + mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql" + neosync_benthos "github.com/nucleuscloud/neosync/worker/internal/benthos" + "github.com/nucleuscloud/neosync/worker/internal/benthos/shutdown" + transformer_utils "github.com/nucleuscloud/neosync/worker/internal/benthos/transformers/utils" +) + +func generateTableRecordsInputSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Field(service.NewStringField("driver")). + Field(service.NewStringField("dsn")). + Field(service.NewAnyMapField("table_columns_map")). + Field(service.NewStringMapField("column_name_map").Optional().Example("{ schema.table.column: new_column_name }")). + Field(service.NewIntField("count")). + Fields(service.NewBloblangField("mapping").Optional()) +} + +func RegisterGenerateTableRecordsInput(env *service.Environment, dbprovider DbPoolProvider, stopActivityChannel chan error) error { + return env.RegisterBatchInput( + "generate_sql_select", generateTableRecordsInputSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) { + b, err := newGenerateReaderFromParsed(conf, mgr, dbprovider, stopActivityChannel) + if err != nil { + return nil, err + } + return service.AutoRetryNacksBatched(b), nil + }, + ) +} + +//------------------------------------------------------------------------------ + +type generateReader struct { + driver string + dsn string + tableColsMap map[string][]string + columnNameMap map[string]string + provider DbPoolProvider + logger *service.Logger + + mapping *bloblang.Executor + + db mysql_queries.DBTX + dbMut sync.Mutex + remaining int + + shutSig *shutdown.Signaller + + stopActivityChannel chan error +} + +func newGenerateReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources, dbprovider DbPoolProvider, channel chan error) (*generateReader, error) { + driver, err := conf.FieldString("driver") + if err != nil { + return nil, err + } + dsn, err := conf.FieldString("dsn") + if err != nil { + return nil, err + } + + count, err := conf.FieldInt("count") + if err != nil { + return nil, err + } + + tmpMap, err := conf.FieldAnyMap("table_columns_map") + if err != nil { + return nil, err + } + tableColsMap := map[string][]string{} + for k, v := range tmpMap { + val, err := v.FieldStringList() + if err != nil { + return nil, err + } + tableColsMap[k] = val + } + + columnNameMap := map[string]string{} + if conf.Contains("column_name_map") { + columnNameMap, err = conf.FieldStringMap("column_name_map") + if err != nil { + return nil, err + } + } + + var mapping *bloblang.Executor + if conf.Contains("mapping") { + mapping, err = conf.FieldBloblang("mapping") + if err != nil { + return nil, err + } + } + + return &generateReader{ + logger: mgr.Logger(), + shutSig: shutdown.NewSignaller(), + driver: driver, + dsn: dsn, + tableColsMap: tableColsMap, + columnNameMap: columnNameMap, + mapping: mapping, + provider: dbprovider, + stopActivityChannel: channel, + remaining: count, + }, nil +} + +// Connect establishes a Bloblang reader. +func (s *generateReader) Connect(ctx context.Context) error { + s.logger.Debug("connecting to pooled input") + s.dbMut.Lock() + defer s.dbMut.Unlock() + + if s.db != nil { + return nil + } + + db, err := s.provider.GetDb(s.driver, s.dsn) + if err != nil { + return nil + } + + s.db = db + + go func() { + <-s.shutSig.CloseNowChan() + + s.dbMut.Lock() + // not closing the connection here as that is managed by an outside force + s.db = nil + s.dbMut.Unlock() + + s.shutSig.ShutdownComplete() + }() + return nil +} + +// ReadBatch a new bloblang generated message. +func (s *generateReader) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { + if s.remaining <= 0 { + return nil, nil, service.ErrEndOfInput + } + + sqlRandomStr := "RANDOM()" + if s.driver == "mysql" { + sqlRandomStr = "RAND()" + } + + tables := []string{} + for t := range s.tableColsMap { + tables = append(tables, t) + } + + randomLrgLimit, err := transformer_utils.GenerateRandomInt64InValueRange(10, 50) + if err != nil { + return nil, nil, err + } + + table := tables[0] + // need to remove self circular dependent tables + otherTables := tables[1:] + + cols := s.tableColsMap[table] + // update col names to be that of destination table or should it be handled on insert + selectColumns := make([]any, len(cols)) + for i, col := range cols { + as, ok := s.columnNameMap[fmt.Sprintf("%s.%s", table, col)] + if ok { + selectColumns[i] = goqu.I(col).As(as) + } else { + selectColumns[i] = col + } + } + rows, err := s.queryDatabase(sqlRandomStr, table, randomLrgLimit, selectColumns) + if err != nil { + return nil, nil, err + } + + batch := service.MessageBatch{} + for _, r := range rows { + randomSmLimit, err := transformer_utils.GenerateRandomInt64InValueRange(0, 3) + if err != nil { + return nil, nil, err + } + otherTableRows := [][]map[string]any{} + for _, t := range otherTables { + cols := s.tableColsMap[t] + selectColumns := make([]any, len(cols)) + for i, col := range cols { + tn := fmt.Sprintf("%s.%s", t, col) + as, ok := s.columnNameMap[tn] + if ok { + selectColumns[i] = goqu.I(col).As(as) + } else { + selectColumns[i] = col + } + } + newRows, err := s.queryDatabase(sqlRandomStr, t, randomSmLimit, selectColumns) + if err != nil { + return nil, nil, err + } + if len(newRows) != 0 { + // how to handle tables that don't have enough data + otherTableRows = append(otherTableRows, newRows) + } + } + combinedRows := combineRowLists(otherTableRows) + if len(combinedRows) > 0 { + for _, cr := range combinedRows { + var args map[string]any + if s.mapping != nil { + args, err = s.queryBloblangMapping() + if err != nil { + return nil, nil, err + } + } + + newRow := combineRows([]map[string]any{r, cr, args}) + rowStr, err := json.Marshal(newRow) + if err != nil { + return nil, nil, err + } + if s.remaining < 1 { + return batch, func(context.Context, error) error { return nil }, nil + } + + msg := service.NewMessage(rowStr) + batch = append(batch, msg) + s.remaining-- + } + } else { + newRow := r + if s.mapping != nil { + args, err := s.queryBloblangMapping() + if err != nil { + return nil, nil, err + } + newRow = combineRows([]map[string]any{r, args}) + } + + rowStr, err := json.Marshal(newRow) + if err != nil { + return nil, nil, err + } + if s.remaining < 1 { + return batch, func(context.Context, error) error { return nil }, nil + } + msg := service.NewMessage(rowStr) + batch = append(batch, msg) + s.remaining-- + } + } + + return batch, func(context.Context, error) error { return nil }, nil +} + +func (s *generateReader) Close(ctx context.Context) (err error) { + s.shutSig.CloseNow() + s.dbMut.Lock() + isNil := s.db == nil + s.dbMut.Unlock() + if isNil { + return nil + } + select { + case <-s.shutSig.HasClosedChan(): + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +func (s *generateReader) queryBloblangMapping() (map[string]any, error) { + var iargs any + var err error + if iargs, err = s.mapping.Query(nil); err != nil { + return nil, err + } + + var ok bool + var args map[string]any + if args, ok = iargs.(map[string]any); !ok { + err = fmt.Errorf("mapping returned non-array result: %T", iargs) + return nil, err + } + return args, nil +} + +func (s *generateReader) queryDatabase(sqlRandomStr, table string, limit int64, columns []any) ([]map[string]any, error) { + orderBy := exp.NewOrderedExpression(exp.NewLiteralExpression(sqlRandomStr), exp.AscDir, exp.NullsLastSortType) + builder := goqu.Dialect(s.driver) + selectBuilder := builder.From(table).Select(columns...).Order(orderBy).Limit(uint(limit)) + selectSql, _, err := selectBuilder.ToSQL() + if err != nil { + return nil, err + } + + rows, err := s.db.QueryContext(ctx, selectSql) + if err != nil { + if !neosync_benthos.ShouldTerminate(err.Error()) { + s.logger.Error(fmt.Sprintf("Benthos input error - sending stop activity signal: %s ", err.Error())) + s.stopActivityChannel <- err + } + return nil, err + } + + rowObjList, err := sqlRowsToMapList(rows) + if err != nil { + _ = rows.Close() + return nil, err + } + return rowObjList, nil +} + +func sqlRowsToMapList(rows *sql.Rows) ([]map[string]any, error) { + results := []map[string]any{} + for rows.Next() { + obj, err := sqlRowToMap(rows) + if err != nil { + return nil, err + } + results = append(results, obj) + } + return results, nil +} + +func combineRows(maps []map[string]any) map[string]any { + result := make(map[string]any) + for _, m := range maps { + for k, v := range m { + result[k] = v + } + } + return result +} + +func combineRowLists(rows [][]map[string]any) []map[string]any { + if len(rows) == 0 { + return []map[string]any{} + } + results := []map[string]any{} + rowCount := len(rows[0]) + for i := 0; i < rowCount; i++ { + rowsToCombine := []map[string]any{} + for _, r := range rows { + rowsToCombine = append(rowsToCombine, r[i]) + } + results = append(results, combineRows(rowsToCombine)) + } + return results +} diff --git a/worker/internal/benthos/sql/input_generate_table_records_test.go b/worker/internal/benthos/sql/input_generate_table_records_test.go new file mode 100644 index 0000000000..1ead48fa47 --- /dev/null +++ b/worker/internal/benthos/sql/input_generate_table_records_test.go @@ -0,0 +1,83 @@ +package neosync_benthos_sql + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_combineRows(t *testing.T) { + tests := []struct { + name string + maps []map[string]any + expected map[string]any + }{ + { + name: "empty input", + maps: []map[string]any{}, + expected: map[string]any{}, + }, + { + name: "single map", + maps: []map[string]any{ + {"key1": "value1", "key2": 2}, + }, + expected: map[string]any{"key1": "value1", "key2": 2}, + }, + { + name: "multiple maps with unique keys", + maps: []map[string]any{ + {"key1": "value1"}, + {"key2": "value2"}, + }, + expected: map[string]any{"key1": "value1", "key2": "value2"}, + }, + { + name: "multiple maps with overlapping keys", + maps: []map[string]any{ + {"key1": "value1", "key2": "value2"}, + {"key2": "newValue2", "key3": 3}, + }, + expected: map[string]any{"key1": "value1", "key2": "newValue2", "key3": 3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := combineRows(tt.maps) + assert.Equal(t, actual, tt.expected) + }) + } +} + +func TestCombineRowLists(t *testing.T) { + // Test case 1: Empty input + emptyResult := combineRowLists([][]map[string]any{}) + assert.Empty(t, emptyResult) + + // Test case 2: Single row with single map + singleRowSingleMap := [][]map[string]any{{{"a": 1, "b": 2}}} + singleResult := combineRowLists(singleRowSingleMap) + expectedSingleResult := []map[string]any{{"a": 1, "b": 2}} + assert.Equal(t, expectedSingleResult, singleResult) + + // Test case 3: Single row with multiple maps + singleRowMultipleMaps := [][]map[string]any{{{"a": 1}}, {{"b": 2}}, {{"c": 3}}} + multipleResult := combineRowLists(singleRowMultipleMaps) + expectedMultipleResult := []map[string]any{{"a": 1, "b": 2, "c": 3}} + assert.Equal(t, expectedMultipleResult, multipleResult) + + // Test case 4: Multiple rows with multiple maps + multipleRowsMultipleMaps := [][]map[string]any{ + {{"a": 1}, {"b": 2}, {"c": 3}}, + {{"d": 4}, {"e": 5}, {"f": 6}}, + {{"g": 7}, {"h": 8}, {"i": 9}}, + } + multipleRowsResult := combineRowLists(multipleRowsMultipleMaps) + expectedMultipleRowsResult := []map[string]any{ + {"a": 1, "d": 4, "g": 7}, + {"b": 2, "e": 5, "h": 8}, + {"c": 3, "f": 6, "i": 9}, + } + assert.Equal(t, expectedMultipleRowsResult, multipleRowsResult) +} diff --git a/worker/internal/benthos/sql/input_sql_raw.go b/worker/internal/benthos/sql/input_sql_raw.go index 2e05a79423..4f1632abd6 100644 --- a/worker/internal/benthos/sql/input_sql_raw.go +++ b/worker/internal/benthos/sql/input_sql_raw.go @@ -3,6 +3,7 @@ package neosync_benthos_sql import ( "context" "database/sql" + "encoding/json" "fmt" "sync" @@ -118,12 +119,14 @@ func (s *pooledInput) Connect(ctx context.Context) error { rows, err := db.QueryContext(ctx, s.queryStatic, args...) if err != nil { - if !neosync_benthos.IsMaxConnectionError(err.Error()) { + if !neosync_benthos.ShouldTerminate(err.Error()) { s.logger.Error(fmt.Sprintf("Benthos input error - sending stop activity signal: %s ", err.Error())) s.stopActivityChannel <- err } return err } + jsonF, _ := json.MarshalIndent(rows, "", " ") + fmt.Printf("\n rows: %s \n", string(jsonF)) s.rows = rows go func() { @@ -169,6 +172,8 @@ func (s *pooledInput) Read(ctx context.Context) (*service.Message, service.AckFu s.rows = nil return nil, nil, err } + jsonF, _ := json.MarshalIndent(obj, "", " ") + fmt.Printf("\n obj: %s \n", string(jsonF)) msg := service.NewMessage(nil) msg.SetStructured(obj) diff --git a/worker/internal/benthos/sql/input_sql_select_generate.go b/worker/internal/benthos/sql/input_sql_select_generate.go new file mode 100644 index 0000000000..669deaf5d3 --- /dev/null +++ b/worker/internal/benthos/sql/input_sql_select_generate.go @@ -0,0 +1,300 @@ +package neosync_benthos_sql + +// combo of generate, sql select and mapping + +import ( + "context" + "fmt" + "sync" + + "github.com/benthosdev/benthos/v4/public/bloblang" + "github.com/benthosdev/benthos/v4/public/service" + "github.com/doug-martin/goqu/v9" + "github.com/doug-martin/goqu/v9/exp" + mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql" + neosync_benthos "github.com/nucleuscloud/neosync/worker/internal/benthos" + "github.com/nucleuscloud/neosync/worker/internal/benthos/shutdown" + transformer_utils "github.com/nucleuscloud/neosync/worker/internal/benthos/transformers/utils" +) + +func sqlSelectGenerateInputSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Field(service.NewStringField("driver")). + Field(service.NewStringField("dsn")). + Field(service.NewStringField("query")). + Field(service.NewAnyMapField("table_columns_map")). + Field(service.NewIntField("count")). + Field(service.NewBloblangField("args_mapping").Optional()) +} + +// Registers an input on a benthos environment called pooled_sql_raw +func RegisterSqlSelectGenerateInput(env *service.Environment, dbprovider DbPoolProvider, stopActivityChannel chan error) error { + return env.RegisterInput( + "pooled_sql_select_generate", sqlSelectGenerateInputSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + input, err := newSqlSelectGenerateInput(conf, mgr, dbprovider, stopActivityChannel) + if err != nil { + return nil, err + } + return input, nil + }, + ) +} + +type sqlSelectGenerateInput struct { + driver string + dsn string + tableColsMap map[string][]string + provider DbPoolProvider + logger *service.Logger + + argsMapping *bloblang.Executor + + db mysql_queries.DBTX + dbMut sync.Mutex + // rows *sql.Rows + remaining int + index int + joinedRows []map[string]any + + shutSig *shutdown.Signaller + + stopActivityChannel chan error +} + +func newSqlSelectGenerateInput(conf *service.ParsedConfig, mgr *service.Resources, dbprovider DbPoolProvider, channel chan error) (*sqlSelectGenerateInput, error) { + driver, err := conf.FieldString("driver") + if err != nil { + return nil, err + } + dsn, err := conf.FieldString("dsn") + if err != nil { + return nil, err + } + + count, err := conf.FieldInt("count") + if err != nil { + return nil, err + } + + tmpMap, err := conf.FieldAnyMap("table_columns_map") + if err != nil { + return nil, err + } + tableColsMap := map[string][]string{} + for k, v := range tmpMap { + val, err := v.FieldStringList() + if err != nil { + return nil, err + } + tableColsMap[k] = val + } + var argsMapping *bloblang.Executor + if conf.Contains("args_mapping") { + argsMapping, err = conf.FieldBloblang("args_mapping") + if err != nil { + return nil, err + } + } + + return &sqlSelectGenerateInput{ + logger: mgr.Logger(), + shutSig: shutdown.NewSignaller(), + driver: driver, + dsn: dsn, + tableColsMap: tableColsMap, + argsMapping: argsMapping, + provider: dbprovider, + stopActivityChannel: channel, + remaining: count, + index: 0, + }, nil +} + +var _ service.Input = &pooledInput{} + +func (s *sqlSelectGenerateInput) Connect(ctx context.Context) error { + s.logger.Debug("connecting to pooled input") + s.dbMut.Lock() + defer s.dbMut.Unlock() + + if s.db != nil { + return nil + } + + db, err := s.provider.GetDb(s.driver, s.dsn) + if err != nil { + return nil + } + + // var args []any + // if s.argsMapping != nil { + // iargs, err := s.argsMapping.Query(nil) + // if err != nil { + // return err + // } + // var ok bool + // if args, ok = iargs.([]any); !ok { + // return fmt.Errorf("mapping returned non-array result: %T", iargs) + // } + // } + + sqlRandomStr := "RANDOM()" + if s.driver == "mysql" { + sqlRandomStr = "RAND()" + } + + tables := []string{} + for t := range s.tableColsMap { + tables = append(tables, t) + } + + randomLimit, err := transformer_utils.GenerateRandomInt64InValueRange(10, 100) + if err != nil { + return err + } + + joinedRows := []map[string]any{} + + table := tables[0] + otherTables := tables[1:] + + cols := s.tableColsMap[table] + // update col names to be that of destination table or should it be handled on insert + selectColumns := make([]any, len(cols)) + for i, col := range cols { + selectColumns[i] = col + } + orderBy := exp.NewOrderedExpression(exp.NewLiteralExpression(sqlRandomStr), exp.AscDir, exp.NullsLastSortType) + builder := goqu.Dialect(s.driver) + selectBuilder := builder.From(table).Select(selectColumns...).Order(orderBy).Limit(uint(randomLimit)) + selectSql, _, err := selectBuilder.ToSQL() + if err != nil { + return err + } + + rows, err := db.QueryContext(ctx, selectSql) + if err != nil { + if !neosync_benthos.ShouldTerminate(err.Error()) { + s.logger.Error(fmt.Sprintf("Benthos input error - sending stop activity signal: %s ", err.Error())) + s.stopActivityChannel <- err + } + return err + } + + rowObjList, err := sqlRowsToMapList(rows) + if err != nil { + _ = rows.Close() + return err + } + + for _, r := range rowObjList { + randLimit, err := transformer_utils.GenerateRandomInt64InValueRange(0, 3) + if err != nil { + return err + } + otherTableRows := [][]map[string]any{} + for _, t := range otherTables { + cols := s.tableColsMap[t] + selectColumns := make([]any, len(cols)) + for i, col := range cols { + selectColumns[i] = col + } + selectBuilder := builder.From(table).Select(selectColumns...).Order(orderBy).Limit(uint(randLimit)) + selectSql, _, err := selectBuilder.ToSQL() + if err != nil { + return err + } + rows, err := db.QueryContext(ctx, selectSql) + if err != nil { + if !neosync_benthos.ShouldTerminate(err.Error()) { + s.logger.Error(fmt.Sprintf("Benthos input error - sending stop activity signal: %s ", err.Error())) + s.stopActivityChannel <- err + } + return err + } + rowObjList, err := sqlRowsToMapList(rows) + if err != nil { + _ = rows.Close() + return err + } + otherTableRows = append(otherTableRows, rowObjList) + } + combinedRows := combineRowLists(otherTableRows) + for _, cr := range combinedRows { + joinedRows = append(joinedRows, combineRows([]map[string]any{r, cr})) + } + } + + s.joinedRows = joinedRows + go func() { + <-s.shutSig.CloseNowChan() + + s.dbMut.Lock() + if rows != nil { + _ = rows.Close() + rows = nil + } + // not closing the connection here as that is managed by an outside force + s.db = nil + s.dbMut.Unlock() + + s.shutSig.ShutdownComplete() + }() + return nil +} + +func (s *sqlSelectGenerateInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { + s.dbMut.Lock() + defer s.dbMut.Unlock() + + // if s.db == nil s.rows == nil { + // return nil, nil, service.ErrNotConnected + // } + // if s.rows == nil { + // return nil, nil, service.ErrEndOfInput + // } + // if !s.rows.Next() { + // err := s.rows.Err() + // if err == nil { + // err = service.ErrEndOfInput + // } + // _ = s.rows.Close() + // s.rows = nil + // return nil, nil, err + // } + // if s. + + // obj, err := sqlRowToMap(s.rows) + // if err != nil { + // _ = s.rows.Close() + // s.rows = nil + // return nil, nil, err + // } + + if s.index >= 0 && s.index < len(s.joinedRows) { + msg := service.NewMessage(nil) + msg.SetStructured(s.joinedRows[s.index]) + s.index++ + s.remaining-- + return msg, emptyAck, nil + } else { + return nil, nil, service.ErrEndOfInput + } +} + +func (s *sqlSelectGenerateInput) Close(ctx context.Context) error { + s.shutSig.CloseNow() + s.dbMut.Lock() + isNil := s.db == nil + s.dbMut.Unlock() + if isNil { + return nil + } + select { + case <-s.shutSig.HasClosedChan(): + case <-ctx.Done(): + return ctx.Err() + } + return nil +} diff --git a/worker/internal/benthos/utils.go b/worker/internal/benthos/utils.go index 6f64bbed35..7787cf59b1 100644 --- a/worker/internal/benthos/utils.go +++ b/worker/internal/benthos/utils.go @@ -21,23 +21,21 @@ func ToSha256(input string) string { return fmt.Sprintf("%x", sha256.Sum256([]byte(input))) } -// checks if the error message matches a max connections error -func IsMaxConnectionError(errMsg string) bool { - // list of known error messages for when max connections are reached - maxConnErrors := []string{ +func containsIgnoreCase(s, substr string) bool { + return strings.Contains(strings.ToLower(s), strings.ToLower(substr)) +} + +// checks if the error should stop activity +func ShouldTerminate(errMsg string) bool { + // list of known error messages to terminate activity + stopErrors := []string{ "too many clients already", - "remaining connection slots are reserved", - "maximum number of connections reached", } - for _, errStr := range maxConnErrors { + for _, errStr := range stopErrors { if containsIgnoreCase(errMsg, errStr) { return true } } return false } - -func containsIgnoreCase(s, substr string) bool { - return strings.Contains(strings.ToLower(s), strings.ToLower(substr)) -} diff --git a/worker/internal/benthos/utils_test.go b/worker/internal/benthos/utils_test.go index bf3ce5f85e..ef0f353445 100644 --- a/worker/internal/benthos/utils_test.go +++ b/worker/internal/benthos/utils_test.go @@ -12,7 +12,7 @@ func Test_BuildBenthosTable(t *testing.T) { assert.Equal(t, BuildBenthosTable("", "users"), "users", "Handles an empty schema") } -func Test_IsMaxConnectionError(t *testing.T) { +func Test_ShouldTerminate(t *testing.T) { tests := []struct { name string errMsg string @@ -30,7 +30,7 @@ func Test_IsMaxConnectionError(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual := IsMaxConnectionError(tt.errMsg) + actual := ShouldTerminate(tt.errMsg) assert.Equal(t, tt.expected, actual) }) } diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder.go index 6afa4e6d66..9a97a5196c 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/benthos-builder.go @@ -105,7 +105,48 @@ func (b *benthosBuilder) GenerateBenthosConfigs( case *mgmtv1alpha1.JobSourceOptions_Generate: sourceTableOpts := groupGenerateSourceOptionsByTable(jobSourceConfig.Generate.Schemas) // TODO this needs to be updated to get db schema - sourceResponses, err := buildBenthosGenerateSourceConfigResponses(ctx, b.transformerclient, groupedMappings, sourceTableOpts, map[string]*dbschemas_utils.ColumnInfo{}) + // get depenendcy configs + // split root tables vs children tables + //////////////////////////////////////////////////////////////////////// + + sourceConnection, err := b.getJobSourceConnection(ctx, job.GetSource()) + if err != nil { + return nil, fmt.Errorf("unable to get connection by id: %w", err) + } + db, err := b.sqladapter.NewSqlDb(ctx, slogger, sourceConnection) + if err != nil { + return nil, fmt.Errorf("unable to create new sql db: %w", err) + } + defer db.ClosePool() + allConstraints, err := db.GetAllForeignKeyConstraints(ctx, uniqueSchemas) + if err != nil { + return nil, fmt.Errorf("unable to retrieve database foreign key constraints: %w", err) + } + slogger.Info(fmt.Sprintf("found %d foreign key constraints for database", len(allConstraints))) + td := sql_manager.GetDbTableDependencies(allConstraints) + + primaryKeys, err := db.GetAllPrimaryKeyConstraints(ctx, uniqueSchemas) + if err != nil { + return nil, fmt.Errorf("unable to get all primary key constraints: %w", err) + } + primaryKeyMap := sql_manager.GetTablePrimaryKeysMap(primaryKeys) + + tables := filterNullTables(groupedMappings) + dependencyConfigs := tabledependency.GetRunConfigs(td, tables, map[string]string{}) + dependencyMap := map[string][]*tabledependency.RunConfig{} + for _, cfg := range dependencyConfigs { + _, ok := dependencyMap[cfg.Table] + if ok { + dependencyMap[cfg.Table] = append(dependencyMap[cfg.Table], cfg) + } else { + dependencyMap[cfg.Table] = []*tabledependency.RunConfig{cfg} + } + } + + // how to handle unique constraints + /////////////////////////////////////////////////////////////// + + sourceResponses, err := buildBenthosGenerateSourceConfigResponses(ctx, b.transformerclient, groupedMappings, sourceTableOpts, map[string]*dbschemas_utils.ColumnInfo{}, dependencyMap, db.Driver, sourceConnection.Id, td, primaryKeyMap) if err != nil { return nil, fmt.Errorf("unable to build benthos generate source config responses: %w", err) } @@ -315,48 +356,51 @@ func (b *benthosBuilder) GenerateBenthosConfigs( } updateResponses = append(updateResponses, updateResp) } - } else if resp.Config.Input.Generate != nil { + } else if resp.Config.Input.Generate != nil || resp.Config.Input.GenerateSqlSelect != nil { cols := buildPlainColumns(tm.Mappings) - processorConfigs := []neosync_benthos.ProcessorConfig{} - for _, pc := range resp.Processors { - processorConfigs = append(processorConfigs, *pc) - } + // processorConfigs := []neosync_benthos.ProcessorConfig{} + // for _, pc := range resp.Processors { + // processorConfigs = append(processorConfigs, *pc) + // } + retries := 10 resp.Config.Output.Broker.Outputs = append(resp.Config.Output.Broker.Outputs, neosync_benthos.Outputs{ Fallback: []neosync_benthos.Outputs{ { // retry processor and output several times - Retry: &neosync_benthos.RetryConfig{ - InlineRetryConfig: neosync_benthos.InlineRetryConfig{ - MaxRetries: 10, - }, - Output: neosync_benthos.OutputConfig{ - Outputs: neosync_benthos.Outputs{ - PooledSqlInsert: &neosync_benthos.PooledSqlInsert{ - Driver: driver, - Dsn: dsn, - - Schema: resp.TableSchema, - Table: resp.TableName, - Columns: cols, - OnConflictDoNothing: destOpts.OnConflictDoNothing, - TruncateOnRetry: destOpts.Truncate, - - ArgsMapping: buildPlainInsertArgs(cols), - - Batching: &neosync_benthos.Batching{ - Period: "5s", - Count: 100, - }, - }, - }, - Processors: processorConfigs, + // Retry: &neosync_benthos.RetryConfig{ + // InlineRetryConfig: neosync_benthos.InlineRetryConfig{ + // MaxRetries: 10, + // }, + // Output: neosync_benthos.OutputConfig{ + // Outputs: neosync_benthos.Outputs{ + PooledSqlInsert: &neosync_benthos.PooledSqlInsert{ + Driver: driver, + Dsn: dsn, + + Schema: resp.TableSchema, + Table: resp.TableName, + Columns: cols, + OnConflictDoNothing: destOpts.OnConflictDoNothing, + TruncateOnRetry: destOpts.Truncate, + + ArgsMapping: buildPlainInsertArgs(cols), + + Batching: &neosync_benthos.Batching{ + Period: "5s", + Count: 100, }, + // }, + // }, + // Processors: processorConfigs, + // }, }, }, // kills activity depending on error + // TODO add retry here {Error: &neosync_benthos.ErrorOutputConfig{ - ErrorMsg: `${! meta("fallback_error")}`, + ErrorMsg: `${! meta("fallback_error")}`, + MaxRetries: &retries, Batching: &neosync_benthos.Batching{ Period: "5s", Count: 100, @@ -520,107 +564,6 @@ func buildOutputArgs(resp *BenthosConfigResponse, tm *tableMapping) *sqlUpdateOu } } -type generateSourceTableOptions struct { - Count int -} - -func buildBenthosGenerateSourceConfigResponses( - ctx context.Context, - transformerclient mgmtv1alpha1connect.TransformersServiceClient, - mappings []*tableMapping, - sourceTableOpts map[string]*generateSourceTableOptions, - columnInfo map[string]*dbschemas_utils.ColumnInfo, -) ([]*BenthosConfigResponse, error) { - responses := []*BenthosConfigResponse{} - - for _, tableMapping := range mappings { - if shared.AreAllColsNull(tableMapping.Mappings) { - // skiping table as no columns are mapped - continue - } - - var count = 0 - tableOpt := sourceTableOpts[neosync_benthos.BuildBenthosTable(tableMapping.Schema, tableMapping.Table)] - if tableOpt != nil { - count = tableOpt.Count - } - - jsCode, err := extractJsFunctionsAndOutputs(ctx, transformerclient, tableMapping.Mappings) - if err != nil { - return nil, err - } - - mutations, err := buildMutationConfigs(ctx, transformerclient, tableMapping.Mappings, columnInfo) - if err != nil { - return nil, err - } - var processors []*neosync_benthos.ProcessorConfig - // for the generate input, benthos requires a mapping, so falling back to a - // generic empty object if the mutations are empty - if mutations == "" { - mutations = "root = {}" - } - processors = append(processors, &neosync_benthos.ProcessorConfig{Mutation: &mutations}) - - if jsCode != "" { - processors = append(processors, &neosync_benthos.ProcessorConfig{Javascript: &neosync_benthos.JavascriptConfig{Code: jsCode}}) - } - if len(processors) > 0 { - // add catch and error processor - processors = append(processors, &neosync_benthos.ProcessorConfig{Catch: []*neosync_benthos.ProcessorConfig{ - {Error: &neosync_benthos.ErrorProcessorConfig{ - ErrorMsg: `${! meta("fallback_error")}`, - }}, - }}) - } - - bc := &neosync_benthos.BenthosConfig{ - StreamConfig: neosync_benthos.StreamConfig{ - Input: &neosync_benthos.InputConfig{ - Inputs: neosync_benthos.Inputs{ - Generate: &neosync_benthos.Generate{ - Interval: "", - Count: count, - Mapping: "root = {}", - }, - }, - }, - Pipeline: &neosync_benthos.PipelineConfig{ - Threads: -1, - Processors: []neosync_benthos.ProcessorConfig{}, // leave empty. processors should be on output - }, - Output: &neosync_benthos.OutputConfig{ - Outputs: neosync_benthos.Outputs{ - Broker: &neosync_benthos.OutputBrokerConfig{ - Pattern: "fan_out", - Outputs: []neosync_benthos.Outputs{}, - }, - }, - }, - }, - } - - responses = append(responses, &BenthosConfigResponse{ - Name: neosync_benthos.BuildBenthosTable(tableMapping.Schema, tableMapping.Table), // todo: may need to expand on this - Config: bc, - DependsOn: []*tabledependency.DependsOn{}, - - TableSchema: tableMapping.Schema, - TableName: tableMapping.Table, - - Processors: processors, - - metriclabels: metrics.MetricLabels{ - metrics.NewEqLabel(metrics.TableSchemaLabel, tableMapping.Schema), - metrics.NewEqLabel(metrics.TableNameLabel, tableMapping.Table), - metrics.NewEqLabel(metrics.JobTypeLabel, "generate"), - }, - }) - } - - return responses, nil -} - func (b *benthosBuilder) getJobById( ctx context.Context, jobId string, @@ -1187,12 +1130,16 @@ func getSqlJobSourceOpts( } func (b *benthosBuilder) getJobSourceConnection(ctx context.Context, jobSource *mgmtv1alpha1.JobSource) (*mgmtv1alpha1.Connection, error) { + jsonF, _ := json.MarshalIndent(jobSource, "", " ") + fmt.Printf("\n %s \n", string(jsonF)) var connectionId string switch jobSourceConfig := jobSource.GetOptions().GetConfig().(type) { case *mgmtv1alpha1.JobSourceOptions_Postgres: connectionId = jobSourceConfig.Postgres.GetConnectionId() case *mgmtv1alpha1.JobSourceOptions_Mysql: connectionId = jobSourceConfig.Mysql.GetConnectionId() + case *mgmtv1alpha1.JobSourceOptions_Generate: + connectionId = jobSourceConfig.Generate.GetFkSourceConnectionId() default: return nil, errors.New("unsupported job source options type") } @@ -1231,130 +1178,6 @@ type tableMapping struct { Mappings []*mgmtv1alpha1.JobMapping } -func buildProcessorConfigs( - ctx context.Context, - transformerclient mgmtv1alpha1connect.TransformersServiceClient, - cols []*mgmtv1alpha1.JobMapping, - tableColumnInfo map[string]*dbschemas_utils.ColumnInfo, - columnConstraints map[string]*dbschemas_utils.ForeignKey, - primaryKeys []string, - jobId, runId string, - redisConfig *shared.RedisConfig, -) ([]*neosync_benthos.ProcessorConfig, error) { - jsCode, err := extractJsFunctionsAndOutputs(ctx, transformerclient, cols) - if err != nil { - return nil, err - } - - mutations, err := buildMutationConfigs(ctx, transformerclient, cols, tableColumnInfo) - if err != nil { - return nil, err - } - - cacheBranches, err := buildBranchCacheConfigs(cols, columnConstraints, jobId, runId, redisConfig) - if err != nil { - return nil, err - } - - pkMapping := buildPrimaryKeyMappingConfigs(cols, primaryKeys) - - var processorConfigs []*neosync_benthos.ProcessorConfig - if pkMapping != "" { - processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Mapping: &pkMapping}) - } - if mutations != "" { - processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Mutation: &mutations}) - } - if jsCode != "" { - processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Javascript: &neosync_benthos.JavascriptConfig{Code: jsCode}}) - } - if len(cacheBranches) > 0 { - for _, config := range cacheBranches { - processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Branch: config}) - } - } - - if len(processorConfigs) > 0 { - // add catch and error processor - processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Catch: []*neosync_benthos.ProcessorConfig{ - {Error: &neosync_benthos.ErrorProcessorConfig{ - ErrorMsg: `${! meta("fallback_error")}`, - }}, - }}) - } - - return processorConfigs, err -} - -func extractJsFunctionsAndOutputs(ctx context.Context, transformerclient mgmtv1alpha1connect.TransformersServiceClient, cols []*mgmtv1alpha1.JobMapping) (string, error) { - var benthosOutputs []string - var jsFunctions []string - - for _, col := range cols { - if shouldProcessStrict(col.Transformer) { - if _, ok := col.Transformer.Config.Config.(*mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig); ok { - val, err := convertUserDefinedFunctionConfig(ctx, transformerclient, col.Transformer) - if err != nil { - return "", errors.New("unable to look up user defined transformer config by id") - } - col.Transformer = val - } - switch col.Transformer.Source { - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT: - code := col.Transformer.Config.GetTransformJavascriptConfig().Code - if code != "" { - jsFunctions = append(jsFunctions, constructJsFunction(code, col.Column, col.Transformer.Source)) - benthosOutputs = append(benthosOutputs, constructBenthosJavascriptObject(col.Column, col.Transformer.Source)) - } - case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT: - code := col.Transformer.Config.GetGenerateJavascriptConfig().Code - if code != "" { - jsFunctions = append(jsFunctions, constructJsFunction(code, col.Column, col.Transformer.Source)) - benthosOutputs = append(benthosOutputs, constructBenthosJavascriptObject(col.Column, col.Transformer.Source)) - } - } - } - } - - if len(jsFunctions) > 0 { - return constructBenthosJsProcessor(jsFunctions, benthosOutputs), nil - } else { - return "", nil - } -} - -func buildMutationConfigs( - ctx context.Context, - transformerclient mgmtv1alpha1connect.TransformersServiceClient, - cols []*mgmtv1alpha1.JobMapping, - tableColumnInfo map[string]*dbschemas_utils.ColumnInfo, -) (string, error) { - mutations := []string{} - - for _, col := range cols { - colInfo := tableColumnInfo[col.Column] - if shouldProcessColumn(col.Transformer) { - if _, ok := col.Transformer.Config.Config.(*mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig); ok { - // handle user defined transformer -> get the user defined transformer configs using the id - val, err := convertUserDefinedFunctionConfig(ctx, transformerclient, col.Transformer) - if err != nil { - return "", errors.New("unable to look up user defined transformer config by id") - } - col.Transformer = val - } - if col.Transformer.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT && col.Transformer.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT { - mutation, err := computeMutationFunction(col, colInfo) - if err != nil { - return "", fmt.Errorf("%s is not a supported transformer: %w", col.Transformer, err) - } - mutations = append(mutations, fmt.Sprintf("root.%q = %s", col.Column, mutation)) - } - } - } - - return strings.Join(mutations, "\n"), nil -} - func buildPrimaryKeyMappingConfigs(cols []*mgmtv1alpha1.JobMapping, primaryKeys []string) string { mappings := []string{} for _, col := range cols { diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/generate-builder.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/generate-builder.go new file mode 100644 index 0000000000..ed3425167a --- /dev/null +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/generate-builder.go @@ -0,0 +1,191 @@ +package genbenthosconfigs_activity + +import ( + "context" + "fmt" + + "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" + dbschemas_utils "github.com/nucleuscloud/neosync/backend/pkg/dbschemas" + "github.com/nucleuscloud/neosync/backend/pkg/metrics" + tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency" + neosync_benthos "github.com/nucleuscloud/neosync/worker/internal/benthos" + "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" +) + +type generateSourceTableOptions struct { + Count int +} + +func buildBenthosGenerateSourceConfigResponses( + ctx context.Context, + transformerclient mgmtv1alpha1connect.TransformersServiceClient, + mappings []*tableMapping, + sourceTableOpts map[string]*generateSourceTableOptions, + columnInfo map[string]*dbschemas_utils.ColumnInfo, + dependencyMap map[string][]*tabledependency.RunConfig, + driver, dsnConnectionId string, + tableConstraintsMap map[string]*dbschemas_utils.TableConstraints, + primaryKeyMap map[string][]string, +) ([]*BenthosConfigResponse, error) { + responses := []*BenthosConfigResponse{} + + for _, tableMapping := range mappings { + if shared.AreAllColsNull(tableMapping.Mappings) { + // skiping table as no columns are mapped + continue + } + + tableName := neosync_benthos.BuildBenthosTable(tableMapping.Schema, tableMapping.Table) + runConfigs := dependencyMap[tableName] + + var count = 0 + tableOpt := sourceTableOpts[tableName] + if tableOpt != nil { + count = tableOpt.Count + } + + jsCode, err := extractJsFunctionsAndOutputs(ctx, transformerclient, tableMapping.Mappings) + if err != nil { + return nil, err + } + + mutations, err := buildMutationConfigs(ctx, transformerclient, tableMapping.Mappings, columnInfo) + if err != nil { + return nil, err + } + var processors []neosync_benthos.ProcessorConfig + // for the generate input, benthos requires a mapping, so falling back to a + // generic empty object if the mutations are empty + if mutations == "" { + mutations = "root = {}" + } + processors = append(processors, neosync_benthos.ProcessorConfig{Mutation: &mutations}) + + if jsCode != "" { + processors = append(processors, neosync_benthos.ProcessorConfig{Javascript: &neosync_benthos.JavascriptConfig{Code: jsCode}}) + } + if len(processors) > 0 { + // add catch and error processor + processors = append(processors, neosync_benthos.ProcessorConfig{Catch: []*neosync_benthos.ProcessorConfig{ + {Error: &neosync_benthos.ErrorProcessorConfig{ + ErrorMsg: `${! meta("fallback_error")}`, + }}, + }}) + } + + var bc *neosync_benthos.BenthosConfig + cols := []string{} + for _, m := range tableMapping.Mappings { + cols = append(cols, m.Column) + } + if len(runConfigs) > 0 && len(runConfigs[0].DependsOn) > 0 { + columnNameMap := map[string]string{} + tableColsMaps := map[string][]string{} + + constraints := tableConstraintsMap[tableName] + for _, tc := range constraints.Constraints { + columnNameMap[fmt.Sprintf("%s.%s", tc.ForeignKey.Table, tc.ForeignKey.Column)] = tc.Column + tableColsMaps[tc.ForeignKey.Table] = append(tableColsMaps[tc.ForeignKey.Table], tc.ForeignKey.Column) + } + + bc = &neosync_benthos.BenthosConfig{ + StreamConfig: neosync_benthos.StreamConfig{ + Input: &neosync_benthos.InputConfig{ + Inputs: neosync_benthos.Inputs{ + GenerateSqlSelect: &neosync_benthos.GenerateSqlSelect{ + Count: count, + Mapping: mutations, + Driver: driver, + Dsn: "${SOURCE_CONNECTION_DSN}", + TableColumnsMap: tableColsMaps, + ColumnNameMap: columnNameMap, + }, + }, + }, + Pipeline: &neosync_benthos.PipelineConfig{ + Threads: -1, + // Processors: processors, + Processors: []neosync_benthos.ProcessorConfig{}, + }, + Output: &neosync_benthos.OutputConfig{ + Outputs: neosync_benthos.Outputs{ + Broker: &neosync_benthos.OutputBrokerConfig{ + Pattern: "fan_out_sequential_fail_fast", + Outputs: []neosync_benthos.Outputs{}, + }, + }, + }, + }, + } + } else { + bc = &neosync_benthos.BenthosConfig{ + StreamConfig: neosync_benthos.StreamConfig{ + Input: &neosync_benthos.InputConfig{ + Inputs: neosync_benthos.Inputs{ + Generate: &neosync_benthos.Generate{ + Interval: "", + Count: count, + Mapping: "root = {}", + }, + }, + }, + Pipeline: &neosync_benthos.PipelineConfig{ + Threads: -1, + Processors: processors, + }, + Output: &neosync_benthos.OutputConfig{ + Outputs: neosync_benthos.Outputs{ + Broker: &neosync_benthos.OutputBrokerConfig{ + Pattern: "fan_out_sequential_fail_fast", + Outputs: []neosync_benthos.Outputs{}, + }, + }, + }, + }, + } + } + + resp := &BenthosConfigResponse{ + Name: neosync_benthos.BuildBenthosTable(tableMapping.Schema, tableMapping.Table), // todo: may need to expand on this + Config: bc, + DependsOn: []*tabledependency.DependsOn{}, + BenthosDsns: []*shared.BenthosDsn{{ConnectionId: dsnConnectionId, EnvVarKey: "SOURCE_CONNECTION_DSN"}}, + + TableSchema: tableMapping.Schema, + TableName: tableMapping.Table, + Columns: cols, + + // Processors: processors, + + metriclabels: metrics.MetricLabels{ + metrics.NewEqLabel(metrics.TableSchemaLabel, tableMapping.Schema), + metrics.NewEqLabel(metrics.TableNameLabel, tableMapping.Table), + metrics.NewEqLabel(metrics.JobTypeLabel, "generate"), + }, + } + if len(runConfigs) > 1 { + // circular dependency + for _, c := range runConfigs { + if c.Columns != nil && c.Columns.Exclude != nil && len(c.Columns.Exclude) > 0 { + resp.excludeColumns = c.Columns.Exclude + resp.DependsOn = c.DependsOn + } else if c.Columns != nil && c.Columns.Include != nil && len(c.Columns.Include) > 0 { + pks := primaryKeyMap[tableName] + if len(pks) == 0 { + return nil, fmt.Errorf("no primary keys found for table (%s). Unable to build update query", tableName) + } + + // config for sql update + resp.updateConfig = c + resp.primaryKeys = pks + } + } + } else if len(runConfigs) == 1 { + resp.DependsOn = runConfigs[0].DependsOn + } + + responses = append(responses, resp) + } + + return responses, nil +} diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go new file mode 100644 index 0000000000..8662cfcfcd --- /dev/null +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/processors.go @@ -0,0 +1,138 @@ +package genbenthosconfigs_activity + +import ( + "context" + "errors" + "fmt" + "strings" + + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" + dbschemas_utils "github.com/nucleuscloud/neosync/backend/pkg/dbschemas" + neosync_benthos "github.com/nucleuscloud/neosync/worker/internal/benthos" + "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" +) + +func buildProcessorConfigs( + ctx context.Context, + transformerclient mgmtv1alpha1connect.TransformersServiceClient, + cols []*mgmtv1alpha1.JobMapping, + tableColumnInfo map[string]*dbschemas_utils.ColumnInfo, + columnConstraints map[string]*dbschemas_utils.ForeignKey, + primaryKeys []string, + jobId, runId string, + redisConfig *shared.RedisConfig, +) ([]*neosync_benthos.ProcessorConfig, error) { + jsCode, err := extractJsFunctionsAndOutputs(ctx, transformerclient, cols) + if err != nil { + return nil, err + } + + mutations, err := buildMutationConfigs(ctx, transformerclient, cols, tableColumnInfo) + if err != nil { + return nil, err + } + + cacheBranches, err := buildBranchCacheConfigs(cols, columnConstraints, jobId, runId, redisConfig) + if err != nil { + return nil, err + } + + pkMapping := buildPrimaryKeyMappingConfigs(cols, primaryKeys) + + var processorConfigs []*neosync_benthos.ProcessorConfig + if pkMapping != "" { + processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Mapping: &pkMapping}) + } + if mutations != "" { + processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Mutation: &mutations}) + } + if jsCode != "" { + processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Javascript: &neosync_benthos.JavascriptConfig{Code: jsCode}}) + } + if len(cacheBranches) > 0 { + for _, config := range cacheBranches { + processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Branch: config}) + } + } + + if len(processorConfigs) > 0 { + // add catch and error processor + processorConfigs = append(processorConfigs, &neosync_benthos.ProcessorConfig{Catch: []*neosync_benthos.ProcessorConfig{ + {Error: &neosync_benthos.ErrorProcessorConfig{ + ErrorMsg: `${! meta("fallback_error")}`, + }}, + }}) + } + + return processorConfigs, err +} + +func extractJsFunctionsAndOutputs(ctx context.Context, transformerclient mgmtv1alpha1connect.TransformersServiceClient, cols []*mgmtv1alpha1.JobMapping) (string, error) { + var benthosOutputs []string + var jsFunctions []string + + for _, col := range cols { + if shouldProcessStrict(col.Transformer) { + if _, ok := col.Transformer.Config.Config.(*mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig); ok { + val, err := convertUserDefinedFunctionConfig(ctx, transformerclient, col.Transformer) + if err != nil { + return "", errors.New("unable to look up user defined transformer config by id") + } + col.Transformer = val + } + switch col.Transformer.Source { + case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT: + code := col.Transformer.Config.GetTransformJavascriptConfig().Code + if code != "" { + jsFunctions = append(jsFunctions, constructJsFunction(code, col.Column, col.Transformer.Source)) + benthosOutputs = append(benthosOutputs, constructBenthosJavascriptObject(col.Column, col.Transformer.Source)) + } + case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT: + code := col.Transformer.Config.GetGenerateJavascriptConfig().Code + if code != "" { + jsFunctions = append(jsFunctions, constructJsFunction(code, col.Column, col.Transformer.Source)) + benthosOutputs = append(benthosOutputs, constructBenthosJavascriptObject(col.Column, col.Transformer.Source)) + } + } + } + } + + if len(jsFunctions) > 0 { + return constructBenthosJsProcessor(jsFunctions, benthosOutputs), nil + } else { + return "", nil + } +} + +func buildMutationConfigs( + ctx context.Context, + transformerclient mgmtv1alpha1connect.TransformersServiceClient, + cols []*mgmtv1alpha1.JobMapping, + tableColumnInfo map[string]*dbschemas_utils.ColumnInfo, +) (string, error) { + mutations := []string{} + + for _, col := range cols { + if shouldProcessColumn(col.Transformer) { + if _, ok := col.Transformer.Config.Config.(*mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig); ok { + // handle user defined transformer -> get the user defined transformer configs using the id + val, err := convertUserDefinedFunctionConfig(ctx, transformerclient, col.Transformer) + if err != nil { + return "", errors.New("unable to look up user defined transformer config by id") + } + col.Transformer = val + } + if col.Transformer.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT && col.Transformer.Source != mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT { + colInfo := tableColumnInfo[col.Column] + mutation, err := computeMutationFunction(col, colInfo) + if err != nil { + return "", fmt.Errorf("%s is not a supported transformer: %w", col.Transformer, err) + } + mutations = append(mutations, fmt.Sprintf("root.%q = %s", col.Column, mutation)) + } + } + } + + return strings.Join(mutations, "\n"), nil +} diff --git a/worker/pkg/workflows/datasync/activities/sync/activity.go b/worker/pkg/workflows/datasync/activities/sync/activity.go index d9af93cb90..9586e23a26 100644 --- a/worker/pkg/workflows/datasync/activities/sync/activity.go +++ b/worker/pkg/workflows/datasync/activities/sync/activity.go @@ -245,6 +245,10 @@ func (a *Activity) Sync(ctx context.Context, req *SyncRequest, metadata *SyncMet if err != nil { return nil, fmt.Errorf("unable to register pooled_sql_raw input to benthos instance: %w", err) } + err = neosync_benthos_sql.RegisterGenerateTableRecordsInput(benthosenv, poolprovider, stopActivityChan) + if err != nil { + return nil, fmt.Errorf("unable to register generate_sql_select input to benthos instance: %w", err) + } err = neosync_benthos_error.RegisterErrorProcessor(benthosenv, stopActivityChan) if err != nil { diff --git a/worker/pkg/workflows/datasync/workflow/workflow.go b/worker/pkg/workflows/datasync/workflow/workflow.go index bd10cdbd7a..42b1f0a777 100644 --- a/worker/pkg/workflows/datasync/workflow/workflow.go +++ b/worker/pkg/workflows/datasync/workflow/workflow.go @@ -274,6 +274,7 @@ func invokeSync( settable.SetError(fmt.Errorf("unable to marshal benthos config: %w", err)) return } + fmt.Println(string(configbits)) logger.Info("scheduling Sync for execution.") var result sync_activity.SyncResponse