При использовании корутин в Kotlin рано или поздно появляется вопрос о том, какие примитивы необходимо использовать для передачи потоков данных между несколькими корутинами. Для этого в библиотеке kotlinx.coroutines имеется специальный механизм, который так и называется - Channel. Посмотрим на него поподробнее.
Что такое каналы в Kotlin?
Kotlin Channel - это примитив для общения и передачи потоков данных между корутинами. То есть если необходимо передавать какие нибудь данные из одной корутины другой в виде потока/стрима данных, то для этого необходимо использовать класс kotlinx.coroutines.channels.Channel
. Как пишут в документации (для тех кто знает Java Core) каналы можно сравнить с классом очереди java.util.concurrent.BlockingQueue
, но в отличие от очереди каналы имеют ключевую особенность - они не блокируют поток в момент вызова операторов для записи или чтения, а лишь приостанавливают выполнение корутины, имеют неблокирующую природу. Поэтому для простоты можно рассматривать каналы именно как очередь со всеми вытекающими особенностями их использования.
Интерфейс Channel
С точки зрения кода Channel
- это интерфейс, который наследуется от двух других интерфейсов: SendChannel
и ReceiveChannel
.
Сначала рассмотрим родительские интерфейсы SendChannel
и ReceiveChannel
. Интерфейс SendChannel
имеет две основные функции отправки данных в канал:
Функция send
объявлена с модификатором suspend
, поэтому выполнение корутины может быть приостановлено если при попытке отправки буфер канала заполнен или он просто отсутствует (тогда приостановка будет до тех пор, пока другая корутина не начнет читать значения из канала).
Функция offer
объявлена без модификатора suspend
и выполняет отправку элемента в канал моментально без блокировки. В случае успешно выполненной отправки (если значение поступило в буфер канала), то функция вернет true
, иначе будет возвращен false
. В случае, если канал ранее был закрыт для отправки (значение поля канала isClosedForSend
имеет значение true
), то будет сгенерировано исключение.
Также в интерфейсе объявлена функция для выполнения закрытия канала:
Выставляет значение true
для поля isClosedForSend
и отправляет в канал (складывает в конец очереди) специальный токен, обозначающий его закрытие. Аргументом функции является nullable исключение, которое может означать причину закрытия. Если при вызове close
в качестве аргумента передать null
, то это означает штатное закрытие канала и при последующей попытке вызвать send
или offer
будет генерироваться исключение типа ClosedSendChannelException
. Если же в качестве аргумента передавать какое либо конкретное исключение, это будет означать закрытие канала с ошибкой и это исключение будет генерироваться при последующих попытках вызвать send
или offer
. Интерфейс ReceiveChannel
также имеет две основные функции для чтения данных из канала:
Функция receive
предназначена для чтения значения и последующего его удаления из канала. Имеет модификатор suspend
и корутина приостанавливает свое выполнение при чтении из пустого канала до тех пор, пока не появится новое значение. При попытке чтения из закрытого канала будет сгенерировано исключение либо с типом ClosedReceiveChannelException
, либо с другим классом, если таковой был передан в функцию close.
Функция poll
, по аналогии с функцией offer
для производителя, является неблокирующей и не имеет модификатор suspend
. Выполняет получение и удаление значения из канала или возвращает null
, если канал на момент чтения пустой или был корректно закрыт без указания причины закрытия. Если ранее при закрытии канала был передан объект исключения, то при вызове poll
будет сгенерировано исключение соответствующего типа.
Также в данном интерфейсе объявлен оператор iterator
, который позволяет писать красивые конструкции на манер чтения из очереди примерно так:
При таком чтении из канала, в случае вызова close
у SendChannel
(корректное закрытие канала со стороны отправителя) все успешно завершится и исключение выброшено не будет. Еще в интерфейсе имеется функция cancel
:
Вызов этой функции без передачи аргумента корректно завершает канал с удалением всех оставшихся данных из буфера. Последующие попытки отправки или чтения данных из ранее отмененного канала будут приводить к исключениям CancellationException
. Если со стороны корутины-потребителя отпала потребность в последующем чтении значений из канала, то необходимо отменять канал посредством функции cancel.
Стратегии буферизации данных у каналов
Итак, оба выше рассмотренных интерфейса наследуются другим основным интерфейсом - Channel
, который сам по себе не добавляет дополнительных функций или абстрактных полей. Но внутри него объявлен интересный объект-компаньон Factory
, внутри которого перечислены константы всех возможных типов буфера данных для каналов. Эти константы используются в момент создания канала при вызове глобальной функции-фабрики Channel()
и указывают какой размер буфера у него будет, тем самым по сути определяя необходимое поведение. Ниже перечислены константы, определяющих размер буфера при создании каналов:
- RENDEZVOUS =
0
, определяет рандеву-канал с размером буфера равным 0, и по-умолчанию фабрикой создаются такие рандеву-каналы. То есть буфер у канала отсутсвует совсем, поэтому когда корутина вызываетsend
, она приостанавливается до тех пор, пока другая корутина не вызовет функцию receive и наоборот. Для этого типа буфера фабрика возвращае объект классаRendezvousChannel
. - CONFLATED =
-1
, создает канал с размером буфера равный 1. Повторный вызовoffer
илиsend
перезаписывает текущее значение в буфере, при этом приостановка корутины не происходит. Поэтому ресивер будет считывать всегда самое последнее значение из канала. Для этого типа буфера фабрика возвращает объект классаConflatedChannel
. - BUFFERED =
-2
, определяет канал с наличием буфера с фиксированным размером по-умолчанию, который равен 64. Для JVM этот размер может быть переопределен. Корутина отправителя приостанавливает работу только тогда, когда буфер канал полн, а корутина получатель приостанавливает работу только при пустом буфере. Для этого типа буфера фабрика возвращает объект классаArrayChannel
. - UNLIMITED =
Int.MAX_VALUE
, создает канал с бесконечным размером буфера, который ограничен только размером памяти, поэтому при очередной отправке даных можно получить исключениеOutOfMemoryError
. При этом корутина отправителя никогда не будет приостановлена при вызове send. Для этого типа буфера фабрика возвращает объект классаLinkedListChannel
.
Создание канала Channel
Теперь посмотрим как создать объект канала. Для этого необходимо вызвать глобальную функцию-фабрику kotlinx.coroutines.channels.Channel()
:
Аргументом функции является целочисленное значение, определяющее размер буфера. То есть можно либо передать одну из констант интерфейса Channel
, которые были описаны выше в данной статье, либо указать конкретное значение размера буфера, тогда фабрика вернет объект класса ArrayChannel
с заданной вместимостью канала. Также функции необходимо передать тип объектов, с которыми канал будет работать.
Создание канала производителя через produce
На момент написания статьи функция помечена аннотацией ExperimentalCoroutinesApi
.
Есть еще один способ создать канал - использовать фабрику produce
:
Сама функция является расширением для CoroutineScope
, создает канал и запускает в скоупе корутину, тело которой задается в аргументе block
. Эта лямбда, в свою очередь, является расширением для класса ProducerScope
. То есть внутри блока имеется доступ к созданному объекту SendChannel
, поэтому можно писать код, который будет генерировать и потом отправлять данные в канал. Собственно, в основном для таких задач и используется эта фабрика. Также в качестве аргумента можно передать контекст для корутины канала (аргумент context
), и можно задать размер буфера либо используя константы интерфейса Channel
или задать конкретное значение (аргумент capacity
). Билдер в результате возвращает объект типа ReceiveChannel
из которого можно читать значения. Примерно так можно создавать каналы с использованием фабрики produce
:
Когда лямбда полностью выполняется, канал автоматические закрывается (у SendChannel
вызывается функция close
). А точнее когда корутина, которая внутри себя выполняет лямбду block
, завершает свое выполнение (у корутины вызывается функция onCompleted
), в этот момент закрывается канал.
Широковещательный канал BroadcastChannel
На момент написания статьи интерфейс помечен аннотацией ExperimentalCoroutinesApi
.
В случае если имеется один производитель данных и несколько потребителей/подписчиков, которым всем надо передавать новое значение (а не первому кто вызвал receive), то для таких задач надо использовать специальный канал BroadcastChannel
, наследника SendChannel
, который имеет следующие функции в своем интерфейсе:
Функция openSubscription
создает нового подписчика и возвращает ссылку на объект типа ReceiveChannel
. То есть в результате создается новый канал, который слушает поступление данных в исходный BroadcastChannel
. Когда необходимость в прослушивании канала отпадает, то обязательно нужно выполнять процесс отписки. Для этого у подписчика ReceiveChannel
необходимо вызывать функцию cancel
, чтобы корректно закрыть этот дополнительный канал и освободить ресурсы. Для завершения всего BroadcastChannel
необходимо вызывать функцию cancel
, которая завершает все каналы подписчиков и освобождает ресурсы, очищая буфер. Для создания объекта BroadcastChannel
надо использовать глобальную функцию-фабрику:
Работает также, как фабрика у Channel
, за исключением одного отличия - нельзя задать неограниченный размер буфера или равный 0
(рандеву-канал). Если задать размер буфера равным 1
, то будет создан ConflatedBroadcastChannel
, в любом другом случае создается ArrayBroadcastChannel
.
На этом пока что всё. В последующих статьях будет рассмотрен оператор Select
, который помогает сделать механизм ожидания выполнения для параллельных suspend функций и будет рассмотрена реализация холодного потока данных Flow
.
Интересные статьи по теме, которые стоит почитать:
- Официальная документация по Kotlin Channels
- Roman Elizarov: Cold flows, hot channels
- Jag Saund: Kotlin: Diving in to Coroutines and Channels
- Evan Fang: Kotlin Coroutines in Android — Channel