Multiplexing Asynchronous Coroutines with Kotlin Select Expressions

Kotlin's select expression enables a coroutine to wait on several suspending operations simultaneous, proceeding with whichever becomes ready first.

Receiving from Multiple Channels

Consider two producers emitting distinct events at different intervals. The first generates a "Tick" every 300 milliseconds:

fun CoroutineScope.ticker() = produce<String> {
    while (isActive) {
        delay(300)
        send("Tick")
    }
}

The second emits a "Tock" every 500 milliseconds:

fun CoroutineScope.tocker() = produce<String> {
    while (isActive) {
        delay(500)
        send("Tock")
    }
}

Instead of blocking on a single channel, select lets you consume from both concurrently using onReceive clauses:

suspend fun raceChannels(tick: ReceiveChannel<String>, tock: ReceiveChannel<String>) {
    select<Unit> {
        tick.onReceive { value ->
            println("tick -> '$value'")
        }
        tock.onReceive { value ->
            println("tock -> '$value'")
        }
    }
}

Running this seven times demonstrates how the faster producer dominates, though the slower one still interleaves when the first suspends on send:

val tick = ticker()
val tock = tocker()
repeat(7) {
    raceChannels(tick, tock)
}
coroutineContext.cancelChildren()

Output:

tick -> 'Tick'
tock -> 'Tock'
tick -> 'Tick'
tick -> 'Tick'
tock -> 'Tock'
tick -> 'Tick'
tock -> 'Tock'

Handling Closed Channels

Using onReceive on a closed channel throws an exception. The onReceiveOrNull clause handles closure gracefully and also illustrates that select can return a value:

suspend fun chooseFirst(primary: ReceiveChannel<String>, fallback: ReceiveChannel<String>): String =
    select<String> {
        primary.onReceiveOrNull { value ->
            if (value == null) "Primary channel closed"
            else "primary -> '$value'"
        }
        fallback.onReceiveOrNull { value ->
            if (value == null) "Fallback channel closed"
            else "fallback -> '$value'"
        }
    }

Note that onReceiveOrNull is available only on channels of non-nullable types to avoid ambiguity between a closed channel and a null element.

If the primary channel delivers four numbered messages and the fallback delivers another four:

val primary = produce<String> {
    repeat(4) { send("Message $it") }
}
val fallback = produce<String> {
    repeat(4) { send("Signal $it") }
}
repeat(8) {
    println(chooseFirst(primary, fallback))
}
coroutineContext.cancelChildren()

The result shows two important characteristics:

primary -> 'Message 0'
primary -> 'Message 1'
fallback -> 'Signal 0'
primary -> 'Message 2'
primary -> 'Message 3'
fallback -> 'Signal 1'
Primary channel closed
Primary channel closed

select exhibits bias toward the first clause. Because both channels are unbuffered, the primary producer occasionally suspends, giving the fallback a chance to send. Once a channel closes, its onReceiveOrNull clause resolves immediately.

Sending with Select

The onSend clause combines neatly with this bias. The following producer attempts to deliver integers to a main consumer, routing overflow to a secondary channel when the primary is full:

fun CoroutineScope.routeValues(spill: SendChannel<Int>) = produce<Int> {
    for (num in 1..10) {
        delay(100)
        select<Unit> {
            onSend(num) { }
            spill.onSend(num) { }
        }
    }
}

A slow consumer processes each item in 250 milliseconds:

val spill = Channel<Int>()
launch {
    spill.consumeEach { println("Spill received $it") }
}
routeValues(spill).consumeEach {
    println("Main received $it")
    delay(250)
}
println("Processing complete")
coroutineContext.cancelChildren()

Because the main consumer cannot keep pace, odd items are diverted to the spill channel:

Main received 1
Spill received 2
Spill received 3
Main received 4
Spill received 5
Spill received 6
Main received 7
Spill received 8
Spill received 9
Main received 10
Processing complete

Awaiting Deferred Values

Deferred results can be queried with onAwait. Start an async computation that finishes after a specified delay:

fun CoroutineScope.delayedText(duration: Int) = async {
    delay(duration.toLong())
    "Finished after ${duration}ms"
}

Launch a dozen such computations with random durations:

fun CoroutineScope.launchDelayedJobs(): List<Deferred<String>> {
    val rnd = Random(7)
    return List(12) { delayedText(rnd.nextInt(1000)) }
}

Then use select as a DSL to react to whichever completes first:

val jobs = launchDelayedJobs()
val winner = select<String> {
    jobs.withIndex().forEach { (idx, deferred) ->
        deferred.onAwait { result ->
            "Job $idx won with '$result'"
        }
    }
}
println(winner)
val running = jobs.count { it.isActive }
println("$running coroutines are still running")

Typical output:

Job 4 won with 'Finished after 128ms'
11 coroutines are still running

Switching Between Deferred Values

You can also replace the current deferred whenever a new one arrives on a channel. The following function waits on the most recently received deferred, yielding its string once ready, but switches immediately if the channel delivers a replacement:

fun CoroutineScope.observeLatest(source: ReceiveChannel<Deferred<String>>) =
    produce<String> {
        var pending = source.receive()
        while (isActive) {
            val incoming = select<Deferred<String>?> {
                source.onReceiveOrNull { candidate ->
                    candidate
                }
                pending.onAwait { value ->
                    send(value)
                    source.receiveOrNull()
                }
            }
            if (incoming == null) {
                println("Source closed")
                break
            } else {
                pending = incoming
            }
        }
    }

A helper for testing:

fun CoroutineScope.delayedResult(text: String, ms: Long) = async {
    delay(ms)
    text
}

The driver sends overlapping tasks:

val updates = Channel<Deferred<String>>()
launch {
    observeLatest(updates).consumeEach { println(it) }
}
updates.send(delayedResult("INIT", 100))
delay(200)
updates.send(delayedResult("LONG", 500))
delay(100)
updates.send(delayedResult("SHORT", 100))
delay(500)
updates.send(delayedResult("LAST", 500))
delay(1000)
updates.close()
delay(500)

Output:

INIT
SHORT
LAST
Source closed

The first result prints once its delay elapses. The second task arrives before it can finish, but the third task supersedes it before the long one completes, demonstrating how select combines onReceiveOrNull and onAwait to implement cancellation-free switching.

Tags: kotlin Coroutines SELECT channels deferred

Posted on Mon, 29 Jun 2026 17:23:47 +0000 by hashim