Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FilteredStream gets "stale" after a certain period of time #37

Closed
rzo1 opened this issue Jul 15, 2022 · 25 comments
Closed

FilteredStream gets "stale" after a certain period of time #37

rzo1 opened this issue Jul 15, 2022 · 25 comments

Comments

@rzo1
Copy link

rzo1 commented Jul 15, 2022

Hi all,

I am migrating our code from twitter4j to this v2 capable library, so I am more than happy, that a official sdk is provided via GitHub!

I am using a slightly modified version of the Streamlistener example, i.e. it uses a fixed thread pool, has multiple consumers and can handle disconnects of the client as well as rate limiting.

However, I am encountering some weird behaviour, which also happens if I am using the streaming example without my modifications.

My setup is as follows:

  • I have a Java application running on Ubuntu linux with an OpenJDK 17 from the package manager
  • I am using a normal developer account (no academic access level or sth. else)
  • I am using the filtered streaming endpoint.
  • I am creating two rules: (a) follows some trending hashtags, and (b) follows some dedicated news outlets.

This setup works quite well. I am receiving lots of tweets, which are subsequently processed by my system.

However, after a certain period of time (some hours), the stream becomes "silent". I do not receive any new tweets and no exception is thrown.

If I am activley restarting the application or conduct an automatic reconnect, the tweets start flowing again.

According to this article from the documentation, disconnects may occur. However, I would expect an exception in case a disconnect happens, so the application can handle it in a nice way, i.e. conduct a reconnect.

Basically, I am wondering, if I am hitting

Full Buffer: Your App is not reading the data fast enough, or a network bottleneck is slowing data flow.

which leads silently (?) to the behaviour described above?

Any ideas / hints or pointers to related documentation are more than welcome.

Thanks in advance!

@yssoe
Copy link

yssoe commented Aug 1, 2022

I have the same problem, I upgraded to version 2.0.2 and noticed that on Friday after 6 hours I stopped receiving tweets, there were no errors, after reconnecting I started to receive tweets again.

@zacmos
Copy link
Collaborator

zacmos commented Aug 4, 2022

I tried to reproduce this issue but didn't manage to do so,
In my case it failed with the exception okhttp3.internal.http2.StreamResetException: stream was reset: CANCEL
I didn't manage to get it stuck without streaming the tweets.

Do you know maybe if it stuck on the reader.readLine(); line?

It looks like the process should identify these "timeout" cases and once it happens you need to reconnect.
Please check this branch, it includes few changes:

  1. StreamTimeoutChecker - checks if there is streaming timeout, if there is it calls shutdown.
  2. In order to avoid a bottleneck due to slow data flow, TweetsQueuer sets the tweets Json strings into the tweetsQueue and TweetsListenersExecutor creates the tweets objects.
  3. The main class is checking for an error in TweetsStreamListenersExecutor

What do you think about this proposed direction?

@andypiper
Copy link

Hi @rzo1 @yssoe - if you're able to take a look into the suggestions @zacmos has made, it would be great to know whether those updates help, or whether you're able to see if the code is getting stuck elsewhere.

Additionally as we are working on reproducing and understanding what you're seeing, it would be helpful to know a bit more about your setup:

  • where is your code running (may help to understand any kind of localised network changes related to the API)
  • what rules you have set (may help to identify anything unusual that could trip some kind of edge case, if this exists; are both of you experiencing issues with similar filter rule patterns, or completely different things)

Keep us updated, thanks!

@rzo1
Copy link
Author

rzo1 commented Aug 4, 2022

Thanks for your replies @andypiper and @zacmos - I will look at the proposed changes and will give an update.

I will also add details regarding the server environment and the rules, I used for load testing our integration.

If it is of interest, I can also share our modified version of the code (one reader, multiple consumers) + guarding and exception handling.

I might find some time tomorrow evening to test the changes and run a fresh experiment.

Big thanks for looking into it!

@yssoe
Copy link

yssoe commented Aug 5, 2022

@andypiper @zacmos Thanks for helping us out here,

