Параллелизм с Cats Effect

Перевод статьи "Concurrency with Cats Effect", написанной Krzysztof Atłasik

Параллелизм — это сложно. Brian Goetz в книге "Java Concurrency in Practice" пишет:

Писать правильные программы трудно; писать правильные параллельные программы сложнее. Просто в параллельной программе может произойти больше ошибок, чем в последовательной.

К нашему преимуществу, в Scala есть много замечательных инструментов, помогающих "укротить" параллельное программирование, таких как Akka, ZIO, Finagle или Cats Effect (далее CE). В этой статье сосредоточимся на функциях, предлагаемых последней библиотекой.

Что такое fiber?

Основным строительным блоком параллелизма CE является "волокно" (fiber). В двух словах, волокно — это легковесный логический поток, который представляет собой последовательность действий. В случае CE это список операций, приостановленных с помощью монады IO и упорядоченных с помощью flatMap.

Волокна занимают меньше места по сравнению с системными потоками. Самое главное, они довольно дешевы с точки зрения использования памяти. Следовательно, они не такие дефицитные ресурсы, как потоки. Нам не нужно создавать пулы волокон, мы можем просто создать новый, когда нам это нужно.

В отличие от системных потоков, волокна не полагаются на планировщик операционной системы для переключения между контекстами. Вместо этого они используют планировщик уровня инфраструктуры, который контролирует, когда и как они выполняются. Одно волокно не привязано к конкретному потоку. Среда выполнения Cats Effect может многократно создавать десятки тысяч волокон в несколько системных потоков. В JVM среда выполнения обычно использует пул потоков фиксированного размера, привязанный к ЦП, но для однопоточных сред, таких как Scala.js, она может работать даже в одном потоке.

fibers

Управление волокнами

В CE можно разветвить вычисления с помощью метода start. Он возвращает экземпляр типа Fiber, который предоставляет доступ к методу join. Вызов join заставляет волокно стороны вызова ждать завершения присоединенного волокна.

import cats.effect.*
import scala.concurrent.duration.*

val task =
  for
    _ <- IO.println("Task started")
    _ <- IO.sleep(1.second)
    _ <- IO.println("Task completed")
  yield ()

for
  fiber <- task.start
  _ <- IO.println("Hello from main fiber")
  // ждем 1 секунду до завершения разветвленного волокна
  _ <- fiber.join
yield ()

Рассмотренный пример в Scastie

Другой метод, доступный на Fiber, - cancel. Он позволяет аннулировать волокно вместе с вычислениями, выполняемыми на нем. Метод cancel не вернет управление до тех пор, пока все ресурсы, выделенные в волокне, не будут должным образом завершены, а все базовые операции не будут отменены.

С помощью onCancel(fin: IO[Unit]) можно настроить финализатор, который будет запускаться при отмене задачи.

import cats.effect.*
import scala.concurrent.duration.*

val task =
  for
    _ <- IO.sleep(5.seconds)
    _ <- IO.println("Computations complete")
  yield ()

for
  fiber <- task.onCancel(IO.println("Task canceled!")).start
  _ <- IO.sleep(1.second)
  // волокно завершилось бы через 5 секунд, но оно отменено через 1 секунду
  _ <- fiber.cancel
yield ()

Рассмотренный пример в Scastie

Метод join возвращает значение типа Outcome, имеющее следующие подтипы: Succeeded, Errored и Canceled. Можно выполнить сопоставление с шаблоном для результата, чтобы проверить завершенный статус волокна.

Значение, возвращаемое волокном, будет доступно в качестве поля результата Succeeded. Таким образом, можно передавать значения из разветвленной задачи на сторону вызова.

import cats.effect.*
import scala.concurrent.duration.*

val task = IO.sleep(500.millis) *> IO.pure("Hello world!")

for
  fiber <- task.start
  _ <- fiber.join.flatMap {
    case Outcome.Succeeded(value) =>
      value.flatMap(v => IO.println(s"Computed value: $v"))
    case Outcome.Errored(e) => IO.println(s"Error: $e")
    case Outcome.Canceled() => IO.println("Task was canceled!")
  }
