본문 바로가기

Kotlin

Flow Cancellation (feat. Exception Handling)

Unsplash, Zack Dutra.

Flow

Flow 는 순차적으로 값을 보내고 성공 또는 예외와 함께 완료되는 비동기 데이터 스트림입니다.

Flow 에 대해서는 이전에 포스팅한 글이 있으니, Flow 에 대한 이해가 부족하시다면 다음 글을 참고해주세요.

 

 

Flow 제대로 알기

안드로이드 개발을 진지하게 하고 계시다면, 앱 아키텍처에 대해 잘 알고 계시리라 생각합니다. 특히 MVVM 의 개념적 설명에 대해서는 정말 여기 저기서 많이 보셨을 겁니다. 저는 처음 MVVM 을 데

blothhundr.tistory.com

 

Flow 는 데이터를 방출, 가공, 수집할 수 있도록 합니다. 수집은 필히 CoroutineScope 내에서 이루어져야 하는데, 이는 Flow 가 비동기 데이터를 처리하기 위해 설계 및 구현되었기 때문입니다. 

 

'방출한 데이터를 가공하고 수집할 수 있다'는 특징 덕에, 멀티 모듈로 레이어를 나눈 프로젝트에서 각 레이어의 데이터 모델에 맞게 데이터를 매핑하는 과정에 Flow 를 사용하기 적합합니다. 이는 곧, 보다 견고한 아키텍처를 구성하는 데에 큰 도움이 되고요.

 

아키텍처가 견고하다는 것은 예사로운 예외에 대한 대응이 잘 이루어져 있다는 것인데, Flow 의 내부에는 기본적인 예외 처리에 대한 로직이 잘 마련되어 있습니다. 이는 개발자가 예외 처리에 큰 리소스를 투입하지 않아도 된다는 것을 의미합니다.

 

저는 꽤 많은 프로젝트에서 Flow 를 사용해 왔지만, Flow 의 취소에 대해서는 정확히 알고 있지 않았습니다. 그럼에도 불구하고, Flow 를 사용하는 데에는 큰 문제나 이슈가 없었습니다. 문득, 내부 설계가 얼마나 안정적인지 궁금해졌습니다.

 


Flow Completion / Cancellation

기본적으로 Flow 가 완료되거나 취소되는 경우는 다음과 같습니다.

 

  • 모든 요소가 성공적으로 방출 되었을 때
  • 에러가 발생했을 때
  • 수동으로 취소하였을 때
  • 조건에 따라 중지시킬 때

모든 요소가 성공적으로 방출 되었을 때

CoroutineScope 내의 Flow 에서 더 이상 방출할 데이터가 없을 때, Flow 는 완료됩니다. 이는 Coroutines 의 완료 조건에 의한 것인데, CoroutineScope 는 내부에 더 이상 실행할 코드가 없을 때 완료됩니다. 

 

lifecycleScope.launch {
    flow {
    	emit(1) 
    }.onCompletion { throwable ->
        println("Flow is Complete: $throwable")
    }.collect {
        println(it)
    }
}.invokeOnCompletion { throwable ->
    println("Coroutine is Complete: $throwable")
}

// Console

1
Flow is Complete: null
Coroutine is Complete: null

 

lifecycleScope 내에 flow 빌더에 의해 생성된 Flow 가 모든 값을 방출하고 난 뒤에는 lifecycleScope 내에 실행할 코드가 더 이상 존재하지 않으므로, collect { } 내의 코드가 실행된 뒤, Flow onCompletion { } 이 호출되고, 그 뒤에 lifecycleScopeinvokeOnCompletion { } 이 호출됩니다.

에러가 발생했을 때

에러는 3개의 포인트에서 발생할 수 있습니다. CoroutineScope 내부와 Flow 사이, Upstream, Downstream 입니다.

1. CoroutineScope 내부와 Flow 사이

val coroutineExceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
    println("CoroutineExceptionHandler : $throwable")
}

lifecycleScope.launch {
	throw Exception("Aiming Driven Development")
    flow {
        emit(1)
    }.onCompletion { throwable ->
        println("Flow is Complete: $throwable")
    }.collect {
        println(it)
    }
}.invokeOnCompletion { throwable ->
    println("Coroutine is Complete: $throwable")
}

// Console

