Sending: limit to 3 retry before failing

This commit is contained in:
ganfra 2020-07-22 11:12:37 +02:00
parent aa5de1896f
commit d7558902f7
10 changed files with 69 additions and 98 deletions

View File

@ -19,7 +19,7 @@ import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.crypto.model.MXEncryptEventContentResult
import im.vector.matrix.android.internal.session.room.send.LocalEchoUpdater
import im.vector.matrix.android.internal.session.room.send.LocalEchoRepository
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.awaitCallback
import javax.inject.Inject
@ -35,7 +35,7 @@ internal interface EncryptEventTask : Task<EncryptEventTask.Params, Event> {
internal class DefaultEncryptEventTask @Inject constructor(
// private val crypto: CryptoService
private val localEchoUpdater: LocalEchoUpdater
private val localEchoRepository: LocalEchoRepository
) : EncryptEventTask {
override suspend fun execute(params: EncryptEventTask.Params): Event {
if (!params.crypto.isRoomEncrypted(params.roomId)) return params.event
@ -44,7 +44,7 @@ internal class DefaultEncryptEventTask @Inject constructor(
throw IllegalArgumentException()
}
localEchoUpdater.updateSendState(localEvent.eventId, SendState.ENCRYPTING)
localEchoRepository.updateSendState(localEvent.eventId, SendState.ENCRYPTING)
val localMutableContent = localEvent.content?.toMutableMap() ?: mutableMapOf()
params.keepKeys?.forEach {

View File

@ -20,7 +20,7 @@ import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.session.room.send.LocalEchoUpdater
import im.vector.matrix.android.internal.session.room.send.LocalEchoRepository
import im.vector.matrix.android.internal.session.room.send.SendResponse
import im.vector.matrix.android.internal.task.Task
import org.greenrobot.eventbus.EventBus
@ -34,7 +34,7 @@ internal interface SendEventTask : Task<SendEventTask.Params, String> {
}
internal class DefaultSendEventTask @Inject constructor(
private val localEchoUpdater: LocalEchoUpdater,
private val localEchoRepository: LocalEchoRepository,
private val encryptEventTask: DefaultEncryptEventTask,
private val roomAPI: RoomAPI,
private val eventBus: EventBus) : SendEventTask {
@ -44,7 +44,7 @@ internal class DefaultSendEventTask @Inject constructor(
val localId = event.eventId!!
try {
localEchoUpdater.updateSendState(localId, SendState.SENDING)
localEchoRepository.updateSendState(localId, SendState.SENDING)
val executeRequest = executeRequest<SendResponse>(eventBus) {
apiCall = roomAPI.send(
localId,
@ -53,10 +53,10 @@ internal class DefaultSendEventTask @Inject constructor(
eventType = event.type
)
}
localEchoUpdater.updateSendState(localId, SendState.SENT)
localEchoRepository.updateSendState(localId, SendState.SENT)
return executeRequest.eventId
} catch (e: Throwable) {
localEchoUpdater.updateSendState(localId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(localId, SendState.UNDELIVERED)
throw e
}
}

View File

@ -20,7 +20,7 @@ import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.session.room.send.LocalEchoUpdater
import im.vector.matrix.android.internal.session.room.send.LocalEchoRepository
import im.vector.matrix.android.internal.session.room.send.SendResponse
import im.vector.matrix.android.internal.task.Task
import org.greenrobot.eventbus.EventBus
@ -34,7 +34,7 @@ internal interface SendVerificationMessageTask : Task<SendVerificationMessageTas
}
internal class DefaultSendVerificationMessageTask @Inject constructor(
private val localEchoUpdater: LocalEchoUpdater,
private val localEchoRepository: LocalEchoRepository,
private val encryptEventTask: DefaultEncryptEventTask,
private val roomAPI: RoomAPI,
private val eventBus: EventBus) : SendVerificationMessageTask {
@ -44,7 +44,7 @@ internal class DefaultSendVerificationMessageTask @Inject constructor(
val localId = event.eventId!!
try {
localEchoUpdater.updateSendState(localId, SendState.SENDING)
localEchoRepository.updateSendState(localId, SendState.SENDING)
val executeRequest = executeRequest<SendResponse>(eventBus) {
apiCall = roomAPI.send(
localId,
@ -53,10 +53,10 @@ internal class DefaultSendVerificationMessageTask @Inject constructor(
eventType = event.type
)
}
localEchoUpdater.updateSendState(localId, SendState.SENT)
localEchoRepository.updateSendState(localId, SendState.SENT)
return executeRequest.eventId
} catch (e: Throwable) {
localEchoUpdater.updateSendState(localId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(localId, SendState.UNDELIVERED)
throw e
}
}

View File

@ -128,6 +128,7 @@ internal class DefaultSendService @AssistedInject constructor(
override fun resendTextMessage(localEcho: TimelineEvent): Cancelable? {
if (localEcho.root.isTextMessage() && localEcho.root.sendState.hasFailed()) {
localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT)
return sendEvent(localEcho.root)
}
return null

View File

@ -52,7 +52,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
) : SessionWorkerParams
@Inject lateinit var crypto: CryptoService
@Inject lateinit var localEchoUpdater: LocalEchoUpdater
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result {
Timber.v("Start Encrypt work")
@ -74,7 +74,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
if (localEvent.eventId == null) {
return Result.success()
}
localEchoUpdater.updateSendState(localEvent.eventId, SendState.ENCRYPTING)
localEchoRepository.updateSendState(localEvent.eventId, SendState.ENCRYPTING)
val localMutableContent = localEvent.content?.toMutableMap() ?: mutableMapOf()
params.keepKeys?.forEach {
@ -116,7 +116,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
senderCurve25519Key = result.eventContent["sender_key"] as? String,
claimedEd25519Key = crypto.getMyDevice().fingerprint()
)
localEchoUpdater.updateEncryptedEcho(localEvent.eventId, safeResult.eventContent, decryptionLocalEcho)
localEchoRepository.updateEncryptedEcho(localEvent.eventId, safeResult.eventContent, decryptionLocalEcho)
}
val nextWorkerParams = SendEventWorker.Params(params.sessionId, encryptedEvent)
@ -126,7 +126,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
is Failure.CryptoError -> SendState.FAILED_UNKNOWN_DEVICES
else -> SendState.UNDELIVERED
}
localEchoUpdater.updateSendState(localEvent.eventId, sendState)
localEchoRepository.updateSendState(localEvent.eventId, sendState)
// always return success, or the chain will be stuck for ever!
val nextWorkerParams = SendEventWorker.Params(params.sessionId, localEvent, error?.localizedMessage
?: "Error")

View File

@ -17,6 +17,7 @@
package im.vector.matrix.android.internal.session.room.send
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Content
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
@ -24,7 +25,9 @@ import im.vector.matrix.android.api.session.room.model.message.MessageContent
import im.vector.matrix.android.api.session.room.model.message.MessageType
import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
import im.vector.matrix.android.internal.crypto.MXEventDecryptionResult
import im.vector.matrix.android.internal.database.helper.nextId
import im.vector.matrix.android.internal.database.mapper.ContentMapper
import im.vector.matrix.android.internal.database.mapper.TimelineEventMapper
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.mapper.toEntity
@ -36,8 +39,8 @@ import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.query.findAllInRoomWithSendStates
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.session.room.summary.RoomSummaryUpdater
import im.vector.matrix.android.internal.session.room.membership.RoomMemberHelper
import im.vector.matrix.android.internal.session.room.summary.RoomSummaryUpdater
import im.vector.matrix.android.internal.session.room.timeline.DefaultTimeline
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm
@ -83,6 +86,31 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
}
}
fun updateSendState(eventId: String, sendState: SendState) {
Timber.v("Update local state of $eventId to ${sendState.name}")
monarchy.writeAsync { realm ->
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
if (sendingEventEntity != null) {
if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) {
// If already synced, do not put as sent
} else {
sendingEventEntity.sendState = sendState
}
}
}
}
fun updateEncryptedEcho(eventId: String, encryptedContent: Content, mxEventDecryptionResult: MXEventDecryptionResult) {
monarchy.writeAsync { realm ->
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
if (sendingEventEntity != null) {
sendingEventEntity.type = EventType.ENCRYPTED
sendingEventEntity.content = ContentMapper.map(encryptedContent)
sendingEventEntity.setDecryptionResult(mxEventDecryptionResult)
}
}
}
suspend fun deleteFailedEcho(roomId: String, localEcho: TimelineEvent) {
monarchy.awaitTransaction { realm ->
TimelineEventEntity.where(realm, roomId = roomId, eventId = localEcho.root.eventId ?: "").findFirst()?.deleteFromRealm()
@ -92,11 +120,11 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
suspend fun clearSendingQueue(roomId: String) {
monarchy.awaitTransaction { realm ->
RoomEntity.where(realm, roomId).findFirst()?.let { room ->
room.sendingTimelineEvents.forEach {
it.root?.sendState = SendState.UNDELIVERED
}
}
TimelineEventEntity
.findAllInRoomWithSendStates(realm, roomId, SendState.IS_SENDING_STATES)
.forEach {
it.root?.sendState = SendState.UNSENT
}
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.send
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Content
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.crypto.MXEventDecryptionResult
import im.vector.matrix.android.internal.database.mapper.ContentMapper
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import timber.log.Timber
import javax.inject.Inject
internal class LocalEchoUpdater @Inject constructor(@SessionDatabase private val monarchy: Monarchy) {
fun updateSendState(eventId: String, sendState: SendState) {
Timber.v("Update local state of $eventId to ${sendState.name}")
monarchy.writeAsync { realm ->
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
if (sendingEventEntity != null) {
if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) {
// If already synced, do not put as sent
} else {
sendingEventEntity.sendState = sendState
}
}
}
}
fun updateEncryptedEcho(eventId: String, encryptedContent: Content, mxEventDecryptionResult: MXEventDecryptionResult) {
monarchy.writeAsync { realm ->
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
if (sendingEventEntity != null) {
sendingEventEntity.type = EventType.ENCRYPTED
sendingEventEntity.content = ContentMapper.map(encryptedContent)
sendingEventEntity.setDecryptionResult(mxEventDecryptionResult)
}
}
}
}

View File

@ -54,7 +54,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
@Inject lateinit var workManagerProvider: WorkManagerProvider
@Inject lateinit var timelineSendEventWorkCommon: TimelineSendEventWorkCommon
@Inject lateinit var localEchoUpdater: LocalEchoUpdater
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result {
Timber.v("Start dispatch sending multiple event work")
@ -67,7 +67,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
if (params.lastFailureMessage != null) {
params.events.forEach { event ->
event.eventId?.let { localEchoUpdater.updateSendState(it, SendState.UNDELIVERED) }
event.eventId?.let { localEchoRepository.updateSendState(it, SendState.UNDELIVERED) }
}
// Transmit the error if needed?
return Result.success(inputData)

View File

@ -33,6 +33,8 @@ import org.greenrobot.eventbus.EventBus
import timber.log.Timber
import javax.inject.Inject
private const val MAX_NUMBER_OF_RETRY_BEFORE_FAILING = 3
/**
* Possible previous worker: [EncryptEventWorker] or first worker
* Possible next worker : None
@ -63,7 +65,7 @@ internal class SendEventWorker(context: Context,
)
}
@Inject lateinit var localEchoUpdater: LocalEchoUpdater
@Inject lateinit var localEchoRepository: LocalEchoRepository
@Inject lateinit var roomAPI: RoomAPI
@Inject lateinit var eventBus: EventBus
@ -74,16 +76,15 @@ internal class SendEventWorker(context: Context,
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
if (params.eventId == null || params.roomId == null || params.type == null) {
// compat with old params, make it fail if any
if (params.event?.eventId != null) {
localEchoUpdater.updateSendState(params.event.eventId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(params.event.eventId, SendState.UNDELIVERED)
}
return Result.success()
}
if (params.lastFailureMessage != null) {
localEchoUpdater.updateSendState(params.eventId, SendState.UNDELIVERED)
localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
// Transmit the error
return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
@ -92,21 +93,22 @@ internal class SendEventWorker(context: Context,
sendEvent(params.eventId, params.roomId, params.type, params.contentStr)
Result.success()
} catch (exception: Throwable) {
if (exception.shouldBeRetried()) {
Result.retry()
// It does start from 0, we want it to stop if it fails the third time
val currentAttemptCount = runAttemptCount + 1
if (currentAttemptCount >= MAX_NUMBER_OF_RETRY_BEFORE_FAILING || !exception.shouldBeRetried()) {
localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
return Result.success()
} else {
localEchoUpdater.updateSendState(params.eventId, SendState.UNDELIVERED)
// always return success, or the chain will be stuck for ever!
Result.success()
Result.retry()
}
}
}
private suspend fun sendEvent(eventId: String, roomId: String, type: String, contentStr: String?) {
localEchoUpdater.updateSendState(eventId, SendState.SENDING)
localEchoRepository.updateSendState(eventId, SendState.SENDING)
executeRequest<SendResponse>(eventBus) {
apiCall = roomAPI.send(eventId, roomId, type, contentStr)
}
localEchoUpdater.updateSendState(eventId, SendState.SENT)
localEchoRepository.updateSendState(eventId, SendState.SENT)
}
}

View File

@ -342,21 +342,18 @@ internal class DefaultTimeline(
private fun updateLoadingStates(results: RealmResults<TimelineEventEntity>) {
val lastCacheEvent = results.lastOrNull()
val lastBuiltEvent = builtEvents.lastOrNull()
val firstCacheEvent = results.firstOrNull()
val firstBuiltEvent = builtEvents.firstOrNull()
val chunkEntity = getLiveChunk()
updateState(Timeline.Direction.FORWARDS) {
it.copy(
hasMoreInCache = firstBuiltEvent != null && firstBuiltEvent.displayIndex < firstCacheEvent?.displayIndex ?: Int.MIN_VALUE,
hasMoreInCache = !builtEventsIdMap.containsKey(firstCacheEvent?.eventId),
hasReachedEnd = chunkEntity?.isLastForward ?: false
)
}
updateState(Timeline.Direction.BACKWARDS) {
it.copy(
hasMoreInCache = lastBuiltEvent == null || lastBuiltEvent.displayIndex > lastCacheEvent?.displayIndex ?: Int.MAX_VALUE,
hasMoreInCache = !builtEventsIdMap.containsKey(lastCacheEvent?.eventId),
hasReachedEnd = chunkEntity?.isLastBackward ?: false || lastCacheEvent?.root?.type == EventType.STATE_ROOM_CREATE
)
}