Skip to content

Commit

Permalink
Merge pull request #117 from CloudFormations:develop_matt
Browse files Browse the repository at this point in the history
Add descriptive view of control pipelines and additional pipeline population functionality
  • Loading branch information
MattPCollins authored Dec 3, 2024
2 parents dd3e0cb + 9034000 commit b45f9dc
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 16 deletions.
8 changes: 4 additions & 4 deletions infrastructure/main.bicep
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ targetScope = 'subscription'
param location string = 'uksouth'
param envName string
param domainName string = 'cfc'
param orgName string = 'debug'
param uniqueIdentifier string = '02'
param orgName string = 'demo'
param uniqueIdentifier string = '01'
param datalakeName string = 'dls' //Storage account name prefix
param functionBlobName string = 'st' //Function app storage name prefix

Expand All @@ -23,14 +23,14 @@ param deploymentTimestamp string = utcNow('yy-MM-dd-HHmm')
param firstDeployment bool = true
param deployADF bool = true
param deployWorkers bool = false
param deployVM bool = false
param deployVM bool = true
param deploySQL bool = true
param deployFunction bool = true
param deployNetworking bool = true
param deployADBWorkspace bool = true
param deployADBCluster bool = false // Controls ADB Cluster creation - TODO
param deployPAT bool = false // - TODO
param setRoleAssignments bool = false
param setRoleAssignments bool = true



Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
CREATE PROCEDURE [ingest].[AddIngestPayloadPipeline] (
CREATE PROCEDURE [common].[AddIngestOrTransformPayloadPipeline] (
@ComponentName VARCHAR(25),
@StageName VARCHAR(25),
@PipelineName VARCHAR(50),
@DatasetDisplayName VARCHAR(50),
@OrchestratorName VARCHAR(50)
) AS


-- defensive check component in ('Ingest', 'Transform')
IF @ComponentName NOT IN ('Ingest', 'Transform')
BEGIN
RAISERROR('This Functionality may only be used for adding Datasets from either Ingest or Transform schemas to the Control Pipeline. If you require different functionality to be added to control.pipelines, please proceed with a manual INSERT statement.',16,1)
RETURN 0;
END

-- defensive check stage exists
DECLARE @StageId INT
DECLARE @StageCount INT

-- defensive check stage exists
SELECT
@StageCount = COUNT(*)
FROM [control].[Stages]
Expand All @@ -33,10 +43,21 @@ WHERE StageName = @StageName
-- defensive checks only 1 dataset id returned
DECLARE @DatasetCount INT

SELECT @DatasetCount = COUNT(*)
FROM [ingest].[Datasets] AS ds
WHERE ds.Enabled = 1
AND ds.DatasetDisplayName = @DatasetDisplayName
IF @ComponentName = 'Ingest'
BEGIN
SELECT @DatasetCount = COUNT(*)
FROM [ingest].[Datasets] AS ids
WHERE ids.Enabled = 1
AND ids.DatasetDisplayName = @DatasetDisplayName
END

IF @ComponentName = 'Transform'
BEGIN
SELECT @DatasetCount = COUNT(*)
FROM [transform].[Datasets] AS tds
WHERE tds.Enabled = 1
AND tds.DatasetName = @DatasetDisplayName
END

IF @DatasetCount = 0
BEGIN
Expand All @@ -56,11 +77,23 @@ DECLARE @Datasets TABLE (
)


INSERT INTO @Datasets
SELECT
DatasetId, Enabled
FROM [ingest].[Datasets]
WHERE DatasetDisplayName = @DatasetDisplayName
IF @ComponentName = 'Ingest'
BEGIN
INSERT INTO @Datasets
SELECT
DatasetId, Enabled
FROM [ingest].[Datasets]
WHERE DatasetDisplayName = @DatasetDisplayName
END

IF @ComponentName = 'Transform'
BEGIN
INSERT INTO @Datasets
SELECT
DatasetId, Enabled
FROM [transform].[Datasets]
WHERE DatasetName = @DatasetDisplayName
END

DECLARE @PipelineId INT

Expand All @@ -81,7 +114,7 @@ WHERE OrchestratorName = @OrchestratorName
IF @OrchestratorId IS NULL
BEGIN
DECLARE @OrchestratorErrorMsg VARCHAR(150)
SET @OrchestratorErrorMsg = 'No Orchestrator Registered to the name ' + @OrchestratorName + '. Please confirm the correct Data Factory name is provided, and exists within this environment.'
SET @OrchestratorErrorMsg = 'No Orchestrator registered to the name ' + @OrchestratorName + '. Please confirm the correct Data Factory name is provided, and exists within this environment.'
RAISERROR(@OrchestratorErrorMsg, 16,1)
RETURN 0;
END
Expand Down
37 changes: 37 additions & 0 deletions src/metadata.control/control/Views/PipelineSummary.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
CREATE VIEW [control].[PipelineSummary] AS
WITH cte AS (
SELECT
CASE
WHEN ids.DatasetDisplayname IS NOT NULL THEN 'Ingest'
WHEN tds.DatasetName IS NOT NULL THEN 'Transform'
ELSE 'Unassigned'
END AS ComponentName,
CASE
WHEN ids.DatasetDisplayname IS NOT NULL THEN ids.DatasetDisplayname
WHEN tds.DatasetName IS NOT NULL THEN tds.DatasetName
ELSE p.PipelineName
END AS DatasetName,
p.PipelineName,
pp.ParameterName,
pp.ParameterValue,
p.PipelineId,
s.StageName,
pd.DependantPipelineId
FROM control.pipelines AS p
INNER JOIN control.pipelineparameters AS pp
ON p.PipelineId = pp.PipelineId
LEFT JOIN control.pipelinedependencies AS pd
ON p.PipelineId = pd.PipelineId
INNER JOIN control.stages AS s
ON p.StageId = s.StageId
LEFT JOIN ingest.Datasets AS ids
ON pp.parametervalue = CAST(ids.datasetid AS VARCHAR(4))
AND p.pipelineName LIKE 'Ingest_PL_%'
LEFT JOIN transform.Datasets as tds
ON pp.parametervalue = CAST(tds.datasetid AS VARCHAR(4))
AND p.pipelineName LIKE 'Transform_PL_%'
)
SELECT cte.*, cte2.DatasetName AS DependsOnDataset, cte2.PipelineName AS DependsOnPipelineName, cte2.PipelineId AS DependsOnPipelineId
FROM cte
LEFT JOIN cte AS cte2
ON cte.PipelineId = cte2.DependantPipelineId
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
CREATE PROCEDURE [transform].[AddTransformPayloadPipelineDependencies] (
@StageName VARCHAR(25),
@DatasetName VARCHAR(50),
@DependsOnStageName VARCHAR(25),
@DependsOnDatasetName VARCHAR(50)
) AS


-- defensive checks stage name in ('Dimensions', 'Facts')
IF @StageName NOT IN ('Dimensions', 'Facts')
BEGIN
RAISERROR('This Functionality may only be used for adding Dimensions or Facts from the Transform schema to the Control Pipeline. If you require different functionality to be added to control.pipelines, please proceed with a manual INSERT statement.',16,1)
RETURN 0;
END

-- defensive checks stage name in ('Dimensions', 'Facts')
IF @DependsOnStageName NOT IN ('Cleansed','Dimensions', 'Facts')
BEGIN
RAISERROR('This Functionality may only be used for adding Dimensions or Facts from the Transform schema to the Control Pipeline. Supported Dependencies include Cleansed, Dimensions and Facts. If you require different functionality to be added to control.pipelines, please proceed with a manual INSERT statement.',16,1)
RETURN 0;
END

-- defensive checks only 1 dataset id returned
DECLARE @DatasetId INT
DECLARE @DatasetCount INT

SELECT @DatasetCount = COUNT(*)
FROM [transform].[Datasets] AS ds
WHERE ds.Enabled = 1
AND ds.DatasetName = @DatasetName

IF @DatasetCount = 0
BEGIN
RAISERROR('No rows returned. Please review the Dataset Id provided and confirm this is enabled.',16,1)
RETURN 0;
END
IF @DatasetCount > 1
BEGIN
RAISERROR('More than 1 row returned. Please review there is 1 active Dataset for the provided Dataset Id.',16,1)
RETURN 0;
END

-- defensive checks only 1 dataset id returned for dependency
DECLARE @DependsOnDatasetId INT
DECLARE @DependsOnDatasetCount INT

IF @DependsOnStageName in ('Dimensions', 'Facts')
BEGIN
SELECT @DependsOnDatasetCount = COUNT(*)
FROM [transform].[Datasets] AS ds
WHERE ds.Enabled = 1
AND ds.DatasetName = @DependsOnDatasetName
END
IF @DependsOnStageName in ('Cleansed')
BEGIN
SELECT @DependsOnDatasetCount = COUNT(*)
FROM [ingest].[Datasets] AS ds
WHERE ds.Enabled = 1
AND ds.DatasetDisplayName = @DependsOnDatasetName
END

IF @DependsOnDatasetCount = 0
BEGIN
RAISERROR('No rows returned. Please review the Dataset Id provided and confirm this is enabled.',16,1)
RETURN 0;
END
IF @DependsOnDatasetCount > 1
BEGIN
RAISERROR('More than 1 row returned. Please review there is 1 active Dataset for the provided Dataset Id.',16,1)
RETURN 0;
END


-- Get Dataset Id
SELECT
@DatasetId = DatasetId
FROM [transform].[Datasets]
WHERE DatasetName = @DatasetName
AND Enabled = 1


-- Get Depends On Dataset Id
IF @DependsOnStageName in ('Dimensions', 'Facts')
BEGIN
SELECT @DependsOnDatasetId = DatasetId
FROM [transform].[Datasets] AS ds
WHERE ds.Enabled = 1
AND ds.DatasetName = @DependsOnDatasetName
END
IF @DependsOnStageName in ('Cleansed')
BEGIN
SELECT @DependsOnDatasetId = DatasetId
FROM [ingest].[Datasets] AS ds
WHERE ds.Enabled = 1
AND ds.DatasetDisplayName = @DependsOnDatasetName
END

-- Get Stage Id
DECLARE @StageId INT
SELECT @StageId = StageId
FROM [control].[Stages]
WHERE StageName = @StageName

-- Get Depends On Stage Id
DECLARE @DependsOnStageId INT
SELECT @DependsOnStageId = StageId
FROM [control].[Stages]
WHERE StageName = @DependsOnStageName


DECLARE @Dependencies TABLE (
PipelineId INT, -- Raw
DependantPipelineId INT -- Cleansed
)

DECLARE @DependenciesStagingTable TABLE (
PipelineId INT,
StageId INT,
ParameterValue INT
)
DECLARE @PipelineIdResult INT;
DECLARE @DependantPipelineIdResult INT;

INSERT INTO @DependenciesStagingTable (PipelineId,StageId,ParameterValue)
SELECT
p.PipelineId, p.StageId, CAST(pp.ParameterValue AS INT) AS ParameterValue --,*
FROM control.Pipelines AS p
INNER JOIN control.PipelineParameters AS pp
ON p.PipelineId = pp.PipelineId
--WHERE p.PipelineName LIKE 'Transform_PL_%'


SELECT @DependantPipelineIdResult = PipelineId
FROM @DependenciesStagingTable
INNER JOIN transform.Datasets AS d
ON ParameterValue = d.DatasetId
WHERE ParameterValue = @DatasetId
AND StageId = @StageId

-- Get Dependency Dataset Id
IF @DependsOnStageName in ('Dimensions', 'Facts')
BEGIN
SELECT @PipelineIdResult = PipelineId
FROM @DependenciesStagingTable
INNER JOIN transform.Datasets AS d
ON ParameterValue = d.DatasetId
WHERE ParameterValue = @DependsOnDatasetId
AND StageId = @DependsOnStageId
END
IF @DependsOnStageName in ('Cleansed')
BEGIN
SELECT @PipelineIdResult = PipelineId
FROM @DependenciesStagingTable
INNER JOIN ingest.Datasets AS d
ON ParameterValue = d.DatasetId
WHERE ParameterValue = @DependsOnDatasetId
AND StageId = @DependsOnStageId
END


INSERT INTO @Dependencies (PipelineId,DependantPipelineId)
SELECT @PipelineIdResult, @DependantPipelineIdResult

IF @PipelineIdResult IS NULL AND @DependantPipelineIdResult IS NULL
BEGIN
RAISERROR('Missing Pipeline Ids for both Transform Dataset and the Dataset it depends on (Cleansed Merge Pipeline or Transform Dataset Pipeline)',16,1)
RETURN 0;
END
ELSE IF @PipelineIdResult IS NULL AND @DependantPipelineIdResult IS NOT NULL
BEGIN
RAISERROR('Missing PipelineId ',16,1)
RETURN 0;
END
ELSE IF @PipelineIdResult IS NOT NULL AND @DependantPipelineIdResult IS NULL
BEGIN
RAISERROR('Missing DependantPipelineId (Cleansed Merge Pipeline or Transform Dataset Pipeline)',16,1)
RETURN 0;
END
ELSE IF @PipelineIdResult IS NOT NULL AND @DependantPipelineIdResult IS NOT NULL
BEGIN
MERGE INTO control.PipelineDependencies AS target
USING @Dependencies AS source
ON target.PipelineId = source.PipelineId
AND target.DependantPipelineId = source.DependantPipelineId
WHEN NOT MATCHED THEN
INSERT (PipelineId, DependantPipelineId) VALUES (source.PipelineId, source.DependantPipelineId);
PRINT 'Dependencies merged into control.PipelineDependencies'
END
ELSE
BEGIN
RAISERROR('Unexpected Error',16,1)
RETURN 0;
END

0 comments on commit b45f9dc

Please sign in to comment.