CoroutineExceptionHandler : java.lang.Exception: Aiming Driven Development
Coroutine is Complete: java.lang.Exception: Aiming Driven Development

 

lifecycleScopeCoroutineContext CoroutineExceptionHandler 를 전달해주었습니다. Flow 의 데이터 방출 및 수집이 시작되기 전에 예외가 발생하였고, CoroutineExceptionHandler 가 이를 핸들링합니다. 물론 CoroutineExceptionHandler 를 전달하지 않는다면 CoroutineScope 는 예외를 핸들링하지 못하고 프로그램이 종료됩니다. 즉, CoroutineScope 내부와 Flow 사이에서 발생한 예외는 별도로 예외 처리를 수행해주어야만 합니다.

2. Upstream

FlowUpstream Downstream 로 나뉩니다. 보통 Upstream 은 데이터의 흐름에서 상위 또는 시작 지점을 나타냅니다. 데이터 소스로부터 데이터가 생성되는 부분을 의미합니다. 반대로 Downstream 은 데이터가 가공 및 소비되는 부분을 지칭합니다. 먼저, Upstream 에서 예외가 발생하는 경우입니다.

 

lifecycleScope.launch {
    flow {
        throw Exception("Aiming Driven Development")
        emit(1)
    }.onCompletion { throwable ->
        println("Flow is Complete: $throwable")
    }.onEach {
        println(it)
    }.catch { throwable ->
        println("Catch in Flow: $throwable")
    }.collect()
}.invokeOnCompletion { throwable ->
    println("Coroutine is Complete: $throwable")
}

// Console

Flow is Complete: java.lang.Exception: Aiming Driven Development
Catch in Flow: java.lang.Exception: Aiming Driven Development
Coroutine is Complete: null

 

Upstream 예외는 Flow 의 catch { } 블록에서 핸들링할 수 있습니다. 물론, Flow 내에서 발생한 예외는 해당 Flow 가 실행되는 CoroutineScope 로 전파되기 때문에, CoroutineExceptionHandler 를 적용한 경우에는 catch { } 없이도 핸들링할 수 있습니다.

 

위 예제의 경우 Flow catch { } 내부에서 핸들링 하였으므로, CoroutineScope 는 별도의 예외없이 종료됩니다.

 

3. Downstream

Downstream 예외의 경우, catch { } 에서 핸들링할 수 없으므로, try-catch 문 또는 runCatching { } 등을 활용하여야 합니다. 개인적으로는 CoroutineExceptionHandler 의 사용을 좋아합니다.

 

val coroutineExceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
    println("CoroutineExceptionHandler : $throwable")
}

lifecycleScope.launch(coroutineExceptionHandler) {
    flow {
        emit(1)
    }.onCompletion { throwable ->
        println("Flow is Complete: $throwable")
    }.catch { throwable ->
        println("Catch in Flow: $throwable")
    }.collect {
        throw Exception("Aiming Driven Development")
        println(it)
    }
}.invokeOnCompletion { throwable ->
    println("Coroutine is Complete: $throwable")
}

// Console

Flow is Complete: java.lang.Exception: Aiming Driven Development
CoroutineExceptionHandler : java.lang.Exception: Aiming Driven Development
Coroutine is Complete: java.lang.Exception: Aiming Driven Development

 

앞서 기술하였듯, catch { } 에서는 Downstream 예외를 처리하지 못합니다. CoroutineExceptionHandler 에 의해 처리되어 예외가 처리됩니다.

수동으로 취소하였을 때

여기서 취소launch() 메서드를 통해 반환되는 Job 의 취소를 말합니다. cancel() 메서드를 통해 취소할 수 있습니다.

 

val coroutineExceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
    println("CoroutineExceptionHandler : $throwable")
}

lifecycleScope.launch(coroutineExceptionHandler) {
    flow {
        emit(1)
    }.onCompletion { throwable ->
        println("Flow is Complete: $throwable")
    }.catch { throwable ->
        println("Catch in Flow: $throwable")
    }.collect {
        println(it)
        cancel() // Job 취소
    }
}.invokeOnCompletion { throwable ->
    println("Coroutine is Complete: $throwable")
}

// Console
1
Flow is Complete: null
Coroutine is Complete: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelled}@5db6cd3

조건에 따라 중지시킬 때

