Fetch Data from multiple sources asynchronously using Kotlin Coroutines and combine them in Kotlin
Let’s consider a use case where user profile information comes from multiple sources: –
- User Bio information from http API endpoint
- User profile pictures from S3 bucket
- User documents from FTP server
We want to use Kotlin coroutines to fetch the profile of 10 users from these 3 different sources asynchronously and return the combined result.
We will do the following things to achieve this: –
- We create 3 functions to fetch bio, pictures and documents asynchronously using
GlobalScope.asyncwhich return a
Deferredresult sometime in future. These 3 functions are: –
- Next, we create a function to fetch user profile
fetchUserProfileAsyncasynchronously, which wait for the results using
await()from above 3 sources as they arrive, combine them, and return the
- Last, we create a function to fetch multiple user profiles
fetchUserProfileswhich map the result using
await()for multiple user profile as they arrive and return the list of user profiles.
- We also create a Coroutine dispatcher fixed thread pool of size 10 using Java executor service
Executors.newFixedThreadPool(10).asCoroutineDispatcher()and use this to spawn the threads for all our asynchronous operations.
We use the suffix
Async in function names which returns
Deferred result, which is a good practice to identify asynchronous functions.
package com.example.concurrency import kotlinx.coroutines.* import java.util.concurrent.Executors import kotlin.system.measureTimeMillis val executor = Executors.newFixedThreadPool(10).asCoroutineDispatcher() data class Profile(val bio: Bio, val picture: List<String>, val documents: List<String>) data class Bio(val name: String, val age: Int, val gender: String, val location: String) fun main() runBlocking val timeElapsed = measureTimeMillis // fetch profile for 10 users fetchUserProfiles(listOf("andrew", "billy", "charlie", "david", "emma", "flora", "gavin", "harry", "idris", "jack")) .forEach(::println) println("Time elapsed: $timeElapsed") suspend fun fetchUserProfiles(profileIds: List<String>): List<Profile> = profileIds.map fetchUserProfileAsync(it) .map it.await() fun fetchUserProfileAsync(profileId: String): Deferred<Profile> = GlobalScope.async(executor) val bio = fetchBioOverHttpAsync(profileId) val picture = fetchPicturesFromS3BucketAsync(profileId) val documents = fetchDocumentsFromFtpServerAsync(profileId) Profile(bio.await(), picture.await(), documents.await()) fun fetchBioOverHttpAsync(id: String): Deferred<Bio> = GlobalScope.async(executor) println("fetchBioOverHttpAsync $Thread.currentThread().name") delay(1000) // delay to simulate 1 sec to fetch bio // Here write code to fetch bio from API Bio(id, (1..100).random(), listOf("male", "female", "na").random(), "location $('a'..'z').random()") fun fetchPicturesFromS3BucketAsync(id: String): Deferred<List<String>> = GlobalScope.async(executor) println("fetchPictureFromDBAsync $Thread.currentThread().name") delay(2000) // delay to simulate 2 sec to fetch pictures // Here write code to fetch pictures from S3 bucket listOf("picture of $id") fun fetchDocumentsFromFtpServerAsync(id: String): Deferred<List<String>> = GlobalScope.async(executor) println("fetchDocumentsFromFtpAsync $Thread.currentThread().name") delay(5000) // delay to simulate 5 sec to fetch documents // Here write code to fetch documents from FTP server listOf("document for $id")
/jdk-11.0.10.jdk/Contents/Home/bin/java com.example.concurrency.FetchProfileCoroutineAsyncKt fetchBioOverHttpAsync pool-1-thread-6 fetchBioOverHttpAsync pool-1-thread-8 fetchBioOverHttpAsync pool-1-thread-9 fetchBioOverHttpAsync pool-1-thread-10 fetchBioOverHttpAsync pool-1-thread-2 fetchBioOverHttpAsync pool-1-thread-4 fetchPictureFromDBAsync pool-1-thread-5 fetchPictureFromDBAsync pool-1-thread-3 fetchPictureFromDBAsync pool-1-thread-7 fetchPictureFromDBAsync pool-1-thread-3 fetchPictureFromDBAsync pool-1-thread-5 fetchPictureFromDBAsync pool-1-thread-7 fetchDocumentsFromFtpAsync pool-1-thread-3 fetchDocumentsFromFtpAsync pool-1-thread-7 fetchDocumentsFromFtpAsync pool-1-thread-5 fetchDocumentsFromFtpAsync pool-1-thread-7 fetchDocumentsFromFtpAsync pool-1-thread-3 fetchDocumentsFromFtpAsync pool-1-thread-5 fetchBioOverHttpAsync pool-1-thread-7 fetchBioOverHttpAsync pool-1-thread-3 fetchBioOverHttpAsync pool-1-thread-5 fetchBioOverHttpAsync pool-1-thread-8 fetchDocumentsFromFtpAsync pool-1-thread-5 fetchPictureFromDBAsync pool-1-thread-9 fetchPictureFromDBAsync pool-1-thread-2 fetchDocumentsFromFtpAsync pool-1-thread-10 fetchDocumentsFromFtpAsync pool-1-thread-3 fetchDocumentsFromFtpAsync pool-1-thread-4 fetchPictureFromDBAsync pool-1-thread-7 fetchPictureFromDBAsync pool-1-thread-1 Profile(bio=Bio(name=andrew, age=46, gender=male, location=location o), picture=[picture of andrew], documents=[document for andrew]) Profile(bio=Bio(name=billy, age=78, gender=na, location=location b), picture=[picture of billy], documents=[document for billy]) Profile(bio=Bio(name=charlie, age=69, gender=male, location=location n), picture=[picture of charlie], documents=[document for charlie]) Profile(bio=Bio(name=david, age=18, gender=female, location=location u), picture=[picture of david], documents=[document for david]) Profile(bio=Bio(name=emma, age=66, gender=female, location=location p), picture=[picture of emma], documents=[document for emma]) Profile(bio=Bio(name=flora, age=65, gender=female, location=location v), picture=[picture of flora], documents=[document for flora]) Profile(bio=Bio(name=gavin, age=76, gender=na, location=location v), picture=[picture of gavin], documents=[document for gavin]) Profile(bio=Bio(name=harry, age=32, gender=female, location=location i), picture=[picture of harry], documents=[document for harry]) Profile(bio=Bio(name=idris, age=18, gender=female, location=location b), picture=[picture of idris], documents=[document for idris]) Profile(bio=Bio(name=jack, age=42, gender=na, location=location p), picture=[picture of jack], documents=[document for jack]) Time elapsed 5038
We simulated our program to fetch user bio, pictures and documents in 1s, 2s, and 5s respectively, a total of 8s to fetch single user profile. With the help of asynchronous programming using Coroutines, we were able to fetch 10 user profiles in around 5s.
You see the total time taken (~ 5s) is almost equal to max delayed function ie fetch documents with delay (5s). Isn’t it amazing?
In Kotlin, each async operation
GlobalScope.async is a coroutinewhich returns a
Deferred result. Unlike threads, coroutines are not bound to any particular thread. A coroutine can start executing in one thread, suspend execution, and resume on a different thread.
Thread vs Coroutines
Kotlin coroutines are lightweight version of Java threads. Coroutines also use the Java thread pool behind the scene though there is a difference how coroutines perform tasks.
When a thread is performing a task which block for some time (IO operation or network call), thread needs to wait. Other threads in the thread pool can take turn to perform some other tasks while the previous thread is waiting. Operating system determines the scheduling of threads and context-switching, which is an extra overhead.
When a coroutine is performing a task under a thread which block for some time (IO operation or network call), coroutine is suspended, same thread can perform some other coroutines while the previous coroutine is waiting. Programming language (Kotlin) determines when to switch coroutines, which is lightweight.
In our program, while coroutine
fetchDocumentsFromFtpAsync wait for 5s, it get suspended, and the current thread can perform some other coroutines in that period say
fetchPictureFromDBAsync in 1s and 2s (total 3s), then it resumes back
fetchDocumentsFromFtpAsync. In this way, coroutines able to fetch bio, pictures, and document all in 5s using a single thread. This is just an example, in practical, these coroutines might have performed by different threads, but you got the gist. Coroutine provide a very high level of concurrency because of its non blocking nature and less overhead of switching threads.
If you use threads without coroutines, like Java, it would take 8s to fetch bio, pictures and document by a single thread.