Michael Evans

A bunch of technobabble.

Turning Any Android Callback Into a Flow With callbackFlow

| Comments

If you’ve been working with Android for any amount of time, you’ve probably run into APIs that expose their results using callbacks or listeners. Whether it’s something like LocationManager, a custom SDK, or a third-party service like Firebase, you’re often stuck adapting old-school async patterns into your modern reactive code.

This approach is especially helpful in apps using Jetpack Compose, coroutines, or unidirectional data flow.

Fortunately, Kotlin’s callbackFlow makes this much easier. In this post, we’ll show how to wrap a listener-based API using callbackFlow, so you can collect updates as a Flow. We’ll use the Firebase Realtime Database as an example, but this pattern works for nearly anything.

The Problem: Listeners Aren’t Reactive

Here’s the classic way of listening to changes in the Firebase Realtime Database:

1
2
3
4
5
6
7
8
9
10
11
val postListener = object : ValueEventListener {
    override fun onDataChange(dataSnapshot: DataSnapshot) {
        val post = dataSnapshot.getValue<Post>()
        // update UI
    }

    override fun onCancelled(error: DatabaseError) {
        Log.w(TAG, "loadPost:onCancelled", error.toException())
    }
}
postReference.addValueEventListener(postListener)

This works fine — but it’s imperative and not easily composable with things like StateFlow, LiveData, or Jetpack Compose. Let’s fix that.

The Fix: Wrap It with callbackFlow

Kotlin’s callbackFlow is designed for exactly this kind of situation — where you need to bridge a listener-based API into a reactive stream.

Here’s what it looks like for Firebase:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun Query.asFlow(): Flow<DataSnapshot> = callbackFlow {
    val listener = object : ValueEventListener {
        override fun onDataChange(snapshot: DataSnapshot) {
            trySend(snapshot).isSuccess // emit each snapshot into the Flow
        }

        override fun onCancelled(error: DatabaseError) {
            close(error.toException()) // cancel the flow on error
        }
    }

    // Start listening for updates
    addValueEventListener(listener)

    // Suspend until the flow is closed or cancelled, then clean up
    awaitClose { removeEventListener(listener) }
}

Let’s break that down:

  • callbackFlow { ... }: Gives you a coroutine-safe way to emit values (send / trySend) from a callback-based API.
  • trySend(snapshot): Emits a value into the flow. This is non-blocking and returns a ChannelResult, which we’re ignoring here.
  • close(error.toException()): If the Firebase listener is cancelled, we close the flow with the exception. This cancels any active collectors.
  • awaitClose { ... }: Suspends the coroutine until the collector stops collecting (i.e., it’s cancelled or the flow completes), and is the right place to clean up listeners or resources.

The result is a clean Flow<DataSnapshot> that behaves just like you’d want: it emits every time Firebase sends an update and automatically stops listening when the consumer stops collecting.

Consuming the Flow

You can now collect this from anywhere in your app. Here’s a quick example:

1
2
3
4
5
6
lifecycleScope.launch {
    postReference.asFlow().collect { snapshot ->
        val post = snapshot.getValue<Post>()
        // Update UI or state
    }
}

This could also be collected in a ViewModel, transformed with map, or turned into Compose State. But that’s outside the scope of this post — we just wanted to show the full round trip here.

A Note on .snapshots

If you’re using the Firebase Realtime Database, you might have seen the (relatively new) snapshots extension, which does exactly this out of the box:

1
postReference.snapshots.collect { snapshot -> /* ... */ }

So if you’re only dealing with Firebase, you might not need your own wrapper. But the callbackFlow approach is still incredibly useful for any API that doesn’t have a ready-made Flow extension.

Wrap-Up

callbackFlow is a powerful tool for adapting listener-based or callback-heavy APIs into clean, composable Kotlin Flows. Once you wrap something like this, you can:

  • Collect updates in your ViewModel or UI layer
  • Combine it with other Flows
  • Debounce, map, retry, or buffer updates

Firebase was just the example here — but this trick works great for things like:

  • SensorManager
  • TextWatcher
  • LocationManager
  • WebSocket libraries
  • Custom event systems

Once you get used to wrapping listeners this way, it’s hard to go back.

Comments