diff --git a/infrastructure/main.bicep b/infrastructure/main.bicep index b343fa1c..cb6ffaa3 100644 --- a/infrastructure/main.bicep +++ b/infrastructure/main.bicep @@ -12,8 +12,8 @@ targetScope = 'subscription' param location string = 'uksouth' param envName string param domainName string = 'cfc' -param orgName string = 'debug' -param uniqueIdentifier string = '02' +param orgName string = 'demo' +param uniqueIdentifier string = '01' param datalakeName string = 'dls' //Storage account name prefix param functionBlobName string = 'st' //Function app storage name prefix @@ -23,14 +23,14 @@ param deploymentTimestamp string = utcNow('yy-MM-dd-HHmm') param firstDeployment bool = true param deployADF bool = true param deployWorkers bool = false -param deployVM bool = false +param deployVM bool = true param deploySQL bool = true param deployFunction bool = true param deployNetworking bool = true param deployADBWorkspace bool = true param deployADBCluster bool = false // Controls ADB Cluster creation - TODO param deployPAT bool = false // - TODO -param setRoleAssignments bool = false +param setRoleAssignments bool = true diff --git a/src/metadata.ingest/ingest/Stored Procedures/AddIngestPayloadPipeline.sql b/src/metadata.common/common/Stored Procedures/AddIngestOrTransformPayloadPipeline.sql similarity index 73% rename from src/metadata.ingest/ingest/Stored Procedures/AddIngestPayloadPipeline.sql rename to src/metadata.common/common/Stored Procedures/AddIngestOrTransformPayloadPipeline.sql index 994a98ef..6cbd3a27 100644 --- a/src/metadata.ingest/ingest/Stored Procedures/AddIngestPayloadPipeline.sql +++ b/src/metadata.common/common/Stored Procedures/AddIngestOrTransformPayloadPipeline.sql @@ -1,13 +1,23 @@ -CREATE PROCEDURE [ingest].[AddIngestPayloadPipeline] ( +CREATE PROCEDURE [common].[AddIngestOrTransformPayloadPipeline] ( + @ComponentName VARCHAR(25), @StageName VARCHAR(25), @PipelineName VARCHAR(50), @DatasetDisplayName VARCHAR(50), @OrchestratorName VARCHAR(50) ) AS + + +-- defensive check component in ('Ingest', 'Transform') +IF @ComponentName NOT IN ('Ingest', 'Transform') +BEGIN + RAISERROR('This Functionality may only be used for adding Datasets from either Ingest or Transform schemas to the Control Pipeline. If you require different functionality to be added to control.pipelines, please proceed with a manual INSERT statement.',16,1) + RETURN 0; +END + +-- defensive check stage exists DECLARE @StageId INT DECLARE @StageCount INT --- defensive check stage exists SELECT @StageCount = COUNT(*) FROM [control].[Stages] @@ -33,10 +43,21 @@ WHERE StageName = @StageName -- defensive checks only 1 dataset id returned DECLARE @DatasetCount INT -SELECT @DatasetCount = COUNT(*) -FROM [ingest].[Datasets] AS ds -WHERE ds.Enabled = 1 -AND ds.DatasetDisplayName = @DatasetDisplayName +IF @ComponentName = 'Ingest' +BEGIN + SELECT @DatasetCount = COUNT(*) + FROM [ingest].[Datasets] AS ids + WHERE ids.Enabled = 1 + AND ids.DatasetDisplayName = @DatasetDisplayName +END + +IF @ComponentName = 'Transform' +BEGIN + SELECT @DatasetCount = COUNT(*) + FROM [transform].[Datasets] AS tds + WHERE tds.Enabled = 1 + AND tds.DatasetName = @DatasetDisplayName +END IF @DatasetCount = 0 BEGIN @@ -56,11 +77,23 @@ DECLARE @Datasets TABLE ( ) -INSERT INTO @Datasets -SELECT - DatasetId, Enabled -FROM [ingest].[Datasets] -WHERE DatasetDisplayName = @DatasetDisplayName +IF @ComponentName = 'Ingest' +BEGIN + INSERT INTO @Datasets + SELECT + DatasetId, Enabled + FROM [ingest].[Datasets] + WHERE DatasetDisplayName = @DatasetDisplayName +END + +IF @ComponentName = 'Transform' +BEGIN + INSERT INTO @Datasets + SELECT + DatasetId, Enabled + FROM [transform].[Datasets] + WHERE DatasetName = @DatasetDisplayName +END DECLARE @PipelineId INT @@ -81,7 +114,7 @@ WHERE OrchestratorName = @OrchestratorName IF @OrchestratorId IS NULL BEGIN DECLARE @OrchestratorErrorMsg VARCHAR(150) - SET @OrchestratorErrorMsg = 'No Orchestrator Registered to the name ' + @OrchestratorName + '. Please confirm the correct Data Factory name is provided, and exists within this environment.' + SET @OrchestratorErrorMsg = 'No Orchestrator registered to the name ' + @OrchestratorName + '. Please confirm the correct Data Factory name is provided, and exists within this environment.' RAISERROR(@OrchestratorErrorMsg, 16,1) RETURN 0; END diff --git a/src/metadata.control/control/Views/PipelineSummary.sql b/src/metadata.control/control/Views/PipelineSummary.sql new file mode 100644 index 00000000..b1d43d1f --- /dev/null +++ b/src/metadata.control/control/Views/PipelineSummary.sql @@ -0,0 +1,37 @@ +CREATE VIEW [control].[PipelineSummary] AS +WITH cte AS ( +SELECT + CASE + WHEN ids.DatasetDisplayname IS NOT NULL THEN 'Ingest' + WHEN tds.DatasetName IS NOT NULL THEN 'Transform' + ELSE 'Unassigned' + END AS ComponentName, + CASE + WHEN ids.DatasetDisplayname IS NOT NULL THEN ids.DatasetDisplayname + WHEN tds.DatasetName IS NOT NULL THEN tds.DatasetName + ELSE p.PipelineName + END AS DatasetName, + p.PipelineName, + pp.ParameterName, + pp.ParameterValue, + p.PipelineId, + s.StageName, + pd.DependantPipelineId +FROM control.pipelines AS p +INNER JOIN control.pipelineparameters AS pp +ON p.PipelineId = pp.PipelineId +LEFT JOIN control.pipelinedependencies AS pd +ON p.PipelineId = pd.PipelineId +INNER JOIN control.stages AS s +ON p.StageId = s.StageId +LEFT JOIN ingest.Datasets AS ids +ON pp.parametervalue = CAST(ids.datasetid AS VARCHAR(4)) +AND p.pipelineName LIKE 'Ingest_PL_%' +LEFT JOIN transform.Datasets as tds +ON pp.parametervalue = CAST(tds.datasetid AS VARCHAR(4)) +AND p.pipelineName LIKE 'Transform_PL_%' +) +SELECT cte.*, cte2.DatasetName AS DependsOnDataset, cte2.PipelineName AS DependsOnPipelineName, cte2.PipelineId AS DependsOnPipelineId +FROM cte +LEFT JOIN cte AS cte2 +ON cte.PipelineId = cte2.DependantPipelineId diff --git a/src/metadata.transform/transform/Stored Procedures/AddTransformPayloadPipelineDependencies.sql b/src/metadata.transform/transform/Stored Procedures/AddTransformPayloadPipelineDependencies.sql new file mode 100644 index 00000000..69905819 --- /dev/null +++ b/src/metadata.transform/transform/Stored Procedures/AddTransformPayloadPipelineDependencies.sql @@ -0,0 +1,194 @@ +CREATE PROCEDURE [transform].[AddTransformPayloadPipelineDependencies] ( + @StageName VARCHAR(25), + @DatasetName VARCHAR(50), + @DependsOnStageName VARCHAR(25), + @DependsOnDatasetName VARCHAR(50) +) AS + + +-- defensive checks stage name in ('Dimensions', 'Facts') +IF @StageName NOT IN ('Dimensions', 'Facts') +BEGIN + RAISERROR('This Functionality may only be used for adding Dimensions or Facts from the Transform schema to the Control Pipeline. If you require different functionality to be added to control.pipelines, please proceed with a manual INSERT statement.',16,1) + RETURN 0; +END + +-- defensive checks stage name in ('Dimensions', 'Facts') +IF @DependsOnStageName NOT IN ('Cleansed','Dimensions', 'Facts') +BEGIN + RAISERROR('This Functionality may only be used for adding Dimensions or Facts from the Transform schema to the Control Pipeline. Supported Dependencies include Cleansed, Dimensions and Facts. If you require different functionality to be added to control.pipelines, please proceed with a manual INSERT statement.',16,1) + RETURN 0; +END + +-- defensive checks only 1 dataset id returned +DECLARE @DatasetId INT +DECLARE @DatasetCount INT + +SELECT @DatasetCount = COUNT(*) +FROM [transform].[Datasets] AS ds +WHERE ds.Enabled = 1 +AND ds.DatasetName = @DatasetName + +IF @DatasetCount = 0 +BEGIN + RAISERROR('No rows returned. Please review the Dataset Id provided and confirm this is enabled.',16,1) + RETURN 0; +END +IF @DatasetCount > 1 +BEGIN + RAISERROR('More than 1 row returned. Please review there is 1 active Dataset for the provided Dataset Id.',16,1) + RETURN 0; +END + +-- defensive checks only 1 dataset id returned for dependency +DECLARE @DependsOnDatasetId INT +DECLARE @DependsOnDatasetCount INT + +IF @DependsOnStageName in ('Dimensions', 'Facts') +BEGIN + SELECT @DependsOnDatasetCount = COUNT(*) + FROM [transform].[Datasets] AS ds + WHERE ds.Enabled = 1 + AND ds.DatasetName = @DependsOnDatasetName +END +IF @DependsOnStageName in ('Cleansed') +BEGIN + SELECT @DependsOnDatasetCount = COUNT(*) + FROM [ingest].[Datasets] AS ds + WHERE ds.Enabled = 1 + AND ds.DatasetDisplayName = @DependsOnDatasetName +END + +IF @DependsOnDatasetCount = 0 +BEGIN + RAISERROR('No rows returned. Please review the Dataset Id provided and confirm this is enabled.',16,1) + RETURN 0; +END +IF @DependsOnDatasetCount > 1 +BEGIN + RAISERROR('More than 1 row returned. Please review there is 1 active Dataset for the provided Dataset Id.',16,1) + RETURN 0; +END + + +-- Get Dataset Id +SELECT + @DatasetId = DatasetId +FROM [transform].[Datasets] +WHERE DatasetName = @DatasetName +AND Enabled = 1 + + +-- Get Depends On Dataset Id +IF @DependsOnStageName in ('Dimensions', 'Facts') +BEGIN + SELECT @DependsOnDatasetId = DatasetId + FROM [transform].[Datasets] AS ds + WHERE ds.Enabled = 1 + AND ds.DatasetName = @DependsOnDatasetName +END +IF @DependsOnStageName in ('Cleansed') +BEGIN + SELECT @DependsOnDatasetId = DatasetId + FROM [ingest].[Datasets] AS ds + WHERE ds.Enabled = 1 + AND ds.DatasetDisplayName = @DependsOnDatasetName +END + +-- Get Stage Id +DECLARE @StageId INT +SELECT @StageId = StageId +FROM [control].[Stages] +WHERE StageName = @StageName + +-- Get Depends On Stage Id +DECLARE @DependsOnStageId INT +SELECT @DependsOnStageId = StageId +FROM [control].[Stages] +WHERE StageName = @DependsOnStageName + + +DECLARE @Dependencies TABLE ( + PipelineId INT, -- Raw + DependantPipelineId INT -- Cleansed +) + +DECLARE @DependenciesStagingTable TABLE ( + PipelineId INT, + StageId INT, + ParameterValue INT +) +DECLARE @PipelineIdResult INT; +DECLARE @DependantPipelineIdResult INT; + +INSERT INTO @DependenciesStagingTable (PipelineId,StageId,ParameterValue) +SELECT + p.PipelineId, p.StageId, CAST(pp.ParameterValue AS INT) AS ParameterValue --,* +FROM control.Pipelines AS p +INNER JOIN control.PipelineParameters AS pp +ON p.PipelineId = pp.PipelineId +--WHERE p.PipelineName LIKE 'Transform_PL_%' + + +SELECT @DependantPipelineIdResult = PipelineId +FROM @DependenciesStagingTable +INNER JOIN transform.Datasets AS d +ON ParameterValue = d.DatasetId +WHERE ParameterValue = @DatasetId +AND StageId = @StageId + +-- Get Dependency Dataset Id +IF @DependsOnStageName in ('Dimensions', 'Facts') +BEGIN + SELECT @PipelineIdResult = PipelineId + FROM @DependenciesStagingTable + INNER JOIN transform.Datasets AS d + ON ParameterValue = d.DatasetId + WHERE ParameterValue = @DependsOnDatasetId + AND StageId = @DependsOnStageId +END +IF @DependsOnStageName in ('Cleansed') +BEGIN + SELECT @PipelineIdResult = PipelineId + FROM @DependenciesStagingTable + INNER JOIN ingest.Datasets AS d + ON ParameterValue = d.DatasetId + WHERE ParameterValue = @DependsOnDatasetId + AND StageId = @DependsOnStageId +END + + +INSERT INTO @Dependencies (PipelineId,DependantPipelineId) +SELECT @PipelineIdResult, @DependantPipelineIdResult + +IF @PipelineIdResult IS NULL AND @DependantPipelineIdResult IS NULL +BEGIN + RAISERROR('Missing Pipeline Ids for both Transform Dataset and the Dataset it depends on (Cleansed Merge Pipeline or Transform Dataset Pipeline)',16,1) + RETURN 0; +END +ELSE IF @PipelineIdResult IS NULL AND @DependantPipelineIdResult IS NOT NULL +BEGIN + RAISERROR('Missing PipelineId ',16,1) + RETURN 0; +END +ELSE IF @PipelineIdResult IS NOT NULL AND @DependantPipelineIdResult IS NULL +BEGIN + RAISERROR('Missing DependantPipelineId (Cleansed Merge Pipeline or Transform Dataset Pipeline)',16,1) + RETURN 0; +END +ELSE IF @PipelineIdResult IS NOT NULL AND @DependantPipelineIdResult IS NOT NULL +BEGIN + MERGE INTO control.PipelineDependencies AS target + USING @Dependencies AS source + ON target.PipelineId = source.PipelineId + AND target.DependantPipelineId = source.DependantPipelineId + WHEN NOT MATCHED THEN + INSERT (PipelineId, DependantPipelineId) VALUES (source.PipelineId, source.DependantPipelineId); + PRINT 'Dependencies merged into control.PipelineDependencies' +END +ELSE +BEGIN + RAISERROR('Unexpected Error',16,1) + RETURN 0; +END +