Async Fetch Data from N Sources and Combine In Kotlin

0
9

Fetch Data from multiple sources asynchronously using Kotlin Coroutines and combine them in Kotlin

Use case

Let’s consider a use case where user profile information comes from multiple sources: –

  1. User Bio information from http API endpoint
  2. User profile pictures from S3 bucket
  3. User documents from FTP server

We want to use Kotlin coroutines to fetch the profile of 10 users from these 3 different sources asynchronously and return the combined result.

Program

We will do the following things to achieve this: –

  1. We create 3 functions to fetch bio, pictures and documents asynchronously using GlobalScope.asyncwhich return a Deferred result sometime in future. These 3 functions are: –
    fetchBioOverHttpAsync
    fetchPicturesFromS3BucketAsync
    fetchDocumentsFromFtpServerAsync
  2. Next, we create a function to fetch user profile fetchUserProfileAsync asynchronously, which wait for the results using await() from above 3 sources as they arrive, combine them, and return the Deferred result
  3. Last, we create a function to fetch multiple user profiles fetchUserProfileswhich map the result using await() for multiple user profile as they arrive and return the list of user profiles.
  4. We also create a Coroutine dispatcher fixed thread pool of size 10 using Java executor service Executors.newFixedThreadPool(10).asCoroutineDispatcher() and use this to spawn the threads for all our asynchronous operations.

We use the suffix Async in function names which returns Deferred result, which is a good practice to identify asynchronous functions.

package com.example.concurrency

import kotlinx.coroutines.*
import java.util.concurrent.Executors
import kotlin.system.measureTimeMillis

val executor = Executors.newFixedThreadPool(10).asCoroutineDispatcher()

data class Profile(val bio: Bio, val picture: List<String>, val documents: List<String>)

data class Bio(val name: String, val age: Int, val gender: String, val location: String)

fun main() 

    runBlocking 

        val timeElapsed = measureTimeMillis 
            // fetch profile for 10 users
            fetchUserProfiles(listOf("andrew", "billy", "charlie", "david", "emma", "flora", "gavin", "harry", "idris", "jack"))
                .forEach(::println)
        

        println("Time elapsed: $timeElapsed")
    


suspend fun fetchUserProfiles(profileIds: List<String>): List<Profile> = profileIds.map  fetchUserProfileAsync(it) .map  it.await() 

fun fetchUserProfileAsync(profileId: String): Deferred<Profile> = GlobalScope.async(executor) 

    val bio = fetchBioOverHttpAsync(profileId)
    val picture = fetchPicturesFromS3BucketAsync(profileId)
    val documents = fetchDocumentsFromFtpServerAsync(profileId)

    Profile(bio.await(), picture.await(), documents.await())


fun fetchBioOverHttpAsync(id: String): Deferred<Bio> = GlobalScope.async(executor) 
    println("fetchBioOverHttpAsync $Thread.currentThread().name")
    delay(1000)
    // delay to simulate 1 sec to fetch bio
    // Here write code to fetch bio from API
    Bio(id, (1..100).random(), listOf("male", "female", "na").random(), "location $('a'..'z').random()")



fun fetchPicturesFromS3BucketAsync(id: String): Deferred<List<String>> = GlobalScope.async(executor) 
    println("fetchPictureFromDBAsync $Thread.currentThread().name")
    delay(2000)
    // delay to simulate 2 sec to fetch pictures
    // Here write code to fetch pictures from S3 bucket
    listOf("picture of $id")


fun fetchDocumentsFromFtpServerAsync(id: String): Deferred<List<String>> = GlobalScope.async(executor) 
    println("fetchDocumentsFromFtpAsync $Thread.currentThread().name")
    delay(5000)
    // delay to simulate 5 sec to fetch documents
    // Here write code to fetch documents from FTP server
    listOf("document for $id")

Run

