Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - multi table gen #1832

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use client';
import {
SINGLE_TABLE_SCHEMA_FORM_SCHEMA,
SingleTableSchemaFormValues,
MULTI_TABLE_SCHEMA_FORM_SCHEMA,
MultiTableSchemaFormValues,
} from '@/app/(mgmt)/[account]/new/job/schema';
import { SchemaTable } from '@/components/jobs/SchemaTable/SchemaTable';
import { getSchemaConstraintHandler } from '@/components/jobs/SchemaTable/schema-constraint-handler';
Expand Down Expand Up @@ -87,8 +87,8 @@ export default function DataGenConnectionCard({ jobId }: Props): ReactElement {
fkSourceConnectionId ?? ''
);

const form = useForm<SingleTableSchemaFormValues>({
resolver: yupResolver(SINGLE_TABLE_SCHEMA_FORM_SCHEMA),
const form = useForm<MultiTableSchemaFormValues>({
resolver: yupResolver(MULTI_TABLE_SCHEMA_FORM_SCHEMA),
values: getJobSource(data?.job),
});

Expand All @@ -107,7 +107,7 @@ export default function DataGenConnectionCard({ jobId }: Props): ReactElement {
return <SchemaPageSkeleton />;
}

async function onSubmit(values: SingleTableSchemaFormValues) {
async function onSubmit(values: MultiTableSchemaFormValues) {
const job = data?.job;
if (!job) {
return;
Expand Down Expand Up @@ -182,7 +182,7 @@ export default function DataGenConnectionCard({ jobId }: Props): ReactElement {
);
}

function getJobSource(job?: Job): SingleTableSchemaFormValues {
function getJobSource(job?: Job): MultiTableSchemaFormValues {
if (!job) {
return {
mappings: [],
Expand All @@ -200,7 +200,7 @@ function getJobSource(job?: Job): SingleTableSchemaFormValues {
}
}

const mappings: SingleTableSchemaFormValues['mappings'] = (
const mappings: MultiTableSchemaFormValues['mappings'] = (
job.mappings ?? []
).map((mapping) => {
return {
Expand All @@ -221,10 +221,31 @@ function getJobSource(job?: Job): SingleTableSchemaFormValues {
async function updateJobConnection(
accountId: string,
job: Job,
values: SingleTableSchemaFormValues
values: MultiTableSchemaFormValues
): Promise<UpdateJobSourceConnectionResponse> {
const schema = values.mappings.length > 0 ? values.mappings[0].schema : null;
const table = values.mappings.length > 0 ? values.mappings[0].table : null;
const schemas = values.mappings.reduce(
(prev, curr) => {
const prevTables = prev[curr.schema] || {};
return {
...prev,
[curr.schema]: { ...prevTables, [curr.table]: curr.table },
};
},
{} as Record<string, Record<string, string>>
);
const schemaRecords = Object.entries(schemas).map(([s, tables]) => {
return new GenerateSourceSchemaOption({
schema: s,
tables: Object.keys(tables).map(
(t) =>
new GenerateSourceTableOption({
rowCount: BigInt(values.numRows),
table: t,
})
),
});
});

const res = await fetch(
`/api/accounts/${accountId}/jobs/${job.id}/source-connection`,
{
Expand Down Expand Up @@ -252,20 +273,7 @@ async function updateJobConnection(
case: 'generate',
value: new GenerateSourceOptions({
fkSourceConnectionId: getFkIdFromGenerateSource(job.source),
schemas:
schema && table
? [
new GenerateSourceSchemaOption({
schema: schema,
tables: [
new GenerateSourceTableOption({
table: table,
rowCount: BigInt(values.numRows),
}),
],
}),
]
: [],
schemas: schemaRecords,
}),
},
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ import { useSessionStorage } from 'usehooks-ts';
import JobsProgressSteps, { DATA_GEN_STEPS } from '../../../JobsProgressSteps';
import {
DefineFormValues,
SINGLE_TABLE_SCHEMA_FORM_SCHEMA,
MULTI_TABLE_SCHEMA_FORM_SCHEMA,
MultiTableSchemaFormValues,
SingleTableConnectFormValues,
SingleTableSchemaFormValues,
} from '../../../schema';
const isBrowser = () => typeof window !== 'undefined';

Expand Down Expand Up @@ -98,7 +98,7 @@ export default function Page({ searchParams }: PageProps): ReactElement {

const formKey = `${sessionPrefix}-new-job-single-table-schema`;

const [schemaFormData] = useSessionStorage<SingleTableSchemaFormValues>(
const [schemaFormData] = useSessionStorage<MultiTableSchemaFormValues>(
formKey,
{
mappings: [],
Expand All @@ -114,8 +114,8 @@ export default function Page({ searchParams }: PageProps): ReactElement {

const form = useForm({
mode: 'onChange',
resolver: yupResolver<SingleTableSchemaFormValues>(
SINGLE_TABLE_SCHEMA_FORM_SCHEMA
resolver: yupResolver<MultiTableSchemaFormValues>(
MULTI_TABLE_SCHEMA_FORM_SCHEMA
),
values: schemaFormData,
});
Expand All @@ -130,7 +130,7 @@ export default function Page({ searchParams }: PageProps): ReactElement {
setIsClient(true);
}, []);

async function onSubmit(values: SingleTableSchemaFormValues) {
async function onSubmit(values: MultiTableSchemaFormValues) {
if (!account) {
return;
}
Expand Down Expand Up @@ -296,7 +296,7 @@ export default function Page({ searchParams }: PageProps): ReactElement {
async function createNewJob(
define: DefineFormValues,
connect: SingleTableConnectFormValues,
schema: SingleTableSchemaFormValues,
schema: MultiTableSchemaFormValues,
accountId: string,
connections: Connection[]
): Promise<CreateJobResponse> {
Expand Down Expand Up @@ -328,9 +328,29 @@ async function createNewJob(
}),
});
}
const tableSchema =
schema.mappings.length > 0 ? schema.mappings[0].schema : null;
const table = schema.mappings.length > 0 ? schema.mappings[0].table : null;
const schemas = schema.mappings.reduce(
(prev, curr) => {
const prevTables = prev[curr.schema] || {};
return {
...prev,
[curr.schema]: { ...prevTables, [curr.table]: curr.table },
};
},
{} as Record<string, Record<string, string>>
);
const schemaRecords = Object.entries(schemas).map(([s, tables]) => {
return new GenerateSourceSchemaOption({
schema: s,
tables: Object.keys(tables).map(
(t) =>
new GenerateSourceTableOption({
rowCount: BigInt(schema.numRows),
table: t,
})
),
});
});

const body = new CreateJobRequest({
accountId,
jobName: define.jobName,
Expand All @@ -352,20 +372,7 @@ async function createNewJob(
case: 'generate',
value: new GenerateSourceOptions({
fkSourceConnectionId: connect.connectionId,
schemas:
tableSchema && table
? [
new GenerateSourceSchemaOption({
schema: tableSchema,
tables: [
new GenerateSourceTableOption({
rowCount: BigInt(schema.numRows),
table: table,
}),
],
}),
]
: [],
schemas: schemaRecords,
}),
},
}),
Expand Down
8 changes: 8 additions & 0 deletions frontend/apps/web/app/(mgmt)/[account]/new/job/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ export type SingleTableSchemaFormValues = Yup.InferType<
typeof SINGLE_TABLE_SCHEMA_FORM_SCHEMA
>;

export const MULTI_TABLE_SCHEMA_FORM_SCHEMA = Yup.object({
mappings: Yup.array().of(JOB_MAPPING_SCHEMA).required(),
numRows: Yup.number().required().min(1),
});
export type MultiTableSchemaFormValues = Yup.InferType<
typeof MULTI_TABLE_SCHEMA_FORM_SCHEMA
>;

export const SUBSET_FORM_SCHEMA = Yup.object({
subsets: Yup.array(SINGLE_SUBSET_FORM_SCHEMA).required(),
subsetOptions: Yup.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export function SchemaTable(props: Props): ReactElement {
options={getDualListBoxOptions(schema, data)}
selected={selectedItems}
onChange={toggleItem}
mode={jobType === 'generate' ? 'single' : 'many'}
mode={'many'}
/>
</CardContent>
</Card>
Expand Down
21 changes: 16 additions & 5 deletions worker/internal/benthos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,19 @@ type InputConfig struct {
}

type Inputs struct {
SqlSelect *SqlSelect `json:"sql_select,omitempty" yaml:"sql_select,omitempty"`
PooledSqlRaw *InputPooledSqlRaw `json:"pooled_sql_raw,omitempty" yaml:"pooled_sql_raw,omitempty"`
Generate *Generate `json:"generate,omitempty" yaml:"generate,omitempty"`
SqlSelect *SqlSelect `json:"sql_select,omitempty" yaml:"sql_select,omitempty"`
PooledSqlRaw *InputPooledSqlRaw `json:"pooled_sql_raw,omitempty" yaml:"pooled_sql_raw,omitempty"`
Generate *Generate `json:"generate,omitempty" yaml:"generate,omitempty"`
GenerateSqlSelect *GenerateSqlSelect `json:"generate_sql_select,omitempty" yaml:"generate_sql_select,omitempty"`
}

type GenerateSqlSelect struct {
Mapping string `json:"mapping,omitempty" yaml:"mapping,omitempty"`
Count int `json:"count" yaml:"count"`
Driver string `json:"driver" yaml:"driver"`
Dsn string `json:"dsn" yaml:"dsn"`
TableColumnsMap map[string][]string `json:"table_columns_map" yaml:"table_columns_map"`
ColumnNameMap map[string]string `json:"column_name_map,omitempty" yaml:"column_name_map,omitempty"`
}

type Generate struct {
Expand Down Expand Up @@ -186,8 +196,9 @@ type SwitchOutputCase struct {
Output Outputs `json:"output,omitempty" yaml:"output,omitempty"`
}
type ErrorOutputConfig struct {
ErrorMsg string `json:"error_msg" yaml:"error_msg"`
Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"`
ErrorMsg string `json:"error_msg" yaml:"error_msg"`
MaxRetries *int `json:"max_retries,omitempty" yaml:"max_retries,omitempty"`
Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"`
}

type RedisHashOutputConfig struct {
Expand Down
17 changes: 16 additions & 1 deletion worker/internal/benthos/error/output_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func errorOutputSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Summary(`Sends stop Activity signal`).
Field(service.NewStringField("error_msg")).
Field(service.NewIntField("max_retries").Optional()).
Field(service.NewIntField("max_in_flight").Default(64)).
Field(service.NewBatchPolicyField("batching"))
}
Expand All @@ -31,6 +32,7 @@ func RegisterErrorOutput(env *service.Environment, stopActivityChannel chan erro
if err != nil {
return nil, service.BatchPolicy{}, -1, err
}

out, err := newErrorOutput(conf, mgr, stopActivityChannel)
if err != nil {
return nil, service.BatchPolicy{}, -1, err
Expand All @@ -44,17 +46,27 @@ func newErrorOutput(conf *service.ParsedConfig, mgr *service.Resources, channel
if err != nil {
return nil, err
}
var retries *int
if conf.Contains("max_retries") {
maxRetries, err := conf.FieldInt("max_retries")
if err != nil {
return nil, err
}
retries = &maxRetries
}
return &errorOutput{
logger: mgr.Logger(),
stopActivityChannel: channel,
errorMsg: errMsg,
retries: retries,
}, nil
}

type errorOutput struct {
logger *service.Logger
stopActivityChannel chan error
errorMsg *service.InterpolatedString
retries *int
}

func (e *errorOutput) Connect(ctx context.Context) error {
Expand All @@ -67,7 +79,10 @@ func (e *errorOutput) WriteBatch(ctx context.Context, batch service.MessageBatch
if err != nil {
return fmt.Errorf("error message interpolation error: %w", err)
}
if neosync_benthos.IsMaxConnectionError(errMsg) {
if !neosync_benthos.ShouldTerminate(errMsg) || (e.retries != nil && *e.retries > 0) {
if e.retries != nil {
*e.retries--
}
// throw error so that benthos retries
return errors.New(errMsg)
}
Expand Down
Loading
Loading