본문 바로가기

Kotlin

Channel 제대로 알기

Unsplash, Ryoji Iwata.

 

Channel 이 왜 필요한가?

채널은 코루틴과 코루틴이 서로 통신하기 위해 필요합니다. 전역 변수 등을 통해서 전달할 수야 있겠지만, 동시성 문제도 고려하여야 합니다. 이를 해결하기 위해 Channel 이 탄생했습니다. 

간단한 사용법은 다음과 같습니다.

 

var num = 0
val channel = Channel<Int>()

findViewById<TextView>(R.id.textView).setOnClickListener { // textView 클릭시
    CoroutineScope(Dispatchers.Default).launch {
        num++
        channel.send(num) // channel 에 num 값 보냄
    }
}

CoroutineScope(Dispatchers.Main).launch {
    repeat(5) { // 다섯 번 까지 받아 낼 수 있음
        channel.receive().print() // channel 에서 값 받아 출력
    }
}

// Console

I/System.out: 1
I/System.out: 2
I/System.out: 3
I/System.out: 4
I/System.out: 5

 

채널은 BlockingQueue 와 개념이 비슷합니다. BlockingQueue 는 특정 상황에 스레드를 대기시킵니다. poll() 하려고 큐가 비었거나, offer() 하려고 하는데 큐에 공간이 없다거나 할 때 그렇습니다. 덕분에 ThreadSafe 합니다.

 


 

큐와 달리 채널은 더 이상 요소가 들어오지 않음을 나타내기 위해 이를 닫을 수 있습니다.

수신자 쪽에서는 일반 for loop 를 이용하여 채널로부터 간편하게 요소를 받아 올 수 있습니다.

 

개념적으로, 채널을 닫는다는 것은 채널에 토큰을 보내는 것과 같습니다. 이 닫기 토큰이 수신되는 즉시

반복이 중지되므로, 닫기 전에 보낸 모든 요소는 수신됨을 보장받습니다.

 

val channel = Channel<Int>()
        MainScope().launch {
            channel.send(7)
            channel.send(8)
            channel.send(6)
            channel.send(5)
            channel.send(0)
            channel.send(17)
            channel.send(28)
            channel.close()
        }

        CoroutineScope(Dispatchers.Default).launch {
            while (true) {
                channel.receive().print()
                if (channel.isClosedForReceive) {
                    break
                }
            }
        }.invokeOnCompletion {
            println("channel and coroutine is complete")
        }


// Console

I/System.out: 7
I/System.out: 8
I/System.out: 6
I/System.out: 5
I/System.out: 0
I/System.out: 17
I/System.out: 28
I/System.out: channel and coroutine is complete

 

위 예제에서는 적절히 채널을 닫았습니다. 채널이 닫혀있을 때 값을 받거나 보내면 다음과 같은 예외를 던집니다.

 

kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

 


 

채널 생산자

코루틴이 시퀀스를 생성하는 패턴은 굉장히 일반적입니다. 이는 Producer-Consumer 패턴의 일부입니다.

채널 생산자를 '채널을 파라미터로 받는 함수' 로 추상화할 수 있습니다. 하지만 이 방식은 결과가 함수에서 반환되어야 한다는 상식에 어긋납니다.

 

생산자 측에서 바로 수행할 수 있도록 produce { } 라는 편리한 코루틴 빌더와 

소비자 측의 for loop 를 대체하는 consumeEach { } 가 있습니다.

 

val channel = CoroutineScope(Dispatchers.IO).produce {
    send(1)
    send(2)
    send(3)
}

CoroutineScope(Dispatchers.Main).launch {
    channel.consumeEach {
        println("Receiver : I received : $it")
    }
}

// Console

I/System.out: Receiver : I received : 1
I/System.out: Receiver : I received : 2
I/System.out: Receiver : I received : 3

 


 

파이프라인

파이프라인은 하나의 코루틴이 끊임없는 값을 생성해내는 패턴입니다. 다른 코루틴 또는 여러 코루틴이 해당 스트림을 소비하고, 임의의 연산을 진행하고 또 다른 새로운 결과값들을 생산합니다.

끊임없이 데이터를 가져와 유저에게 전달하는 방식 등에 사용합니다.

 

여러 코루틴이 하나의 채널에서 서로 작업을 받을 수도 있고,

여러 코루틴이 하나의 채널에게 작업을 보낼 수도 있습니다.

 


 

버퍼 선언이 가능합니다. 팩토리 함수와 produce 빌더는 버퍼 크기를 지정하기 위해 매개변수를 사용합니다. SharedFlow 와 비슷합니다.

 


 

이런 게 있는 줄도 몰랐는데, 자주 사용하면 좋을 것 같습니다. withContext() 대신 사용할 수 있을 듯 합니다.

'Kotlin' 카테고리의 다른 글

확장 함수 간단 정리  (0) 2022.11.15
ChannelFlow, CallbackFlow 제대로 알기  (0) 2022.11.03
StateFlow, SharedFlow 제대로 알기  (0) 2022.11.01
Sealed Class, Enum 제대로 알기  (0) 2022.10.28
out, in 제대로 알기  (0) 2022.10.26