Skip to content

Commit

Permalink
Fix bytes race (#981)
Browse files Browse the repository at this point in the history
**Background**

When sending a lot of bytes to flipper, some of them may be missed. This
is happening because of a bug inside new
`FlipperSerialOverflowThrottler`

**Changes**

- Fix receive for bytearray
- Fix long-polling request

**Test plan**

- Open sample (probably a 10 times to see that flipper now connecting
normally)
- Open file manager and upload at least >1024KiB-sized file
- (or) open some key in text editor and save it as another file
- See uploaded/saved file has the same size as original
  • Loading branch information
makeevrserg authored Nov 6, 2024
1 parent adb0297 commit c0779d0
Show file tree
Hide file tree
Showing 22 changed files with 224 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Attention: don't forget to add the flag for F-Droid before release
- [FIX] Fix disappeared file manager
- [FIX] Add deeplink fallback for flipper scheme uri
- [FIX] Change description of remotes library card
- [FIX] Fix bytes race in new api
- [CI] Fix merge-queue files diff
- [CI] Add https://github.com/LionZXY/detekt-decompose-rule
- [CI] Enabling detekt module for android and kmp modules
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/build
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
plugins {
id("flipper.multiplatform")
id("flipper.multiplatform-dependencies")
}

android.namespace = "com.flipperdevices.bridge.connection.feature.actionnotifier.api"

commonDependencies {
implementation(libs.kotlin.coroutines)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.flipperdevices.bridge.connection.feature.actionnotifier.api

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow

interface FlipperActionNotifier {
fun getActionFlow(): Flow<Unit>

fun notifyAboutAction()

fun interface Factory {
fun invoke(scope: CoroutineScope): FlipperActionNotifier
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ android.namespace = "com.flipperdevices.bridge.connection.feature.seriallagsdete

commonDependencies {
implementation(projects.components.bridge.connection.feature.lagsdetector.api)
implementation(projects.components.bridge.connection.feature.actionnotifier.api)

implementation(projects.components.core.di)
implementation(projects.components.core.log)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.flipperdevices.bridge.connection.feature.common.api.FDeviceFeatureQua
import com.flipperdevices.bridge.connection.feature.common.api.FUnsafeDeviceFeatureApi
import com.flipperdevices.bridge.connection.feature.restartrpc.api.FRestartRpcFeatureApi
import com.flipperdevices.bridge.connection.transport.common.api.FConnectedDeviceApi
import com.flipperdevices.bridge.connection.transport.common.api.serial.FSerialDeviceApi
import com.flipperdevices.core.di.AppGraph
import com.squareup.anvil.annotations.ContributesMultibinding
import kotlinx.coroutines.CoroutineScope
Expand All @@ -23,9 +24,13 @@ class FLagsDetectorFeatureFactoryImpl @Inject constructor(
): FDeviceFeatureApi? {
val restartRpcFeature =
unsafeFeatureDeviceApi.getUnsafe(FRestartRpcFeatureApi::class) ?: return null
val flipperActionNotifier = (connectedDevice as? FSerialDeviceApi)
?.getActionNotifier()
?: return null
return lagsDetectorFeatureFactory(
scope = scope,
restartRpcFeatureApi = restartRpcFeature
restartRpcFeatureApi = restartRpcFeature,
flipperActionNotifier = flipperActionNotifier
)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipperdevices.bridge.connection.feature.seriallagsdetector.impl

import com.flipperdevices.bridge.connection.feature.actionnotifier.api.FlipperActionNotifier
import com.flipperdevices.bridge.connection.feature.restartrpc.api.FRestartRpcFeatureApi
import com.flipperdevices.bridge.connection.feature.rpc.model.FlipperRequest
import com.flipperdevices.bridge.connection.feature.seriallagsdetector.api.FLagsDetectorFeature
Expand All @@ -20,13 +21,12 @@ import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch

class FLagsDetectorFeatureImpl @AssistedInject constructor(
@Assisted scope: CoroutineScope,
@Assisted private val scope: CoroutineScope,
@Assisted restartRpcFeatureApi: FRestartRpcFeatureApi,
@Assisted private val flipperActionNotifier: FlipperActionNotifier
) : FLagsDetectorFeature, LogTagProvider {
override val TAG = "FlipperLagsDetector-${hashCode()}"

private val flipperActionNotifier = FlipperActionNotifierImpl(scope = scope)

private val pendingResponseCounter = PendingResponseCounter(
onAction = flipperActionNotifier::notifyAboutAction
)
Expand All @@ -49,7 +49,9 @@ class FLagsDetectorFeatureImpl @AssistedInject constructor(
}
}

override fun notifyAboutAction() = flipperActionNotifier.notifyAboutAction()
override fun notifyAboutAction() {
flipperActionNotifier.notifyAboutAction()
}

override suspend fun <T> wrapPendingAction(
request: FlipperRequest?,
Expand Down Expand Up @@ -79,6 +81,7 @@ class FLagsDetectorFeatureImpl @AssistedInject constructor(
operator fun invoke(
scope: CoroutineScope,
restartRpcFeatureApi: FRestartRpcFeatureApi,
flipperActionNotifier: FlipperActionNotifier
): FLagsDetectorFeatureImpl
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package com.flipperdevices.bridge.connection.feature.seriallagsdetector.impl

import com.flipperdevices.bridge.connection.feature.actionnotifier.api.FlipperActionNotifier
import com.flipperdevices.core.di.AppGraph
import com.flipperdevices.core.ktx.jre.FlipperDispatchers
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import me.gulya.anvil.assisted.ContributesAssistedFactory

class FlipperActionNotifierImpl(
private val scope: CoroutineScope
) {
@ContributesAssistedFactory(AppGraph::class, FlipperActionNotifier.Factory::class)
class FlipperActionNotifierImpl @AssistedInject constructor(
@Assisted private val scope: CoroutineScope
) : FlipperActionNotifier {
private val actionFlow = MutableSharedFlow<Unit>()

fun getActionFlow(): Flow<Unit> = actionFlow
override fun getActionFlow(): Flow<Unit> = actionFlow

fun notifyAboutAction() {
override fun notifyAboutAction() {
scope.launch(FlipperDispatchers.workStealingDispatcher) {
actionFlow.emit(Unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class FLagsDetectorFeatureTest {

underTest = FLagsDetectorFeatureImpl(
scope = childScope,
restartRpcFeatureApi = restartRpcFeatureApi
restartRpcFeatureApi = restartRpcFeatureApi,
flipperActionNotifier = FlipperActionNotifierImpl(childScope)
)
val backgroundJob = backgroundScope.launch {
underTest.wrapPendingAction(mockk(relaxed = true)) {
Expand All @@ -65,7 +66,8 @@ class FLagsDetectorFeatureTest {

underTest = FLagsDetectorFeatureImpl(
scope = childScope,
restartRpcFeatureApi = restartRpcFeatureApi
restartRpcFeatureApi = restartRpcFeatureApi,
flipperActionNotifier = FlipperActionNotifierImpl(childScope)
)
val backgroundJob = backgroundScope.launch {
underTest.wrapPendingAction(mockk()) {
Expand Down
1 change: 1 addition & 0 deletions components/bridge/connection/sample/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies {
implementation(projects.components.bridge.connection.feature.getinfo.impl)
implementation(projects.components.bridge.connection.feature.lagsdetector.api)
implementation(projects.components.bridge.connection.feature.lagsdetector.impl)
implementation(projects.components.bridge.connection.feature.actionnotifier.api)
implementation(projects.components.bridge.connection.feature.protocolversion.api)
implementation(projects.components.bridge.connection.feature.protocolversion.impl)
implementation(projects.components.bridge.connection.feature.provider.api)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ androidDependencies {
implementation(projects.components.core.ktx)

implementation(projects.components.bridge.connection.transport.common.api)
implementation(projects.components.bridge.connection.feature.actionnotifier.api)

implementation(libs.ble.kotlin.scanner)
implementation(libs.ble.kotlin.client)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipperdevices.bridge.connection.transport.ble.impl.api

import com.flipperdevices.bridge.connection.feature.actionnotifier.api.FlipperActionNotifier
import com.flipperdevices.bridge.connection.transport.ble.api.FBleDeviceSerialConfig
import com.flipperdevices.bridge.connection.transport.ble.api.GATTCharacteristicAddress
import com.flipperdevices.bridge.connection.transport.ble.impl.serial.FSerialDeviceApiWrapper
Expand All @@ -13,7 +14,8 @@ import javax.inject.Inject

class FBleApiWithSerialFactory @Inject constructor(
private val serialDeviceApiWrapperFactory: FSerialDeviceApiWrapper.Factory,
private val fSerialRestartApiFactory: FSerialRestartApiImpl.Factory
private val fSerialRestartApiFactory: FSerialRestartApiImpl.Factory,
private val flipperActionNotifierFactory: FlipperActionNotifier.Factory
) {
fun build(
scope: CoroutineScope,
Expand All @@ -22,7 +24,13 @@ class FBleApiWithSerialFactory @Inject constructor(
metaInfoGattMap: ImmutableMap<TransportMetaInfoKey, GATTCharacteristicAddress>,
statusListener: FTransportConnectionStatusListener
): FBleApiWithSerial {
val serialDeviceApi = serialDeviceApiWrapperFactory(scope, serialConfig, client.services)
val flipperActionNotifier = flipperActionNotifierFactory.invoke(scope)
val serialDeviceApi = serialDeviceApiWrapperFactory(
scope = scope,
config = serialConfig,
services = client.services,
flipperActionNotifier = flipperActionNotifier
)
val restartApi = fSerialRestartApiFactory(
services = client.services,
serialServiceUuid = serialConfig.serialServiceUuid,
Expand All @@ -34,7 +42,7 @@ class FBleApiWithSerialFactory @Inject constructor(
metaInfoGattMap = metaInfoGattMap,
statusListener = statusListener,
serialDeviceApi = serialDeviceApi,
serialRestartApi = restartApi
serialRestartApi = restartApi,
)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipperdevices.bridge.connection.transport.ble.impl.serial

import com.flipperdevices.bridge.connection.feature.actionnotifier.api.FlipperActionNotifier
import com.flipperdevices.bridge.connection.transport.ble.api.FBleDeviceSerialConfig
import com.flipperdevices.bridge.connection.transport.common.api.serial.FSerialDeviceApi
import com.flipperdevices.core.ktx.jre.FlipperDispatchers
Expand All @@ -22,13 +23,16 @@ class FSerialDeviceApiWrapper @AssistedInject constructor(
@Assisted scope: CoroutineScope,
@Assisted config: FBleDeviceSerialConfig,
@Assisted serviceFlow: StateFlow<ClientBleGattServices?>,
@Assisted private val flipperActionNotifier: FlipperActionNotifier,
serialApiFactory: SerialApiFactory
) : FSerialDeviceApi, LogTagProvider {
override val TAG = "FSerialDeviceApiWrapper"
private var serialApiScope: CoroutineScope? = null
private var delegateSerialApi: FSerialDeviceApi? = null
private val lock = WaitNotifyLock()

override fun getActionNotifier(): FlipperActionNotifier = flipperActionNotifier

init {
scope.launch {
serviceFlow.collect { services ->
Expand All @@ -46,7 +50,8 @@ class FSerialDeviceApiWrapper @AssistedInject constructor(
val serialApi = serialApiFactory.build(
config = config,
services = services,
scope = newSerialApiScope
scope = newSerialApiScope,
flipperActionNotifier = flipperActionNotifier
)
if (serialApi == null) {
delegateSerialApi = null
Expand Down Expand Up @@ -83,7 +88,8 @@ class FSerialDeviceApiWrapper @AssistedInject constructor(
operator fun invoke(
scope: CoroutineScope,
config: FBleDeviceSerialConfig,
services: StateFlow<ClientBleGattServices?>
services: StateFlow<ClientBleGattServices?>,
flipperActionNotifier: FlipperActionNotifier
): FSerialDeviceApiWrapper
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.flipperdevices.bridge.connection.transport.ble.impl.serial
import android.Manifest
import android.content.Context
import android.content.pm.PackageManager
import com.flipperdevices.bridge.connection.feature.actionnotifier.api.FlipperActionNotifier
import com.flipperdevices.bridge.connection.transport.ble.impl.model.BLEConnectionPermissionException
import com.flipperdevices.bridge.connection.transport.common.api.serial.FSerialDeviceApi
import com.flipperdevices.core.log.LogTagProvider
Expand Down Expand Up @@ -32,6 +33,7 @@ class FSerialOverflowThrottler @AssistedInject constructor(
@Assisted private val serialApi: FSerialDeviceApi,
@Assisted private val scope: CoroutineScope,
@Assisted private val overflowCharacteristic: ClientBleGattCharacteristic,
@Assisted private val flipperActionNotifier: FlipperActionNotifier,
private val context: Context
) : FSerialDeviceApi,
LogTagProvider {
Expand All @@ -41,6 +43,8 @@ class FSerialOverflowThrottler @AssistedInject constructor(

private var pendingBytes: ByteArray? = null

override fun getActionNotifier() = flipperActionNotifier

/**
* Bytes waiting to be sent to the device
*/
Expand Down Expand Up @@ -149,8 +153,10 @@ class FSerialOverflowThrottler @AssistedInject constructor(
remainBufferSize = 0 // here we end the while cycle
}

bytesToSend = withTimeoutOrNull(SERIAL_SEND_WAIT_TIMEOUT_MS) {
channel.receive()
if (remainBufferSize > 0) {
bytesToSend = withTimeoutOrNull(SERIAL_SEND_WAIT_TIMEOUT_MS) {
channel.receive()
}
}
}

Expand All @@ -175,7 +181,8 @@ class FSerialOverflowThrottler @AssistedInject constructor(
operator fun invoke(
serialApi: FSerialDeviceApi,
scope: CoroutineScope,
overflowCharacteristic: ClientBleGattCharacteristic
overflowCharacteristic: ClientBleGattCharacteristic,
flipperActionNotifier: FlipperActionNotifier
): FSerialOverflowThrottler
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.flipperdevices.bridge.connection.transport.ble.impl.serial
import android.Manifest
import android.content.Context
import android.content.pm.PackageManager
import com.flipperdevices.bridge.connection.feature.actionnotifier.api.FlipperActionNotifier
import com.flipperdevices.bridge.connection.transport.ble.impl.model.BLEConnectionPermissionException
import com.flipperdevices.bridge.connection.transport.common.api.serial.FSerialDeviceApi
import com.flipperdevices.bridge.connection.transport.common.api.serial.FlipperSerialSpeed
Expand Down Expand Up @@ -31,7 +32,8 @@ class FSerialUnsafeApiImpl @AssistedInject constructor(
@Assisted(DAGGER_ID_CHARACTERISTIC_RX) val rxCharacteristic: ClientBleGattCharacteristic,
@Assisted(DAGGER_ID_CHARACTERISTIC_TX) val txCharacteristic: ClientBleGattCharacteristic,
@Assisted scope: CoroutineScope,
private val context: Context
@Assisted private val flipperActionNotifier: FlipperActionNotifier,
private val context: Context,
) : FSerialDeviceApi, LogTagProvider {
override val TAG = "FSerialUnsafeApiImpl"

Expand All @@ -41,6 +43,8 @@ class FSerialUnsafeApiImpl @AssistedInject constructor(
private val rxSpeed = SpeedMeter(scope)
private val speedFlowState = MutableStateFlow(FlipperSerialSpeed())

override fun getActionNotifier() = flipperActionNotifier

init {
scope.launch {
rxCharacteristic.getNotifications(
Expand All @@ -54,6 +58,7 @@ class FSerialUnsafeApiImpl @AssistedInject constructor(
rxSpeed.getSpeed(),
txSpeed.getSpeed()
) { rxBPS, txBPS ->
flipperActionNotifier.notifyAboutAction()
speedFlowState.emit(
FlipperSerialSpeed(receiveBytesInSec = rxBPS, transmitBytesInSec = txBPS)
)
Expand All @@ -80,7 +85,8 @@ class FSerialUnsafeApiImpl @AssistedInject constructor(
operator fun invoke(
@Assisted(DAGGER_ID_CHARACTERISTIC_RX) rxCharacteristic: ClientBleGattCharacteristic,
@Assisted(DAGGER_ID_CHARACTERISTIC_TX) txCharacteristic: ClientBleGattCharacteristic,
scope: CoroutineScope
scope: CoroutineScope,
flipperActionNotifier: FlipperActionNotifier
): FSerialUnsafeApiImpl
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipperdevices.bridge.connection.transport.ble.impl.serial

import com.flipperdevices.bridge.connection.feature.actionnotifier.api.FlipperActionNotifier
import com.flipperdevices.bridge.connection.transport.ble.api.FBleDeviceSerialConfig
import com.flipperdevices.bridge.connection.transport.common.api.serial.FSerialDeviceApi
import com.flipperdevices.core.log.LogTagProvider
Expand All @@ -11,13 +12,14 @@ import javax.inject.Inject

class SerialApiFactory @Inject constructor(
private val unsafeApiImplFactory: FSerialUnsafeApiImpl.Factory,
private val throttlerApiFactory: FSerialOverflowThrottler.Factory
private val throttlerApiFactory: FSerialOverflowThrottler.Factory,
) : LogTagProvider {
override val TAG = "SerialApiCombinedFactory"
fun build(
config: FBleDeviceSerialConfig,
services: ClientBleGattServices,
scope: CoroutineScope
scope: CoroutineScope,
flipperActionNotifier: FlipperActionNotifier
): FSerialDeviceApi? {
val serialService = services.findService(config.serialServiceUuid)
val rxCharacteristic = serialService?.findCharacteristic(config.rxServiceCharUuid)
Expand All @@ -32,7 +34,8 @@ class SerialApiFactory @Inject constructor(
var deviceApi: FSerialDeviceApi = unsafeApiImplFactory(
rxCharacteristic = rxCharacteristic,
txCharacteristic = txCharacteristic,
scope = scope
scope = scope,
flipperActionNotifier = flipperActionNotifier
)

val overflowControlConfig = config.overflowControl
Expand All @@ -47,7 +50,8 @@ class SerialApiFactory @Inject constructor(
deviceApi = throttlerApiFactory(
serialApi = deviceApi,
scope = scope,
overflowCharacteristic = overflowCharacteristic
overflowCharacteristic = overflowCharacteristic,
flipperActionNotifier = flipperActionNotifier
)
}

Expand Down
Loading

0 comments on commit c0779d0

Please sign in to comment.