From bce421fa5e63a1777de58eb9b2957f02decb0351 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 18 Feb 2023 14:34:00 +0100 Subject: [PATCH] WIP: Implement CoAP over TCP logic --- example/tcp_test.dart | 47 +++ lib/src/coap_message.dart | 13 +- lib/src/codec/tcp/message_decoder.dart | 427 +++++++++++++++++++++++++ lib/src/codec/tcp/message_encoder.dart | 183 +++++++++++ lib/src/network/coap_inetwork.dart | 14 + lib/src/network/coap_network_tcp.dart | 48 +-- lib/src/option/option.dart | 6 +- 7 files changed, 703 insertions(+), 35 deletions(-) create mode 100644 example/tcp_test.dart create mode 100644 lib/src/codec/tcp/message_decoder.dart create mode 100644 lib/src/codec/tcp/message_encoder.dart diff --git a/example/tcp_test.dart b/example/tcp_test.dart new file mode 100644 index 00000000..b801c181 --- /dev/null +++ b/example/tcp_test.dart @@ -0,0 +1,47 @@ +// ignore_for_file: avoid_print + +import 'dart:core'; +import 'dart:io'; + +import 'package:coap/coap.dart'; + +Future startServer() async { + final server = await ServerSocket.bind(InternetAddress.anyIPv4, 5683); + server.listen((final connection) async { + await connection.forEach((final frame) { + print(frame); + + const responseCode = (2 << 5) + 5; + + const tokenLength = 8; + const tokenOffset = 2; + final token = frame.sublist(tokenOffset, tokenOffset + tokenLength); + + final response = [tokenLength, responseCode, ...token]; + + connection.add(response); + }); + }); +} + +/// Tests the basic functionality of the TCP network. +/// Will be replaced with a "real" example later. +Future main() async { + await startServer(); + await connect(); +} + +Future connect() async { + final coapClient = CoapClient(Uri.parse('coap+tcp://127.0.0.1')); + + final response = await coapClient.get( + 'test', + options: [ContentFormatOption(40)], + ); + // TODO(JKRhb): Responses can't be matched at the moment, as the current + // implementation requires a message ID which is not defined in + // CoAP over TCP. + print(response); + + coapClient.close(); +} diff --git a/lib/src/coap_message.dart b/lib/src/coap_message.dart index 99190325..11ee8bb4 100644 --- a/lib/src/coap_message.dart +++ b/lib/src/coap_message.dart @@ -17,6 +17,7 @@ import 'coap_code.dart'; import 'coap_media_type.dart'; import 'coap_message_type.dart'; import 'coap_response.dart'; +import 'codec/tcp/message_encoder.dart'; import 'codec/udp/message_decoder.dart'; import 'codec/udp/message_encoder.dart'; import 'event/coap_event_bus.dart'; @@ -657,18 +658,8 @@ abstract class CoapMessage { /// Is also used for DTLS. Uint8Buffer toUdpPayload() => serializeUdpMessage(this); - /// Serializes this CoAP message from the TCP message format. - /// - /// Is also used for TLS. - static CoapMessage? fromTcpPayload(final Uint8Buffer data) => - throw UnimplementedError( - 'TCP segment deserialization is not implemented yet.', - ); - /// Serializes this CoAP message into the TCP message format. /// /// Is also used for TLS. - Uint8Buffer toTcpPayload() => throw UnimplementedError( - 'TCP segment serialization is not implemented yet.', - ); + Uint8Buffer toTcpPayload() => serializeTcpMessage(this); } diff --git a/lib/src/codec/tcp/message_decoder.dart b/lib/src/codec/tcp/message_decoder.dart new file mode 100644 index 00000000..dba012a0 --- /dev/null +++ b/lib/src/codec/tcp/message_decoder.dart @@ -0,0 +1,427 @@ +// SPDX-FileCopyrightText: © 2023 Jan Romann + +// SPDX-License-Identifier: MIT + +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:typed_data/typed_data.dart'; + +import '../../coap_code.dart'; +import '../../coap_empty_message.dart'; +import '../../coap_message.dart'; +import '../../coap_message_type.dart'; +import '../../coap_request.dart'; +import '../../coap_response.dart'; +import '../../option/coap_option_type.dart'; +import '../../option/option.dart'; +import '../../option/uri_converters.dart'; +import '../udp/datagram_reader.dart'; +import '../udp/message_format.dart' as message_format; + +enum TcpState { + initialState, + extendedLength, + extendedTokenLength, + code, + token, + optionsAndPayload, +} + +final toByteStream = + StreamTransformer((final input, final cancelOnError) { + final controller = StreamController(); + + controller.onListen = () { + final subscription = input.listen( + (final bytes) => bytes.forEach(controller.add), + onDone: controller.close, + onError: controller.addError, + cancelOnError: cancelOnError, + ); + controller + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }; + + return controller.stream.listen(null); +}); + +class RawCoapTcpMessage { + RawCoapTcpMessage({ + required this.code, + required this.optionsAndPayload, + required this.token, + }); + + final int code; + + final Uint8List optionsAndPayload; + + final Uint8List token; + + @override + String toString() => + 'Code: $code\nToken:$token\nOptions and Payload:$optionsAndPayload'; +} + +final toRawCoapTcpStream = StreamTransformer( + (final input, final cancelOnError) { + // TODO(JKRhb): Connections must be aborted on error + final controller = StreamController(); + + var state = TcpState.initialState; + var length = 0; + var extendedLengthBytes = 0; + final extendedLengthBuffer = Uint8Buffer(); + var tokenLength = 0; + final token = Uint8Buffer(); + var extendedTokenLengthBytes = 0; + final extendedTokenLengthBuffer = Uint8Buffer(); + var code = 0; + final optionsAndPayload = Uint8Buffer(); + + controller.onListen = () { + final subscription = input.listen( + (final byte) async { + switch (state) { + case TcpState.initialState: + token.clear(); + extendedLengthBuffer.clear(); + optionsAndPayload.clear(); + extendedTokenLengthBuffer.clear(); + + // TODO(JKRhb): Handle WebSockets case with length = 0 + length = (byte >> 4) & 15; + tokenLength = byte & 15; + + if (const [13, 14, 15].contains(length)) { + state = TcpState.extendedLength; + extendedLengthBytes = determineExtendedLength(length); + break; + } + + state = TcpState.code; + break; + case TcpState.extendedLength: + extendedLengthBuffer.add(byte); + if (extendedLengthBytes-- <= 0) { + length = _readExtendedMessageLength( + length, + DatagramReader(extendedLengthBuffer), + ); + state = TcpState.code; + break; + } + + break; + case TcpState.code: + code = byte; + if (const [13, 14].contains(tokenLength)) { + state = TcpState.extendedTokenLength; + extendedTokenLengthBytes = determineExtendedLength(length); + break; + } else if (tokenLength == 15) { + throw const FormatException(); + } + state = TcpState.token; + break; + case TcpState.extendedTokenLength: + extendedTokenLengthBuffer.add(byte); + extendedTokenLengthBytes--; + + if (extendedTokenLengthBytes < 1) { + length = _readExtendedMessageLength( + length, + DatagramReader(extendedLengthBuffer), + ); + + state = TcpState.code; + break; + } + + break; + case TcpState.token: + token.add(byte); + tokenLength--; + + if (tokenLength >= 1) { + break; + } + + // TODO(JKRhb): Refactor + if (length < 1) { + state = TcpState.initialState; + controller.add( + RawCoapTcpMessage( + code: code, + token: Uint8List.fromList(token.toList(growable: false)), + optionsAndPayload: Uint8List.fromList( + optionsAndPayload.toList(growable: false), + ), + ), + ); + } else { + state = TcpState.optionsAndPayload; + } + + break; + case TcpState.optionsAndPayload: + optionsAndPayload.add(byte); + length--; + + if (length < 1) { + state = TcpState.initialState; + controller.add( + RawCoapTcpMessage( + code: code, + token: Uint8List.fromList(token.toList(growable: false)), + optionsAndPayload: Uint8List.fromList( + optionsAndPayload.toList(growable: false), + ), + ), + ); + } + + break; + } + }, + onDone: controller.close, + onError: controller.addError, + cancelOnError: cancelOnError, + ); + controller + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }; + + return controller.stream.listen(null); +}); + +int determineExtendedLength(final int length) { + switch (length) { + case 13: + return 1; + case 14: + return 2; + case 15: + return 4; + } + + throw const FormatException('message'); +} + +/// Decodes a CoAP UDP or DTLS message from a bytes array. +/// +/// Returns the deserialized message, or `null` if the message can not be +/// decoded, i.e. the bytes do not represent a [CoapRequest], a [CoapResponse] +/// or a [CoapEmptyMessage]. +final deserializeTcpMessage = StreamTransformer( + (final input, final cancelOnError) { + final controller = StreamController(); + + controller.onListen = () { + final subscription = input.listen( + (final coapTcpMessage) { + final code = CoapCode.decode(coapTcpMessage.code); + + if (code == null) { + throw const FormatException('Encountered unknown CoapCode'); + } + + final token = coapTcpMessage.token; + + final reader = DatagramReader( + Uint8Buffer()..addAll(coapTcpMessage.optionsAndPayload), + ); + + try { + final options = readOptions(reader); + final payload = reader.readBytesLeft(); + final tokenBuffer = Uint8Buffer()..addAll(token); + final CoapMessage coapMessage; + + // TODO(JKRhb): Probably not really needed for TCP, since connections + // are simply closed on error + const hasUnknownCriticalOption = false; + const hasFormatError = false; + + if (code.isRequest) { + final method = RequestMethod.fromCoapCode(code); + if (method == null) { + return; + } + + final uri = optionsToUri( + options.where((final option) => option.isUriOption).toList(), + scheme: 'coap+tcp', // TODO(JKRhb): Replace + destinationAddress: + InternetAddress('127.0.0.1'), // TODO(JKRhb): Replace + ); + + coapMessage = CoapRequest.fromParsed( + uri, + method, + // id and type are not defined for CoAP over TCP + id: 0, + type: CoapMessageType.ack, + token: tokenBuffer, + options: options, + payload: payload, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else if (code.isResponse) { + final responseCode = ResponseCode.fromCoapCode(code); + if (responseCode == null) { + return; + } + + final location = optionsToUri( + options.where((final option) => option.isLocationOption).toList(), + ); + + coapMessage = CoapResponse.fromParsed( + responseCode, + id: 0, + type: CoapMessageType.ack, + token: tokenBuffer, + options: options, + payload: payload, + location: location, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else if (code.isEmpty) { + coapMessage = CoapEmptyMessage.fromParsed( + id: 0, + type: CoapMessageType.ack, + token: tokenBuffer, + options: options, + payload: payload, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else { + return; + } + + controller.add(coapMessage); + } on UnknownCriticalOptionException { + // Should something be done here? + return; + } on FormatException { + // Should something be done here? + return; + } + }, + onDone: controller.close, + onError: controller.addError, + cancelOnError: cancelOnError, + ); + controller + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }; + + return controller.stream.listen(null); +}); + +List> readOptions(final DatagramReader reader) { + final options = >[]; + var currentOption = 0; + while (reader.bytesAvailable) { + final nextByte = reader.readNextByte(); + if (nextByte == message_format.payloadMarker) { + if (!reader.bytesAvailable) { + throw const FormatException('Illegal format'); + // The presence of a marker followed by a zero-length payload + // must be processed as a message format error + } + } else { + // The first 4 bits of the byte represent the option delta + final optionDeltaNibble = (0xF0 & nextByte) >> 4; + final deltaValue = _getValueFromOptionNibble( + optionDeltaNibble, + reader, + ); + + if (deltaValue == null) { + throw const FormatException('Illegal format'); + } + + currentOption += deltaValue; + + // The second 4 bits represent the option length + final optionLengthNibble = 0x0F & nextByte; + final optionLength = _getValueFromOptionNibble( + optionLengthNibble, + reader, + ); + + if (optionLength == null) { + throw const FormatException('Illegal format'); + } + + // Read option + try { + final optionType = OptionType.fromTypeNumber(currentOption); + var optionBytes = reader.readBytes(optionLength); + if (Endian.host == Endian.little && + optionType.optionFormat is OptionFormat) { + optionBytes = Uint8Buffer()..addAll(optionBytes.reversed); + } + final option = optionType.parse(optionBytes); + options.add(option); + } on UnknownElectiveOptionException catch (_) { + // Unknown elective options must be silently ignored + continue; + } + } + } + + return options; +} + +/// Calculates the value used in the extended option fields as specified +/// in RFC 7252, section 3.1. +int? _getValueFromOptionNibble( + final int nibble, + final DatagramReader datagram, +) => + _readExtendedLength(nibble, datagram); + +int? _readExtendedLength( + final int value, + final DatagramReader datagram, +) { + if (value < 13) { + return value; + } else if (value == 13) { + return datagram.read(8) + 13; + } else if (value == 14) { + return datagram.read(16) + 269; + } + + return null; +} + +int _readExtendedMessageLength( + final int value, + final DatagramReader datagramReader, +) { + switch (value) { + case 13: + return datagramReader.read(8) + 13; + case 14: + return datagramReader.read(16) + 269; + case 15: + return datagramReader.read(32) + 65805; + } + + throw StateError('Illegal value read'); +} diff --git a/lib/src/codec/tcp/message_encoder.dart b/lib/src/codec/tcp/message_encoder.dart new file mode 100644 index 00000000..804272f6 --- /dev/null +++ b/lib/src/codec/tcp/message_encoder.dart @@ -0,0 +1,183 @@ +// SPDX-FileCopyrightText: © 2023 Jan Romann + +// SPDX-License-Identifier: MIT + +import 'package:typed_data/typed_data.dart'; + +import '../../coap_code.dart'; +import '../../coap_message.dart'; +import '../../option/coap_option_type.dart'; +import '../udp/datagram_writer.dart'; +import '../udp/message_format.dart' as message_format; + +/// Encodes a CoAP TCP or WebSockets message into a bytes array. +/// Returns the encoded bytes, or null if the message can not be encoded, +/// i.e. the message is not a Request, a Response or an EmptyMessage. +Uint8Buffer serializeTcpMessage(final CoapMessage message) { + final writer = DatagramWriter(); + + final token = message.token; + final tokenLength = _getTokenLength(token); + final options = _serializeOptions(message); + + final payload = message.payload; + const payloadMarkerLength = 1; + final payloadLength = + payload.isNotEmpty ? payload.length + payloadMarkerLength : 0; + + final messageLength = options.lengthInBytes + payloadLength; + + // TODO(JKRhb): Refactor + final lengthField = _getOptionNibble(messageLength); + + writer + ..write(lengthField, 4) + ..write(tokenLength, message_format.tokenLengthBits); + + if (lengthField == 13) { + writer.write(messageLength - 13, 8); + } else if (lengthField == 14) { + writer.write(messageLength - 269, 16); + } + + writer.write(message.code.code, CoapCode.bitLength); + + if (token != null) { + _writeExtendedTokenLength(writer, tokenLength, token); + } + + // Write token, which may be 0 to 8 bytes or have an extended token length, + // given by token length and the extended token length field. + writer + ..writeBytes(token) + ..writeBytes(options); + + if (payload.isNotEmpty) { + // If payload is present and of non-zero length, it is prefixed by + // an one-byte Payload Marker (0xFF) which indicates the end of + // options and the start of the payload + writer.writeByte(message_format.payloadMarker); + } + // Write payload + writer.writeBytes(payload); + + return writer.toByteArray(); +} + +Uint8Buffer _serializeOptions(final CoapMessage message) { + final writer = DatagramWriter(); + + var lastOptionNumber = 0; + final options = message.getAllOptions()..sort(); + + for (final opt in options) { + if (opt.type == OptionType.uriHost || opt.type == OptionType.uriPort) { + continue; + } + + // Write 4-bit option delta + final optNum = opt.type.optionNumber; + final optionDelta = optNum - lastOptionNumber; + final optionDeltaNibble = _getOptionNibble(optionDelta); + writer.write(optionDeltaNibble, message_format.optionDeltaBits); + + // Write 4-bit option length + final optionLength = opt.length; + final optionLengthNibble = _getOptionNibble(optionLength); + writer.write(optionLengthNibble, message_format.optionLengthBits); + + // Write extended option delta field (0 - 2 bytes) + if (optionDeltaNibble == 13) { + writer.write(optionDelta - 13, 8); + } else if (optionDeltaNibble == 14) { + writer.write(optionDelta - 269, 16); + } + + // Write extended option length field (0 - 2 bytes) + if (optionLengthNibble == 13) { + writer.write(optionLength - 13, 8); + } else if (optionLengthNibble == 14) { + writer.write(optionLength - 269, 16); + } + + // Write option value, reverse byte order for numeric options + if (opt.type.optionFormat == OptionFormat.integer) { + final reversedBuffer = Uint8Buffer()..addAll(opt.byteValue.reversed); + writer.writeBytes(reversedBuffer); + } else { + writer.writeBytes(opt.byteValue); + } + + lastOptionNumber = optNum; + } + + return writer.toByteArray(); +} + +/// Determine the token length. +/// +/// The token length can either be of zero to eight bytes or be extended, +/// following [RFC 8974]. +/// +/// [RFC 8974]: https://datatracker.ietf.org/doc/html/rfc8974 +int _getTokenLength(final Uint8Buffer? token) { + final tokenLength = token?.length ?? 0; + if (tokenLength <= 12) { + return tokenLength; + } else if (tokenLength <= 255 + 13) { + return 13; + } else if (tokenLength <= 65535 + 269) { + return 14; + } else { + throw FormatException('Unsupported token length delta $tokenLength'); + } +} + +/// Write a potentially extended token length as specified in [RFC 8974]. +/// +/// [RFC 8974]: https://datatracker.ietf.org/doc/html/rfc8974 +void _writeExtendedTokenLength( + final DatagramWriter writer, + final int tokenLength, + final Uint8Buffer token, +) { + final extendedTokenLength = _getExtendedTokenLength(tokenLength, token); + + switch (tokenLength) { + case 13: + writer.write(extendedTokenLength, 8); + break; + case 14: + writer.write(extendedTokenLength, 16); + } +} + +/// Determine a potentially extended token length as specified in [RFC 8974]. +/// +/// [RFC 8974]: https://datatracker.ietf.org/doc/html/rfc8974 +int _getExtendedTokenLength( + final int tokenLength, + final Uint8Buffer token, +) { + switch (tokenLength) { + case 13: + return token.length - 13; + case 14: + return token.length - 269; + } + + return 0; +} + +/// Returns the 4-bit option header value. +int _getOptionNibble(final int optionValue) { + if (optionValue <= 12) { + return optionValue; + } else if (optionValue <= 255 + 13) { + return 13; + } else if (optionValue <= 65535 + 269) { + return 14; + } else { + throw FormatException('Unsupported option delta $optionValue'); + } +} diff --git a/lib/src/network/coap_inetwork.dart b/lib/src/network/coap_inetwork.dart index e1e203dd..07baa65b 100644 --- a/lib/src/network/coap_inetwork.dart +++ b/lib/src/network/coap_inetwork.dart @@ -11,6 +11,7 @@ import '../coap_config.dart'; import '../coap_constants.dart'; import '../coap_message.dart'; import 'coap_network_openssl.dart'; +import 'coap_network_tcp.dart'; import 'coap_network_udp.dart'; import 'credentials/psk_credentials.dart'; @@ -93,6 +94,19 @@ abstract class CoapINetwork { libSsl: config.libSslInstance, hostName: uri.host, ); + case 'coap+tcp': + return CoapNetworkTCP( + address, + port ?? config.defaultPort, + bindAddress ?? defaultBindAddress, + ); + case 'coaps+tcp': + return CoapNetworkTCP( + address, + port ?? config.defaultSecurePort, + bindAddress ?? defaultBindAddress, + isTls: true, + ); default: throw UnsupportedProtocolException(uri.scheme); } diff --git a/lib/src/network/coap_network_tcp.dart b/lib/src/network/coap_network_tcp.dart index 117ab723..143d5c0c 100644 --- a/lib/src/network/coap_network_tcp.dart +++ b/lib/src/network/coap_network_tcp.dart @@ -8,9 +8,8 @@ import 'dart:async'; import 'dart:io'; -import 'package:typed_data/typed_data.dart'; - import '../coap_message.dart'; +import '../codec/tcp/message_decoder.dart'; import '../event/coap_event_bus.dart'; import 'coap_inetwork.dart'; @@ -98,26 +97,29 @@ class CoapNetworkTCP implements CoapINetwork { } void _receive() { - socket?.listen( - (final data) { - final message = CoapMessage.fromTcpPayload(Uint8Buffer()..addAll(data)); - eventBus.fire(CoapMessageReceivedEvent(message, address)); - }, - // ignore: avoid_types_on_closure_parameters - onError: (final Object e, final StackTrace s) => - eventBus.fire(CoapSocketErrorEvent(e, s)), - // Socket stream is done and can no longer be listened to - onDone: () { - isClosed = true; - Timer.periodic(CoapINetwork.reinitPeriod, (final timer) async { - try { - await init(); - timer.cancel(); - } on Exception catch (_) { - // Ignore errors, retry until successful - } - }); - }, - ); + socket + ?.transform(toByteStream) + .transform(toRawCoapTcpStream) + .transform(deserializeTcpMessage) + .listen( + (final message) { + eventBus.fire(CoapMessageReceivedEvent(message, address)); + }, + // ignore: avoid_types_on_closure_parameters + onError: (final Object e, final StackTrace s) => + eventBus.fire(CoapSocketErrorEvent(e, s)), + // Socket stream is done and can no longer be listened to + onDone: () { + isClosed = true; + Timer.periodic(CoapINetwork.reinitPeriod, (final timer) async { + try { + await init(); + timer.cancel(); + } on Exception catch (_) { + // Ignore errors, retry until successful + } + }); + }, + ); } } diff --git a/lib/src/option/option.dart b/lib/src/option/option.dart index 87ef980a..9b77059a 100644 --- a/lib/src/option/option.dart +++ b/lib/src/option/option.dart @@ -8,7 +8,7 @@ import 'string_option.dart'; /// This class describes the options of the CoAP messages. @immutable -abstract class Option { +abstract class Option implements Comparable> { Option() { _validate(); } @@ -96,6 +96,10 @@ abstract class Option { bool get isLocationOption => this is LocationPathOption || this is LocationQueryOption; + + @override + int compareTo(final Option other) => + this.optionNumber - other.optionNumber; } /// Mixin for an Oscore class E option (encrypted and integrity protected).