0
votes

I am trying to launch an Coroutine inside my PagingSource in order to watch how long my paging source is already trying to get my data. The only problem I have here is, that my Coroutine is still somehow collecting some data, even after I stopped my shopPagingWatcher Flow. Because of this, it throws IOException("No Intenet Exception) even when it should not.

I am launching a Coroutine because watching the state should not block the main flow of my paging source

PagingSource

class ShopRemoteMediator @Inject constructor(
    private val db: FirebaseFirestore,
    private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {

    @InternalCoroutinesApi
    override suspend fun load(params: LoadParams<QuerySnapshot>): LoadResult<QuerySnapshot, Product> {
        return try {
            // Launch Async Coroutine, Observe State, throw IO Exception when not loaded within 5 seconds
            shopPagingWatcher.start()
            CoroutineScope(Dispatchers.IO).launch {
                shopPagingWatcher.observeMaxTimeReached().collect { maxTimeReached ->
                    if (maxTimeReached) {
                        Timber.d("Mediator failed")
                        throw IOException("No Internet Exception")
                    }
                }
            }

            val currentPage = params.key ?: db.collection(FIREBASE_PRODUCTS)
                .limit(SHOP_LIST_LIMIT)
                .get()
                .await()

            val lastDocumentSnapShot = currentPage.documents[currentPage.size() - 1]

            val nextPage = db.collection(FIREBASE_PRODUCTS)
                .limit(SHOP_LIST_LIMIT)
                .startAfter(lastDocumentSnapShot)
                .get()
                .await()

            // When PagingSource is here, it successfully loaded currentPage and nextPage, therefore stop Watcher
            Timber.d("Mediator Sucessfull")
            shopPagingWatcher.stop()

            LoadResult.Page(
                data = currentPage.toObjects(),
                prevKey = null,
                nextKey = nextPage
            )

        } catch (e: Exception) {
            // IOException should be caught here, but it is not! The app crashed instead!
            Timber.d("Mediator Exception ist $e")
            LoadResult.Error(e)
        }
    }
}

ShopPagingWatcher

@Singleton
class ShopPagingWatcher @Inject constructor() : Workwatcher()

Abstract WorkWatcher

abstract class Workwatcher {
    private companion object {
        private val dispatcher = Dispatchers.IO
        private var timeStamp by Delegates.notNull<Long>()

        private var running = false
        private var manuallyStopped = false
        private var finished = false

        private const val maxTime: Long = 5000000000L
    }

    // Push the current timestamp, set running to true
    // I don't know if it is necessary to use "synchronized"
    @InternalCoroutinesApi
    fun start() = synchronized(dispatcher) {
        timeStamp = System.nanoTime()
        running = true
        manuallyStopped = false
        finished = false
    }


    // Manually stop the WorkerHelper 
    // I don't know if it is necessary to use "synchronized"
    @InternalCoroutinesApi
    fun stop()  = synchronized(dispatcher) {
        running = false
        manuallyStopped = true
        finished = true
        Timber.d("Mediator stopped")
    }

    // Function that observes the time
    fun observeMaxTimeReached(): Flow<Boolean> = flow {
        // Check if maxTime is not passed with → (System.nanoTime() - timeStamp) <= maxTime
        while (running && !finished && !manuallyStopped && (System.nanoTime() - timeStamp) <= maxTime) {
            emit(false)
            Timber.d("Currenttime is smaller, everything fine")
        }
        // This will be executed only when the Worker is running longer than maxTime
        if (!manuallyStopped && !finished) {
            Timber.d("Currenttime bigger, yikes. Stop worker")
            emit(true)
            running = false
            finished = true
            return@flow
        } else if (finished || manuallyStopped) {
            return@flow
        }
    }.flowOn(dispatcher)
}

How should I change my Coroutine inside my PagingSource in order to achieve my goal? Timber.d("Mediator stopped) gets called.

I appreciate every help, thank you.

1
You haven't really cancelled the flow. You have to cancel the coroutine that is collecting the flow, meaning cancelling the Job returned from launch.Louis Wasserman
@LouisWasserman I don't understand. I have stopped my flow with shopPagingWatcher.stop. Wouldn't that mean, that my flow is cancelled?Andrew
No. Flows aren't really things that can be stopped, only collections of flows can be stopped.Louis Wasserman
@LouisWasserman Then what does my stop() function do?Andrew

1 Answers

1
votes

Do you need to measure duration? Time is already passing everywhere, you don't need another thread or coroutine to track that. There's measureNanoTime {} that measures how long a code block took to execute.

Do you need to apply a timeout inside a suspending function? There's withTimeout exactly for that. Example:

class ShopRemoteMediator @Inject constructor(
    private val db: FirebaseFirestore,
    private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {

    @InternalCoroutinesApi
    override suspend fun load(
        params: LoadParams<QuerySnapshot>
    ): LoadResult<QuerySnapshot, Product> {
        return try {
            withTimeout(5, TimeUnit.SECONDS) {         // <<<<<<<<<<
                val currentPage = ...
                val nextPage = ...
                LoadResult.Page(
                    data = currentPage.toObjects(),
                    prevKey = null,
                    nextKey = nextPage
                )
            }
        } catch (e: IOException) {
            LoadResult.Error(e)
        } catch (e: TimeoutCancellationException) {    // <<<<<<<<<<
            LoadResult.Error(e)
        }
    }
}