diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/di/MatrixModule.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/di/MatrixModule.kt index c17864b82b..8b37d01b70 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/di/MatrixModule.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/di/MatrixModule.kt @@ -38,8 +38,7 @@ internal object MatrixModule { return MatrixCoroutineDispatchers(io = Dispatchers.IO, computation = Dispatchers.Default, main = Dispatchers.Main, - crypto = createBackgroundHandler("Crypto_Thread").asCoroutineDispatcher(), - sync = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + crypto = createBackgroundHandler("Crypto_Thread").asCoroutineDispatcher() ) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/DefaultSession.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/DefaultSession.kt index 32126fb78d..ac86d437ea 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/DefaultSession.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/DefaultSession.kt @@ -44,6 +44,7 @@ import im.vector.matrix.android.api.session.sync.SyncState import im.vector.matrix.android.api.session.user.UserService import im.vector.matrix.android.internal.crypto.DefaultCryptoService import im.vector.matrix.android.internal.database.LiveEntityObserver +import im.vector.matrix.android.internal.session.sync.SyncTaskSequencer import im.vector.matrix.android.internal.session.sync.SyncTokenStore import im.vector.matrix.android.internal.session.sync.job.SyncThread import im.vector.matrix.android.internal.session.sync.job.SyncWorker @@ -74,6 +75,7 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se private val syncThreadProvider: Provider, private val contentUrlResolver: ContentUrlResolver, private val syncTokenStore: SyncTokenStore, + private val syncTaskSequencer: SyncTaskSequencer, private val contentUploadProgressTracker: ContentUploadStateTracker, private val initialSyncProgressService: Lazy, private val homeServerCapabilitiesService: Lazy) @@ -143,6 +145,7 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se cryptoService.get().close() isOpen = false EventBus.getDefault().unregister(this) + syncTaskSequencer.close() } override fun syncState(): LiveData { diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt index 1082fa835b..688f159fb0 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt @@ -45,10 +45,15 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI, private val initialSyncProgressService: DefaultInitialSyncProgressService, private val syncTokenStore: SyncTokenStore, private val getHomeServerCapabilitiesTask: GetHomeServerCapabilitiesTask, - private val userStore: UserStore + private val userStore: UserStore, + private val syncTaskSequencer: SyncTaskSequencer ) : SyncTask { - override suspend fun execute(params: SyncTask.Params) { + override suspend fun execute(params: SyncTask.Params) = syncTaskSequencer.post { + doSync(params) + } + + private suspend fun doSync(params: SyncTask.Params) { Timber.v("Sync task started on Thread: ${Thread.currentThread().name}") // Maybe refresh the home server capabilities data we know getHomeServerCapabilitiesTask.execute(Unit) diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTaskSequencer.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTaskSequencer.kt new file mode 100644 index 0000000000..bfa49b7af5 --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTaskSequencer.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.vector.matrix.android.internal.session.sync + +import im.vector.matrix.android.internal.session.SessionScope +import im.vector.matrix.android.internal.task.ChannelCoroutineSequencer +import javax.inject.Inject + +@SessionScope +internal class SyncTaskSequencer @Inject constructor() : ChannelCoroutineSequencer() diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt index 8a91186e07..ce46531045 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt @@ -63,7 +63,7 @@ abstract class SyncService : Service() { Timber.i("Received a start while was already syncing... ignore") } else { isRunning.set(true) - serviceScope.launch(coroutineDispatchers.sync) { + serviceScope.launch(coroutineDispatchers.io) { doSync() } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt index 01923b103f..1acb7dbb4a 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt @@ -30,8 +30,7 @@ import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.util.BackgroundDetectionObserver -import kotlinx.coroutines.delay -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import timber.log.Timber import java.net.SocketTimeoutException import java.util.concurrent.CountDownLatch @@ -42,14 +41,13 @@ private const val DEFAULT_LONG_POOL_TIMEOUT = 30_000L internal class SyncThread @Inject constructor(private val syncTask: SyncTask, private val networkConnectivityChecker: NetworkConnectivityChecker, - private val backgroundDetectionObserver: BackgroundDetectionObserver, - private val taskExecutor: TaskExecutor -) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener { + private val backgroundDetectionObserver: BackgroundDetectionObserver) + : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener { private var state: SyncState = SyncState.IDLE private var liveState = MutableLiveData() private val lock = Object() - private var cancelableTask: Cancelable? = null + private val syncScope = CoroutineScope(SupervisorJob()) private var isStarted = false @@ -74,14 +72,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, if (isStarted) { Timber.v("Pause sync...") isStarted = false - cancelableTask?.cancel() + syncScope.coroutineContext.cancelChildren() } } fun kill() = synchronized(lock) { Timber.v("Kill sync...") updateStateTo(SyncState.KILLING) - cancelableTask?.cancel() + syncScope.coroutineContext.cancelChildren() lock.notify() } @@ -101,7 +99,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, isStarted = true networkConnectivityChecker.register(this) backgroundDetectionObserver.register(this) - while (state != SyncState.KILLING) { Timber.v("Entering loop, state: $state") if (!networkConnectivityChecker.hasInternetAccess()) { @@ -122,9 +119,12 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, val timeout = state.let { if (it is SyncState.RUNNING && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT } Timber.v("Execute sync request with timeout $timeout") val params = SyncTask.Params(timeout) - runBlocking { + val sync = syncScope.launch { doSync(params) } + runBlocking { + sync.join() + } Timber.v("...Continue") } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt index 804102913d..eb4f2ff7c2 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt @@ -18,19 +18,14 @@ package im.vector.matrix.android.internal.session.sync.job import android.content.Context import androidx.work.* import com.squareup.moshi.JsonClass -import im.vector.matrix.android.api.failure.Failure -import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.failure.isTokenError import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.session.sync.SyncTask import im.vector.matrix.android.internal.task.TaskExecutor -import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers import im.vector.matrix.android.internal.worker.WorkManagerUtil import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWorkRequestBuilder import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.getSessionComponent -import kotlinx.coroutines.delay -import kotlinx.coroutines.withContext import timber.log.Timber import java.util.concurrent.TimeUnit import javax.inject.Inject @@ -50,7 +45,6 @@ internal class SyncWorker(context: Context, @Inject lateinit var syncTask: SyncTask @Inject lateinit var taskExecutor: TaskExecutor - @Inject lateinit var coroutineDispatchers: MatrixCoroutineDispatchers @Inject lateinit var networkConnectivityChecker: NetworkConnectivityChecker override suspend fun doWork(): Result { @@ -72,7 +66,7 @@ internal class SyncWorker(context: Context, ) } - private suspend fun doSync(timeout: Long) = withContext(coroutineDispatchers.sync) { + private suspend fun doSync(timeout: Long) { val taskParams = SyncTask.Params(timeout) syncTask.execute(taskParams) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt index 0f8439b144..1df0dae901 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/CoroutineSequencer.kt @@ -16,32 +16,36 @@ package im.vector.matrix.android.internal.task -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.launch import java.util.concurrent.Executors +/** + * This class intends to be used for ensure suspendable methods are played sequentially all the way long. + */ internal interface CoroutineSequencer { + /** + * @param block the suspendable block to execute + * @return the result of the block + */ suspend fun post(block: suspend () -> T): T - fun cancel() + + /** + * Cancel all and close, so you won't be able to post anything else after + */ fun close() } -internal class ChannelCoroutineSequencer : CoroutineSequencer { +internal open class ChannelCoroutineSequencer : CoroutineSequencer { private data class Message( val block: suspend () -> T, val deferred: CompletableDeferred ) - private val messageChannel: Channel> = Channel() + private var messageChannel: Channel> = Channel() private val coroutineScope = CoroutineScope(SupervisorJob()) + // This will ensure private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() init { @@ -62,13 +66,8 @@ internal class ChannelCoroutineSequencer : CoroutineSequencer { } override fun close() { - messageChannel.cancel() coroutineScope.coroutineContext.cancelChildren() - } - - override fun cancel() { - close() - launchCoroutine() + messageChannel.close() } override suspend fun post(block: suspend () -> T): T { @@ -78,6 +77,8 @@ internal class ChannelCoroutineSequencer : CoroutineSequencer { return try { deferred.await() } catch (cancellation: CancellationException) { + // In case of cancellation, we stop the current coroutine context + // and relaunch one to consume next messages coroutineScope.coroutineContext.cancelChildren() launchCoroutine() throw cancellation diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt index d5392779d1..244cc83901 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt @@ -85,6 +85,5 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers TaskThread.IO -> coroutineDispatchers.io TaskThread.CALLER -> EmptyCoroutineContext TaskThread.CRYPTO -> coroutineDispatchers.crypto - TaskThread.SYNC -> coroutineDispatchers.sync } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskThread.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskThread.kt index 16ed93662c..c04e9fbce6 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskThread.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskThread.kt @@ -21,6 +21,5 @@ internal enum class TaskThread { COMPUTATION, IO, CALLER, - CRYPTO, - SYNC + CRYPTO } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/MatrixCoroutineDispatchers.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/MatrixCoroutineDispatchers.kt index 23201c084e..d15389f703 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/MatrixCoroutineDispatchers.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/MatrixCoroutineDispatchers.kt @@ -22,6 +22,5 @@ internal data class MatrixCoroutineDispatchers( val io: CoroutineDispatcher, val computation: CoroutineDispatcher, val main: CoroutineDispatcher, - val crypto: CoroutineDispatcher, - val sync: CoroutineDispatcher + val crypto: CoroutineDispatcher ) diff --git a/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/MatrixCoroutineSequencersTest.kt b/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/CoroutineSequencersTest.kt similarity index 99% rename from matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/MatrixCoroutineSequencersTest.kt rename to matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/CoroutineSequencersTest.kt index c69146c3dc..a367632d9f 100644 --- a/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/MatrixCoroutineSequencersTest.kt +++ b/matrix-sdk-android/src/test/java/im/vector/matrix/android/internal/task/CoroutineSequencersTest.kt @@ -26,7 +26,7 @@ import org.junit.Assert.assertEquals import org.junit.Test import java.util.concurrent.Executors -class MatrixCoroutineSequencersTest { +class CoroutineSequencersTest { private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()