Coroutines: introduce a sequencer

This commit is contained in:
ganfra 2019-12-11 20:39:07 +01:00
parent 3a269be2ef
commit 6b61c95843
3 changed files with 190 additions and 47 deletions

View File

@ -30,6 +30,8 @@ import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import timber.log.Timber
import java.net.SocketTimeoutException
import java.util.concurrent.CountDownLatch
@ -99,9 +101,9 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
isStarted = true
networkConnectivityChecker.register(this)
backgroundDetectionObserver.register(this)
while (state != SyncState.KILLING) {
Timber.v("Entering loop, state: $state")
if (!networkConnectivityChecker.hasInternetAccess()) {
Timber.v("No network. Waiting...")
updateStateTo(SyncState.NO_NETWORK)
@ -116,24 +118,26 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
if (state !is SyncState.RUNNING) {
updateStateTo(SyncState.RUNNING(afterPause = true))
}
// No timeout after a pause
val timeout = state.let { if (it is SyncState.RUNNING && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT }
Timber.v("Execute sync request with timeout $timeout")
val latch = CountDownLatch(1)
val params = SyncTask.Params(timeout)
cancelableTask = syncTask.configureWith(params) {
this.callbackThread = TaskThread.SYNC
this.executionThread = TaskThread.SYNC
this.callback = object : MatrixCallback<Unit> {
override fun onSuccess(data: Unit) {
Timber.v("onSuccess")
latch.countDown()
runBlocking {
doSync(params)
}
Timber.v("...Continue")
}
}
Timber.v("Sync killed")
updateStateTo(SyncState.KILLED)
backgroundDetectionObserver.unregister(this)
networkConnectivityChecker.unregister(this)
}
override fun onFailure(failure: Throwable) {
private suspend fun doSync(params: SyncTask.Params) {
try {
syncTask.execute(params)
} catch (failure: Throwable) {
if (failure is Failure.NetworkConnection && failure.cause is SocketTimeoutException) {
// Timeout are not critical
Timber.v("Timeout")
@ -146,35 +150,20 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
updateStateTo(SyncState.KILLING)
} else {
Timber.e(failure)
if (failure !is Failure.NetworkConnection || failure.cause is JsonEncodingException) {
// Wait 10s before retrying
Timber.v("Wait 10s")
sleep(RETRY_WAIT_TIME_MS)
delay(RETRY_WAIT_TIME_MS)
}
}
latch.countDown()
}
}
}
.executeBy(taskExecutor)
latch.await()
} finally {
state.let {
if (it is SyncState.RUNNING && it.afterPause) {
updateStateTo(SyncState.RUNNING(afterPause = false))
}
}
Timber.v("...Continue")
}
}
Timber.v("Sync killed")
updateStateTo(SyncState.KILLED)
backgroundDetectionObserver.unregister(this)
networkConnectivityChecker.unregister(this)
}
private fun updateStateTo(newState: SyncState) {
Timber.v("Update state from $state to $newState")

View File

@ -0,0 +1,93 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.task
import im.vector.matrix.android.internal.di.MatrixScope
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.Executors
import javax.inject.Inject
@MatrixScope
internal class MatrixCoroutineSequencers @Inject constructor() {
private val sequencers = HashMap<String, CoroutineSequencer>()
suspend fun post(name: String, block: suspend CoroutineScope.() -> Any): Any {
val sequencer = sequencers.getOrPut(name) {
ChannelCoroutineSequencer()
}
return sequencer.post(block)
}
fun cancel(name: String) {
sequencers.remove(name)?.cancel()
}
fun cancelAll() {
sequencers.values.forEach {
it.cancel()
}
sequencers.clear()
}
}
internal interface CoroutineSequencer {
suspend fun post(block: suspend CoroutineScope.() -> Any): Any
fun cancel()
}
internal class ChannelCoroutineSequencer : CoroutineSequencer {
private data class Message(
val block: suspend CoroutineScope.() -> Any,
val deferred: CompletableDeferred<Any>
)
private val messageChannel: Channel<Message> = Channel()
private val coroutineScope = CoroutineScope(SupervisorJob())
private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
init {
coroutineScope.launch(singleDispatcher) {
for (message in messageChannel) {
try {
val result = message.block(this)
message.deferred.complete(result)
} catch (exception: Throwable) {
message.deferred.completeExceptionally(exception)
}
}
}
}
override fun cancel() {
messageChannel.cancel()
coroutineScope.coroutineContext.cancelChildren()
}
override suspend fun post(block: suspend CoroutineScope.() -> Any): Any {
val deferred = CompletableDeferred<Any>()
val message = Message(block, deferred)
messageChannel.send(message)
return deferred.await()
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.task
import kotlinx.coroutines.*
import org.junit.Test
import java.util.concurrent.Executors
class MatrixCoroutineSequencersTest {
@Test
fun sequencer_should_run_sequential() {
val sequencer = MatrixCoroutineSequencers()
val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val jobs = listOf(
GlobalScope.launch(dispatcher) {
sequencer.post("Sequencer1") { suspendingMethod("#3") }
},
GlobalScope.launch(dispatcher) {
sequencer.post("Sequencer1") { suspendingMethod("#4") }
},
GlobalScope.launch(dispatcher) {
sequencer.post("Sequencer2") { suspendingMethod("#5") }
},
GlobalScope.launch(dispatcher) {
sequencer.post("Sequencer2") { suspendingMethod("#6") }
},
GlobalScope.launch(dispatcher) {
sequencer.post("Sequencer2") { suspendingMethod("#7") }
}
)
Thread.sleep(5500)
sequencer.cancelAll()
runBlocking {
jobs.joinAll()
}
}
private suspend fun suspendingMethod(name: String): String = withContext(Dispatchers.Default) {
println("BLOCKING METHOD $name STARTS")
delay(3000)
println("BLOCKING METHOD $name ENDS")
name
}
}