diff --git a/lib/flutter_client_sse.dart b/lib/flutter_client_sse.dart index 57db0b3..dccb58f 100644 --- a/lib/flutter_client_sse.dart +++ b/lib/flutter_client_sse.dart @@ -77,70 +77,97 @@ class SSEClient { Future response = _client.send(request); /// Listening to the response as a stream - response.asStream().listen((data) { + late StreamSubscription + responseStreamSubscription; + responseStreamSubscription = response.asStream().listen((data) async { + if (data.headers["content-type"] != "text/event-stream") { + print("---ERROR---"); + if (data.headers["connection"] != "keep-alive") { + final responseText = + await data.stream.transform(Utf8Decoder()).reduce( + (previous, element) => previous + element, + ); + print(responseText); + } + _retryConnection( + method: method, + url: url, + header: header, + streamController: streamController, + ); + responseStreamSubscription.cancel(); + return; + } + /// Applying transforms and listening to it - data.stream - ..transform(Utf8Decoder()).transform(LineSplitter()).listen( - (dataLine) { - if (dataLine.isEmpty) { - /// This means that the complete event set has been read. - /// We then add the event to the stream - streamController.add(currentSSEModel); - currentSSEModel = SSEModel(data: '', id: '', event: ''); - return; - } + late StreamSubscription transformedResponseStreamSubscription; + transformedResponseStreamSubscription = data.stream + .transform(Utf8Decoder()) + .transform(LineSplitter()) + .listen( + (dataLine) { + if (dataLine.isEmpty) { + /// This means that the complete event set has been read. + /// We then add the event to the stream + streamController.add(currentSSEModel); + currentSSEModel = SSEModel(data: '', id: '', event: ''); + return; + } - /// Get the match of each line through the regex - Match match = lineRegex.firstMatch(dataLine)!; - var field = match.group(1); - if (field!.isEmpty) { - return; - } - var value = ''; - if (field == 'data') { - // If the field is data, we get the data through the substring - value = dataLine.substring( - 5, - ); - } else { - value = match.group(2) ?? ''; - } - switch (field) { - case 'event': - currentSSEModel.event = value; - break; - case 'data': - currentSSEModel.data = - (currentSSEModel.data ?? '') + value + '\n'; - break; - case 'id': - currentSSEModel.id = value; - break; - case 'retry': - break; - default: - print('---ERROR---'); - print(dataLine); - _retryConnection( - method: method, - url: url, - header: header, - streamController: streamController, - ); - } - }, - onError: (e, s) { - print('---ERROR---'); - print(e); - _retryConnection( - method: method, - url: url, - header: header, - body: body, - streamController: streamController, + /// Get the match of each line through the regex + Match match = lineRegex.firstMatch(dataLine)!; + var field = match.group(1); + if (field!.isEmpty) { + return; + } + var value = ''; + if (field == 'data') { + // If the field is data, we get the data through the substring + value = dataLine.substring( + 5, ); - }, - ); + } else { + value = match.group(2) ?? ''; + } + switch (field) { + case 'event': + currentSSEModel.event = value; + break; + case 'data': + currentSSEModel.data = + (currentSSEModel.data ?? '') + value + '\n'; + break; + case 'id': + currentSSEModel.id = value; + break; + case 'retry': + break; + default: + print("---ERROR---"); + print(dataLine); + transformedResponseStreamSubscription.cancel(); + responseStreamSubscription.cancel(); + + _retryConnection( + method: method, + url: url, + header: header, + streamController: streamController, + ); + } + }, + onError: (e, s) { + print('---ERROR---'); + print(e); + _retryConnection( + method: method, + url: url, + header: header, + body: body, + streamController: streamController, + ); + }, + ); }, onError: (e, s) { print('---ERROR---'); print(e);