From 67b59046eccd64916a142d16c8bddaa764504d12 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 5 Apr 2024 12:59:18 -0400 Subject: [PATCH] add BigTable example --- pom.xml | 2 +- solace-apache-beam-samples/pom.xml | 42 +++--- .../beam/examples/SolaceBeamBigTable.java | 129 +++++++++--------- 3 files changed, 92 insertions(+), 81 deletions(-) diff --git a/pom.xml b/pom.xml index 762cc41..f637ab5 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ SolaceProducts - 2.35.0 + 2.55.0 10.13.0 1.7.25 6.37.0 diff --git a/solace-apache-beam-samples/pom.xml b/solace-apache-beam-samples/pom.xml index 14041cf..edc0655 100644 --- a/solace-apache-beam-samples/pom.xml +++ b/solace-apache-beam-samples/pom.xml @@ -30,8 +30,8 @@ Samples for the Apache Beam I/O Component for Solace PubSub+ - 2.35.0 - 1.3.0-SNAPSHOT + 2.55.0 + 1.2.0 1.6.0 3.0.2 @@ -61,19 +61,6 @@ beam-sdks-java-io-solace ${solace-beam.version} - - - com.google.cloud.bigtable - bigtable-hbase-beam - 2.12.0 - - - - com.google.cloud - google-cloud-bigtable - 2.37.0 - - @@ -122,6 +109,18 @@ + + com.google.cloud.bigtable + bigtable-hbase-beam + 2.12.0 + + + + com.google.cloud + google-cloud-bigtable + 2.37.0 + + com.solace.test.integration pubsubplus-testcontainer @@ -146,6 +145,17 @@ 3.12.0 test + + org.junit.jupiter + junit-jupiter-api + test + + + org.apache.beam + beam-vendor-guava-26_0-jre + 0.1 + compile + @@ -159,7 +169,6 @@ org.apache.beam beam-runners-direct-java - import @@ -171,7 +180,6 @@ org.apache.beam beam-runners-google-cloud-dataflow-java - import diff --git a/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java b/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java index 7870c45..7a5f380 100644 --- a/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java +++ b/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java @@ -2,9 +2,9 @@ import com.google.cloud.bigtable.beam.CloudBigtableIO; import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration; -import com.google.cloud.bigtable.hbase.BigtableOptionsFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -43,63 +43,7 @@ public class SolaceBeamBigTable { private static final Logger LOG = LoggerFactory.getLogger(SolaceRecordTest.class); - public static void main(String[] args) { - - BigtableOptions options = - PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class); - - List queues = Arrays.asList(options.getSql().split(",")); - boolean useSenderMsgId = options.getSmi(); - - /** Create pipeline **/ - Pipeline p = Pipeline.create(options); - - /** Set Solace connection properties **/ - JCSMPProperties jcsmpProperties = new JCSMPProperties(); - jcsmpProperties.setProperty(JCSMPProperties.HOST, options.getCip()); - jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, options.getVpn()); - jcsmpProperties.setProperty(JCSMPProperties.USERNAME, options.getCu()); - jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, options.getCp()); - - /** Create object for BigTable table configuration to be used later to run the pipeline **/ - CloudBigtableTableConfiguration bigtableTableConfig = - new CloudBigtableTableConfiguration.Builder() - .withProjectId(options.getBigtableProjectId()) - .withInstanceId(options.getBigtableInstanceId()) - .withTableId(options.getBigtableTableId()) - .build(); - - /* The pipeline consists of three components: - * 1. Reading message from Solace queue - * 2. Doing any necessary transformation and creating a BigTable row - * 3. Writing the row to BigTable - */ - p.apply(SolaceIO.read(jcsmpProperties, queues, SolaceTextRecord.getCoder(), SolaceTextRecord.getMapper()) - .withUseSenderTimestamp(options.getSts()) - .withAdvanceTimeoutInMillis(options.getTimeout())) - .apply(ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - - String uniqueID = UUID.randomUUID().toString(); - - Put row = new Put(Bytes.toBytes(uniqueID)); - - /** Create row that will be written to BigTable **/ - row.addColumn( - Bytes.toBytes("stats"), - null, - c.element().getPayload().getBytes(StandardCharsets.UTF_8)); - c.output(row); - } - })) - .apply(CloudBigtableIO.writeToTable(bigtableTableConfig)); - - p.run().waitUntilFinish(); - } - - public interface BigtableOptions extends DataflowPipelineOptions { + public interface Options extends DataflowPipelineOptions { @Description("IP and port of the client appliance. (e.g. -cip=192.168.160.101)") String getCip(); @@ -164,16 +108,75 @@ public interface BigtableOptions extends DataflowPipelineOptions { void setBigtableTableId(String bigtableTableId); } - public static CloudBigtableTableConfiguration batchWriteFlowControlExample( - BigtableOptions options) { + private static void WriteToBigTable(Options options) throws Exception { + + List queues = Arrays.asList(options.getSql().split(",")); + boolean useSenderMsgId = options.getSmi(); + + /** Create pipeline **/ + Pipeline pipeline = Pipeline.create(options); + + /** Set Solace connection properties **/ + JCSMPProperties jcsmpProperties = new JCSMPProperties(); + jcsmpProperties.setProperty(JCSMPProperties.HOST, options.getCip()); + jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, options.getVpn()); + jcsmpProperties.setProperty(JCSMPProperties.USERNAME, options.getCu()); + jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, options.getCp()); + + /** Create object for BigTable table configuration to be used later to run the pipeline **/ CloudBigtableTableConfiguration bigtableTableConfig = new CloudBigtableTableConfiguration.Builder() .withProjectId(options.getBigtableProjectId()) .withInstanceId(options.getBigtableInstanceId()) .withTableId(options.getBigtableTableId()) - .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL, - "true") .build(); - return bigtableTableConfig; + + /* The pipeline consists of three components: + * 1. Reading message from Solace queue + * 2. Doing any necessary transformation and creating a BigTable row + * 3. Writing the row to BigTable + */ + pipeline.apply(SolaceIO.read(jcsmpProperties, queues, SolaceTextRecord.getCoder(), SolaceTextRecord.getMapper()) + .withUseSenderTimestamp(options.getSts()) + .withAdvanceTimeoutInMillis(options.getTimeout())) + .apply("Map to BigTable row", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + + String uniqueID = UUID.randomUUID().toString(); + + Put row = new Put(Bytes.toBytes(uniqueID)); + + /** Create row that will be written to BigTable **/ + row.addColumn( + Bytes.toBytes("stats"), + null, + c.element().getPayload().getBytes(StandardCharsets.UTF_8)); + c.output(row); + } + })) + .apply("Write to BigTable", + CloudBigtableIO.writeToTable(bigtableTableConfig)); + + PipelineResult result = pipeline.run(); + + try { + result.waitUntilFinish(); + } catch (Exception exc) { + result.cancel(); + } + + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SolaceBeamBigTable.Options.class); + + try { + WriteToBigTable(options); + } catch (Exception e) { + e.printStackTrace(); + } } } \ No newline at end of file