yield ()

Рассмотренный пример в Scastie

Есть другой вариант метода join: joinWith(onCancel: F[A]). Он позволяет указывать резервное вычисление в случае отмены волокна и имеет два специализированных варианта: joinWithNever - сокращение для joinWith(IO.never), и joinWithUnit - сокращение для joinWith(IO.unit).

Тайм-ауты

Можно использовать метод timeout, чтобы указать период, по истечении которого волокно будет отменено.

task
  .onCancel(IO.println("Task has timed out!"))
  .timeout(500.millis)
  .start

Метод timeoutTo позволяет указать резервный вариант для отмененных вычислений.

task
  .onCancel(IO.println("Task has timed out!"))
  .timeoutTo(500.millis, planB)
  .start

Подобно cancel методы тайм-аута всегда будут ждать, пока все основные ресурсы, выделенные в волокне, не будут завершены должным образом. Ожидание можно пропустить с помощью timeoutAndForget. После вызова он немедленно возвращает управление стороне вызова и асинхронно запускает финализаторы. Имейте в виду, что если финализаторы никогда не завершатся, волокно будет протекать.

Структурированный параллелизм

Если волокно не присоединено или не отменено, оно будет проходить параллельно волокну места вызова. Волокно завершится только в том случае, если его вычисления будут завершены или вызовут ошибку. Если вычисления, выполняемые на волокне, никогда не заканчиваются, волокно никогда не завершится.

val task =
  for
    time <- IO.realTimeInstant
    _ <- IO.println(f"Current date and time is $time")
    _ <- IO.sleep(1.second)
  yield ()

for
  _ <- task.foreverM.start // задача запущена как бесконечный цикл
  // и волокно никогда не завершится
  _ <- IO.sleep(5.seconds)
  _ <- IO.println("Bye")
yield ()

Это может привести к утечке ресурсов. Волокна довольно легковесны, но если создать достаточное их количество, это все равно может привести к истощению доступной памяти.

Такие методы, как start и join являются примитивами параллелизма CE. Это удобные инструменты, но их следует использовать с осторожностью. В противном случае можно легко добиться утечки памяти или вызвать тупиковые ситуации.

Чтобы предотвратить утечку, мы должны либо явно отменить избыточные волокна, либо использовать более безопасные методы, такие как background. Метод background связывает жизненный цикл волокна с типом данных Resource. Всякий раз, когда ресурс завершается, он также отменяет базовое волокно.

val backgroundTask: Resource[IO, Unit] = task
  .onCancel(IO.println("Closing the background task"))
  .background

backgroundTask.surround {
  for
    _ <- IO.sleep(5.seconds)
    _ <- IO.println("Bye") // после завершения первоочередной задачи
                           // фоновое волокно будет отменено
  yield ()
}

Статус волокна соответствует синтаксической структуре кода. Такой способ управления жизненным циклом волокон называется структурным параллелизмом.

Еще один инструмент структурированного параллелизма, предлагаемый CE, — это Supervisor. Supervisor — это ресурс, способный порождать волокна, чьи жизненные циклы будет связаны с его жизненным циклом. Всякий раз, когда супервизор завершается, он также обязательно завершает все свои дочерние волокна.

Supervisor поставляется с двумя режимами. Мы можем выбрать режим, передав флаг await при создании ресурса. Если await - false, то супервизор отменит все свои активные потоки, когда завершится. Это значение по умолчанию. Если await - true, завершение супервизора будет блокироваться до тех пор, пока не будут завершены все волокна.

Supervisor[IO](await = false).use { supervisor =>
  for
    _ <- supervisor.supervise(
      IO.println("A").andWait(1.second).foreverM
    )
    _ <- supervisor.supervise(
      IO.println("B").andWait(2.second).foreverM
    )
    _ <- IO.sleep(5.seconds) // волокна будут печатать A и B
                             // в течении 5 секунд, а затем будут завершены
  yield ()
}

Добавив предложение импорта import cats.effect.implicits.*, мы также можем добавить в область видимости метод расширения supervise для IO. Это позволит вызывать supervise аналогично тому, как вызывается start.