/jdk-11.0.10.jdk/Contents/Home/bin/java com.example.concurrency.FetchProfileCoroutineAsyncKt
fetchBioOverHttpAsync pool-1-thread-6
fetchBioOverHttpAsync pool-1-thread-8
fetchBioOverHttpAsync pool-1-thread-9
fetchBioOverHttpAsync pool-1-thread-10
fetchBioOverHttpAsync pool-1-thread-2
fetchBioOverHttpAsync pool-1-thread-4
fetchPictureFromDBAsync pool-1-thread-5
fetchPictureFromDBAsync pool-1-thread-3
fetchPictureFromDBAsync pool-1-thread-7
fetchPictureFromDBAsync pool-1-thread-3
fetchPictureFromDBAsync pool-1-thread-5
fetchPictureFromDBAsync pool-1-thread-7
fetchDocumentsFromFtpAsync pool-1-thread-3
fetchDocumentsFromFtpAsync pool-1-thread-7
fetchDocumentsFromFtpAsync pool-1-thread-5
fetchDocumentsFromFtpAsync pool-1-thread-7
fetchDocumentsFromFtpAsync pool-1-thread-3
fetchDocumentsFromFtpAsync pool-1-thread-5
fetchBioOverHttpAsync pool-1-thread-7
fetchBioOverHttpAsync pool-1-thread-3
fetchBioOverHttpAsync pool-1-thread-5
fetchBioOverHttpAsync pool-1-thread-8
fetchDocumentsFromFtpAsync pool-1-thread-5
fetchPictureFromDBAsync pool-1-thread-9
fetchPictureFromDBAsync pool-1-thread-2
fetchDocumentsFromFtpAsync pool-1-thread-10
fetchDocumentsFromFtpAsync pool-1-thread-3
fetchDocumentsFromFtpAsync pool-1-thread-4
fetchPictureFromDBAsync pool-1-thread-7
fetchPictureFromDBAsync pool-1-thread-1
Profile(bio=Bio(name=andrew, age=46, gender=male, location=location o), picture=[picture of andrew], documents=[document for andrew])
Profile(bio=Bio(name=billy, age=78, gender=na, location=location b), picture=[picture of billy], documents=[document for billy])
Profile(bio=Bio(name=charlie, age=69, gender=male, location=location n), picture=[picture of charlie], documents=[document for charlie])
Profile(bio=Bio(name=david, age=18, gender=female, location=location u), picture=[picture of david], documents=[document for david])
Profile(bio=Bio(name=emma, age=66, gender=female, location=location p), picture=[picture of emma], documents=[document for emma])
Profile(bio=Bio(name=flora, age=65, gender=female, location=location v), picture=[picture of flora], documents=[document for flora])
Profile(bio=Bio(name=gavin, age=76, gender=na, location=location v), picture=[picture of gavin], documents=[document for gavin])
Profile(bio=Bio(name=harry, age=32, gender=female, location=location i), picture=[picture of harry], documents=[document for harry])
Profile(bio=Bio(name=idris, age=18, gender=female, location=location b), picture=[picture of idris], documents=[document for idris])
Profile(bio=Bio(name=jack, age=42, gender=na, location=location p), picture=[picture of jack], documents=[document for jack])
Time elapsed 5038

We simulated our program to fetch user bio, pictures and documents in 1s, 2s, and 5s respectively, a total of 8s to fetch single user profile. With the help of asynchronous programming using Coroutines, we were able to fetch 10 user profiles in around 5s.

You see the total time taken (~ 5s) is almost equal to max delayed function ie fetch documents with delay (5s). Isn’t it amazing?

In Kotlin, each async operation GlobalScope.async is a coroutinewhich returns a Deferred result. Unlike threads, coroutines are not bound to any particular thread. A coroutine can start executing in one thread, suspend execution, and resume on a different thread.

Thread vs Coroutines

Kotlin coroutines are lightweight version of Java threads. Coroutines also use the Java thread pool behind the scene though there is a difference how coroutines perform tasks.

Thread

When a thread is performing a task which block for some time (IO operation or network call), thread needs to wait. Other threads in the thread pool can take turn to perform some other tasks while the previous thread is waiting. Operating system determines the scheduling of threads and context-switching, which is an extra overhead.

Coroutines

When a coroutine is performing a task under a thread which block for some time (IO operation or network call), coroutine is suspended, same thread can perform some other coroutines while the previous coroutine is waiting. Programming language (Kotlin) determines when to switch coroutines, which is lightweight.

In our program, while coroutine fetchDocumentsFromFtpAsync wait for 5s, it get suspended, and the current thread can perform some other coroutines in that period say fetchBioOverHttpAsync and fetchPictureFromDBAsync in 1s and 2s (total 3s), then it resumes back fetchDocumentsFromFtpAsync. In this way, coroutines able to fetch bio, pictures, and document all in 5s using a single thread. This is just an example, in practical, these coroutines might have performed by different threads, but you got the gist. Coroutine provide a very high level of concurrency because of its non blocking nature and less overhead of switching threads.

If you use threads without coroutines, like Java, it would take 8s to fetch bio, pictures and document by a single thread.

Source

LEAVE A REPLY

Please enter your comment!
Please enter your name here