diff --git a/x-pack/solutions/observability/plugins/streams/common/types.ts b/x-pack/solutions/observability/plugins/streams/common/types.ts index 59cdd1cf9c4b9..3d8e0fc0d390c 100644 --- a/x-pack/solutions/observability/plugins/streams/common/types.ts +++ b/x-pack/solutions/observability/plugins/streams/common/types.ts @@ -87,22 +87,21 @@ export type StreamChild = z.infer; export const streamWithoutIdDefinitonSchema = z.object({ processing: z.array(processingDefinitionSchema).default([]), fields: z.array(fieldDefinitionSchema).default([]), + managed: z.boolean().default(true), children: z.array(streamChildSchema).default([]), }); export type StreamWithoutIdDefinition = z.infer; +export const unmanagedElasticsearchAsset = z.object({ + type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']), + id: z.string(), +}); +export type UnmanagedElasticsearchAsset = z.infer; + export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({ id: z.string(), - managed: z.boolean().default(true), - unmanaged_elasticsearch_assets: z.optional( - z.array( - z.object({ - type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']), - id: z.string(), - }) - ) - ), + unmanaged_elasticsearch_assets: z.optional(z.array(unmanagedElasticsearchAsset)), }); export type StreamDefinition = z.infer; diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts index eb09df8831304..e7c9c784a8123 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts @@ -12,20 +12,24 @@ import { logsDefaultPipelineProcessors } from './logs_default_pipeline'; import { isRoot } from '../helpers/hierarchy'; import { getProcessingPipelineName } from './name'; +function generateProcessingSteps(definition: StreamDefinition) { + return definition.processing.map((processor) => { + const { type, ...config } = processor.config; + return { + [type]: { + ...config, + if: processor.condition ? conditionToPainless(processor.condition) : undefined, + }, + }; + }); +} + export function generateIngestPipeline(id: string, definition: StreamDefinition) { return { id: getProcessingPipelineName(id), processors: [ ...(isRoot(definition.id) ? logsDefaultPipelineProcessors : []), - ...definition.processing.map((processor) => { - const { type, ...config } = processor.config; - return { - [type]: { - ...config, - if: processor.condition ? conditionToPainless(processor.condition) : undefined, - }, - }; - }), + ...generateProcessingSteps(definition), { pipeline: { name: `${id}@stream.reroutes`, @@ -40,3 +44,14 @@ export function generateIngestPipeline(id: string, definition: StreamDefinition) version: ASSET_VERSION, }; } + +export function generateClassicIngestPipelineBody(definition: StreamDefinition) { + return { + processors: generateProcessingSteps(definition), + _meta: { + description: `Stream-managed pipeline for the ${definition.id} stream`, + managed: true, + }, + version: ASSET_VERSION, + }; +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts index c158280bec161..5066ecd61a601 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts @@ -7,6 +7,8 @@ import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; +import { IngestPipeline, IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types'; +import { set } from '@kbn/safer-lodash-set'; import { IndicesDataStream } from '@elastic/elasticsearch/lib/api/types'; import { STREAMS_INDEX } from '../../../common/constants'; import { FieldDefinition, StreamDefinition } from '../../../common/types'; @@ -24,7 +26,10 @@ import { getAncestors } from './helpers/hierarchy'; import { generateIndexTemplate } from './index_templates/generate_index_template'; import { deleteTemplate, upsertTemplate } from './index_templates/manage_index_templates'; import { getIndexTemplateName } from './index_templates/name'; -import { generateIngestPipeline } from './ingest_pipelines/generate_ingest_pipeline'; +import { + generateClassicIngestPipelineBody, + generateIngestPipeline, +} from './ingest_pipelines/generate_ingest_pipeline'; import { generateReroutePipeline } from './ingest_pipelines/generate_reroute_pipeline'; import { deleteIngestPipeline, @@ -45,6 +50,63 @@ interface DeleteStreamParams extends BaseParams { logger: Logger; } +export async function deleteUnmanagedStreamObjects({ + id, + scopedClusterClient, + logger, +}: DeleteStreamParams) { + const unmanagedAssets = await getUnmanagedElasticsearchAssets({ + name: id, + scopedClusterClient, + }); + const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id; + if (pipelineName) { + const { targetPipelineName, targetPipeline, referencesStreamManagedPipeline } = + await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, id); + if (referencesStreamManagedPipeline) { + const streamManagedPipelineName = getProcessingPipelineName(id); + const updatedProcessors = targetPipeline.processors!.filter( + (processor) => + !(processor.pipeline && processor.pipeline.name === streamManagedPipelineName) + ); + await scopedClusterClient.asCurrentUser.ingest.putPipeline({ + id: targetPipelineName, + body: { + processors: updatedProcessors, + }, + }); + } + } + await deleteDataStream({ + esClient: scopedClusterClient.asCurrentUser, + name: id, + logger, + }); + try { + await deleteIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + id: getProcessingPipelineName(id), + logger, + }); + } catch (e) { + // if the pipeline doesn't exist, we don't need to delete it + if (!(e.meta?.statusCode === 404)) { + throw e; + } + } + try { + await scopedClusterClient.asInternalUser.delete({ + id, + index: STREAMS_INDEX, + refresh: 'wait_for', + }); + } catch (e) { + if (e.meta?.statusCode !== 404) { + throw e; + } + } +} + export async function deleteStreamObjects({ id, scopedClusterClient, logger }: DeleteStreamParams) { await deleteDataStream({ esClient: scopedClusterClient.asCurrentUser, @@ -82,7 +144,7 @@ async function upsertInternalStream({ definition, scopedClusterClient }: BasePar return scopedClusterClient.asInternalUser.index({ id: definition.id, index: STREAMS_INDEX, - document: { ...definition, managed: true }, + document: { ...definition }, refresh: 'wait_for', }); } @@ -103,14 +165,20 @@ export async function listStreams({ }); const dataStreams = await listDataStreamsAsStreams({ scopedClusterClient }); - let definitions = response.hits.hits.map((hit) => ({ ...hit._source!, managed: true })); + let definitions = response.hits.hits.map((hit) => ({ ...hit._source! })); const hasAccess = await Promise.all( definitions.map((definition) => checkReadAccess({ id: definition.id, scopedClusterClient })) ); definitions = definitions.filter((_, index) => hasAccess[index]); + const definitionMap = new Map(definitions.map((definition) => [definition.id, definition])); + dataStreams.forEach((dataStream) => { + if (!definitionMap.has(dataStream.id)) { + definitionMap.set(dataStream.id, dataStream); + } + }); return { - definitions: [...definitions, ...dataStreams], + definitions: Array.from(definitionMap.values()), }; } @@ -158,7 +226,6 @@ export async function readStream({ return { definition: { ...definition, - managed: true, }, }; } catch (e) { @@ -169,35 +236,42 @@ export async function readStream({ } } -export async function readDataStreamAsStream({ - id, +export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadStreamParams) { + const definition: StreamDefinition = { + id, + managed: false, + children: [], + fields: [], + processing: [], + }; + + definition.unmanaged_elasticsearch_assets = await getUnmanagedElasticsearchAssets({ + name: id, + scopedClusterClient, + }); + + return { definition }; +} + +interface ReadUnmanagedAssetsParams extends BaseParams { + name: string; +} + +async function getUnmanagedElasticsearchAssets({ + name, scopedClusterClient, - skipAccessCheck, -}: ReadStreamParams) { +}: ReadUnmanagedAssetsParams) { let dataStream: IndicesDataStream; try { - const response = await scopedClusterClient.asInternalUser.indices.getDataStream({ name: id }); + const response = await scopedClusterClient.asInternalUser.indices.getDataStream({ name }); dataStream = response.data_streams[0]; } catch (e) { if (e.meta?.statusCode === 404) { - throw new DefinitionNotFound(`Stream definition for ${id} not found.`); + throw new DefinitionNotFound(`Stream definition for ${name} not found.`); } throw e; } - const definition: StreamDefinition = { - id, - managed: false, - children: [], - fields: [], - processing: [], - }; - if (!skipAccessCheck) { - const hasAccess = await checkReadAccess({ id, scopedClusterClient }); - if (!hasAccess) { - throw new DefinitionNotFound(`Stream definition for ${id} not found.`); - } - } // retrieve linked index template, component template and ingest pipeline const templateName = dataStream.template; const componentTemplates: string[] = []; @@ -215,9 +289,9 @@ export async function readDataStreamAsStream({ }); const ingestPipelineId = currentIndex[writeIndexName].settings?.index?.default_pipeline!; - definition.unmanaged_elasticsearch_assets = [ + return [ { - type: 'ingest_pipeline', + type: 'ingest_pipeline' as const, id: ingestPipelineId, }, ...componentTemplates.map((componentTemplateName) => ({ @@ -225,16 +299,14 @@ export async function readDataStreamAsStream({ id: componentTemplateName, })), { - type: 'index_template', + type: 'index_template' as const, id: templateName, }, { - type: 'data_stream', - id, + type: 'data_stream' as const, + id: name, }, ]; - - return { definition }; } interface ReadAncestorsParams extends BaseParams { @@ -378,8 +450,12 @@ export async function syncStream({ logger, }: SyncStreamParams) { if (!definition.managed) { - // TODO For now, we just don't allow reads at all - later on we will relax this to allow certain operations, but they will use a completely different syncing logic - throw new Error('Cannot sync an unmanaged stream'); + await syncUnmanagedStream({ scopedClusterClient, definition, logger }); + await upsertInternalStream({ + scopedClusterClient, + definition, + }); + return; } const componentTemplate = generateLayer(definition.id, definition); await upsertComponent({ @@ -432,6 +508,163 @@ export async function syncStream({ }); } +interface ExecutionPlanStep { + method: string; + path: string; + body?: Record; +} + +async function syncUnmanagedStream({ scopedClusterClient, definition, logger }: SyncStreamParams) { + if (definition.managed) { + throw new Error('Got an unmanaged stream that is marked as managed'); + } + if (definition.fields.length) { + throw new Error( + 'Unmanaged streams cannot have managed fields, please edit the component templates directly' + ); + } + if (definition.children.length) { + throw new Error('Unmanaged streams cannot have managed children, coming soon'); + } + const unmanagedAssets = await getUnmanagedElasticsearchAssets({ + name: definition.id, + scopedClusterClient, + }); + const executionPlan: ExecutionPlanStep[] = []; + const streamManagedPipelineName = getProcessingPipelineName(definition.id); + const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id; + if (!pipelineName) { + throw new Error('Unmanaged stream needs a default ingest pipeline'); + } + if (pipelineName === streamManagedPipelineName) { + throw new Error('Unmanaged stream cannot have the @stream pipeline as the default pipeline'); + } + await ensureStreamManagedPipelineReference( + scopedClusterClient, + pipelineName, + definition, + executionPlan + ); + + if (definition.processing.length) { + // if the stream has processing, we need to create or update the stream managed pipeline + executionPlan.push({ + method: 'PUT', + path: `/_ingest/pipeline/${streamManagedPipelineName}`, + body: generateClassicIngestPipelineBody(definition), + }); + } else { + const pipelineExists = Boolean( + await tryGettingPipeline({ scopedClusterClient, id: streamManagedPipelineName }) + ); + // no processing, just delete the pipeline if it exists. The reference to the pipeline won't break anything + if (pipelineExists) { + executionPlan.push({ + method: 'DELETE', + path: `/_ingest/pipeline/${streamManagedPipelineName}`, + }); + } + } + + await executePlan(executionPlan, scopedClusterClient); +} + +async function executePlan( + executionPlan: ExecutionPlanStep[], + scopedClusterClient: IScopedClusterClient +) { + for (const step of executionPlan) { + await scopedClusterClient.asCurrentUser.transport.request({ + method: step.method, + path: step.path, + body: step.body, + }); + } +} + +async function findStreamManagedPipelineReference( + scopedClusterClient: IScopedClusterClient, + pipelineName: string, + streamId: string +): Promise<{ + targetPipelineName: string; + targetPipeline: IngestPipeline; + referencesStreamManagedPipeline: boolean; +}> { + const streamManagedPipelineName = getProcessingPipelineName(streamId); + const pipeline = (await tryGettingPipeline({ scopedClusterClient, id: pipelineName })) || { + processors: [], + }; + const streamProcessor = pipeline.processors?.find( + (processor) => processor.pipeline && processor.pipeline.name === streamManagedPipelineName + ); + const customProcessor = pipeline.processors?.findLast( + (processor) => processor.pipeline && processor.pipeline.name.endsWith('@custom') + ); + if (streamProcessor) { + return { + targetPipelineName: pipelineName, + targetPipeline: pipeline, + referencesStreamManagedPipeline: true, + }; + } + if (customProcessor) { + // go one level deeper, find the latest @custom leaf pipeline + return await findStreamManagedPipelineReference( + scopedClusterClient, + customProcessor.pipeline!.name, + streamId + ); + } + return { + targetPipelineName: pipelineName, + targetPipeline: pipeline, + referencesStreamManagedPipeline: false, + }; +} + +async function ensureStreamManagedPipelineReference( + scopedClusterClient: IScopedClusterClient, + pipelineName: string, + definition: StreamDefinition, + executionPlan: ExecutionPlanStep[] +) { + const streamManagedPipelineName = getProcessingPipelineName(definition.id); + const { targetPipelineName, targetPipeline, referencesStreamManagedPipeline } = + await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, definition.id); + if (!referencesStreamManagedPipeline) { + const callStreamManagedPipelineProcessor: IngestProcessorContainer = { + pipeline: { + name: streamManagedPipelineName, + if: `ctx._index == '${definition.id}'`, + ignore_missing_pipeline: true, + description: + "Call the stream's managed pipeline - do not change this manually but instead use the streams UI or API", + }, + }; + executionPlan.push({ + method: 'PUT', + path: `/_ingest/pipeline/${targetPipelineName}`, + body: set( + { ...targetPipeline }, + 'processors', + (targetPipeline.processors || []).concat(callStreamManagedPipelineProcessor) + ), + }); + } +} + +async function tryGettingPipeline({ scopedClusterClient, id }: ReadStreamParams) { + try { + return (await scopedClusterClient.asCurrentUser.ingest.getPipeline({ id }))[id]; + } catch (e) { + if (e.meta?.statusCode === 404) { + return; + } + throw e; + } +} + export async function streamsEnabled({ scopedClusterClient }: BaseParams) { return await scopedClusterClient.asInternalUser.indices.exists({ index: STREAMS_INDEX, diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts index a2092838792cf..d6bf5fbb84d8f 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts @@ -16,7 +16,12 @@ import { SecurityException, } from '../../lib/streams/errors'; import { createServerRoute } from '../create_server_route'; -import { syncStream, readStream, deleteStreamObjects } from '../../lib/streams/stream_crud'; +import { + syncStream, + readStream, + deleteStreamObjects, + deleteUnmanagedStreamObjects, +} from '../../lib/streams/stream_crud'; import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; import { getParentId } from '../../lib/streams/helpers/hierarchy'; @@ -47,14 +52,6 @@ export const deleteStreamRoute = createServerRoute({ try { const { scopedClusterClient } = await getScopedClients({ request }); - const parentId = getParentId(params.path.id); - if (!parentId) { - throw new MalformedStreamId('Cannot delete root stream'); - } - - // need to update parent first to cut off documents streaming down - await updateParentStream(scopedClusterClient, params.path.id, parentId, logger); - await deleteStream(scopedClusterClient, params.path.id, logger); return { acknowledged: true }; @@ -83,6 +80,18 @@ export async function deleteStream( ) { try { const { definition } = await readStream({ scopedClusterClient, id }); + if (!definition.managed) { + await deleteUnmanagedStreamObjects({ scopedClusterClient, id, logger }); + return; + } + + const parentId = getParentId(id); + if (!parentId) { + throw new MalformedStreamId('Cannot delete root stream'); + } + + // need to update parent first to cut off documents streaming down + await updateParentStream(scopedClusterClient, id, parentId, logger); for (const child of definition.children) { await deleteStream(scopedClusterClient, child.id, logger); } diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts index e280796bc9780..19867018ce25f 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts @@ -50,6 +50,17 @@ export const editStreamRoute = createServerRoute({ handler: async ({ response, params, logger, request, getScopedClients }) => { try { const { scopedClusterClient } = await getScopedClients({ request }); + const streamDefinition = { ...params.body, id: params.path.id }; + + if (!streamDefinition.managed) { + await syncStream({ + scopedClusterClient, + definition: { ...streamDefinition, id: params.path.id }, + rootDefinition: undefined, + logger, + }); + return { acknowledged: true }; + } await validateStreamChildren(scopedClusterClient, params.path.id, params.body.children); await validateAncestorFields(scopedClusterClient, params.path.id, params.body.fields); @@ -58,8 +69,6 @@ export const editStreamRoute = createServerRoute({ const parentId = getParentId(params.path.id); let parentDefinition: StreamDefinition | undefined; - const streamDefinition = { ...params.body, id: params.path.id }; - // always need to go from the leaves to the parent when syncing ingest pipelines, otherwise data // will be routed before the data stream is ready diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts index cfcb97f9b3581..ee49a93aaacbb 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts @@ -31,12 +31,12 @@ export const enableStreamsRoute = createServerRoute({ response, logger, getScopedClients, - }): Promise<{ acknowledged: true }> => { + }): Promise<{ acknowledged: true; message: string }> => { try { const { scopedClusterClient } = await getScopedClients({ request }); const alreadyEnabled = await streamsEnabled({ scopedClusterClient }); if (alreadyEnabled) { - return { acknowledged: true }; + return { acknowledged: true, message: 'Streams was already enabled' }; } await createStreamsIndex(scopedClusterClient); await syncStream({ @@ -44,7 +44,11 @@ export const enableStreamsRoute = createServerRoute({ definition: rootStreamDefinition, logger, }); - return { acknowledged: true }; + return { + acknowledged: true, + message: + 'Streams enabled - reload your browser window to show the streams UI in the navigation', + }; } catch (e) { if (e instanceof SecurityException) { throw badRequest(e); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts index 070dc66b9ab10..9ec61d27619e2 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts @@ -58,7 +58,11 @@ export const forkStreamsRoute = createServerRoute({ id: params.path.id, }); - const childDefinition = { ...params.body.stream, children: [], managed: true }; + if (rootDefinition.managed === false) { + throw new MalformedStreamId('Cannot fork a stream that is not managed'); + } + + const childDefinition = { ...params.body.stream, children: [] }; // check whether root stream has a child of the given name already if (rootDefinition.children.some((child) => child.id === childDefinition.id)) { diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts index 774a256e5ba4a..f98c635830bda 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts @@ -29,14 +29,14 @@ export const listStreamsRoute = createServerRoute({ response, request, getScopedClients, - }): Promise<{ definitions: StreamDefinition[]; trees: StreamTree[] }> => { + }): Promise<{ definitions: StreamDefinition[] }> => { try { const { scopedClusterClient } = await getScopedClients({ request }); const { definitions } = await listStreams({ scopedClusterClient }); - const trees = asTrees(definitions); - - return { definitions, trees }; + return { + definitions, + }; } catch (e) { if (e instanceof DefinitionNotFound) { throw notFound(e); @@ -46,33 +46,3 @@ export const listStreamsRoute = createServerRoute({ } }, }); - -export interface StreamTree { - id: string; - children: StreamTree[]; -} - -function asTrees(definitions: Array<{ id: string; managed?: boolean }>) { - const trees: StreamTree[] = []; - const ids = definitions - .filter((definition) => definition.managed) - .map((definition) => definition.id); - - ids.sort((a, b) => a.split('.').length - b.split('.').length); - - ids.forEach((id) => { - let currentTree = trees; - let existingNode: StreamTree | undefined; - // traverse the tree following the prefix of the current id. - // once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth - while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) { - currentTree = existingNode.children; - } - if (!existingNode) { - const newNode = { id, children: [] }; - currentTree.push(newNode); - } - }); - - return trees; -} diff --git a/x-pack/solutions/observability/plugins/streams/tsconfig.json b/x-pack/solutions/observability/plugins/streams/tsconfig.json index 4cec623f68286..08ed4e1648af7 100644 --- a/x-pack/solutions/observability/plugins/streams/tsconfig.json +++ b/x-pack/solutions/observability/plugins/streams/tsconfig.json @@ -29,6 +29,7 @@ "@kbn/licensing-plugin", "@kbn/server-route-repository-client", "@kbn/observability-utils-server", - "@kbn/observability-utils-common" + "@kbn/observability-utils-common", + "@kbn/safer-lodash-set" ] } diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx new file mode 100644 index 0000000000000..1664b322b5a8e --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import React from 'react'; +import { i18n } from '@kbn/i18n'; +import { ReadStreamDefinition, StreamDefinition } from '@kbn/streams-plugin/common'; +import { EuiFlexGroup, EuiListGroup, EuiText } from '@elastic/eui'; +import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; +import { RedirectTo } from '../redirect_to'; +import { StreamDetailEnriching } from '../stream_detail_enriching'; +import { useKibana } from '../../hooks/use_kibana'; +import { Wrapper } from './wrapper'; + +type ManagementSubTabs = 'enrich' | 'overview'; + +function isValidManagementSubTab(value: string): value is ManagementSubTabs { + return ['enrich', 'overview'].includes(value); +} + +export function ClassicStreamDetailManagement({ + definition, + refreshDefinition, +}: { + definition: ReadStreamDefinition; + refreshDefinition: () => void; +}) { + const { + path: { key, subtab }, + } = useStreamsAppParams('/{key}/management/{subtab}'); + + const tabs = { + overview: { + content: , + label: i18n.translate('xpack.streams.streamDetailView.overviewTab', { + defaultMessage: 'Overview', + }), + }, + enrich: { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.enrichingTab', { + defaultMessage: 'Extract field', + }), + }, + }; + + if (!isValidManagementSubTab(subtab)) { + return ( + + ); + } + + return ; +} + +function UnmanagedStreamOverview({ definition }: { definition: StreamDefinition }) { + const { + core: { + http: { basePath }, + }, + } = useKibana(); + const groupedAssets = (definition.unmanaged_elasticsearch_assets ?? []).reduce((acc, asset) => { + const title = assetToTitle(asset); + if (title) { + acc[title] = acc[title] ?? []; + acc[title].push(asset); + } + return acc; + }, {} as Record>); + return ( + + +

+ {i18n.translate('xpack.streams.streamDetailView.unmanagedStreamOverview', { + defaultMessage: + 'This stream is not managed. Follow the links to stack management to change the related Elasticsearch objects.', + })} +

+
+ {Object.entries(groupedAssets).map(([title, assets]) => ( +
+ +

{title}

+
+ ({ + label: asset.id, + href: basePath.prepend(assetToLink(asset)), + iconType: 'index', + target: '_blank', + }))} + /> +
+ ))} +
+ ); +} + +function assetToLink(asset: { type: string; id: string }) { + switch (asset.type) { + case 'index_template': + return `/app/management/data/index_management/templates/${asset.id}`; + case 'component_template': + return `/app/management/data/index_management/component_templates/${asset.id}`; + case 'data_stream': + return `/app/management/data/index_management/data_streams/${asset.id}`; + case 'ingest_pipeline': + return `/app/management/ingest/ingest_pipelines?pipeline=${asset.id}`; + default: + return ''; + } +} + +function assetToTitle(asset: { type: string; id: string }) { + switch (asset.type) { + case 'index_template': + return i18n.translate('xpack.streams.streamDetailView.indexTemplate', { + defaultMessage: 'Index template', + }); + case 'component_template': + return i18n.translate('xpack.streams.streamDetailView.componentTemplate', { + defaultMessage: 'Component template', + }); + case 'data_stream': + return i18n.translate('xpack.streams.streamDetailView.dataStream', { + defaultMessage: 'Data stream', + }); + case 'ingest_pipeline': + return i18n.translate('xpack.streams.streamDetailView.ingestPipeline', { + defaultMessage: 'Ingest pipeline', + }); + default: + return ''; + } +} diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx index 4f3d207c91789..c093f05c03210 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx @@ -5,23 +5,9 @@ * 2.0. */ import React from 'react'; -import { i18n } from '@kbn/i18n'; -import { ReadStreamDefinition, StreamDefinition } from '@kbn/streams-plugin/common'; -import { css } from '@emotion/css'; -import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem, EuiListGroup, EuiText } from '@elastic/eui'; -import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; -import { RedirectTo } from '../redirect_to'; -import { useStreamsAppRouter } from '../../hooks/use_streams_app_router'; -import { StreamDetailRouting } from '../stream_detail_routing'; -import { StreamDetailEnriching } from '../stream_detail_enriching'; -import { StreamDetailSchemaEditor } from '../stream_detail_schema_editor'; -import { useKibana } from '../../hooks/use_kibana'; - -type ManagementSubTabs = 'route' | 'enrich' | 'schemaEditor'; - -function isValidManagementSubTab(value: string): value is ManagementSubTabs { - return ['route', 'enrich', 'schemaEditor'].includes(value); -} +import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; +import { WiredStreamDetailManagement } from './wired'; +import { ClassicStreamDetailManagement } from './classic'; export function StreamDetailManagement({ definition, @@ -30,180 +16,17 @@ export function StreamDetailManagement({ definition?: ReadStreamDefinition; refreshDefinition: () => void; }) { - const { - path: { key, subtab }, - } = useStreamsAppParams('/{key}/management/{subtab}'); - const router = useStreamsAppRouter(); - - if (subtab === 'overview') { - if (!definition) { - return null; - } - if (definition.managed) { - return ( - - ); - } - return ; - } - - const tabs = { - route: { - content: ( - - ), - label: i18n.translate('xpack.streams.streamDetailView.routingTab', { - defaultMessage: 'Streams Partitioning', - }), - }, - enrich: { - content: ( - - ), - label: i18n.translate('xpack.streams.streamDetailView.enrichingTab', { - defaultMessage: 'Extract field', - }), - }, - schemaEditor: { - content: ( - - ), - label: i18n.translate('xpack.streams.streamDetailView.schemaEditorTab', { - defaultMessage: 'Schema editor', - }), - }, - }; - - if (!isValidManagementSubTab(subtab)) { - return ( - - ); + if (!definition) { + return null; } - if (definition && !definition.managed) { + if (definition.managed) { return ( - + ); } - const selectedTabObject = tabs[subtab]; - - return ( - - - { - router.push('/{key}/management/{subtab}', { - path: { key, subtab: optionId }, - query: {}, - }); - }} - options={Object.keys(tabs).map((id) => ({ - id, - label: tabs[id as ManagementSubTabs].label, - }))} - /> - - - {selectedTabObject.content} - - - ); -} - -function assetToLink(asset: { type: string; id: string }) { - switch (asset.type) { - case 'index_template': - return `/app/management/data/index_management/templates/${asset.id}`; - case 'component_template': - return `/app/management/data/index_management/component_templates/${asset.id}`; - case 'data_stream': - return `/app/management/data/index_management/data_streams/${asset.id}`; - case 'ingest_pipeline': - return `/app/management/ingest/ingest_pipelines?pipeline=${asset.id}`; - default: - return ''; - } -} - -function assetToTitle(asset: { type: string; id: string }) { - switch (asset.type) { - case 'index_template': - return i18n.translate('xpack.streams.streamDetailView.indexTemplate', { - defaultMessage: 'Index template', - }); - case 'component_template': - return i18n.translate('xpack.streams.streamDetailView.componentTemplate', { - defaultMessage: 'Component template', - }); - case 'data_stream': - return i18n.translate('xpack.streams.streamDetailView.dataStream', { - defaultMessage: 'Data stream', - }); - case 'ingest_pipeline': - return i18n.translate('xpack.streams.streamDetailView.ingestPipeline', { - defaultMessage: 'Ingest pipeline', - }); - default: - return ''; - } -} - -function UnmanagedStreamOverview({ definition }: { definition: StreamDefinition }) { - const { - core: { - http: { basePath }, - }, - } = useKibana(); - const groupedAssets = (definition.unmanaged_elasticsearch_assets ?? []).reduce((acc, asset) => { - const title = assetToTitle(asset); - if (title) { - acc[title] = acc[title] ?? []; - acc[title].push(asset); - } - return acc; - }, {} as Record>); return ( - - -

- {i18n.translate('xpack.streams.streamDetailView.unmanagedStreamOverview', { - defaultMessage: - 'This stream is not managed. Follow the links to stack management to change the related Elasticsearch objects.', - })} -

-
- {Object.entries(groupedAssets).map(([title, assets]) => ( -
- -

{title}

-
- ({ - label: asset.id, - href: basePath.prepend(assetToLink(asset)), - iconType: 'index', - target: '_blank', - }))} - /> -
- ))} -
+ ); } diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wired.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wired.tsx new file mode 100644 index 0000000000000..5f8c4e57bf7d1 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wired.tsx @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import React from 'react'; +import { i18n } from '@kbn/i18n'; +import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; +import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; +import { RedirectTo } from '../redirect_to'; +import { StreamDetailRouting } from '../stream_detail_routing'; +import { StreamDetailEnriching } from '../stream_detail_enriching'; +import { StreamDetailSchemaEditor } from '../stream_detail_schema_editor'; +import { Wrapper } from './wrapper'; + +type ManagementSubTabs = 'route' | 'enrich' | 'schemaEditor'; + +function isValidManagementSubTab(value: string): value is ManagementSubTabs { + return ['route', 'enrich', 'schemaEditor'].includes(value); +} + +export function WiredStreamDetailManagement({ + definition, + refreshDefinition, +}: { + definition?: ReadStreamDefinition; + refreshDefinition: () => void; +}) { + const { + path: { key, subtab }, + } = useStreamsAppParams('/{key}/management/{subtab}'); + + const tabs = { + route: { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.routingTab', { + defaultMessage: 'Streams Partitioning', + }), + }, + enrich: { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.enrichingTab', { + defaultMessage: 'Extract field', + }), + }, + schemaEditor: { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.schemaEditorTab', { + defaultMessage: 'Schema editor', + }), + }, + }; + + if (!isValidManagementSubTab(subtab)) { + return ( + + ); + } + + return ; +} diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wrapper.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wrapper.tsx new file mode 100644 index 0000000000000..92d80924298fc --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/wrapper.tsx @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem } from '@elastic/eui'; +import React from 'react'; +import { css } from '@emotion/css'; +import { useStreamsAppRouter } from '../../hooks/use_streams_app_router'; + +export function Wrapper({ + tabs, + streamId, + subtab, +}: { + tabs: Record; + streamId: string; + subtab: string; +}) { + const router = useStreamsAppRouter(); + return ( + + + { + router.push('/{key}/management/{subtab}', { + path: { key: streamId, subtab: optionId }, + query: {}, + }); + }} + options={Object.keys(tabs).map((id) => ({ + id, + label: tabs[id].label, + }))} + /> + + + {tabs[subtab].content} + + + ); +} diff --git a/x-pack/test/api_integration/apis/streams/classic.ts b/x-pack/test/api_integration/apis/streams/classic.ts new file mode 100644 index 0000000000000..25a7238a757ca --- /dev/null +++ b/x-pack/test/api_integration/apis/streams/classic.ts @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { JsonObject } from '@kbn/utility-types'; +import { + deleteStream, + enableStreams, + fetchDocument, + getStream, + indexDocument, + listStreams, + putStream, +} from './helpers/requests'; +import { FtrProviderContext } from '../../ftr_provider_context'; +import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers'; +import { cleanUpRootStream } from './helpers/cleanup'; + +export default function ({ getService }: FtrProviderContext) { + const supertest = getService('supertest'); + const esClient = getService('es'); + const retryService = getService('retry'); + const logger = getService('log'); + + describe('Classic streams', () => { + after(async () => { + await cleanUpRootStream(esClient); + }); + + before(async () => { + await enableStreams(supertest); + }); + + it('Shows non-wired data streams', async () => { + const doc = { + message: '2023-01-01T00:00:10.000Z error test', + }; + const response = await indexDocument(esClient, 'logs-test-default', doc); + expect(response.result).to.eql('created'); + const streams = await listStreams(supertest); + const classicStream = streams.definitions.find( + (stream: JsonObject) => stream.id === 'logs-test-default' + ); + expect(classicStream).to.eql({ + id: 'logs-test-default', + managed: false, + children: [], + fields: [], + processing: [], + }); + }); + + it('Allows setting processing on classic streams', async () => { + const response = await putStream(supertest, 'logs-test-default', { + managed: false, + children: [], + fields: [], + processing: [ + { + config: { + type: 'grok', + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + }, + ], + }); + expect(response).to.have.property('acknowledged', true); + const streamBody = await getStream(supertest, 'logs-test-default'); + expect(streamBody).to.eql({ + id: 'logs-test-default', + managed: false, + children: [], + inheritedFields: [], + fields: [], + processing: [ + { + config: { + type: 'grok', + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + }, + ], + }); + }); + + it('Executes processing on classic streams', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + }; + const response = await indexDocument(esClient, 'logs-test-default', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ + esClient, + indexName: 'logs-test-default', + retryService, + logger, + docCountTarget: 2, + }); + const result = await fetchDocument(esClient, 'logs-test-default', response._id); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + inner_timestamp: '2023-01-01T00:00:10.000Z', + message2: 'test', + log: { + level: 'error', + }, + }); + }); + + it('Allows removing processing on classic streams', async () => { + const response = await putStream(supertest, 'logs-test-default', { + managed: false, + children: [], + fields: [], + processing: [], + }); + expect(response).to.have.property('acknowledged', true); + }); + + it('Executes processing on classic streams after removing processing', async () => { + const doc = { + // default logs pipeline fills in timestamp with current date if not set + message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + }; + const response = await indexDocument(esClient, 'logs-test-default', doc); + expect(response.result).to.eql('created'); + await waitForDocumentInIndex({ + esClient, + indexName: 'logs-test-default', + retryService, + logger, + docCountTarget: 3, + }); + const result = await fetchDocument(esClient, 'logs-test-default', response._id); + expect(result._source).to.eql({ + // accept any date + '@timestamp': (result._source as { [key: string]: unknown })['@timestamp'], + message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + }); + }); + + it('Allows deleting classic streams', async () => { + await deleteStream(supertest, 'logs-test-default'); + const streams = await listStreams(supertest); + const classicStream = streams.definitions.find( + (stream: JsonObject) => stream.id === 'logs-test-default' + ); + expect(classicStream).to.eql(undefined); + }); + }); +} diff --git a/x-pack/test/api_integration/apis/streams/helpers/requests.ts b/x-pack/test/api_integration/apis/streams/helpers/requests.ts index 7d656e4aacf5e..43e7f02b7a750 100644 --- a/x-pack/test/api_integration/apis/streams/helpers/requests.ts +++ b/x-pack/test/api_integration/apis/streams/helpers/requests.ts @@ -42,6 +42,18 @@ export async function putStream(supertest: Agent, name: string, body: JsonObject return response.body; } +export async function getStream(supertest: Agent, name: string) { + const req = supertest.get(`/api/streams/${name}`).set('kbn-xsrf', 'xxx'); + const response = await req.send().expect(200); + return response.body; +} + +export async function listStreams(supertest: Agent) { + const req = supertest.get(`/api/streams`).set('kbn-xsrf', 'xxx'); + const response = await req.send().expect(200); + return response.body; +} + export async function deleteStream(supertest: Agent, id: string) { const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx'); const response = await req.send().expect(200); diff --git a/x-pack/test/api_integration/apis/streams/index.ts b/x-pack/test/api_integration/apis/streams/index.ts index 4ef3fce385248..14decb2400196 100644 --- a/x-pack/test/api_integration/apis/streams/index.ts +++ b/x-pack/test/api_integration/apis/streams/index.ts @@ -11,6 +11,7 @@ export default function ({ loadTestFile }: FtrProviderContext) { describe('Streams Endpoints', () => { loadTestFile(require.resolve('./full_flow')); loadTestFile(require.resolve('./enrichment')); + loadTestFile(require.resolve('./classic')); loadTestFile(require.resolve('./flush_config')); }); }