forked from vert-x3/vertx-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FeedExample.java
77 lines (59 loc) · 2.44 KB
/
FeedExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package io.vertx.example.camel.feed;
import io.vertx.camel.CamelBridge;
import io.vertx.camel.CamelBridgeOptions;
import io.vertx.core.Future;
import io.vertx.core.VerticleBase;
import io.vertx.core.Vertx;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.SimpleRegistry;
import java.util.Collections;
import static io.vertx.camel.InboundMapping.fromCamel;
/**
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class FeedExample extends VerticleBase {
private final static String VERTX_BLOG_ATOM = "http://vertx.io/feed.xml";
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(FeedExample.class.getName());
}
@Override
public Future<?> start() throws Exception {
vertx.eventBus().consumer("announce", message -> {
System.out.println("ANNOUNCE >> " + message.body());
});
vertx.eventBus().consumer("errors", message -> {
System.out.println("ERROR >> " + message.body());
});
SimpleRegistry registry = new SimpleRegistry();
registry.put("filterService", Collections.singletonMap(ReleasePostFilter.class, new ReleasePostFilter()));
CamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(createMyRoutes());
camelContext.start();
return CamelBridge.create(vertx, new CamelBridgeOptions(camelContext)
.addInboundMapping(fromCamel("seda:announce").toVertx("announce"))
.addInboundMapping(fromCamel("seda:errors").toVertx("errors")))
.start();
}
private RouteBuilder createMyRoutes() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
errorHandler(deadLetterChannel("seda:errors"));
// We pool the atom feeds from the source for further processing in the seda queue
// we set the delay to 1 second for each pool.
// Using splitEntries=true will during polling only fetch one RSS Entry at any given time.
from("rss:" + VERTX_BLOG_ATOM +
"?splitEntries=true&consumer.delay=100").to("seda:feeds");
from("seda:feeds")
// Filter
.filter().method("filterService", "isRelease")
// Transform (extract)
.transform(simple("${body.entries[0].title}"))
// Output
.to("seda:announce");
}
};
}
}