Skip to content

Commit

Permalink
docs: Add ingestion from Kafka samples and tests (#2009)
Browse files Browse the repository at this point in the history
* docs: Add ingestion from Kafka samples and tests

* chore: Fix formatting in samples
  • Loading branch information
michaelpri10 authored Jan 31, 2025
1 parent 506301b commit c1bed9e
Show file tree
Hide file tree
Showing 7 changed files with 624 additions and 0 deletions.
90 changes: 90 additions & 0 deletions samples/createTopicWithAwsMskIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With AWS MSK Ingestion
// description: Creates a new topic, with AWS MSK ingestion enabled.
// usage: node createTopicWithAwsMskIngestion.js <topic-name> <cluster-arn> <msk-topic> <role-arn> <gcp-service-account>

// [START pubsub_create_topic_with_aws_msk_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const clusterArn = 'arn:aws:kafka:...';
// const mskTopic = 'YOUR_MSK_TOPIC';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithAwsMskIngestion(
topicNameOrId,
clusterArn,
mskTopic,
awsRoleArn,
gcpServiceAccount
) {
// Creates a new topic with AWS MSK ingestion.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
awsMsk: {
clusterArn,
topic: mskTopic,
awsRoleArn,
gcpServiceAccount,
},
},
});
console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
}
// [END pubsub_create_topic_with_aws_msk_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
clusterArn = 'arn:aws:kafka:...',
mskTopic = 'YOUR_MSK_TOPIC',
roleArn = 'arn:aws:iam:...',
gcpServiceAccount = 'ingestion-account@...'
) {
createTopicWithAwsMskIngestion(
topicNameOrId,
clusterArn,
mskTopic,
roleArn,
gcpServiceAccount
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
107 changes: 107 additions & 0 deletions samples/createTopicWithAzureEventHubsIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Azure Event Hubs Ingestion
// description: Creates a new topic, with Azure Event Hubs ingestion enabled.
// usage: node createTopicWithAzureEventHubsIngestion.js <topic-name> <cluster-arn> <msk-topic> <role-arn> <gcp-service-account>

// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const resourceGroup = 'YOUR_RESOURCE_GROUP';
// const namespace = 'YOUR_NAMESPACE';
// const eventHub = 'YOUR_EVENT_HUB';
// const clientId = 'YOUR_CLIENT_ID';
// const tenantId = 'YOUR_TENANT_ID';
// const subscriptionId = 'YOUR_SUBSCRIPTION_ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithAzureEventHubsIngestion(
topicNameOrId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount
) {
// Creates a new topic with Azure Event Hubs ingestion.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
azureEventHubs: {
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount,
},
},
});
console.log(
`Topic ${topicNameOrId} created with Azure Event Hubs ingestion.`
);
}
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
resourceGroup = 'YOUR_RESOURCE_GROUP',
namespace = 'YOUR_NAMESPACE',
eventHub = 'YOUR_EVENT_HUB',
clientId = 'YOUR_CLIENT_ID',
tenantId = 'YOUR_TENANT_ID',
subscriptionId = 'YOUR_SUBSCRIPTION_ID',
gcpServiceAccount = 'ingestion-account@...'
) {
createTopicWithAzureEventHubsIngestion(
topicNameOrId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
95 changes: 95 additions & 0 deletions samples/createTopicWithConfluentCloudIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Confluent Cloud Ingestion
// description: Creates a new topic, with Confluent Cloud ingestion enabled.
// usage: node createTopicWithConfluentCloudIngestion.js <topic-name> <bootstrap-server> <cluster-id> <confluent-topic> <identity-pool-id> <gcp-service-account>

// [START pubsub_create_topic_with_confluent_cloud_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
topicNameOrId,
bootstrapServer,
clusterId,
confluentTopic,
identityPoolId,
gcpServiceAccount
) {
// Creates a new topic with Confluent Cloud ingestion.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
confluentCloud: {
bootstrapServer,
clusterId,
topic: confluentTopic,
identityPoolId,
gcpServiceAccount,
},
},
});
console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}
// [END pubsub_create_topic_with_confluent_cloud_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
bootstrapServer = 'url:port',
clusterId = 'YOUR_CLUSTER_ID',
confluentTopic = 'YOUR_CONFLUENT_TOPIC',
identityPoolId = 'pool-ID',
gcpServiceAccount = 'ingestion-account@...'
) {
createTopicWithConfluentCloudIngestion(
topicNameOrId,
bootstrapServer,
clusterId,
confluentTopic,
identityPoolId,
gcpServiceAccount
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
52 changes: 52 additions & 0 deletions samples/system-test/topics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,58 @@ describe('topics', () => {
}
});

it('should create a topic with aws msk ingestion', async () => {
const testId = 'create-aws-msk-ingestion';
const name = topicName(testId);
const clusterArn = 'arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1';
const mskTopic = 'fake-msk-topic';
const roleArn = 'arn:aws:iam::111111111111:role/fake-role-name';
const gcpServiceAccount = '[email protected]'

const output = execSync(
`${commandFor('createTopicWithAwsMskIngestion')} ${name} ${clusterArn} ${mskTopic} ${roleArn} ${gcpServiceAccount}`);
assert.include(output, `Topic ${name} created with AWS MSK ingestion.`)
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
});

it('should create a topic with confluent cloud ingestion', async () => {
const testId = 'create-confluent-cloud-ingestion';
const name = topicName(testId);
const bootstrapServer = 'fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092';
const clusterId = 'fake-cluster-id';
const confluentTopic = 'fake-confluent-topic';
const identityPoolId = 'fake-pool-id';
const gcpServiceAccount = '[email protected]'

const output = execSync(
`${commandFor('createTopicWithConfluentCloudIngestion')} ${name} ${bootstrapServer} ${clusterId} ${confluentTopic} ${identityPoolId} ${gcpServiceAccount}`);
assert.include(output, `Topic ${name} created with Confluent Cloud ingestion.`)
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
});

it('should create a topic with azure event hubs ingestion', async () => {
const testId = 'create-azure-event-hubs-ingestion';
const name = topicName(testId);
const resourceGroup = 'fake-resource-group';
const namespace = 'fake-namespace';
const eventHub = 'fake-event-hub';
const clientId = 'fake-client-id';
const tenantId = 'fake-tenant-id';
const subscriptionId = 'fake-subscription-id';
const gcpServiceAccount = '[email protected]'

const output = execSync(
`${commandFor('createTopicWithAzureEventHubsIngestion')} ${name} ${resourceGroup} ${namespace} ${eventHub} ${clientId} ${tenantId} ${subscriptionId} ${gcpServiceAccount}`);
assert.include(output, `Topic ${name} created with Azure Event Hubs ingestion.`)
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
});

it('should update a topic with kinesis integration', async () => {
const pair = await createPair('update-kinesis');
const output = execSync(
Expand Down
Loading

0 comments on commit c1bed9e

Please sign in to comment.