Coroutine sequencer: use semaphore

This commit is contained in:
Ganard 2020-02-11 12:08:47 +01:00 committed by ganfra
parent b04ee7153a
commit a305ce302e
3 changed files with 16 additions and 61 deletions

View File

@ -17,8 +17,8 @@
package im.vector.matrix.android.internal.session.sync
import im.vector.matrix.android.internal.session.SessionScope
import im.vector.matrix.android.internal.task.ChannelCoroutineSequencer
import im.vector.matrix.android.internal.task.SemaphoreCoroutineSequencer
import javax.inject.Inject
@SessionScope
internal class SyncTaskSequencer @Inject constructor() : ChannelCoroutineSequencer<Unit>()
internal class SyncTaskSequencer @Inject constructor() : SemaphoreCoroutineSequencer()

View File

@ -16,72 +16,27 @@
package im.vector.matrix.android.internal.task
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.Executors
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
/**
* This class intends to be used for ensure suspendable methods are played sequentially all the way long.
*/
internal interface CoroutineSequencer<T> {
internal interface CoroutineSequencer {
/**
* @param block the suspendable block to execute
* @return the result of the block
*/
suspend fun post(block: suspend () -> T): T
/**
* Cancel all and close, so you won't be able to post anything else after
*/
fun close()
suspend fun <T> post(block: suspend () -> T): T
}
internal open class ChannelCoroutineSequencer<T> : CoroutineSequencer<T> {
internal open class SemaphoreCoroutineSequencer : CoroutineSequencer {
private data class Message<T>(
val block: suspend () -> T,
val deferred: CompletableDeferred<T>
)
private val semaphore = Semaphore(1) // Permits 1 suspend function at a time.
private var messageChannel: Channel<Message<T>> = Channel()
private val coroutineScope = CoroutineScope(SupervisorJob())
// This will ensure
private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
init {
launchCoroutine()
}
private fun launchCoroutine() {
coroutineScope.launch(singleDispatcher) {
for (message in messageChannel) {
try {
val result = message.block()
message.deferred.complete(result)
} catch (exception: Throwable) {
message.deferred.completeExceptionally(exception)
}
}
}
}
override fun close() {
coroutineScope.coroutineContext.cancelChildren()
messageChannel.close()
}
override suspend fun post(block: suspend () -> T): T {
val deferred = CompletableDeferred<T>()
val message = Message(block, deferred)
messageChannel.send(message)
return try {
deferred.await()
} catch (cancellation: CancellationException) {
// In case of cancellation, we stop the current coroutine context
// and relaunch one to consume next messages
coroutineScope.coroutineContext.cancelChildren()
launchCoroutine()
throw cancellation
override suspend fun <T> post(block: suspend () -> T): T {
return semaphore.withPermit {
block()
}
}
}

View File

@ -32,7 +32,7 @@ class CoroutineSequencersTest {
@Test
fun sequencer_should_run_sequential() {
val sequencer = ChannelCoroutineSequencer<String>()
val sequencer = SemaphoreCoroutineSequencer()
val results = ArrayList<String>()
val jobs = listOf(
@ -63,9 +63,9 @@ class CoroutineSequencersTest {
@Test
fun sequencer_should_run_parallel() {
val sequencer1 = ChannelCoroutineSequencer<String>()
val sequencer2 = ChannelCoroutineSequencer<String>()
val sequencer3 = ChannelCoroutineSequencer<String>()
val sequencer1 = SemaphoreCoroutineSequencer()
val sequencer2 = SemaphoreCoroutineSequencer()
val sequencer3 = SemaphoreCoroutineSequencer()
val results = ArrayList<String>()
val jobs = listOf(
GlobalScope.launch(dispatcher) {
@ -92,7 +92,7 @@ class CoroutineSequencersTest {
@Test
fun sequencer_should_jump_to_next_when_current_job_canceled() {
val sequencer = ChannelCoroutineSequencer<String>()
val sequencer = SemaphoreCoroutineSequencer()
val results = ArrayList<String>()
val jobs = listOf(
GlobalScope.launch(dispatcher) {