Skip to content
This repository has been archived by the owner on Apr 13, 2024. It is now read-only.

Commit

Permalink
Merge pull request #246 from ramindu90/master
Browse files Browse the repository at this point in the history
changes to custom xml sample
  • Loading branch information
dnwick authored Jul 6, 2017
2 parents 96a61a2 + 754b620 commit c995800
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 9 deletions.
2 changes: 1 addition & 1 deletion modules/samples/artifacts/0001/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ Kafka libs to be added and converted to OSGI from {KafkaHome}/libs are as follow

6. Navigate to {WSO2DASHome}/bin and start using ./carbon.sh
7. Navigate to {WSO2DASHome}/samples/sample-clients/kafka-consumer and run ant command without arguments
8. Navigate to {WSO2DASHome}/samples/sample-clients/kafka-producer and run ant command without arguments
8. Navigate to {WSO2DASHome}/samples/sample-clients/kafka-producer and run ant -Dbroker=localhost:9092 -DtopicName=kafka_topic -Dtype=xxxx -DsampleNo=0001 -DfileName=kafka_sample -DpartitionNo=0

Published values should be printed on the kafka-consumer console.
2 changes: 1 addition & 1 deletion modules/samples/artifacts/0001/kafka-sample.siddhi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
@source(type='kafka', topic.list='kafka_topic', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='localhost:9092', @map(type='json'))
define stream StockStream(symbol string, price float, volume long);

@sink(type='kafka', topic='kafka_result_topic', bootstrap.servers='localhost:9092', partition.no='0', @map(type='json'))
@sink(type='kafka', topic='kafka_result_topic', bootstrap.servers='localhost:9092', partition.no='0', @map(type='xml'))
define stream ResultStream(symbol string, price float, totalVolume long);

from StockStream#window.length(5)
Expand Down
2 changes: 1 addition & 1 deletion modules/samples/artifacts/0005/xml-default-sample.siddhi
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@App:name('xml-default-sample')

@source(type='kafka', topic='kafka_topic', partition.no.list='0', threading.option='single.thread', group.id="group",
@source(type='kafka', topic.list='kafka_topic', partition.no.list='0', threading.option='single.thread', group.id="group",
bootstrap.servers='localhost:9092', @map(type='xml'))
define stream StockStream(symbol string, price float, volume long);

Expand Down
2 changes: 1 addition & 1 deletion modules/samples/artifacts/0006/xml-custom-sample.siddhi
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@App:name('xml-custom-sample')

@source(type='kafka', topic='kafka_topic', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='localhost:9092', @map(type='xml', namespaces = "dt=urn:schemas-microsoft-com:datatypes", enclosing.element="//portfolio", @attributes(symbol ="symbol", price = "price", volume = "volume")))
@source(type='kafka', topic.list='kafka_topic', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='localhost:9092', @map(type='xml', namespaces = "dt=urn:schemas-microsoft-com:datatypes", enclosing.element="//portfolio", @attributes(symbol ="symbol", price = "price", volume = "volume")))
define stream StockStream(symbol string, price float, volume long);

@sink(type='kafka', topic='kafka_result_topic', bootstrap.servers='localhost:9092', partition.no='0', @map(type='xml', @payload("<StockData><Symbol>{{symbol}}</Symbol><Price>{{price}}</Price></StockData>")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,20 @@ public static void main(String[] args) {
} else {
for (int i = 0; i < 100; i++) {
if ("json".equals(type)) {
producer.send(new ProducerRecord<>("kafka_topic", "{\"event\": {\"symbol\": \"wso2symbol\", "
producer.send(new ProducerRecord<>(topicName, "{\"event\": {\"symbol\": \"wso2symbol\", "
+ "\"price\":123.123, \"volume\":100}}"));
producer.send(new ProducerRecord<>("kafka_topic", "{\"event\": {\"symbol\": \"wso2symbol\", "
producer.send(new ProducerRecord<>(topicName, "{\"event\": {\"symbol\": \"wso2symbol\", "
+ "\"price\":123.123, \"volume\":200}}"));
} else if ("xmlDefault".equals(type)) {
producer.send(new ProducerRecord<>("kafka_topic", "<events><event><symbol>WSO2" +
producer.send(new ProducerRecord<>(topicName, "<events><event><symbol>WSO2" +
"</symbol><price>55.689</price>" +
"<volume>100</volume></event></events>"));
producer.send(new ProducerRecord<>("kafka_topic", "<events><event><symbol>IBM" +
producer.send(new ProducerRecord<>(topicName, "<events><event><symbol>IBM" +
"</symbol><price>75</price>" +
"<volume>10</volume></event></events>"));
log.info("Sending message on topic: " + topicName);
} else if ("xmlCustom".equals(type)) {
producer.send(new ProducerRecord<>("kafka_topic", "<portfolio " +
producer.send(new ProducerRecord<>(topicName, "<portfolio " +
"xmlns:dt=\"urn:schemas-microsoft-com:datatypes\">" +
" <stock exchange=\"nasdaq\">" +
" <volume>100</volume>" +
Expand All @@ -122,6 +123,7 @@ public static void main(String[] args) {
" <price dt:dt=\"number\">75.6</price>" +
" </stock>" +
"</portfolio>"));
log.info("Sending message on topic: " + topicName);
}
}
}
Expand Down

0 comments on commit c995800

Please sign in to comment.