Skip to content

Commit

Permalink
🌊 Streams: Processing for classic streams (elastic#202339)
Browse files Browse the repository at this point in the history
This PR allows to add processing to classic streams.

When defining processing for a classic stream, it will add a reference
to the `<stream name>@stream.processing` pipeline to the default
pipeline of the stream (or a `@custom` sub pipeline if it exists). Then,
it will write processing rules like for wired streams.

On the UI, the processing tab is not implemented yet, but this PR also
prepares that there are two tabs (overview and enrich) shown for classic
streams, not just an overview page.

---------

Co-authored-by: Chris Cowan <[email protected]>
Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
3 people authored Dec 11, 2024
1 parent 87b2a12 commit 64e9728
Show file tree
Hide file tree
Showing 16 changed files with 796 additions and 286 deletions.
17 changes: 8 additions & 9 deletions x-pack/solutions/observability/plugins/streams/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,21 @@ export type StreamChild = z.infer<typeof streamChildSchema>;
export const streamWithoutIdDefinitonSchema = z.object({
processing: z.array(processingDefinitionSchema).default([]),
fields: z.array(fieldDefinitionSchema).default([]),
managed: z.boolean().default(true),
children: z.array(streamChildSchema).default([]),
});

export type StreamWithoutIdDefinition = z.infer<typeof streamDefinitonSchema>;

export const unmanagedElasticsearchAsset = z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: z.string(),
});
export type UnmanagedElasticsearchAsset = z.infer<typeof unmanagedElasticsearchAsset>;

export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({
id: z.string(),
managed: z.boolean().default(true),
unmanaged_elasticsearch_assets: z.optional(
z.array(
z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: z.string(),
})
)
),
unmanaged_elasticsearch_assets: z.optional(z.array(unmanagedElasticsearchAsset)),
});

export type StreamDefinition = z.infer<typeof streamDefinitonSchema>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@ import { logsDefaultPipelineProcessors } from './logs_default_pipeline';
import { isRoot } from '../helpers/hierarchy';
import { getProcessingPipelineName } from './name';

function generateProcessingSteps(definition: StreamDefinition) {
return definition.processing.map((processor) => {
const { type, ...config } = processor.config;
return {
[type]: {
...config,
if: processor.condition ? conditionToPainless(processor.condition) : undefined,
},
};
});
}

export function generateIngestPipeline(id: string, definition: StreamDefinition) {
return {
id: getProcessingPipelineName(id),
processors: [
...(isRoot(definition.id) ? logsDefaultPipelineProcessors : []),
...definition.processing.map((processor) => {
const { type, ...config } = processor.config;
return {
[type]: {
...config,
if: processor.condition ? conditionToPainless(processor.condition) : undefined,
},
};
}),
...generateProcessingSteps(definition),
{
pipeline: {
name: `${id}@stream.reroutes`,
Expand All @@ -40,3 +44,14 @@ export function generateIngestPipeline(id: string, definition: StreamDefinition)
version: ASSET_VERSION,
};
}

export function generateClassicIngestPipelineBody(definition: StreamDefinition) {
return {
processors: generateProcessingSteps(definition),
_meta: {
description: `Stream-managed pipeline for the ${definition.id} stream`,
managed: true,
},
version: ASSET_VERSION,
};
}
Loading

0 comments on commit 64e9728

Please sign in to comment.