Управление жизненным циклом реактивных потоков: от Callback API до SharedFlow с автоматической подпиской
В современной разработке под Android и JVM реактивное программирование на основе Kotlin Coroutines и Flow является стандартом. Однако часто приходится иметь дело с традиционными Callback-интерфейсами (слушателями событий) или необходимо распределять один поток данных между несколькими подписчиками с жестким контролем ресурсов.
В этой статье мы подробно разберем, как связать асинхронный генератор событий на основе колбэков, обернуть его в callbackFlow, преобразовать в горячий SharedFlow с политикой WhileSubscribed и управлять подпиской так, чтобы ресурсы выделялись только тогда, когда есть реальные слушатели.
Архитектурная задача
Представим систему, которая генерирует события (например, гео-координаты, сетевые сокеты или показания датчиков). Нам нужно:
- Обернуть этот источник в поток
Flow. - Сделать так, чтобы несколько подписчиков могли слушать один и тот же поток событий одновременно (SharedFlow).
- Избежать утечек ресурсов: если подписчиков нет (все отписались), генератор событий должен быть полностью остановлен.
- При повторном подключении подписчиков генератор должен автоматически возобновить работу.
Для реализации этого в Kotlin используются два мощных механизма:
callbackFlow— мост между callback-based API и реактивными потоками.SharingStarted.WhileSubscribed— стратегия, управляющая временем жизни горячего потока в зависимости от количества активных коллекторов.
Схема взаимодействия элементов
sequenceDiagram
autonumber
actor Sub1 as Подписчик 1
actor Sub2 as Подписчик 2
participant SF as SharedFlow (shareIn)
participant CBF as callbackFlow
participant EG as EventGenerator
Note over Sub1, EG: Фаза 1: Появление подписчиков
Sub1->>SF: collect() (1-й подписчик)
SF->>CBF: Запуск сбора данных
CBF->>EG: registerListener()
EG->>EG: Запуск фонового треда/корутины генерации
Sub2->>SF: collect() (2-й подписчик)
EG->>Sub1: Отправка события
EG->>Sub2: Отправка события
Note over Sub1, EG: Фаза 2: Отмена подписок и освобождение ресурсов
Sub1->>SF: Отмена корутины сбора
Sub2->>SF: Отмена корутины сбора
Note over SF: Число подписчиков = 0
SF->>CBF: Отмена upstream-корутины
CBF->>EG: unregisterListener() (awaitClose)
EG->>EG: Остановка генерации событий (Ресурсы свободны)
Полный код реализации
Ниже приведен готовый к запуску пример кода на Kotlin. Для наглядности логов мы используем масштабируемый интервал времени в 10 секунд (в реальном приложении это может быть минута или любой другой промежуток).
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.*
import java.time.LocalTime
import java.time.format.DateTimeFormatter
// Вспомогательный логгер с выводом точного времени
fun log(message: String) {
val time = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss.SSS"))
println("[$time] $message")
}
// 1. Интерфейс традиционного слушателя событий
interface EventListener {
fun onEvent(event: String)
}
// 2. Обвязка для генерации событий (Эмуляция внешнего источника)
class EventGenerator(private val scope: CoroutineScope) {
private val listeners = mutableListOf<EventListener>()
private var job: Job? = null
private var eventCounter = 1
@Synchronized
fun registerListener(listener: EventListener) {
listeners.add(listener)
log("[EventGenerator] Слушатель зарегистрирован. Активных слушателей: ${listeners.size}")
if (job == null) {
startGenerating()
}
}
@Synchronized
fun unregisterListener(listener: EventListener) {
listeners.remove(listener)
log("[EventGenerator] Слушатель удален. Активных слушателей: ${listeners.size}")
if (listeners.isEmpty()) {
stopGenerating()
}
}
private fun startGenerating() {
log("[EventGenerator] Запуск фоновой генерации событий...")
job = scope.launch {
try {
while (isActive) {
delay(1000) // Генерация события каждую секунду
val event = "Event #${eventCounter++}"
log("[EventGenerator] Сгенерировано: $event")
// Уведомляем зарегистрированных слушателей
val currentListeners = synchronized(this@EventGenerator) {
ArrayList(listeners)
}
currentListeners.forEach { it.onEvent(event) }
}
} catch (e: CancellationException) {
log("[EventGenerator] Генерация событий отменена (корутина остановлена).")
}
}
}
private fun stopGenerating() {
log("[EventGenerator] Останавливаем генерацию событий (0 слушателей)...")
job?.cancel()
job = null
}
}
// 3. Преобразование Callback API во Flow с помощью callbackFlow
fun eventFlow(eventGenerator: EventGenerator): Flow<String> = callbackFlow {
log("[callbackFlow] Активация. Подписываемся на EventGenerator...")
val listener = object : EventListener {
override fun onEvent(event: String) {
log("[callbackFlow] Получено событие от Generator: $event -> передаем во Flow")
trySend(event) // Безопасно отправляем событие в поток
}
}
// Регистрируем колбэк
eventGenerator.registerListener(listener)
// awaitClose приостанавливает выполнение callbackFlow и ждет закрытия канала.
// Когда сборщик отменяет подписку, управление переходит сюда.
awaitClose {
log("[callbackFlow] Деактивация. Отписываемся от EventGenerator в блоке awaitClose...")
eventGenerator.unregisterListener(listener)
}
}
fun main() = runBlocking {
log("=== ЗАПУСК ДЕМОНСТРАЦИИ FLOW ===")
val DEMO_PERIOD_MS = 10_000L // 10 секунд работы
log("Период активности подписчиков: ${DEMO_PERIOD_MS / 1000} сек.")
val generatorScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
val eventGenerator = EventGenerator(generatorScope)
// 4. Превращаем callbackFlow в горячий SharedFlow
val sharedFlow = eventFlow(eventGenerator)
.shareIn(
scope = this, // Область видимости для шаринга
started = SharingStarted.WhileSubscribed(
stopTimeoutMillis = 0, // Мгновенно останавливать сбор при отсутствии подписчиков
replayExpirationMillis = 0
),
replay = 0 // Новые подписчики не получают старые события
)
// --- ФАЗА 1: Подписка первых слушателей ---
log("\n--- ФАЗА 1: Подписка подписчиков 1 и 2 ---")
val sub1Job = launch {
sharedFlow.collect { event ->
log(" [Подписчик 1] Получил: $event")
}
}
delay(500) // Небольшой сдвиг по времени
val sub2Job = launch {
sharedFlow.collect { event ->
log(" [Подписчик 2] Получил: $event")
}
}
// Собираем события в течение DEMO_PERIOD_MS
delay(DEMO_PERIOD_MS)
// --- ФАЗА 2: Подписчики завершают работу (отписываются) ---
log("\n--- ФАЗА 2: Отписка подписчиков 1 и 2 ---")
log("Отменяем работу Подписчика 1...")
sub1Job.cancel()
log("Отменяем работу Подписчика 2...")
sub2Job.cancel()
// Ожидаем завершения фоновых процессов для наглядности логов
delay(2000)
log("\n--- ПЕРИОД ТИШИНЫ: Нет активных подписчиков. Генератор должен молчать. ---")
delay(DEMO_PERIOD_MS)
// --- ФАЗА 3: Подписчики снова подписываются спустя время ---
log("\n--- ФАЗА 3: Повторная подписка подписчиков 1 и 2 ---")
val sub1JobSecond = launch {
sharedFlow.collect { event ->
log(" [Подписчик 1 (Переподключен)] Получил: $event")
}
}
delay(500)
val sub2JobSecond = launch {
sharedFlow.collect { event ->
log(" [Подписчик 2 (Переподключен)] Получил: $event")
}
}
// Собираем события повторно
delay(DEMO_PERIOD_MS)
// --- ФИНАЛЬНАЯ ОЧИСТКА ---
log("\n--- ЗАВЕРШЕНИЕ: Финальная отписка ---")
sub1JobSecond.cancel()
sub2JobSecond.cancel()
delay(1000)
generatorScope.cancel() // Полностью завершаем фоновые ресурсы генератора
log("=== ДЕМОНСТРАЦИЯ FLOW УСПЕШНО ЗАВЕРШЕНА ===")
}
Разбор Жизненного Цикла по Шагам
Запустив этот код, мы увидим детальную трассировку всех внутренних процессов.
1. Активация и регистрация (Фаза 1)
Когда первый подписчик вызывает .collect{} на нашем SharedFlow, политика SharingStarted.WhileSubscribed понимает, что количество подписчиков изменилось со 0 на 1, и немедленно инициирует сбор данных из вышележащего callbackFlow.
[15:59:07.533] [callbackFlow] Активация. Подписываемся на EventGenerator...
[15:59:07.535] [EventGenerator] Слушатель зарегистрирован. Активных слушателей: 1
[15:59:07.535] [EventGenerator] Запуск фоновой генерации событий...
callbackFlow регистрирует свой EventListener внутри EventGenerator, а тот, обнаружив появление первого слушателя, запускает фоновую генерацию событий раз в секунду.
2. Мультивещание (Multicasting)
Каждое событие, отправленное в генератор, перенаправляется в callbackFlow через trySend и распределяется между подписчиками SharedFlow. Оба подписчика получают идентичные сообщения одновременно:
[15:59:08.545] [EventGenerator] Сгенерировано: Event #1
[15:59:08.548] [callbackFlow] Получено событие от Generator: Event #1 -> передаем во Flow
[15:59:08.549] [Подписчик 1] Получил: Event #1
[15:59:08.550] [Подписчик 2] Получил: Event #1
3. Освобождение ресурсов (Фаза 2)
По истечении времени мы отменяем корутины подписчиков. Количество активных слушателей SharedFlow падает до 0.
[15:59:18.038] Отменяем работу Подписчика 1...
[15:59:18.040] Отменяем работу Подписчика 2...
[15:59:18.043] [callbackFlow] Деактивация. Отписываемся от EventGenerator в блоке awaitClose...
[15:59:18.044] [EventGenerator] Слушатель удален. Активных слушателей: 0
[15:59:18.044] [EventGenerator] Останавливаем генерацию событий (0 слушателей)...
[15:59:18.044] [EventGenerator] Генерация событий отменена (корутина остановлена).
WhileSubscribed(stopTimeoutMillis = 0) мгновенно прекращает сбор данных из callbackFlow. Сборщик отменяется, что провоцирует выполнение блока awaitClose внутри callbackFlow. Колбэк безопасно удаляется из EventGenerator. Генератор видит 0 слушателей и останавливает свой фоновый поток эмуляции.
4. Период тишины
На протяжении периода тишины в консоли не появляется ни одной записи: генерация остановлена, системные ресурсы (процессор, батарея мобильного устройства) находятся в полной безопасности.
5. Повторный запуск (Фаза 3)
Когда новые подписчики вновь подключаются к тому же SharedFlow, жизненный цикл запускается заново без необходимости пересоздавать поток вручную:
--- ФАЗА 3: Повторная подписка подписчиков 1 и 2 ---
[15:59:30.054] [callbackFlow] Активация. Подписываемся на EventGenerator...
[15:59:30.054] [EventGenerator] Слушатель зарегистрирован. Активных слушателей: 1
[15:59:30.054] [EventGenerator] Запуск фоновой генерации событий...
Генератор успешно стартует снова и продолжает выработку событий (начиная с Event #11), распределяя их между новыми подписчиками.
Важные особенности проектирования
SharingStarted.WhileSubscribed:- Параметр
stopTimeoutMillisпозволяет настроить задержку перед закрытием верхнего потока. Значение0означает моментальную остановку. Если выставить, например,5000(5 секунд), то кратковременный перезапуск экрана (например, при повороте экрана устройства) не приведет к полной переподписке и остановке генератора. Это крайне полезно при разработке UI под Android.
- Параметр
- Безопасность
trySend:- Внутри
callbackFlowдля передачи данных используетсяtrySend(event). Он возвращает объект результата и не блокирует выполнение, в отличие от приостанавливающего методаsend(event). Если буфер канала переполнится, событие будет безопасно отброшено или обработано в соответствии с логикой.
- Внутри
- Гарантированная очистка в
awaitClose:- Блок
awaitCloseвыполнится всегда, вне зависимости от того, отменился ли сборщик штатно, упал ли с ошибкой или корутина была принудительно остановлена. Это лучшее место для закрытия сокетов, удаления слушателей, отписки от системных служб.
- Блок
Заключение
Связка callbackFlow + shareIn с политикой WhileSubscribed предоставляет элегантное и надежное решение для адаптации любых асинхронных Callback-источников данных в реактивную парадигму Kotlin. Она решает проблему утечки ресурсов “из коробки” и обеспечивает бесшовное переиспользование потоков данных между неограниченным числом потребителей.