Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronization on OutputCollector #7

Open
ponythewhite opened this issue Dec 6, 2013 · 3 comments
Open

Synchronization on OutputCollector #7

ponythewhite opened this issue Dec 6, 2013 · 3 comments

Comments

@ponythewhite
Copy link

Hello,

Synchronization on OutputCollector is required, e.g.:

synchronized (collector){
    collector.ack(tuple);
}

and

synchronized (collector){
    collector.emit(finalEventType.getStreamId(), toTuple(newEvent, finalEventType.getFields()));
}

Otherwise there are exceptions of this kind:

java.lang.RuntimeException: java.lang.NullPointerException
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
    at backtype.storm.disruptor$consume_loop_STAR_$fn__1596.invoke(disruptor.clj:67)
    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
    at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
    at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126$fn__4130.invoke(worker.clj:99)
    at backtype.storm.util$fast_list_map.invoke(util.clj:771)
    at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126.invoke(worker.clj:99)
    at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3904.invoke(executor.clj:205)
    at backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
    ... 6 more
@tomdz
Copy link
Owner

tomdz commented Dec 6, 2013

Could you post the code that lead to this error ?

@ponythewhite
Copy link
Author

Sure, but one thing first:
I'm using the project with Storm 0.8.2 & Esper 4.10.0 - that might cause different behavior - I'm not sure.

I'm including the code for construction of the failing Bolt. Only this bolt experienced the exception.

        final EsperBolt.StatementsBuilder statBu = inBu
                .outputs()
                .onStream("alphaGroupBetaGroupGammaStream")
                .fromEventType("AlphaGroupBetaGroupGamma")
                .emit("gamma", "alphaGroup", "betaGroup", "vol", "sigmaAmount",
                        "deltaAmount", "calculatedValue", "sideValue")
                .outputs()
                .onStream("alphaGroupBetaGroupGammaRedisStream")
                .fromEventType("AlphaGroupBetaGroupGammaRedis")
                .emit("gamma", "alphaGroup", "betaGroup", "vol", "sigmaAmount",
                        "deltaAmount", "calculatedValue", "sideValue", "measurementTimestamp", "receivedTimestamp")
                .statements()
                .add("insert into AlphaOccurenceCasted select cast(identifier,String) as identifier, cast(alphaGroup,String) as alphaGroup, cast(betaGroup,String) as betaGroup, cast(beta,String) as beta, "
                        + "cast(server,String) as server, cast(gamma, int) as gamma, cast(vol, double) as vol, cast(minusModifier, double) as minusModifier, "
                        + "cast(plusModifier, double) as plusModifier,  cast(sigmaAmount, double) as sigmaAmount, cast(deltaAmount, double) as deltaAmount, cast(measurementTimestamp, long) as measurementTimestamp, cast(receivedTimestamp, long) as receivedTimestamp from AlphaOccurence")

                .add("insert into BetaAspectCasted select cast(betaGroup,String) as betaGroup, "
                        + "cast(plusMultiplier, double) as plusMultiplier, "
                        + "cast(minusMultiplier, double) as minusMultiplier, cast(theta_tickvalue,double) as theta, cast(kappa_tickvalue,double) as kappa from BetaAspect group by betaGroup output last every 1 sec")
                .add("insert into DeleteStream select * from AlphaOccurenceCasted where identifier='-9999'")
                .add("create window AlphaOccurenceWindow.std:unique(identifier, beta, server, gamma) as select * from AlphaOccurenceCasted")
                .add("insert into AlphaOccurenceWindow select * from AlphaOccurenceCasted where identifier!='-9999'")
                .add("on DeleteStream as del update AlphaOccurenceWindow as win set vol = 0, minusModifier=0, plusModifier=0 where win.server=del.server and win.gamma=del.gamma")
                .add("on DeleteStream as del insert into aux"
                        + " select win.identifier           as identifier, "
                        + "      win.beta                   as beta, "
                        + "      win.betaGroup              as betaGroup, "
                        + "      win.alphaGroup             as alphaGroup, "
                        + "      win.server                 as server, "
                        + "      win.gamma                  as gamma, "
                        + "      win.vol                    as vol, "
                        + "      win.minusModifier          as minusModifier, "
                        + "      win.plusModifier           as plusModifier,"
                        + "      win.sigmaAmount            as sigmaAmount,"
                        + "      win.deltaAmount            as deltaAmount,"
                        + "      win.measurementTimestamp   as measurementTimestamp,"
                        + "      win.receivedTimestamp      as receivedTimestamp "
                        + " from AlphaOccurenceWindow as win"
                        + " where win.server=del.server and win.gamma=del.gamma")
                .add("insert into aux select * from AlphaOccurenceWindow")
                .add("insert into AlphaGroupBetaGroupGamma "
                        + " select  occurence.betaGroup                     as betaGroup, "
                        + "         occurence.gamma                         as gamma, "
                        + "         occurence.alphaGroup                    as alphaGroup, "
                        + "         sum(occurence.vol)                      as vol,"
                        + "         sum(occurence.sigmaAmount)              as sigmaAmount,"
                        + "         sum(occurence.deltaAmount)              as deltaAmount,"
                        + " (-1)*( (case when (sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) > 0 then ((sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) * aspect.plusMultiplier) else ((sum (case when (occurence.vol >= 0) then (occurence.vol * aspect.kappa) else (occurence.vol * aspect.theta) end) - sum(occurence.minusModifier)) * aspect.minusMultiplier) end) + sum(occurence.plusModifier) ) as calculatedValue, "
                        + " (-1) * sum(occurence.plusModifier)              as sideValue, "
                        + "         max(occurence.measurementTimestamp)     as measurementTimestamp, "
                        + "         max(occurence.receivedTimestamp)        as receivedTimestamp "
                        + " from aux.std:unique(identifier, alphaGroup, beta, server, gamma) as occurence,"
                        + " BetaAspectCasted.std:unique(betaGroup) as aspect"
                        // last entry per beta
                        + " where occurence.betaGroup=aspect.betaGroup"
                        + " group by occurence.alphaGroup, occurence.betaGroup, occurence.gamma ")
                .add("insert into AlphaGroupBetaGroupGammaRedis select * from AlphaGroupBetaGroupGamma where alphaGroup <> '-7777'");

It basically is a simple join of streams: AlphaOccurrence & BetaAspect.

The only difference from other bolts is the creation of "aux" window & updating the entries on DeleteStream.
The idea is that when event with identifier "-9999" is encountered, then all existing entries should be updated with zeros & published containing them.

This bolt also experiences the highest load.

Best,

Jacek

@tomdz
Copy link
Owner

tomdz commented Dec 12, 2013

The reason I was asking for the code is that the error indicates concurrent access to the output collector which shouldn't happen unless it's coming from Esper, and with the code I can maybe create a test case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants