1
votes

I have got the following method:

    operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> = flow {
        val jobDomainModelList = mutableListOf<JobDomainModel>()
        jobListingRepository.searchJobs(sanitizeSearchQuery(query))
            .collect { jobEntityList: List<JobEntity> ->
                for (jobEntity in jobEntityList) {
                    categoriesRepository.getCategoryById(jobEntity.categoryId)
                        .collect { categoryEntity ->
                            if (categoryEntity.categoryId == jobEntity.categoryId) {
                                jobDomainModelList.add(jobEntity.toDomainModel(categoryEntity))
                            }
                        }
                }
                emit(jobDomainModelList)
            }
    }

It searches in a repository calling the search method that returns a Flow<List<JobEntity>>. Then for every JobEntity in the flow, I need to fetch from the DB the category to which that job belongs. Once I have that category and the job, I can convert the job to a domain model object (JobDomainModel) and add it to a list, which will be returned in a flow as the return object of the method.

The problem I'm having is that nothing is ever emitted. I'm not sure if I'm missing something from working with flows in Kotlin, but I don't fetch the category by ID (categoriesRepository.getCategoryById(jobEntity.categoryId)) it then works fine and the list is emitted.

Thanks a lot in advance!

1
Just to make sure: is searchJobs() actually a stream of values (e.g. a WebSocket) or a single-shot request?Alex Krupa
It is also a Flow, as it searches in the DB using Room via DAOnoloman
I have also tried a map instead of collect (when fetching the category for a job) but it still doesn't worknoloman

1 Answers

0
votes

I think the problem is that you're collecting infinite length Flows, so collect never returns. You should use .take(1) to get a finite Flow before collecting it, or use first().

The Flows returned by your DAO are infinite length. The first value is the first query made, but the Flow will continue forever until cancelled. Each item in the Flow is a new query made when the contents of the database change.

Something like this:

operator fun invoke(query: String): Flow<MutableList<JobDomainModel>> =
    jobListingRepository.searchJobs(sanitizeSearchQuery(query))
        .map { jobEntityList: List<JobEntity> ->
            jobEntityList.mapNotNull { jobEntity ->
                categoriesRepository.getCategoryById(jobEntity.categoryId)
                    .first()
                    .takeIf { it.categoryId == jobEntity.categoryId }
            }
        }

Alternatively, in your DAO you could make a suspend function version of getCategoryById() that simply returns the list.