동기
친분이 있던 안드로이드 개발자분과 만나 대화를 나누던 중, 과거에 봤던 어느 기업의 과제 테스트 이야기가 나왔습니다. 해당 과제의 요구 사항 중, 단시간 내 연속적 네트워크 요청에 의해 발생할 수 있는 리소스의 낭비를 막아야 하는 것이 있었는데, 이에 대해 이야기를 나누다 'RxJava 에 그러한 기능을 쉽게 구현할 수 있는 기능이 있다'고 말씀해주셨습니다. '동일한 기능이 Flow<T> 에도 있지 않을까?' 싶어서 찾아보다보니, 생각보다 다양한 연산자들이 존재해서, 오늘은 이에 대해 다뤄보려 합니다.
연산자의 종류
Flow<T> 는 데이터 스트림을 처리하기 위해 만들어졌기 때문에, 시간의 흐름에 따라 데이터를 처리할 수 있는 다양한 연산자가 마련되어 있습니다. 데이터 스트림 처리를 시작하기 위해 사용되는 생성 연산자, 데이터를 변형하고 조작할 수 있는 중간 연산자, 전달된 데이터를 수집하여 사용할 수 있도록 하는 종단 연산자로 크게 분류됩니다. 연산자의 종류가 다양한 만큼, 원하는 기능 동작에 따라 연산자를 다양하게 조합하여 구현이 가능하므로, 여러 상황에 대응하기 좋습니다.
생성 연산자
이름 그대로, Flow<T> 구현체를 생성하기 위해 사용됩니다. flow {}, flowOf(), asFlow(), channelFlow {} 등이 있습니다. 모두 콜드 스트림을 반환하며, 사용법이 매우 단순합니다.
val numberFlow = flowOf(1, 2, 3, 4, 5)
val listFlow = listOf("a", "b", "c").asFlow()
val customFlow = flow {
emit("Start")
delay(100)
emit("End")
}
val channelFlow = channelFlow {
send(1)
send(2)
launch {
send(3)
}
}
그 중 성격이 조금 다른 건 channelFlow {} 인데, 블럭 내부가 ProducerScope 이기 때문에 내부에서 추가적인 Coroutines 생성이 굉장히 간편합니다.
callbackFlow {} 도 있는데, 비동기적으로 동작하는 콜백 이벤트에서 데이터를 얻는 경우, 이를 Flow<T> 형태로 전달하여 수집할 수 있도록 합니다.
fun getStringFlow(): Flow<String> = callbackFlow {
val callback = object : Callback {
override fun onResult(data: String) {
trySend(data)
}
}
someApi.setCallback(callback)
awaitClose {
someApi.removeCallback(callback) // 종료 시 콜백 정리
}
}
중간 연산자
List<T> 나 Array<T> 와 같은 Iterable 들에 사용되는 확장 함수들, 그리고 그 외 다른 다양한 메서드들이 존재하는 연산자 묶음입니다. map {}, filter {}, take {}, takeWhile {} 등은 그 이름도 직관적이고 용례도 많기 때문에 구태여 설명할 필요는 없어 보입니다.
transform {}
각 요소에 다양한 액션을 취할 수 있습니다. map {} 이나 여타 다른 메서드들도 같은 역할을 수행할 수 있지만, 내부에서 추가적으로 emit() 메서드를 호출할 수 있다는 점이 달라 상당히 유연한 메서드입니다.
val flow = flowOf(1, 2, 3)
flow.transform {
emit(it - 1)
emit(it)
emit(it + 1)
}
flatMapConcat {}, flatMapMerge {}, flatMapLatest {}
'flat' 이 붙는 메서드들은 모두 평탄화 작업을 수행합니다. 즉, 여러 Flow<T> 에서 들어오는 데이터들을 하나의 Flow<T> 로 수집할 수 있습니다. flatMapConcat {} 은 여러 Flow<T> 를 순차적으로 받아 낼 수 있고, flatMapMerge {} 는 flatMapConcat {} 과 다르게 순서 상관 없이 받아 낼 수 있습니다. flatMapLatest {} 는 다른 'Latest' 가 붙는 메서드들과 똑같이 최신의 값을 받아내면 이전 처리 작업이 취소되고 새로운 값에 대한 처리가 진행되어 값을 받아 볼 수 있습니다.
특이한 점은, 처리가 진행되는 각 원소마다 블럭 내부에서 새로운 Flow<T> 를 생성한다는 점인데요. 이는 여러 비동기 작업을 통해 전달되는 데이터를 활용한 추가적인 비동기 작업에 대한 지원을 가능하게 합니다. 즉, 전달되는 값을 활용해 또 다른 비동기 작업이 진행될 수 있고, 이 작업들의 결과를 하나의 Flow<T> 로 간편하게 처리할수 있습니다.
suspend fun asynchronousJob() {
// ...
}
val flow = flowOf(1, 2, 3, 4, 5)
lifecycleScope.launch {
flow.flatMapConcat { value ->
flow {
emit(asynchronousJob())
}
}.collect {
println("Value $it")
}
}
이와 같이, 값으로부터 파생되는 비동기 작업들의 결과를 간편하게 받아 볼 수 있도록 하는 매우 유용한 메서드입니다.
이어 기술할 combine() 이나 zip() 으로도 구현할 수는 있겠지만, 공수나 구현 복잡도를 생각하면 이러한 메서드를 쓰는 쪽이 훨씬 나은 방향일 겁니다.
combine(), zip()
두 메서드 모두 Flow<T> 를 다른 Flow<T> 와 결합하는 역할을 수행합니다. 다만, 하는 모양새가 조금 다릅니다.
combine() 메서드의 경우, 원소의 개수와 상관없이 데이터가 방출될 때의 값을 기준으로 묶어 방출됩니다. 반대로 zip() 메서드의 경우, 꼭 한 쌍을 지어 값을 방출합니다.
flowA.combine(flowB) { int, char ->
println("Flow Combine $int, $char")
}.collect()
flowA.zip(flowB) { int, char ->
println("Flow Zip $int, $char")
}.collect()
combine(flowA, flowB, flowC) { int, char, float ->
println("Combine $int, $char, $float")
}.collect()
//
Flow Combine 1, A
Flow Combine 2, A
Flow Combine 2, B
Flow Combine 3, B
Flow Combine 3, C
Flow Combine 4, C
Flow Zip 1, A
Flow Zip 2, B
Flow Zip 3, C
Combine 1, A, 0.1
Combine 2, A, 0.1
Combine 2, B, 0.1
Combine 2, B, 0.2
Combine 3, B, 0.2
Combine 3, C, 0.2
Combine 3, C, 0.3
Combine 4, C, 0.3
Flow Zip 의 경우, 묶이는 Flow<T> 의 원소 개수가 다르므로 세 쌍만 방출되는 모습입니다.
Flow<T> 의 확장 함수인 combine() 메서드와 combine() 퍼블릭 메서드의 차이도 있는데, Flow<T> 의 확장 함수의 경우 하나의 Flow<T> 만 결합 가능하고, 퍼블릭 메서드의 경우 여러 개의 Flow<T> 를 결합할 수 있습니다. 각 메서드들 모두, 결합에 동원된 Flow<T> 들이 모두 한 개 이상의 원소를 방출해야만 실행됩니다.
다양한 흐름 제어 연산자
흐름 제어 연산자 역시 중간 연산자에 속하며, 포스팅의 동기가 된 debounce() 를 비롯하여 sample(), distinctUntilChanged(), onEach {}, retry(), catch {} 등 다양합니다. 값을 직접 수정하는 기능보다는 한 Flow<T> 객체의 거시적 흐름을 통제하거나, 값마다의 별도 액션을 취해주기 위해 사용됩니다.
debounce()
Long 값 하나를 전달하여 값의 연속 방출 중 일정 시간 멈춘 뒤에만 값을 처리하도록 설정합니다. 수행했던 사전 과제의 요구 사항이 검색 기능 구현이었는데요. 타이핑 후 바로 검색이 되어서는 안 되고, 연속적인 타이핑이 있을 시 직전 요청을 취소하고 새로운 요청을 보내야했습니다. 이러한 기능이 있는 줄 모르고 Coroutine Job 을 취소했다가, 또 재할당했다가... 구현 복잡도는 물론이고 코드 가독성까지 엉망인 코드를 제출했었습니다. 이와 같은 기능은 debounce + CollectLatest(또는 FlatMapLatest) 로 구현을 하는 편이 훨씬 나은 방향이었죠. 다음과 같이 말입니다.
searchQueryFlow
.debounce(300)
.flatMapLatest { query ->
flow {
val result = api.search(query)
emit(result)
}
}
.collect { showResults(it) }
sample()
debounce() 메서드와 같이 Long 값 하나를 넘깁니다. 정해진 주기마다 최신의 값을 하나씩 방출하는 방식입니다. 해당 메서드 역시 사용할 곳이 무궁무진한데, Coroutines 를 사용하여 구현하려면 어마무시한 공수가 들 수 있는 구현입니다.
val flow = flow {
emit("A") // 0ms
delay(100)
emit("B") // 100ms
delay(100)
emit("C") // 200ms
delay(100)
emit("D") // 300ms
delay(100)
emit("E") // 400ms
}
flow.sample(300)
.collect { println(it) }
//
C
E
onEach {}, retry(), catch() 는 다소 직관적입니다. 값을 변경하기 위해 사용되는 메서드들은 아니고, Flow<T> 자체의 거시적 흐름에 대한 통제가 가능합니다. onEach {} 의 경우, 방출되는 값마다 부수 효과를 발생시키기에 적절하고(특히 로깅에 적합하다고 생각합니다.), retry() 메서드는 이름처럼 재시도에 사용되는데, 내부적으로 Throwable 을 전달하기 때문에 발생하는 오류에 따라 분기 처리하기가 좋습니다. 마지막으로 catch {} 역시, 일반적인 catch 관련 메서드와 같이 발생하는 오류를 캐치하는 기능을 수행합니다. 이후 액션은 개발자가 구현하기 나름이고요.
종단 연산자
Flow<T> 를 최종 소비할 수 있도록 하는 연산자 모음입니다. 이름과 같이 더 이상 연결될 연산자가 없는 종점입니다. 이 역시 다양한 연산자가 존재하는데, 살펴 볼만한 것은 collect {}, launchIn() 입니다. 그 외 reduce {}, fold {} 와 같은 익숙한 연산자들은 Iterable<t> 의 그것들과 같은 메서드입니다.
collect {}
Flow<T> 수집의 가장 기본이 되는 연산자입니다. Flow<T> 는 결국 종단 연산자를 통해 수집되어야만 값을 방출하기 때문에, collect() 되지 않으면 동작하지 않습니다. 즉, 원하는 값을 받을 수 있으니 보내달라고 요청하는 겁니다.
val flowA = flowOf(1, 2, 3, 4)
val flowB = flowOf('A', 'B', 'C')
flowA.onEach { println(it) }
flowB.onEach { println(it) }.collect()
//
A
B
C
위와 같이, collect() 메서드를 호출하지 않으면 아무런 일도 일어나지 않습니다.
launchIn()
CoroutineScope 를 인자로 넘겨줄 수 있는 종단 연산자입니다. 넘겨주는 CoroutineScope 내에서 Flow<T> 를 수집하도록 하기 위해 사용됩니다. 저는 보통 ViewModel 에서 Flow<T> 를 수집할 때, 수집과 관계 없이 ViewModel 의 생명주기에 따라 자동 취소되도록 하여 메모리 누수를 방지하기 위해 사용합니다.
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
내부 구현은 위와 같이 심플합니다. 전달한 CoroutineScope 에서 launch {} 합니다.
이와 비슷한 메서드로 stateIn(), shareIn() 이 있는데, 전달한 CoroutineScope 에서 각각 StateFlow<T>, SharedFlow<T> 로 생성하여 값을 방출하도록 합니다.
다양한 상황에서 사용되는 컴포넌트이다 보니, 유연한 활용을 위해 사용되는 메서드가 굉장히 많습니다. 적재적소에 적절한 형태로 활용하기 좋아보이는데, 또 그만큼 복잡할 수 있기에 꾸준히 사용하면서 체화할 수 있도록 해야겠네요.
'Kotlin' 카테고리의 다른 글
Generics 의 out, in 다시 보기 (0) | 2025.01.15 |
---|---|
Effective Kotlin, 좋았던 부분들. (0) | 2023.11.04 |
Coroutines 파헤치기 (1) | 2023.10.06 |
Flow Cancellation (feat. Exception Handling) (0) | 2023.09.24 |
Coroutine Details (0) | 2023.06.22 |