import cats.effect.implicits.*

Supervisor[IO](await = true).use { supervisor =>
  IO.println("Hello")
    .andWait(1.second)
    .foreverM
    .timeout(5.seconds)
    .supervise(supervisor)
}

Операторы параллелизма

CE предоставляет множество удобных высокоуровневых абстракций для общих операций, связанных с параллелизмом.

Например, с IO.race можно взять два IO, запустить их параллельно и дождаться более быстрого. Более медленное вычисление будет отменено. Тип результата race - IO[Either[A, B]].

val raced: IO[Either[String, Int]] = IO.race(
  IO("Faster").delayBy(5.seconds),
  IO(999).delayBy(10.seconds)
)

Рассмотренный пример в Scastie

Метод IO.racePair дает больше контроля над тем, как справляться с потерей волокна. Он возвращает как более быструю задачу в виде Outcome, так и экземпляр объекта Fiber, который можно использовать для отмены или присоединения более медленного волокна. Поначалу возвращаемый тип может показаться немного запутанным: Either[(OutcomeIO[A], FiberIO[B]), (FiberIO[A, OutcomeIO[B])]. Если левая задача "побеждает", то мы получаем левое значение Either: кортеж, содержащий результат левого волокна и дескриптор волокна для правого. Если "выигрывает" правый, то получаем правое значение, на этот раз с результатом правого волокна и дескриптором левого.

IO.racePair(IO(10), IO(5).delayBy(2.seconds)).flatMap {
 case Left((resL, fibR))  => 
   fibR.cancel *> IO(s"Left fiber outcome: $resL")
 case Right((fibL, resR)) => 
   fibL.cancel *> IO(s"Right fiber outcome: $resR")
}

Рассмотренный пример в Scastie

Если нам нужно запустить два вычисления параллельно и дождаться их завершения, у нас есть несколько вариантов. Метод IO.both возвращает кортеж, содержащий оба результата: IO[(A, B)]. Если какая-либо из операций отменена или завершилась с ошибкой, вторая также будет отменена.

IO.both(IO(1).delayBy(1.second), IO("Hello"))

IO.bothOutcome не прерывает другое волокно, даже если первое прекращено. Он возвращает кортеж, содержащий результаты обоих волокон.

Библиотека Cats предоставляет множество операторов, позволяющих параллельно выполнять эффекты. Они реализованы как методы расширения, поэтому для их использования нужно сделать соответствующий импорт, который внесет в область видимости правильные имплициты.

import cats.syntax.all.*

Метод parTupled очень похож на IO.both. Его можно вызвать для кортежа из двух IO, и оба будут выполняться параллельно. Тип результата IO[(A, B)]. Семантика этого метода такая же, как и у tupled, с той лишь разницей, что эффект tupled выполняется последовательно.

Если необходимо выполнить произвольное количество эффектов одновременно, то можно использовать параллельные аналоги traverse и sequence: parTraverse и parSequence.

val r: IO[List[Int]] = List(
  IO(1).delayBy(1.second),
  IO(2).delayBy(2.seconds),
  IO(3).delayBy(3.seconds)
).parSequence

Рассмотренный пример в Scastie

Можно ограничить количество одновременно выполняемых задач с помощью вариантов parTraverseN и parSequenceN. Они принимают дополнительный Int параметр, определяющий максимальный параллелизм.

По умолчанию оба parTraverse и parSequence сохраняют порядок задач, что может привести к небольшому снижению производительности. С другой стороны, parTraverseUnordered и parSequenceUnordered не соблюдают порядок, что потенциально может ускорить их выполнение.

И последнее, но не менее важное: мы можем использовать два полезных символических оператора: &> и <&. Они также позволяют выполнять два эффекта параллельно. Неудача в одном из IO отменит и другой. Если всё вычисление отменено, оба действия будут отменены. Оператор &> возвращает значение справа, <& - слева.

// Получим 1 приблизительно через 2 секунды
IO(1).delayBy(1.second) <& IO(2).delayBy(2.seconds)

Об этих операторах можно думать как об аналогах последовательных *>, <*, << и >>.


Ссылки: