본문 바로가기

Kotlin

ChannelFlow, CallbackFlow 제대로 알기

Unsplash, Chris Stenger.

 

ChannelFlow

ProducerScope 를 통해 빌더 코드 블록에 sendChannel 이 제공됩니다. 해당 Channel 로 보내지는 원소들로 Cold Flow 인스턴스를 생성, 이를 반환합니다.

서로 다른 Context 나 동시에 실행되는 코드에서 요소를 방출합니다.

 

해당 빌더는 Thread Safe 이므로, 다른 Context 에서 동시에 사용될 수 있습니다.

기본적인 용법은 다음과 같습니다.

 

CoroutineScope(Dispatchers.Default).launch {
    channelFlow {
        CoroutineScope(Dispatchers.IO).launch {
            send(3) // 네트워킹
        }

        CoroutineScope(Dispatchers.IO).launch {
            send(5) // 로컬 DB
        }
        awaitClose()
    }.onEach {
        println(it)
    }.collect()
}

// Console

I/System.out: 3
I/System.out: 5

 

기본 버퍼 크기의 채널이 사용되며, 반환되는 Flow 에 버퍼 연산자를 사용하여 사용자의 임의값을 정의하고

데이터가 소비되는 것보다 빠르게 생성될 때의 문제를 조절할 수 있습니다.

당연히 BufferOverflow 에 대한 전략 역시 제공합니다.

 

간단히 말해, Channel 을 기본적으로 제공받는 Flow 라고 보시면 될 것 같습니다. 해당 Channel 에 

여러 Context 에서 반환되는 값을 전송하여, 이를 하나의 ColdFlow 로 반환하는 것입니다.

 


 

CallbackFlow

ProducerScope 를 통해 제공된 sendChannel 에 원소를 보내어 Cold Flow 인스턴스를 생성하는 것까진 같습니다. 다만, 이름에서도 알 수 있듯, Callback 을 베이스로한 API 등에서 편하게 사용할 수 있습니다.

즉, 비동기 통신 등을 통해 Callback 으로 전송된 데이터를 Flow 로 반환합니다.

 

CoroutineScope(Dispatchers.IO).launch {
    callbackFlow {
        client.getPosts().enqueue(object : Callback<JsonObject> {
            override fun onResponse(
                call: Call<JsonObject>,
                response: Response<JsonObject>
            ) {
                trySend(response.body())
                close()
            }

            override fun onFailure(call: Call<JsonObject>, t: Throwable) {

            }
        })

        awaitClose {
            println("Networking is done.")
        }

    }.onEach { json ->
        println(json?.get("title"))
    }.collect()
}

// Console

I/System.out: "sunt aut facere repellat provident occaecati excepturi..." // 비동기로 받아 온 값
I/System.out: Networking is done. // Channel 닫힌 후 awaitClose { } 에서 출력

 

 

정말 유용한 API 입니다. 비동기 통신 Handler 코드가 잘 짜여진 상태라면, CallbackFlow 만 사용해도 꽤 깔끔한 코드를 쓸 수 있다고 생각합니다.

 

Flow 수집이 취소될 때는 메모리 누수 방지를 위해 꼭 awaitClose() 를 사용해야 합니다. 그렇지 않으면

수집이 모두 완료되어도 계속 콜백이 실행될 수 있습니다.

 

버퍼에 관한 특징은 ChannelFlow 와 정확히 같습니다.

 

Callback 으로 들어오는 데이터를 Flow 로 반환하고 싶을 때 사용하면 됩니다.

'Kotlin' 카테고리의 다른 글

by 를 사용한 Kotlin 의 Delegation Pattern  (0) 2022.11.17
확장 함수 간단 정리  (0) 2022.11.15
Channel 제대로 알기  (0) 2022.11.02
StateFlow, SharedFlow 제대로 알기  (0) 2022.11.01
Sealed Class, Enum 제대로 알기  (0) 2022.10.28