보통 takeWhile { }, first { } 를 많이 사용하는 것 같습니다. 예시에서는 takeWhile { } 을 사용하였는데, 특정 조건에 맞는 데이터면 수집하고, 그렇지 않은 데이터가 방출될 때 완료됩니다.

 

lifecycleScope.launch(coroutineExceptionHandler) {
    flow {
        repeat(5) {
            emit(it)
        }
    }.onCompletion { throwable ->
        println("Flow is Complete: $throwable")
    }.takeWhile {
        it < 3
    }.collect {
        println(it)
    }
}.invokeOnCompletion { throwable ->
    println("Coroutine is Complete: $throwable")
}

// Console

0
1
2
Flow is Complete: kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
Coroutine is Complete: null

 


Cancellable()

Exception 이 발생하는 갖가지 상황에 대해 유연한 대응을 할 수 있다는 부분이 Flow 의 큰 강점임을 알아 볼 수 있었습니다. 예외를 다양하고도 일관적인 방식으로 처리할 수 있다면, 보다 안정적인 아키텍처를 구성하는 데에 도움이 될 것입니다.

 

그러나 Exception 만이 문제는 아니겠지요. 예외에 의해 프로그램이 강제 종료되지 않더라도, 개발자가 의도하지 않은 대로 동작한다면 그 것은 안정적인 아키텍처를 유지하고 있다 말할 수는 없지 않을까 싶습니다.

 

하지만, 앞서 기술한 것과 같이 Flow 를 사용한다면 원하는 시점에, 원하는 트리거를 이용해서 Flow 의 동작을 적절하게 제어할 수 있습니다. 이러한 동작이 가능한 이유는 무엇일까요?

 

그 답은 cancellable() 에 있었습니다.

 

cancellable

Returns a flow which checks cancellation status on each emission and throws the corresponding cancellation cause if flow collector was cancelled. Note that flow builder and all implementations of SharedFlow are cancellable by default. This operator provide

kotlinlang.org

 

해당 Docs 에 따르면 Flow 빌더와 SharedFlow<T> 의 모든 구현체들은 기본적으로 cancellable 합니다.

flowOf(), asFlow() 등의 Extension 들은 cancellable 하지 않은데요.

 

@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

 

위 코드 스니펫은 Extension 들을 통해 반환되는 Flow 이며, 아래 스니펫은 기존의 Flow 빌더를 통해 반환되는 Flow 입니다.

 

Extension 에 의해 반환되는 Flow 의 경우 inline 키워드와 crossinline 키워드를 붙여 비지역 리턴을 방지한 모습입니다. 즉, FlowCollector 람다에 대한 구현으로 collect { } 의 호출 컨텍스트가 종료되는 경우를 배제한 셈입니다.

 

lifecycleScope.launch {
    listOf(1, 2, 3, 4, 5).asFlow().collect {
        if (it == 3) {
            cancel()
        } else {
            println(it)
        }
    }
}.invokeOnCompletion { throwable ->
    println("Coroutine is Complete: $throwable")
}

// Console

1
2
4
5
Coroutine is Complete: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelled}@5db6cd3

 

수집된 값이 3일 때 CoroutineScope 를 취소하였지만, 수집이 모두 끝나고 나서야 호출 컨텍스트 (CoroutineScope) 가 취소됨을 확인할 수 있습니다. 

 

즉, Flow 사용 시 정확한 시점에 적절한 제어가 필요하다면 Extension 을 사용하기보다 기본적인 Flow 빌더를 사용하는 것이 좋습니다.

 


값을 불러오고, 이를 가공하여 UI 를 업데이트하는 일에는 익숙했지만, Flow 에 대한 취소 처리나 에러 대응에 대해서는 적잖이 부족하다고 느꼈습니다. 이번 기회에 정리해 볼 수 있어 좋았습니다.

중간에 inline 키워드와 crossinline 키워드도 한 번 등장했는데, 이에 대해서도 한 번 정리해보면 좋을 것 같습니다.

'Kotlin' 카테고리의 다른 글

Effective Kotlin, 좋았던 부분들.  (0) 2023.11.04
Coroutines 파헤치기  (0) 2023.10.06
Coroutine Details  (0) 2023.06.22
Kotlin Closure  (0) 2023.05.04
Generics 와 Reified 키워드  (0) 2023.04.21