Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PacketException when too many messages arrives #31

Open
ghost opened this issue May 8, 2018 · 11 comments
Open

PacketException when too many messages arrives #31

ghost opened this issue May 8, 2018 · 11 comments

Comments

@ghost
Copy link

ghost commented May 8, 2018

Hello,

The issue was firstly reported here: jurek7/logstash-input-mqtt#5

When using the 'logstash-input-mqtt', which uses this library, I found out this problem. As related by the plugin's authors, its related to paho.mqtt.ruby library and not their library.

So I hope you guys can help me to find out what's going on.

Let me know if I can provide any more details.

Thanks in advance!

@p-goudet
Copy link
Contributor

p-goudet commented May 9, 2018

Hello,

I merged the fix on the logger reference from @tomc78.
A new minor version (1.0.8) have been released today. This included the fix about the logger reference.
However, unfortunately I think that I would not solve your issue. The raised exception is due to a mismatch on the id of a SUBACK packet.

Could you give me more details about the test sequence, especially the subscribe process?

@ghost
Copy link
Author

ghost commented May 9, 2018

Hi @p-goudet,

Ok, I will try to provide more information.

The test case its simple and easy to reproduce.

I'm using the paho-mqtt java client to produce the messages with qos=1 and a total of 10k messages are published. The consumer is turned off when the producer is running.

After producing the 10k messages, I start the consumer. I'm using logstash software to do that 4 me, using the mqtt-input plugin, which its very simple (the source code can be viewed here: https://github.com/jurek7/logstash-input-mqtt/blob/master/lib/logstash/inputs/mqtt.rb) and relies on this library to work.

The follow configuration is given to the plugin, which is forwarded to this library:

               topic           => "/teste"
               host            => "localhost"
               username        => "aaaa"
               password        => "bbbb"
               client_id       => "teste"
               clean_session   => false
               qos             => 1

Here's how the configuration is forwarded:

@client = RecoverableMqttClient.new({
			:host => @host,
			:port => @port,
			:persistent => @persistent, # keep connection persistent
			:mqtt_version => @mqtt_version,
			:clean_session => @clean_session,
			:client_id => @client_id,
			:username => @username,
			:password => @password,
			:ssl => @ssl,
			:will_topic => @will_topic,
			:will_payload => @will_payload,
			:will_qos => @will_qos,
			:will_retain => @will_retain,
			:retry_reconnection_max_count => @reconnect_retries,
                        :retry_reconnection_sleep_time => @reconnect_sleep_time,
		})

When a message is received, I just print it's content (JSON). This is accomplished by an output plugin in logstash, as easy as this:

output {
    stdout { codec => rubydebug }
}

Mosquitto MQTT its being used as mqtt server, running in docker. That's it, there's nothing special about the setup. I can easily reproduce with 100% accuracy the error.

The consumer consumes all the messages (90% sure about it) and when it finishes, the given exception is thrown. Maybe the MQTT Client is tryin' to "commit" (ack) all the messages together and the package ends up being bigger than expected? If I send less messages, let's say 1k, I don't see that error. The content of all the messages are like this:

{"data": 0}

It holds between 11 and 15 bytes so something around 150.000 bytes are sent/received.

Let me know if you need any more detailed information.

Thanks very much for looking that up.

Best Regards.

@ghost
Copy link
Author

ghost commented May 15, 2018

Hi guys,

Any thoughts on that?

@p-goudet
Copy link
Contributor

Hello @casmeiron ,

Yes, I could find the reason why some packet could not be reach. When the client request for a publish, the packet are not send directly, they are first buffered, and then send in a background process which handle the writing and the reading on the sockets. The current implementation might drop packet if the sending buffer is full.
I am changing this approach, when the sending buffer is full, the client would wait for a short delay (currently 2ms) before retrying to send.
This solution is being implemented, the result I could observe from samples seems to fix this issues (correctly sending 10000 publish packets with qos = 1 ). As soon as it would pass the test I would submit a new version that should be 1.0.10.

@ghost
Copy link
Author

ghost commented May 16, 2018

Hi @p-goudet ,

That's great thanks very much for the help.
Understood why the error its happening, and the workaround isn't a problem too.
Yes, I'm indeed gonna wait till the tests are OK.

Best Regards.

@p-goudet
Copy link
Contributor

Hello @casmeiron,

I have submit the version 1.0.10 right now. The client should be able to send any packet now, some error might be displayed in log if the sending frequency is to high. As said previously, the current version would wait for short delay if the sending buffer is full, and then retry to push into the sending buffer.
Could you tell me if the version 1.0.10 fixed this issues or if other point remains unsolved?

@ghost
Copy link
Author

ghost commented Jun 7, 2018

Hi @p-goudet ,

Sorry I didn't get your comment.
No it did not fix it, and even with the version 1.0.12 the problem appears, under a different hood: jurek7/logstash-input-mqtt#5 (comment)

@ghost
Copy link
Author

ghost commented Jul 4, 2018

Hi @p-goudet ,

Any news on this? Currently I'm unable to use this implementation of paho 'cause of this problem.
Would love to see any updates.

Thanks.

@p-goudet
Copy link
Contributor

p-goudet commented Jul 4, 2018

I sorry I haven't been really active in development recently. I would try to tackle this issue again soon.

@ghost
Copy link
Author

ghost commented Aug 23, 2018

Hi @p-goudet , any news about this one?

Thanks!

@daviduebelacker
Copy link

Ping! ? ;-) any news?

We did run into the same Problem, sometimes we get an invalid SUBACK for a QOS1 subscription.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants