Skip to content

Commit

Permalink
MQTT3 clean shutdown (#535)
Browse files Browse the repository at this point in the history
* Add MqttConnectionCore: an auxiliary class handling lifetimes of the MqttConnection object and the underlying C connection.

* Extract MqttConnection class from MqttClient.cpp into separate source files.

* Add MqttConnectionOptions class to pass all possible options on MqttConnection object creation uniformly.

* Add MqttTypes.h with MQTT3-related types used in multiple places.
  • Loading branch information
sfod authored Sep 11, 2023
1 parent 12a1575 commit 609b34c
Show file tree
Hide file tree
Showing 12 changed files with 2,662 additions and 1,379 deletions.
5 changes: 3 additions & 2 deletions include/aws/crt/http/HttpRequestResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ namespace Aws
namespace Mqtt
{
class MqttConnection;
}
class MqttConnectionCore;
} // namespace Mqtt
namespace Mqtt5
{
class Mqtt5ClientCore;
Expand Down Expand Up @@ -107,7 +108,7 @@ namespace Aws
*/
class AWS_CRT_CPP_API HttpRequest : public HttpMessage
{
friend class Mqtt::MqttConnection;
friend class Mqtt::MqttConnectionCore;
friend class Mqtt5::Mqtt5ClientCore;

public:
Expand Down
519 changes: 1 addition & 518 deletions include/aws/crt/mqtt/MqttClient.h

Large diffs are not rendered by default.

452 changes: 452 additions & 0 deletions include/aws/crt/mqtt/MqttConnection.h

Large diffs are not rendered by default.

130 changes: 130 additions & 0 deletions include/aws/crt/mqtt/MqttTypes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#pragma once
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/crt/Types.h>
#include <aws/crt/io/SocketOptions.h>
#include <aws/crt/io/TlsOptions.h>

#include <aws/mqtt/client.h>
#include <aws/mqtt/v5/mqtt5_client.h>

#include <functional>

namespace Aws
{
namespace Crt
{
namespace Mqtt
{
class MqttConnection;

/**
* Options required to create an MqttConnection.
*/
struct MqttConnectionOptions
{
const char *hostName = nullptr;
uint16_t port = 0;
Io::SocketOptions socketOptions;
Crt::Io::TlsContext tlsContext;
Crt::Io::TlsConnectionOptions tlsConnectionOptions;
bool useWebsocket = false;
bool useTls = false;
Allocator *allocator = nullptr;
};

/**
* Invoked upon receipt of a Publish message on a subscribed topic.
*
* @param connection The connection object.
* @param topic The information channel to which the payload data was published.
* @param payload The payload data.
* @param dup DUP flag. If true, this might be re-delivery of an earlier attempt to send the message.
* @param qos Quality of Service used to deliver the message.
* @param retain Retain flag. If true, the message was sent as a result of a new subscription being made by
* the client.
*/
using OnMessageReceivedHandler = std::function<void(
MqttConnection &connection,
const String &topic,
const ByteBuf &payload,
bool dup,
QOS qos,
bool retain)>;

/**
* Invoked when a suback message is received.
*
* @param connection The connection object.
* @param packetId Packet ID of the corresponding subscribe request.
* @param topic The information channel to which the payload data was published.
* @param qos Quality of Service used to deliver the message.
* @param errorCode Indicating if an error occurred.
*/
using OnSubAckHandler = std::function<
void(MqttConnection &connection, uint16_t packetId, const String &topic, QOS qos, int errorCode)>;

/**
* Invoked when a suback message for multiple topics is received.
*
* @param connection The connection object.
* @param packetId Packet ID of the corresponding subscribe request.
* @param topics The information channels to which the payload data was published.
* @param qos Quality of Service used to deliver the message.
* @param errorCode Indicating if an error occurred.
*/
using OnMultiSubAckHandler = std::function<void(
MqttConnection &connection,
uint16_t packetId,
const Vector<String> &topics,
QOS qos,
int errorCode)>;

/**
* Invoked when an operation completes.
*
* For QoS 0, this is when the packet is passed to the tls layer. For QoS 1 (and 2, in theory) this is when
* the final ACK packet is received from the server.
*
* @param connection The connection object.
* @param packetId Packet ID of the corresponding subscribe request.
* @param errorCode Indicating if an error occurred.
*/
using OnOperationCompleteHandler =
std::function<void(MqttConnection &connection, uint16_t packetId, int errorCode)>;

/**
* Simple statistics about the current state of the client's queue of operations.
*/
struct AWS_CRT_CPP_API MqttConnectionOperationStatistics
{
/*
* Total number of operations submitted to the connection that have not yet been completed. Unacked
* operations are a subset of this.
*/
uint64_t incompleteOperationCount;

/*
* Total packet size of operations submitted to the connection that have not yet been completed. Unacked
* operations are a subset of this.
*/
uint64_t incompleteOperationSize;

/*
* Total number of operations that have been sent to the server and are waiting for a corresponding ACK
* before they can be completed.
*/
uint64_t unackedOperationCount;

/*
* Total packet size of operations that have been sent to the server and are waiting for a corresponding
* ACK before they can be completed.
*/
uint64_t unackedOperationSize;
};
} // namespace Mqtt
} // namespace Crt
} // namespace Aws
Loading

0 comments on commit 609b34c

Please sign in to comment.