From 64e97285fb46d01718fe9ecf23e1c18c7382fff4 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Wed, 11 Dec 2024 12:26:28 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=8C=8A=20Streams:=20Processing=20for=20cl?= =?UTF-8?q?assic=20streams=20(#202339)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR allows to add processing to classic streams. When defining processing for a classic stream, it will add a reference to the `@stream.processing` pipeline to the default pipeline of the stream (or a `@custom` sub pipeline if it exists). Then, it will write processing rules like for wired streams. On the UI, the processing tab is not implemented yet, but this PR also prepares that there are two tabs (overview and enrich) shown for classic streams, not just an overview page. --------- Co-authored-by: Chris Cowan Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> --- .../plugins/streams/common/types.ts | 17 +- .../generate_ingest_pipeline.ts | 33 +- .../streams/server/lib/streams/stream_crud.ts | 299 ++++++++++++++++-- .../streams/server/routes/streams/delete.ts | 27 +- .../streams/server/routes/streams/edit.ts | 13 +- .../streams/server/routes/streams/enable.ts | 10 +- .../streams/server/routes/streams/fork.ts | 6 +- .../streams/server/routes/streams/list.ts | 38 +-- .../plugins/streams/tsconfig.json | 3 +- .../stream_detail_management/classic.tsx | 142 +++++++++ .../stream_detail_management/index.tsx | 193 +---------- .../stream_detail_management/wired.tsx | 68 ++++ .../stream_detail_management/wrapper.tsx | 57 ++++ .../api_integration/apis/streams/classic.ts | 163 ++++++++++ .../apis/streams/helpers/requests.ts | 12 + .../api_integration/apis/streams/index.ts | 1 + 16 files changed, 796 insertions(+), 286 deletions(-) create mode 100644 x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx create mode 100644 x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wired.tsx create mode 100644 x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wrapper.tsx create mode 100644 x-pack/test/api_integration/apis/streams/classic.ts diff --git a/x-pack/solutions/observability/plugins/streams/common/types.ts b/x-pack/solutions/observability/plugins/streams/common/types.ts index 59cdd1cf9c4b9..3d8e0fc0d390c 100644 --- a/x-pack/solutions/observability/plugins/streams/common/types.ts +++ b/x-pack/solutions/observability/plugins/streams/common/types.ts @@ -87,22 +87,21 @@ export type StreamChild = z.infer; export const streamWithoutIdDefinitonSchema = z.object({ processing: z.array(processingDefinitionSchema).default([]), fields: z.array(fieldDefinitionSchema).default([]), + managed: z.boolean().default(true), children: z.array(streamChildSchema).default([]), }); export type StreamWithoutIdDefinition = z.infer; +export const unmanagedElasticsearchAsset = z.object({ + type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']), + id: z.string(), +}); +export type UnmanagedElasticsearchAsset = z.infer; + export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({ id: z.string(), - managed: z.boolean().default(true), - unmanaged_elasticsearch_assets: z.optional( - z.array( - z.object({ - type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']), - id: z.string(), - }) - ) - ), + unmanaged_elasticsearch_assets: z.optional(z.array(unmanagedElasticsearchAsset)), }); export type StreamDefinition = z.infer; diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts index eb09df8831304..e7c9c784a8123 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts @@ -12,20 +12,24 @@ import { logsDefaultPipelineProcessors } from './logs_default_pipeline'; import { isRoot } from '../helpers/hierarchy'; import { getProcessingPipelineName } from './name'; +function generateProcessingSteps(definition: StreamDefinition) { + return definition.processing.map((processor) => { + const { type, ...config } = processor.config; + return { + [type]: { + ...config, + if: processor.condition ? conditionToPainless(processor.condition) : undefined, + }, + }; + }); +} + export function generateIngestPipeline(id: string, definition: StreamDefinition) { return { id: getProcessingPipelineName(id), processors: [ ...(isRoot(definition.id) ? logsDefaultPipelineProcessors : []), - ...definition.processing.map((processor) => { - const { type, ...config } = processor.config; - return { - [type]: { - ...config, - if: processor.condition ? conditionToPainless(processor.condition) : undefined, - }, - }; - }), + ...generateProcessingSteps(definition), { pipeline: { name: `${id}@stream.reroutes`, @@ -40,3 +44,14 @@ export function generateIngestPipeline(id: string, definition: StreamDefinition) version: ASSET_VERSION, }; } + +export function generateClassicIngestPipelineBody(definition: StreamDefinition) { + return { + processors: generateProcessingSteps(definition), + _meta: { + description: `Stream-managed pipeline for the ${definition.id} stream`, + managed: true, + }, + version: ASSET_VERSION, + }; +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts index c158280bec161..5066ecd61a601 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts @@ -7,6 +7,8 @@ import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; +import { IngestPipeline, IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types'; +import { set } from '@kbn/safer-lodash-set'; import { IndicesDataStream } from '@elastic/elasticsearch/lib/api/types'; import { STREAMS_INDEX } from '../../../common/constants'; import { FieldDefinition, StreamDefinition } from '../../../common/types'; @@ -24,7 +26,10 @@ import { getAncestors } from './helpers/hierarchy'; import { generateIndexTemplate } from './index_templates/generate_index_template'; import { deleteTemplate, upsertTemplate } from './index_templates/manage_index_templates'; import { getIndexTemplateName } from './index_templates/name'; -import { generateIngestPipeline } from './ingest_pipelines/generate_ingest_pipeline'; +import { + generateClassicIngestPipelineBody, + generateIngestPipeline, +} from './ingest_pipelines/generate_ingest_pipeline'; import { generateReroutePipeline } from './ingest_pipelines/generate_reroute_pipeline'; import { deleteIngestPipeline, @@ -45,6 +50,63 @@ interface DeleteStreamParams extends BaseParams { logger: Logger; } +export async function deleteUnmanagedStreamObjects({ + id, + scopedClusterClient, + logger, +}: DeleteStreamParams) { + const unmanagedAssets = await getUnmanagedElasticsearchAssets({ + name: id, + scopedClusterClient, + }); + const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id; + if (pipelineName) { + const { targetPipelineName, targetPipeline, referencesStreamManagedPipeline } = + await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, id); + if (referencesStreamManagedPipeline) { + const streamManagedPipelineName = getProcessingPipelineName(id); + const updatedProcessors = targetPipeline.processors!.filter( + (processor) => + !(processor.pipeline && processor.pipeline.name === streamManagedPipelineName) + ); + await scopedClusterClient.asCurrentUser.ingest.putPipeline({ + id: targetPipelineName, + body: { + processors: updatedProcessors, + }, + }); + } + } + await deleteDataStream({ + esClient: scopedClusterClient.asCurrentUser, + name: id, + logger, + }); + try { + await deleteIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + id: getProcessingPipelineName(id), + logger, + }); + } catch (e) { + // if the pipeline doesn't exist, we don't need to delete it + if (!(e.meta?.statusCode === 404)) { + throw e; + } + } + try { + await scopedClusterClient.asInternalUser.delete({ + id, + index: STREAMS_INDEX, + refresh: 'wait_for', + }); + } catch (e) { + if (e.meta?.statusCode !== 404) { + throw e; + } + } +} + export async function deleteStreamObjects({ id, scopedClusterClient, logger }: DeleteStreamParams) { await deleteDataStream({ esClient: scopedClusterClient.asCurrentUser, @@ -82,7 +144,7 @@ async function upsertInternalStream({ definition, scopedClusterClient }: BasePar return scopedClusterClient.asInternalUser.index({ id: definition.id, index: STREAMS_INDEX, - document: { ...definition, managed: true }, + document: { ...definition }, refresh: 'wait_for', }); } @@ -103,14 +165,20 @@ export async function listStreams({ }); const dataStreams = await listDataStreamsAsStreams({ scopedClusterClient }); - let definitions = response.hits.hits.map((hit) => ({ ...hit._source!, managed: true })); + let definitions = response.hits.hits.map((hit) => ({ ...hit._source! })); const hasAccess = await Promise.all( definitions.map((definition) => checkReadAccess({ id: definition.id, scopedClusterClient })) ); definitions = definitions.filter((_, index) => hasAccess[index]); + const definitionMap = new Map(definitions.map((definition) => [definition.id, definition])); + dataStreams.forEach((dataStream) => { + if (!definitionMap.has(dataStream.id)) { + definitionMap.set(dataStream.id, dataStream); + } + }); return { - definitions: [...definitions, ...dataStreams], + definitions: Array.from(definitionMap.values()), }; } @@ -158,7 +226,6 @@ export async function readStream({ return { definition: { ...definition, - managed: true, }, }; } catch (e) { @@ -169,35 +236,42 @@ export async function readStream({ } } -export async function readDataStreamAsStream({ - id, +export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadStreamParams) { + const definition: StreamDefinition = { + id, + managed: false, + children: [], + fields: [], + processing: [], + }; + + definition.unmanaged_elasticsearch_assets = await getUnmanagedElasticsearchAssets({ + name: id, + scopedClusterClient, + }); + + return { definition }; +} + +interface ReadUnmanagedAssetsParams extends BaseParams { + name: string; +} + +async function getUnmanagedElasticsearchAssets({ + name, scopedClusterClient, - skipAccessCheck, -}: ReadStreamParams) { +}: ReadUnmanagedAssetsParams) { let dataStream: IndicesDataStream; try { - const response = await scopedClusterClient.asInternalUser.indices.getDataStream({ name: id }); + const response = await scopedClusterClient.asInternalUser.indices.getDataStream({ name }); dataStream = response.data_streams[0]; } catch (e) { if (e.meta?.statusCode === 404) { - throw new DefinitionNotFound(`Stream definition for ${id} not found.`); + throw new DefinitionNotFound(`Stream definition for ${name} not found.`); } throw e; } - const definition: StreamDefinition = { - id, - managed: false, - children: [], - fields: [], - processing: [], - }; - if (!skipAccessCheck) { - const hasAccess = await checkReadAccess({ id, scopedClusterClient }); - if (!hasAccess) { - throw new DefinitionNotFound(`Stream definition for ${id} not found.`); - } - } // retrieve linked index template, component template and ingest pipeline const templateName = dataStream.template; const componentTemplates: string[] = []; @@ -215,9 +289,9 @@ export async function readDataStreamAsStream({ }); const ingestPipelineId = currentIndex[writeIndexName].settings?.index?.default_pipeline!; - definition.unmanaged_elasticsearch_assets = [ + return [ { - type: 'ingest_pipeline', + type: 'ingest_pipeline' as const, id: ingestPipelineId, }, ...componentTemplates.map((componentTemplateName) => ({ @@ -225,16 +299,14 @@ export async function readDataStreamAsStream({ id: componentTemplateName, })), { - type: 'index_template', + type: 'index_template' as const, id: templateName, }, { - type: 'data_stream', - id, + type: 'data_stream' as const, + id: name, }, ]; - - return { definition }; } interface ReadAncestorsParams extends BaseParams { @@ -378,8 +450,12 @@ export async function syncStream({ logger, }: SyncStreamParams) { if (!definition.managed) { - // TODO For now, we just don't allow reads at all - later on we will relax this to allow certain operations, but they will use a completely different syncing logic - throw new Error('Cannot sync an unmanaged stream'); + await syncUnmanagedStream({ scopedClusterClient, definition, logger }); + await upsertInternalStream({ + scopedClusterClient, + definition, + }); + return; } const componentTemplate = generateLayer(definition.id, definition); await upsertComponent({ @@ -432,6 +508,163 @@ export async function syncStream({ }); } +interface ExecutionPlanStep { + method: string; + path: string; + body?: Record; +} + +async function syncUnmanagedStream({ scopedClusterClient, definition, logger }: SyncStreamParams) { + if (definition.managed) { + throw new Error('Got an unmanaged stream that is marked as managed'); + } + if (definition.fields.length) { + throw new Error( + 'Unmanaged streams cannot have managed fields, please edit the component templates directly' + ); + } + if (definition.children.length) { + throw new Error('Unmanaged streams cannot have managed children, coming soon'); + } + const unmanagedAssets = await getUnmanagedElasticsearchAssets({ + name: definition.id, + scopedClusterClient, + }); + const executionPlan: ExecutionPlanStep[] = []; + const streamManagedPipelineName = getProcessingPipelineName(definition.id); + const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id; + if (!pipelineName) { + throw new Error('Unmanaged stream needs a default ingest pipeline'); + } + if (pipelineName === streamManagedPipelineName) { + throw new Error('Unmanaged stream cannot have the @stream pipeline as the default pipeline'); + } + await ensureStreamManagedPipelineReference( + scopedClusterClient, + pipelineName, + definition, + executionPlan + ); + + if (definition.processing.length) { + // if the stream has processing, we need to create or update the stream managed pipeline + executionPlan.push({ + method: 'PUT', + path: `/_ingest/pipeline/${streamManagedPipelineName}`, + body: generateClassicIngestPipelineBody(definition), + }); + } else { + const pipelineExists = Boolean( + await tryGettingPipeline({ scopedClusterClient, id: streamManagedPipelineName }) + ); + // no processing, just delete the pipeline if it exists. The reference to the pipeline won't break anything + if (pipelineExists) { + executionPlan.push({ + method: 'DELETE', + path: `/_ingest/pipeline/${streamManagedPipelineName}`, + }); + } + } + + await executePlan(executionPlan, scopedClusterClient); +} + +async function executePlan( + executionPlan: ExecutionPlanStep[], + scopedClusterClient: IScopedClusterClient +) { + for (const step of executionPlan) { + await scopedClusterClient.asCurrentUser.transport.request({ + method: step.method, + path: step.path, + body: step.body, + }); + } +} + +async function findStreamManagedPipelineReference( + scopedClusterClient: IScopedClusterClient, + pipelineName: string, + streamId: string +): Promise<{ + targetPipelineName: string; + targetPipeline: IngestPipeline; + referencesStreamManagedPipeline: boolean; +}> { + const streamManagedPipelineName = getProcessingPipelineName(streamId); + const pipeline = (await tryGettingPipeline({ scopedClusterClient, id: pipelineName })) || { + processors: [], + }; + const streamProcessor = pipeline.processors?.find( + (processor) => processor.pipeline && processor.pipeline.name === streamManagedPipelineName + ); + const customProcessor = pipeline.processors?.findLast( + (processor) => processor.pipeline && processor.pipeline.name.endsWith('@custom') + ); + if (streamProcessor) { + return { + targetPipelineName: pipelineName, + targetPipeline: pipeline, + referencesStreamManagedPipeline: true, + }; + } + if (customProcessor) { + // go one level deeper, find the latest @custom leaf pipeline + return await findStreamManagedPipelineReference( + scopedClusterClient, + customProcessor.pipeline!.name, + streamId + ); + } + return { + targetPipelineName: pipelineName, + targetPipeline: pipeline, + referencesStreamManagedPipeline: false, + }; +} + +async function ensureStreamManagedPipelineReference( + scopedClusterClient: IScopedClusterClient, + pipelineName: string, + definition: StreamDefinition, + executionPlan: ExecutionPlanStep[] +) { + const streamManagedPipelineName = getProcessingPipelineName(definition.id); + const { targetPipelineName, targetPipeline, referencesStreamManagedPipeline } = + await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, definition.id); + if (!referencesStreamManagedPipeline) { + const callStreamManagedPipelineProcessor: IngestProcessorContainer = { + pipeline: { + name: streamManagedPipelineName, + if: `ctx._index == '${definition.id}'`, + ignore_missing_pipeline: true, + description: + "Call the stream's managed pipeline - do not change this manually but instead use the streams UI or API", + }, + }; + executionPlan.push({ + method: 'PUT', + path: `/_ingest/pipeline/${targetPipelineName}`, + body: set( + { ...targetPipeline }, + 'processors', + (targetPipeline.processors || []).concat(callStreamManagedPipelineProcessor) + ), + }); + } +} + +async function tryGettingPipeline({ scopedClusterClient, id }: ReadStreamParams) { + try { + return (await scopedClusterClient.asCurrentUser.ingest.getPipeline({ id }))[id]; + } catch (e) { + if (e.meta?.statusCode === 404) { + return; + } + throw e; + } +} + export async function streamsEnabled({ scopedClusterClient }: BaseParams) { return await scopedClusterClient.asInternalUser.indices.exists({ index: STREAMS_INDEX, diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts index a2092838792cf..d6bf5fbb84d8f 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts @@ -16,7 +16,12 @@ import { SecurityException, } from '../../lib/streams/errors'; import { createServerRoute } from '../create_server_route'; -import { syncStream, readStream, deleteStreamObjects } from '../../lib/streams/stream_crud'; +import { + syncStream, + readStream, + deleteStreamObjects, + deleteUnmanagedStreamObjects, +} from '../../lib/streams/stream_crud'; import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; import { getParentId } from '../../lib/streams/helpers/hierarchy'; @@ -47,14 +52,6 @@ export const deleteStreamRoute = createServerRoute({ try { const { scopedClusterClient } = await getScopedClients({ request }); - const parentId = getParentId(params.path.id); - if (!parentId) { - throw new MalformedStreamId('Cannot delete root stream'); - } - - // need to update parent first to cut off documents streaming down - await updateParentStream(scopedClusterClient, params.path.id, parentId, logger); - await deleteStream(scopedClusterClient, params.path.id, logger); return { acknowledged: true }; @@ -83,6 +80,18 @@ export async function deleteStream( ) { try { const { definition } = await readStream({ scopedClusterClient, id }); + if (!definition.managed) { + await deleteUnmanagedStreamObjects({ scopedClusterClient, id, logger }); + return; + } + + const parentId = getParentId(id); + if (!parentId) { + throw new MalformedStreamId('Cannot delete root stream'); + } + + // need to update parent first to cut off documents streaming down + await updateParentStream(scopedClusterClient, id, parentId, logger); for (const child of definition.children) { await deleteStream(scopedClusterClient, child.id, logger); } diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts index e280796bc9780..19867018ce25f 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts @@ -50,6 +50,17 @@ export const editStreamRoute = createServerRoute({ handler: async ({ response, params, logger, request, getScopedClients }) => { try { const { scopedClusterClient } = await getScopedClients({ request }); + const streamDefinition = { ...params.body, id: params.path.id }; + + if (!streamDefinition.managed) { + await syncStream({ + scopedClusterClient, + definition: { ...streamDefinition, id: params.path.id }, + rootDefinition: undefined, + logger, + }); + return { acknowledged: true }; + } await validateStreamChildren(scopedClusterClient, params.path.id, params.body.children); await validateAncestorFields(scopedClusterClient, params.path.id, params.body.fields); @@ -58,8 +69,6 @@ export const editStreamRoute = createServerRoute({ const parentId = getParentId(params.path.id); let parentDefinition: StreamDefinition | undefined; - const streamDefinition = { ...params.body, id: params.path.id }; - // always need to go from the leaves to the parent when syncing ingest pipelines, otherwise data // will be routed before the data stream is ready diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts index cfcb97f9b3581..ee49a93aaacbb 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts @@ -31,12 +31,12 @@ export const enableStreamsRoute = createServerRoute({ response, logger, getScopedClients, - }): Promise<{ acknowledged: true }> => { + }): Promise<{ acknowledged: true; message: string }> => { try { const { scopedClusterClient } = await getScopedClients({ request }); const alreadyEnabled = await streamsEnabled({ scopedClusterClient }); if (alreadyEnabled) { - return { acknowledged: true }; + return { acknowledged: true, message: 'Streams was already enabled' }; } await createStreamsIndex(scopedClusterClient); await syncStream({ @@ -44,7 +44,11 @@ export const enableStreamsRoute = createServerRoute({ definition: rootStreamDefinition, logger, }); - return { acknowledged: true }; + return { + acknowledged: true, + message: + 'Streams enabled - reload your browser window to show the streams UI in the navigation', + }; } catch (e) { if (e instanceof SecurityException) { throw badRequest(e); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts index 070dc66b9ab10..9ec61d27619e2 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts @@ -58,7 +58,11 @@ export const forkStreamsRoute = createServerRoute({ id: params.path.id, }); - const childDefinition = { ...params.body.stream, children: [], managed: true }; + if (rootDefinition.managed === false) { + throw new MalformedStreamId('Cannot fork a stream that is not managed'); + } + + const childDefinition = { ...params.body.stream, children: [] }; // check whether root stream has a child of the given name already if (rootDefinition.children.some((child) => child.id === childDefinition.id)) { diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts index 774a256e5ba4a..f98c635830bda 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts @@ -29,14 +29,14 @@ export const listStreamsRoute = createServerRoute({ response, request, getScopedClients, - }): Promise<{ definitions: StreamDefinition[]; trees: StreamTree[] }> => { + }): Promise<{ definitions: StreamDefinition[] }> => { try { const { scopedClusterClient } = await getScopedClients({ request }); const { definitions } = await listStreams({ scopedClusterClient }); - const trees = asTrees(definitions); - - return { definitions, trees }; + return { + definitions, + }; } catch (e) { if (e instanceof DefinitionNotFound) { throw notFound(e); @@ -46,33 +46,3 @@ export const listStreamsRoute = createServerRoute({ } }, }); - -export interface StreamTree { - id: string; - children: StreamTree[]; -} - -function asTrees(definitions: Array<{ id: string; managed?: boolean }>) { - const trees: StreamTree[] = []; - const ids = definitions - .filter((definition) => definition.managed) - .map((definition) => definition.id); - - ids.sort((a, b) => a.split('.').length - b.split('.').length); - - ids.forEach((id) => { - let currentTree = trees; - let existingNode: StreamTree | undefined; - // traverse the tree following the prefix of the current id. - // once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth - while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) { - currentTree = existingNode.children; - } - if (!existingNode) { - const newNode = { id, children: [] }; - currentTree.push(newNode); - } - }); - - return trees; -} diff --git a/x-pack/solutions/observability/plugins/streams/tsconfig.json b/x-pack/solutions/observability/plugins/streams/tsconfig.json index 4cec623f68286..08ed4e1648af7 100644 --- a/x-pack/solutions/observability/plugins/streams/tsconfig.json +++ b/x-pack/solutions/observability/plugins/streams/tsconfig.json @@ -29,6 +29,7 @@ "@kbn/licensing-plugin", "@kbn/server-route-repository-client", "@kbn/observability-utils-server", - "@kbn/observability-utils-common" + "@kbn/observability-utils-common", + "@kbn/safer-lodash-set" ] } diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx new file mode 100644 index 0000000000000..1664b322b5a8e --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import React from 'react'; +import { i18n } from '@kbn/i18n'; +import { ReadStreamDefinition, StreamDefinition } from '@kbn/streams-plugin/common'; +import { EuiFlexGroup, EuiListGroup, EuiText } from '@elastic/eui'; +import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; +import { RedirectTo } from '../redirect_to'; +import { StreamDetailEnriching } from '../stream_detail_enriching'; +import { useKibana } from '../../hooks/use_kibana'; +import { Wrapper } from './wrapper'; + +type ManagementSubTabs = 'enrich' | 'overview'; + +function isValidManagementSubTab(value: string): value is ManagementSubTabs { + return ['enrich', 'overview'].includes(value); +} + +export function ClassicStreamDetailManagement({ + definition, + refreshDefinition, +}: { + definition: ReadStreamDefinition; + refreshDefinition: () => void; +}) { + const { + path: { key, subtab }, + } = useStreamsAppParams('/{key}/management/{subtab}'); + + const tabs = { + overview: { + content: , + label: i18n.translate('xpack.streams.streamDetailView.overviewTab', { + defaultMessage: 'Overview', + }), + }, + enrich: { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.enrichingTab', { + defaultMessage: 'Extract field', + }), + }, + }; + + if (!isValidManagementSubTab(subtab)) { + return ( + + ); + } + + return ; +} + +function UnmanagedStreamOverview({ definition }: { definition: StreamDefinition }) { + const { + core: { + http: { basePath }, + }, + } = useKibana(); + const groupedAssets = (definition.unmanaged_elasticsearch_assets ?? []).reduce((acc, asset) => { + const title = assetToTitle(asset); + if (title) { + acc[title] = acc[title] ?? []; + acc[title].push(asset); + } + return acc; + }, {} as Record>); + return ( + + +

+ {i18n.translate('xpack.streams.streamDetailView.unmanagedStreamOverview', { + defaultMessage: + 'This stream is not managed. Follow the links to stack management to change the related Elasticsearch objects.', + })} +

+
+ {Object.entries(groupedAssets).map(([title, assets]) => ( +
+ +

{title}

+
+ ({ + label: asset.id, + href: basePath.prepend(assetToLink(asset)), + iconType: 'index', + target: '_blank', + }))} + /> +
+ ))} +
+ ); +} + +function assetToLink(asset: { type: string; id: string }) { + switch (asset.type) { + case 'index_template': + return `/app/management/data/index_management/templates/${asset.id}`; + case 'component_template': + return `/app/management/data/index_management/component_templates/${asset.id}`; + case 'data_stream': + return `/app/management/data/index_management/data_streams/${asset.id}`; + case 'ingest_pipeline': + return `/app/management/ingest/ingest_pipelines?pipeline=${asset.id}`; + default: + return ''; + } +} + +function assetToTitle(asset: { type: string; id: string }) { + switch (asset.type) { + case 'index_template': + return i18n.translate('xpack.streams.streamDetailView.indexTemplate', { + defaultMessage: 'Index template', + }); + case 'component_template': + return i18n.translate('xpack.streams.streamDetailView.componentTemplate', { + defaultMessage: 'Component template', + }); + case 'data_stream': + return i18n.translate('xpack.streams.streamDetailView.dataStream', { + defaultMessage: 'Data stream', + }); + case 'ingest_pipeline': + return i18n.translate('xpack.streams.streamDetailView.ingestPipeline', { + defaultMessage: 'Ingest pipeline', + }); + default: + return ''; + } +} diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx index 4f3d207c91789..c093f05c03210 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx @@ -5,23 +5,9 @@ * 2.0. */ import React from 'react'; -import { i18n } from '@kbn/i18n'; -import { ReadStreamDefinition, StreamDefinition } from '@kbn/streams-plugin/common'; -import { css } from '@emotion/css'; -import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem, EuiListGroup, EuiText } from '@elastic/eui'; -import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; -import { RedirectTo } from '../redirect_to'; -import { useStreamsAppRouter } from '../../hooks/use_streams_app_router'; -import { StreamDetailRouting } from '../stream_detail_routing'; -import { StreamDetailEnriching } from '../stream_detail_enriching'; -import { StreamDetailSchemaEditor } from '../stream_detail_schema_editor'; -import { useKibana } from '../../hooks/use_kibana'; - -type ManagementSubTabs = 'route' | 'enrich' | 'schemaEditor'; - -function isValidManagementSubTab(value: string): value is ManagementSubTabs { - return ['route', 'enrich', 'schemaEditor'].includes(value); -} +import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; +import { WiredStreamDetailManagement } from './wired'; +import { ClassicStreamDetailManagement } from './classic'; export function StreamDetailManagement({ definition, @@ -30,180 +16,17 @@ export function StreamDetailManagement({ definition?: ReadStreamDefinition; refreshDefinition: () => void; }) { - const { - path: { key, subtab }, - } = useStreamsAppParams('/{key}/management/{subtab}'); - const router = useStreamsAppRouter(); - - if (subtab === 'overview') { - if (!definition) { - return null; - } - if (definition.managed) { - return ( - - ); - } - return ; - } - - const tabs = { - route: { - content: ( - - ), - label: i18n.translate('xpack.streams.streamDetailView.routingTab', { - defaultMessage: 'Streams Partitioning', - }), - }, - enrich: { - content: ( - - ), - label: i18n.translate('xpack.streams.streamDetailView.enrichingTab', { - defaultMessage: 'Extract field', - }), - }, - schemaEditor: { - content: ( - - ), - label: i18n.translate('xpack.streams.streamDetailView.schemaEditorTab', { - defaultMessage: 'Schema editor', - }), - }, - }; - - if (!isValidManagementSubTab(subtab)) { - return ( - - ); + if (!definition) { + return null; } - if (definition && !definition.managed) { + if (definition.managed) { return ( - + ); } - const selectedTabObject = tabs[subtab]; - - return ( - - - { - router.push('/{key}/management/{subtab}', { - path: { key, subtab: optionId }, - query: {}, - }); - }} - options={Object.keys(tabs).map((id) => ({ - id, - label: tabs[id as ManagementSubTabs].label, - }))} - /> - - - {selectedTabObject.content} - - - ); -} - -function assetToLink(asset: { type: string; id: string }) { - switch (asset.type) { - case 'index_template': - return `/app/management/data/index_management/templates/${asset.id}`; - case 'component_template': - return `/app/management/data/index_management/component_templates/${asset.id}`; - case 'data_stream': - return `/app/management/data/index_management/data_streams/${asset.id}`; - case 'ingest_pipeline': - return `/app/management/ingest/ingest_pipelines?pipeline=${asset.id}`; - default: - return ''; - } -} - -function assetToTitle(asset: { type: string; id: string }) { - switch (asset.type) { - case 'index_template': - return i18n.translate('xpack.streams.streamDetailView.indexTemplate', { - defaultMessage: 'Index template', - }); - case 'component_template': - return i18n.translate('xpack.streams.streamDetailView.componentTemplate', { - defaultMessage: 'Component template', - }); - case 'data_stream': - return i18n.translate('xpack.streams.streamDetailView.dataStream', { - defaultMessage: 'Data stream', - }); - case 'ingest_pipeline': - return i18n.translate('xpack.streams.streamDetailView.ingestPipeline', { - defaultMessage: 'Ingest pipeline', - }); - default: - return ''; - } -} - -function UnmanagedStreamOverview({ definition }: { definition: StreamDefinition }) { - const { - core: { - http: { basePath }, - }, - } = useKibana(); - const groupedAssets = (definition.unmanaged_elasticsearch_assets ?? []).reduce((acc, asset) => { - const title = assetToTitle(asset); - if (title) { - acc[title] = acc[title] ?? []; - acc[title].push(asset); - } - return acc; - }, {} as Record>); return ( - - -

- {i18n.translate('xpack.streams.streamDetailView.unmanagedStreamOverview', { - defaultMessage: - 'This stream is not managed. Follow the links to stack management to change the related Elasticsearch objects.', - })} -

-
- {Object.entries(groupedAssets).map(([title, assets]) => ( -
- -

{title}

-
- ({ - label: asset.id, - href: basePath.prepend(assetToLink(asset)), - iconType: 'index', - target: '_blank', - }))} - /> -
- ))} -
+ ); } diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wired.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wired.tsx new file mode 100644 index 0000000000000..5f8c4e57bf7d1 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wired.tsx @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import React from 'react'; +import { i18n } from '@kbn/i18n'; +import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; +import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; +import { RedirectTo } from '../redirect_to'; +import { StreamDetailRouting } from '../stream_detail_routing'; +import { StreamDetailEnriching } from '../stream_detail_enriching'; +import { StreamDetailSchemaEditor } from '../stream_detail_schema_editor'; +import { Wrapper } from './wrapper'; + +type ManagementSubTabs = 'route' | 'enrich' | 'schemaEditor'; + +function isValidManagementSubTab(value: string): value is ManagementSubTabs { + return ['route', 'enrich', 'schemaEditor'].includes(value); +} + +export function WiredStreamDetailManagement({ + definition, + refreshDefinition, +}: { + definition?: ReadStreamDefinition; + refreshDefinition: () => void; +}) { + const { + path: { key, subtab }, + } = useStreamsAppParams('/{key}/management/{subtab}'); + + const tabs = { + route: { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.routingTab', { + defaultMessage: 'Streams Partitioning', + }), + }, + enrich: { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.enrichingTab', { + defaultMessage: 'Extract field', + }), + }, + schemaEditor: { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.schemaEditorTab', { + defaultMessage: 'Schema editor', + }), + }, + }; + + if (!isValidManagementSubTab(subtab)) { + return ( + + ); + } + + return ; +} diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wrapper.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wrapper.tsx new file mode 100644 index 0000000000000..92d80924298fc --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wrapper.tsx @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem } from '@elastic/eui'; +import React from 'react'; +import { css } from '@emotion/css'; +import { useStreamsAppRouter } from '../../hooks/use_streams_app_router'; + +export function Wrapper({ + tabs, + streamId, + subtab, +}: { + tabs: Record; + streamId: string; + subtab: string; +}) { + const router = useStreamsAppRouter(); + return ( + + + { + router.push('/{key}/management/{subtab}', { + path: { key: streamId, subtab: optionId }, + query: {}, + }); + }} + options={Object.keys(tabs).map((id) => ({ + id, + label: tabs[id].label, + }))} + /> + + + {tabs[subtab].content} + + + ); +} diff --git a/x-pack/test/api_integration/apis/streams/classic.ts b/x-pack/test/api_integration/apis/streams/classic.ts new file mode 100644 index 0000000000000..25a7238a757ca --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/classic.ts @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { JsonObject } from '@kbn/utility-types'; +import { + deleteStream, + enableStreams, + fetchDocument, + getStream, + indexDocument, + listStreams, + putStream, +} from './helpers/requests'; +import { FtrProviderContext } from '../../ftr_provider_context'; +import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers'; +import { cleanUpRootStream } from './helpers/cleanup'; + +export default function ({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const esClient = getService('es'); + const retryService = getService('retry'); + const logger = getService('log'); + + describe('Classic streams', () => { + after(async () => { + await cleanUpRootStream(esClient); + }); + + before(async () => { + await enableStreams(supertest); + }); + + it('Shows non-wired data streams', async () => { + const doc = { + message: '2023-01-01T00:00:10.000Z error test', + }; + const response = await indexDocument(esClient, 'logs-test-default', doc); + expect(response.result).to.eql('created'); + const streams = await listStreams(supertest); + const classicStream = streams.definitions.find( + (stream: JsonObject) => stream.id === 'logs-test-default' + ); + expect(classicStream).to.eql({ + id: 'logs-test-default', + managed: false, + children: [], + fields: [], + processing: [], + }); + }); + + it('Allows setting processing on classic streams', async () => { + const response = await putStream(supertest, 'logs-test-default', { + managed: false, + children: [], + fields: [], + processing: [ + { + config: { + type: 'grok', + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + }, + ], + }); + expect(response).to.have.property('acknowledged', true); + const streamBody = await getStream(supertest, 'logs-test-default'); + expect(streamBody).to.eql({ + id: 'logs-test-default', + managed: false, + children: [], + inheritedFields: [], + fields: [], + processing: [ + { + config: { + type: 'grok', + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + }, + ], + }); + }); + + it('Executes processing on classic streams', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + }; + const response = await indexDocument(esClient, 'logs-test-default', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ + esClient, + indexName: 'logs-test-default', + retryService, + logger, + docCountTarget: 2, + }); + const result = await fetchDocument(esClient, 'logs-test-default', response._id); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + inner_timestamp: '2023-01-01T00:00:10.000Z', + message2: 'test', + log: { + level: 'error', + }, + }); + }); + + it('Allows removing processing on classic streams', async () => { + const response = await putStream(supertest, 'logs-test-default', { + managed: false, + children: [], + fields: [], + processing: [], + }); + expect(response).to.have.property('acknowledged', true); + }); + + it('Executes processing on classic streams after removing processing', async () => { + const doc = { + // default logs pipeline fills in timestamp with current date if not set + message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + }; + const response = await indexDocument(esClient, 'logs-test-default', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ + esClient, + indexName: 'logs-test-default', + retryService, + logger, + docCountTarget: 3, + }); + const result = await fetchDocument(esClient, 'logs-test-default', response._id); + expect(result._source).to.eql({ + // accept any date + '@timestamp': (result._source as { [key: string]: unknown })['@timestamp'], + message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + }); + }); + + it('Allows deleting classic streams', async () => { + await deleteStream(supertest, 'logs-test-default'); + const streams = await listStreams(supertest); + const classicStream = streams.definitions.find( + (stream: JsonObject) => stream.id === 'logs-test-default' + ); + expect(classicStream).to.eql(undefined); + }); + }); +} diff --git a/x-pack/test/api_integration/apis/streams/helpers/requests.ts b/x-pack/test/api_integration/apis/streams/helpers/requests.ts index 7d656e4aacf5e..43e7f02b7a750 100644 --- a/x-pack/test/api_integration/apis/streams/helpers/requests.ts +++ b/x-pack/test/api_integration/apis/streams/helpers/requests.ts @@ -42,6 +42,18 @@ export async function putStream(supertest: Agent, name: string, body: JsonObject return response.body; } +export async function getStream(supertest: Agent, name: string) { + const req = supertest.get(`/api/streams/${name}`).set('kbn-xsrf', 'xxx'); + const response = await req.send().expect(200); + return response.body; +} + +export async function listStreams(supertest: Agent) { + const req = supertest.get(`/api/streams`).set('kbn-xsrf', 'xxx'); + const response = await req.send().expect(200); + return response.body; +} + export async function deleteStream(supertest: Agent, id: string) { const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx'); const response = await req.send().expect(200); diff --git a/x-pack/test/api_integration/apis/streams/index.ts b/x-pack/test/api_integration/apis/streams/index.ts index 4ef3fce385248..14decb2400196 100644 --- a/x-pack/test/api_integration/apis/streams/index.ts +++ b/x-pack/test/api_integration/apis/streams/index.ts @@ -11,6 +11,7 @@ export default function ({ loadTestFile }: FtrProviderContext) { describe('Streams Endpoints', () => { loadTestFile(require.resolve('./full_flow')); loadTestFile(require.resolve('./enrichment')); + loadTestFile(require.resolve('./classic')); loadTestFile(require.resolve('./flush_config')); }); }