My code is running locally on my desktop in a JavaFX program with the below code in it's own thread.

This is how I use it:

"-is:retweet -is:reply (from:XXXXXXX OR from:XXXXXXX OR from:XXXXXXX OR from:XXXXXXX OR from:XXXXXXX OR from:XXXXXXX OR from:XXXXXXX OR from:XXXXXXX)"

so 2 issues:

  1. I never receive tweets from the 4th tweeter i subscribe to in the above list
  2. after a few hours (between 3 and 6 hours) tweets stop rolling in without any errors logged, reconnecting fixes it

and for completeness this is my retrieve code:

public void getTweets() {
        try {
            InputStream streamResults = apiInstance.tweets().searchStream()
                    .backfillMinutes(0)
                    .tweetFields(tweetFields)
                    .expansions(expansions)
                    .userFields(userFields)
                    .execute();

            Responder responder = new Responder();
            tsle = new TweetsStreamListenersExecutor(streamResults);
            tsle.addListener(responder);
            tsle.executeListeners();

        } catch (ApiException e) {
            logger.error("Status code: " + e.getCode());
            logger.error("Reason: " + e.getResponseBody());
            logger.error("Response headers: " + e.getResponseHeaders());
        }

    }

    public void setRules(ArrayList<TwitterFollowing> twitterFollowings) {
        RulesLookupResponse rulesResponse = null;
        try {
            rulesResponse = apiInstance.tweets().getRules().execute();
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
        if (rulesResponse != null) {
            try {
                if (!rulesResponse.getData().isEmpty()) {
                    ArrayList<String> ids = new ArrayList<>();
                    for (Rule rule : rulesResponse.getData()) {
                        ids.add(rule.getId());
                    }
                    deleteRule(ids);
                }
            } catch (Exception e) {
                logger.error(e.getMessage());
            }

        }

        if (twitterFollowings.size() > 0) {
            String ruleValue = generateRuleValue(twitterFollowings);
            if (ruleValue != null) {
                addRule(ruleValue);
            }

        }

openjdk 18 2022-03-22
OpenJDK Runtime Environment (build 18+36-2087)
OpenJDK 64-Bit Server VM (build 18+36-2087, mixed mode, sharing)

@rzo1
Copy link
Author

rzo1 commented Aug 5, 2022

Hi,

as promised here are some more details.

System Environment

We are running an Jakarta EE 8 Enterprise Application with TomEE 8.0.12 on an OpenJDK 17

openjdk version "17.0.4" 2022-07-19
OpenJDK Runtime Environment (build 17.0.4+8-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 17.0.4+8-Ubuntu-120.04, mixed mode, sharing

The container is run on a virtual machine (Ubuntu 20.04 LTS Server) with

Linux 5.4.0-122-generic #138-Ubuntu SMP Wed Jun 22 15:00:31 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

under a German locale

LANG=de_DE.UTF-8
LANGUAGE=
LC_CTYPE="de_DE.UTF-8"
LC_NUMERIC="de_DE.UTF-8"
LC_TIME="de_DE.UTF-8"
LC_COLLATE="de_DE.UTF-8"
LC_MONETARY="de_DE.UTF-8"
LC_MESSAGES="de_DE.UTF-8"
LC_PAPER="de_DE.UTF-8"
LC_NAME="de_DE.UTF-8"
LC_ADDRESS="de_DE.UTF-8"
LC_TELEPHONE="de_DE.UTF-8"
LC_MEASUREMENT="de_DE.UTF-8"
LC_IDENTIFICATION="de_DE.UTF-8"
LC_ALL=

The server is a virtual machine and is hosted in our university data center.

Note: I am well aware of the implications of spawning threads in a Jakarat EE context, so don't start a witch hunt ;)

Rules

We are creating two rules

    1. We look for hashtags
    1. We look for user accounts
{
  "data": [
    {
      "id": "1553796404811890689",
      "value": "(from:Bitkom_Media OR from:Bitkom_Startup OR from:Bitkom_Bildung OR from:SmartCountryCon OR from:Bitkom_Software OR from:Bitkom_Health OR from:Bitkom) -(is:retweet)",
      "tag": "FollowedUser-Query-0"
    },
    {
      "id": "1553796404811890690",
      "value": "#ENGGER OR #WEuro2022Final OR #GERENG OR #WEURO2022 -(is:retweet)",
      "tag": "TrackedTerms-Query-0"
    }
  ],
  "meta": {
    "sent": "2022-08-05T11:06:26.742Z",
    "result_count": 2
  }
}

The real hashtags or twitter accounts do not really matter. It happens regardless of the twitter accounts or hashtags used. For our experiments, we are using trending high volume hashtags.

TweetsQueuer

Our current code is basically a modified version of the original TweetsStreamListenersExecutor.

Basically it looks like

    private class TweetsQueuer implements Runnable {
        private static final org.slf4j.Logger innerLogger = org.slf4j.LoggerFactory.getLogger(TweetsQueuer.class);
        private final TwitterApi client;
        private final int retries;

        public TweetsQueuer(TwitterApi client, int connectionRetries) {
            this.client = client;
            this.retries = connectionRetries;
        }

        @Override
        public void run() {
            queueTweets();
        }

        private void queueTweets() {
            String line;
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(connect()))) {
                while (isRunning.get()) {
                    line = reader.readLine();
                    if (line == null || line.isBlank()) {
                        innerLogger.debug("Waiting to receive a tweet...");
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        continue;
                    }
                    try {
                        innerLogger.debug("Queuing a tweet...");
                        tweetsQueue.put(StreamingTweetResponse.fromJson(line));
                    } catch (Exception e) {
                        innerLogger.warn(e.getLocalizedMessage(), e);
                    }
                }
            } catch (Exception e) {
                innerLogger.warn(e.getLocalizedMessage(), e);
                innerLogger.warn("Lost connection to the Twitter API v2 endpoint due to an unexpected error. Try to reconnect...");
                if(e instanceof SocialServiceException) {
                    try {
                        int waitingTimeInMs = 15*1000;
                        innerLogger.debug("Twitter API V2: Waiting {} seconds before trying to reconnect.", waitingTimeInMs / 1000);
                        Thread.sleep(waitingTimeInMs);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    }
                }
                //restart
                startQueuer();
            }
            watch.countDown();
        }


        private InputStream getConnection() throws ApiException {
            return client.tweets().searchStream()
                    .backfillMinutes(0) //we do not have academic access, so no possibility for backfill, see https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/recovery-and-redundancy-features
                    .expansions(TwitterEntityFields.getExpansions())
                    .mediaFields(TwitterEntityFields.getMediaFields())
                    .placeFields(TwitterEntityFields.getPlaceFields())
                    .userFields(TwitterEntityFields.getUserFields())
                    .tweetFields(TwitterEntityFields.getTweetFields())
                    .execute(retries);

        }

        private InputStream connect() {
            boolean connected = false;
            while (!connected && isRunning.get()) {
                try {
                    InputStream stream = getConnection();
                    connected = true;
                    innerLogger.info("Twitter API V2: Successfully reconnected ...");
                    return stream;
                } catch (Exception e) {
                    long waitingTimeInMs = 30 * 1000;
                    innerLogger.info("Twitter API V2: Reconnect failed due to: '{}'.", e.getLocalizedMessage(), e);
                    innerLogger.info("Twitter API V2: Trying to reconnect...");
                    if (e instanceof ApiException ae) {
                        if (ae.getCode() == 429) {
                            final Map<String, List<String>> headers = ae.getResponseHeaders();
                            //we tried it too often, we will wait 15 min (reset window) before we try to re-connect ;)
                            waitingTimeInMs = 15 * 60 * 1000;
                            //let's check if we can make this windows more accurate!
                            if (headers != null) {
                                // the remaining window before the rate limit resets, in UTC epoch seconds
                                List<String> xRateLimitReset = headers.get("x-rate-limit-reset");
                                if (xRateLimitReset != null) {
                                    if (!xRateLimitReset.isEmpty()) {
                                        try {
                                            final long now = Instant.now(Clock.systemUTC()).getEpochSecond();
                                            final long reset = Long.parseLong(xRateLimitReset.get(0));
                                            final long wait = 1000 * (reset - now);

                                            waitingTimeInMs = wait <= 0 ? 1000 : wait;

                                        } catch (NumberFormatException ignored) {
                                        }
                                    }
                                }
                            }
                        }
                    }
                    try {
                        innerLogger.debug("Twitter API V2: Waiting {} seconds before trying to reconnect.", waitingTimeInMs / 1000);
                        Thread.sleep(waitingTimeInMs);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            throw new SocialServiceException("Could not connect to Twitter API V2");
        }
    }

After some hours I am seeing

2022-08-05 10:50:58,338 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-05 10:50:58,438 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-05 10:50:58,538 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-05 10:50:58,639 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-05 10:50:58,739 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-05 10:50:58,839 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...

which indicates that

 line = reader.readLine();

returns null or an empty String.

From the JavaDoc, I can see that readLine() return NULL

A String containing the contents of the line, not including any line-termination characters, or null if the end of the stream has been reached

if the end of the stream is reached. Shouldn't we do a re-connect in case we reached the end of the stream instead of active waiting?

Nevertheless, I get no exception, which should lead to an automatic re-connect in such a situation.

However, while digging through hundred of debug logs, I found this one:

2022-08-01 15:18:49,213 WARN  [pool-5-thread-1] TwitterStatusListener(44): Caught an error: 'class OperationalDisconnectProblem {
    class Problem {
        detail: This stream has been disconnected for operational reasons.
        status: null
        title: operational-disconnect
        type: https://api.twitter.com/2/problems/operational-disconnect
    }
    disconnectType: OperationalDisconnect
}'.

which correlates with the timestamp with:

2022-08-01 15:18:49,212 DEBUG [pool-5-thread-1] LinkedBlockingTweetQueue(34): Returning Tweet from Queue. Current size: 0
2022-08-01 15:18:49,214 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:49,317 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:49,417 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:49,518 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:49,618 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:49,718 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:49,819 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:49,919 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,019 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,119 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,220 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,321 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,421 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,521 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,621 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,722 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,822 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:50,922 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,023 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,123 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,223 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,324 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,424 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,524 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,624 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,725 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,825 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:51,925 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:52,026 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:52,126 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:52,226 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:52,327 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:52,427 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:52,527 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:52,628 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...
2022-08-01 15:18:52,728 DEBUG [pool-5-thread-11] TweetsStreamListenersExecutor$TweetsQueuer(188): Waiting to receive a tweet...

The related code of the listener is

    public void onStatus(StreamingTweetResponse status) {

        if (status.getErrors() == null) {
            logger.debug("Received a tweet - starting to process in pipeline...");
            twitterStatusHandler.storeStatus(new StatusT4JImpl(status.getData(), status.getIncludes()), getTrackedTerms(), getSystemBlockedTerms());
        } else {
            for (Problem problem : status.getErrors()) {
                logger.warn("Caught an error: '{}'.", problem.toString());
            }
        }

    }

That could be the reason for the issue we are seeing? The stream doesn't get terminated and becomes stale due to an "operational disconnected", which is hidden in a StreamingTweetResponse?

Guess, we need to check the Problem in the Queuer and conduct a reconnect, if needed?

Proposed Code Changes

I had a look at https://github.com/twitterdev/twitter-api-java-sdk/tree/stream-timeout by @zacmos. Overall, it looks good to me.

Moving the JSON parsing away from the queuer is surely a good thing. However, I didn't see any bottlenecks here.
I was able of processing thousands of Tweets without encountering issues with it.

As no exception is raised in the case the stream becomes stale, I am wondering, if we need to check the StreamingTweetResponse for an "operational disconnect", which doesn't raise an exception at SDK level?

Hope it helps & thanks for your time @andypiper and @zacmos !

@andypiper
Copy link

Thanks lots to go through here...

@yssoe "I never receive tweets from the 4th tweeter i subscribe to in the above list"

Is that a protected account? It may be that you can see Tweets from that account yourself if it follows you and you are using the Twitter app or site, but from the perspective of the streaming API, private/protected accounts are not available there. Could that explain this specific case?

Thank you both for sharing your code examples this is super useful in understanding and potentially reproducing. I'm going to step back for @zacmos to comment on the code internals as he is more familiar with the SDK side than I am 🙂 👍🏻

@rzo1
Copy link
Author

rzo1 commented Aug 6, 2022

I slightly modified my code to re-connect in case an operational disconnect is encountered. I forgot to enable detailed log output yesterday, so I will now re-start my experiment and will report back in ~ 24h ;)

@zacmos
Copy link
Collaborator

zacmos commented Aug 7, 2022

@rzo1
It’s really good that you found the exception about “disconnected for operational reasons”, this can be a clue for investigating it on the server side.

The provided solution related to "timeouts" will probably not contribute so much since lines are being streamed (even if they are empty lines). Maybe resetTweetStreamedTime() should consider these empty lines in order to end up with a timeout.

“Guess, we need to check the Problem in the Queuer and conduct a reconnect, if needed?” - Yes, I think so.

Looking forward to hear from you about your results.

By the way regarding to

if (line == null || line.isBlank()) {
                        innerLogger.debug("Waiting to receive a tweet...");
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        continue;
                    }

Shouldn’t you throw InterruptedException here in order to reach the startQueuer()?

@rzo1
Copy link
Author

rzo1 commented Aug 8, 2022

@zacmos

Currently, the stream is still active and the system is processing tweets as expected. Perhaps, there was no "operational disconnect" on the backend side yet to trigger the state.

At the moment, I am doing

  final StreamingTweetResponse str = StreamingTweetResponse.fromJson(line);

                    // https://github.com/twitterdev/twitter-api-java-sdk/issues/37
                    if (str.getErrors() != null) {
                        for (Problem problem : str.getErrors()) {
                            if (problem instanceof OperationalDisconnectProblem) {
                                logger.warn("Experiencing an operational disconnect: {}", problem);
                                //trigger a re-connect
                                throw new RuntimeException(problem.getDetail());
                            }
                        }
                    }

in order to re-connect if an operational disconnect happens. I made some tweaks and will do a re-deploy now. Let's see, if it is related by doing a longer 48-72h run on some trending hashtags.

I hope, that my app does not hit the maximum tweet limit soon.

Maybe resetTweetStreamedTime() should consider these empty lines in order to end up with a timeout.

That might break user expectations. If you are listening to the filtered stream and have "slient" stream rules, i.e. not really active hashtags or users, you will get an timeout, which isn't really expected.

Shouldn’t you throw InterruptedException here in order to reach the startQueuer()?

Yes. I think you are right here. Missed that one! Thx.

Side note

FYI: I found an unrelated exception - maybe we can make the sdk more robust against such things (if it is better trackable, I can open a new issue for it). Nevermind. it is already tracked with #30

```bash 2022-08-08 02:12:15,010 WARN [pool-5-thread-9] TweetsStreamListenersExecutor$TweetsQueuer(214): java.net.MalformedURLException: no protocol: com.google.gson.JsonIOException: java.net.MalformedURLException: no protocol: at com.google.gson.TypeAdapter.fromJsonTree(TypeAdapter.java:287) at com.twitter.clientlib.model.User$CustomTypeAdapterFactory$1.read(User.java:623) at com.twitter.clientlib.model.User$CustomTypeAdapterFactory$1.read(User.java:612) at com.google.gson.TypeAdapter$1.read(TypeAdapter.java:199) at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.read(TypeAdapterRuntimeTypeWrapper.java:41) at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:82) at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:61) at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:130) at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:221) at com.google.gson.TypeAdapter.fromJsonTree(TypeAdapter.java:285) at com.twitter.clientlib.model.Expansions$CustomTypeAdapterFactory$1.read(Expansions.java:459) at com.twitter.clientlib.model.Expansions$CustomTypeAdapterFactory$1.read(Expansions.java:448) at com.google.gson.TypeAdapter$1.read(TypeAdapter.java:199) at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:130) at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:221) at com.google.gson.TypeAdapter.fromJsonTree(TypeAdapter.java:285) at com.twitter.clientlib.model.StreamingTweetResponse$CustomTypeAdapterFactory$1.read(StreamingTweetResponse.java:274) at com.twitter.clientlib.model.StreamingTweetResponse$CustomTypeAdapterFactory$1.read(StreamingTweetResponse.java:263) at com.google.gson.TypeAdapter$1.read(TypeAdapter.java:199) at com.google.gson.Gson.fromJson(Gson.java:991) at com.google.gson.Gson.fromJson(Gson.java:956) at com.google.gson.Gson.fromJson(Gson.java:905) at com.google.gson.Gson.fromJson(Gson.java:876) at com.twitter.clientlib.model.StreamingTweetResponse.fromJson(StreamingTweetResponse.java:289) at de.tweetpoint.service.twitter.streams.TweetsStreamListenersExecutor$TweetsQueuer.queueTweets(TweetsStreamListenersExecutor.java:199) at de.tweetpoint.service.twitter.streams.TweetsStreamListenersExecutor$TweetsQueuer.run(TweetsStreamListenersExecutor.java:180) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.net.MalformedURLException: no protocol: at java.base/java.net.URL.(URL.java:674) at java.base/java.net.URL.(URL.java:569) at java.base/java.net.URL.(URL.java:516) at com.google.gson.internal.bind.TypeAdapters$21.read(TypeAdapters.java:502) at com.google.gson.internal.bind.TypeAdapters$21.read(TypeAdapters.java:494) at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:130) at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:221) at com.google.gson.TypeAdapter.fromJsonTree(TypeAdapter.java:285) ... 30 more ```

@zacmos
Copy link
Collaborator

zacmos commented Aug 8, 2022

Thanks for the update @rzo1 , I think that the fix you described relates to how to handle server maintenance

@yssoe , did you manage to find if you also got OperationalDisconnectProblem?

@yssoe
Copy link

yssoe commented Aug 9, 2022

@zacmos I will modify my code and look for it. Is there no way you guys can monitor this on the server side? I mean this issue is 100% reproducable, just subscribe to a twitter feed like we do, let it run for a few hours and it will stop. Thanks

@zacmos
Copy link
Collaborator

zacmos commented Aug 9, 2022

Please check this link
A reconnect is expected when server is restarted. Most common errors are OperationalDisconnectProblem or ConnectionExceptionProblem

@yssoe , can you please try it after modifying your code? It will be interesting what errors do you get.

@rzo1
Copy link
Author

rzo1 commented Aug 11, 2022

Here is my update after 48h:

The first error happened was a stream reset, which the application successfully survived.

2022-08-10 06:16:25,158 WARN  [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(211): stream was reset: NO_ERROR
okhttp3.internal.http2.StreamResetException: stream was reset: NO_ERROR
	at okhttp3.internal.http2.Http2Stream$FramingSource.read(Http2Stream.kt:355)
	at okhttp3.internal.connection.Exchange$ResponseBodySource.read(Exchange.kt:276)
	at okio.RealBufferedSource.read(RealBufferedSource.kt:189)
	at okio.RealBufferedSource.exhausted(RealBufferedSource.kt:197)
	at okio.InflaterSource.refill(InflaterSource.kt:112)
	at okio.InflaterSource.readOrInflate(InflaterSource.kt:76)
	at okio.InflaterSource.read(InflaterSource.kt:49)
	at okio.GzipSource.read(GzipSource.kt:69)
	at okio.RealBufferedSource$inputStream$1.read(RealBufferedSource.kt:158)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:270)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:313)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:188)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:177)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:162)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:329)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:396)
	at de.tweetpoint.service.twitter.streams.TweetsStreamListenersExecutor$TweetsQueuer.queueTweets(TweetsStreamListenersExecutor.java:188)
	at de.tweetpoint.service.twitter.streams.TweetsStreamListenersExecutor$TweetsQueuer.run(TweetsStreamListenersExecutor.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
2022-08-10 06:16:25,192 WARN  [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(212): Lost connection to the Twitter API v2 endpoint due to an unexpected error. Try to reconnect...

More interesting is this one:

2022-08-10 15:19:21,906 WARN  [pool-5-thread-7] TwitterStatusListener(201): Experiencing an operational disconnect: class OperationalDisconnectProblem {
    class Problem {
        detail: This stream has been disconnected for operational reasons.
        status: null
        title: operational-disconnect
        type: https://api.twitter.com/2/problems/operational-disconnect
    }
    disconnectType: OperationalDisconnect
}
2022-08-10 15:19:21,907 WARN  [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(211): This stream has been disconnected for operational reasons.
java.lang.RuntimeException: This stream has been disconnected for operational reasons.
	at d.t.service.twitter.streams.TweetsStreamListenersExecutor$TweetsQueuer.queueTweets(TweetsStreamListenersExecutor.java:203)
	at d.t.service.twitter.streams.TweetsStreamListenersExecutor$TweetsQueuer.run(TweetsStreamListenersExecutor.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
2022-08-10 15:19:21,908 WARN  [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(212): Lost connection to the Twitter API v2 endpoint due to an unexpected error. Try to reconnect...

After this event and the successful re-connect, the stream become more silent with occasionally retrieving one tweet. This looks like

2022-08-10 19:22:28,765 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:22:48,774 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:23:08,788 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:23:28,795 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:23:48,808 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:24:08,817 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:24:28,825 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:24:48,835 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:25:08,853 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:25:28,854 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:25:48,865 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:26:08,877 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(190): Waiting to receive a tweet...
2022-08-10 19:26:18,967 DEBUG [pool-5-thread-7] TweetsStreamListenersExecutor$TweetsQueuer(194): Queuing a tweet...
2022-08-10 19:26:18,969 DEBUG [pool-5-thread-9] LinkedBlockingTweetQueue(34): Returning Tweet from Queue. Current size: 1

The "funny" thing is, that after manually restarting the application (i.e. refreshing the connection) the tweets starting to flow at high volume again.

The issue seems related to an "operational disconnect" but I am not sure, how we should handle it on the client side if an immediate re-connect isn't sufficient. Perhaps we need to wait a bit before doing a re-connect?

@zacmos
Copy link
Collaborator

zacmos commented Aug 11, 2022

@rzo1 , thank you for the update.
Can you please share your code? Just to follow the code in case of the operational exception.
Regarding waiting before a reconnect, please check this.

@rzo1
Copy link
Author

rzo1 commented Aug 12, 2022

@zacmos Yes. I extracted the related code from our JakartaEE application to a standalone, standard Java project and pushed it here: https://github.com/rzo1/twitter-api-v2-streams

The relevant code regarding the operational disconnect is located here.

Main just contains some glue code to create rules, start it up and wait (forever). Maybe it is possible to reproduce the issue, we are seeing with it...

Regarding waiting before a reconnect, please check this.

Thanks for the docu link. We do not get an HTTP error code or an error response during the re-connect. It just works "fine". So the question would be, if this docu section also applies for a successful re-connect after an operational disconnect?

@zacmos
Copy link
Collaborator

zacmos commented Aug 15, 2022

As far as I checked, you should back off when reconnecting for best practice.
I'll continue checking it.
In addition to this I've updated the new branch

@rzo1
Copy link
Author

rzo1 commented Aug 16, 2022

As far as I checked, you should back off when reconnecting for best practice.

Ok - I will look into it soon. Due to my extensive tests in the last weeks, I exceeded my monthly tweet cap, so I am now forced to pause for 2 weeks. I will then implement backing off before attempting a re-connect.

Or do you or @andypiper have the authority to reset the cap earlier, so I can continue testing?

I'll continue checking it.

Thanks for looking into it. It is very much appreciated.

In addition to this I've updated the new branch

I will have a deeper look soon, but seems like we iterativly enhance the example to a more robust solution.

My only concern is the following:

If we do not get an exception or any other information during a re-connect, we cannot truely decide, if we encounter the "stale stream" thing or if there aren't any tweets matching the given rules.

Of course, we can implement a timeout-based solution but this may result in a lot of useless re-connects if we are listening to a stream with rules, which doesn't match very often. I don't know, if this would be intended behaviourr.

BTW: The interesting thing is, that - I just remembered it - we encountered a similar silent behaviour for the https://github.com/redouane59/twittered v2 api binding but didn't had time to dig into it.

@andypiper
Copy link

I’m really sorry about the cap thing but we are not able to perform resets there. I appreciate that is frustrating, especially when you’ve been testing like this.

@rzo1
Copy link
Author

rzo1 commented Aug 16, 2022

@andypiper Alright, no problem.

@rzo1
Copy link
Author

rzo1 commented Sep 3, 2022

FYI: My limit is reset but I am a bit busy atm. I will continue my investigation / testing probably at the end of September.

@zacmos
Copy link
Collaborator

zacmos commented Sep 4, 2022

Hi @rzo1, I tried it several times, as far as I checked it looks that waiting before the reconnect enables to stream the tweets properly.

@zacmos zacmos closed this as completed Oct 3, 2022
@rzo1
Copy link
Author

rzo1 commented Oct 3, 2022

I updated our code base as discussed above and updated to 2.0.3. Given that my limit is now reset, I will start a new test and see, if it doesn't happen. Thanks for your support and guideance @zacmos

@rzo1
Copy link
Author

rzo1 commented Oct 5, 2022

As promised, I conducted a test. It worked from 03.10.22, 14:00 to 05.10.22, 07:00 and I was able to retrieve around 200k Tweets. After that, the stream became silent:

2022-10-05T09:02:40,167 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:03:10,589 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:03:46,012 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:04:26,425 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:05:11,860 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:06:02,287 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:06:27,704 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:06:58,124 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:07:33,542 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:08:13,958 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:08:59,385 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:09:49,801 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:10:15,208 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:10:45,637 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:11:21,051 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:12:01,451 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:12:46,877 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:13:37,313 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:14:02,723 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:14:33,140 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:15:08,568 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:15:48,989 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:16:34,426 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:17:24,871 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:17:50,284 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:18:20,697 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:18:56,109 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:19:36,518 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:20:21,931 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:21:12,344 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:21:37,767 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:22:08,195 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:22:43,612 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:23:24,044 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:26:09,518 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:26:59,924 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:27:25,344 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:27:55,751 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:28:31,167 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:29:11,583 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:29:57,010 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:32:07,609 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:32:33,162 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:33:03,588 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:33:39,134 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:34:19,696 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:35:05,271 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:35:55,835 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:37:21,445 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:37:51,985 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:38:27,542 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:40:08,139 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:40:53,684 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:41:44,247 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:42:09,816 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:42:40,388 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:43:15,920 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:43:56,472 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:44:42,044 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:45:32,564 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:45:58,162 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.
2022-10-05T09:46:28,713 DEBUG [pool-2-thread-6] TweetsStreamListenersExecutor$TweetsQueuer(204): We did not receive a new tweet for 60000 ms. Stream might be stale. Trigger a reconnect.

The reconnect succeedes, but no streams are retrieved. After doing a force restart of the java application (terminating the JVM), tweets were flowing again.

I used the following rule (#Bitcoin OR #Ukrain OR #Russia) -(is:retweet) to avoid being on a silent hashtag.

I can / will try to test the official example (without my code) as a next step. However, it will take some time as I am a bit busy this week. Stay tuned.

@rzo1
Copy link
Author

rzo1 commented Oct 12, 2022

Short update: It seems to be solved. My code was running for 7+ days now and it just works as expected. I was able to pull 1.647.620 in that time span.

Thanks for your time, guideance and help: @andypiper @zacmos

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants
@andypiper @rzo1 @yssoe @zacmos and others