Основы параллелизма Cats с Ref и Deferred

Перевод статьи "Cats Concurrency basics with Ref and Deferred", автор Krzysztof Grajek

Параллельный доступ и ссылочная прозрачность

Ref и Deferred являются основными строительными блоками структур в ФП, используемых параллельно. Особенно при использовании с tagless final абстракцией эти два элемента при построении бизнес-логики могут дать нам и параллельный доступ, и ссылочную прозрачность, и мы можем использовать их для создания более сложных структур, таких как счетчики и конечные автоматы.

Прежде чем углубиться в Ref и Deferred, полезно знать, что параллелизм в Cats основан на Java AtomicReference, и мы начнем наше путешествие здесь.

Atomic Reference

AtomicReference является одним из элементов пакета java.util.concurrent.atomic, и в документах Oracle мы можем прочитать, что java.util.concurrent.atomic это:

Небольшой набор классов, поддерживающих безблокировочное поточно-ориентированное программирование с одиночными переменными. По сути, классы в этом пакете расширяют понятие volatile значений, полей и элементов массива до тех, которые также обеспечивают атомарную операцию условного обновления.

...

Экземпляры классов AtomicBoolean, AtomicInteger, AtomicLong и AtomicReference каждый обеспечивают доступ и обновления к одной переменной соответствующего типа.

AtomicReference с нами начиная с Java 1.5, и он используется для получения большей производительности, чем синхронизация (что обычно, но не всегда).

Когда вам нужно поделиться некоторыми данными между потоками, то вы должны защитить доступ к этой части данных. Простейшим примером может быть увеличение некоторого int: i = i + 1. Наш пример состоит на самом деле из 3 операций: сначала мы читаем значение i, затем прибавляем 1 к этому значению, а в конце присваиваем i вновь вычисленное значение. В многопоточных приложениях может возникнуть ситуация, когда каждый поток будет выполнять эти 3 шага между шагами других потоков, и окончательное значение i невозможно будет предсказать.

Обычно возникают мысли о механизме слова synchronised или класса блокировки, но с atomic.* больше не нужно беспокоиться о явной синхронизации, и вы можете использовать предоставленные служебные атомарные типы, где автоматически включается проверка, выполняется ли операция за один шаг.

Возьмем в качестве примера AtomicInteger.incrementAndGet:

/**
 * Atomically increments by one the current value.
 *
 * @return the updated value
 */
public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

С помощью операции compareAndSet мы либо обновляем наши данные, либо терпим неудачу, но мы никогда не заставляем поток ждать. Таким образом, если compareAndSet в incrementAndGet не удается, то мы просто пытаемся повторить всю операцию снова, извлекая текущее значение данных в get(). С другой стороны, с synchronized-подобными механизмами нет ограничений на количество операторов, которые можно выполнить, пока блокировка получена, но этот блок никогда не выйдет из строя и может заставить вызывающий поток ждать, обеспечивая вероятность взаимоблокировки или снижения производительности.

Теперь, разобрав основы, давайте перейдем к нашей первой мегазвезде параллелизма.

Ref

Cats Ref очень похож на упомянутую выше атомарную ссылку Java, основные отличия в том, что первое - Ref используется с tagless final абстракцией F, второе - оно всегда содержит значение, и значение, содержащееся в Ref — типа A, - это неизменяемое значение.

abstract class Ref[F[_], A]:
  def get: F[A]
  def set(a: A): F[Unit]
  def modify[B](f: A => (A, B)): F[B]
  // ... и т.д.

Ref[F[_], A] является чисто функциональной изменяемой ссылкой:

Он создается путем присвоения начального значения, и каждая операция оборачивается в F, например, в cats.effect.IO.

Если внимательно посмотреть на сопутствующий объект для Cats Ref, то увидим, что необходимо предоставить также Sync[F]:

def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))

Приведенный выше метод является лишь примером многих операций, доступных в Ref, он используется для построения Ref с начальным значением. Sync дает возможность приостановить любые побочные эффекты с помощью своего метода delay для каждой операции на Ref.

Ref довольно простая конструкция, мы можем сосредоточиться в основном на ее методах get, set и of, чтобы понять, как она работает.

get и set подход

Допустим, у нас есть объект (назовем его Shared), который должен быть обновлен несколькими потоками. И мы используем для этого методы get и set, создав служебный метод, который поможет нам на этом пути:

def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] =
  for
    sh <- trace.get()
    _ <- trace.set(Shared(sh, msg))
  yield ()

Наш объект Shared можно создать, применив для создания нового экземпляра его предыдущее состояние и новое значение. Shared на самом деле может быть всем, чем мы захотим: простым списком, ассоциативным массивом или чем-то еще, к чему мы хотим получить одновременный безопасный доступ. Я только что изобрел Shared(prev: Shared, msg: String) для целей этого урока.

F в примере выше был заменен на конкретный IO из Cats effect, но имейте в виду, что Ref полиморфен на F и может использоваться с другими библиотеками.

С нашей монадической IO мы выполняем flatMap на каждом шаге и устанавливаем значение, хранящееся в нашей Ref, на желаемое — или... подождите, может быть, мы этого не делаем.

При таком подходе при одновременном вызове modifyShared мы можем потерять обновления! Это потому, что у нас может быть ситуация, когда, например, два потока могут прочитать значение с помощью get, и каждый из них будет выполнять set одновременно. Методы get и set не вызываются атомарно вместе.

Атомарный update

Конечно, мы можем улучшить приведенный выше пример и использовать другие доступные методы из Ref. Для выполнения get и set вместе можно использовать update.

def update(f: A => A): F[Unit]

Это решит проблему с обновлением значения, но у простого update есть свои недостатки. Если мы хотим прочитать значение сразу после его обновления, аналогично тому, как мы использовали get и set, то мы можем в конечном итоге прочитать устаревшие данные, скажем, наш Ref будет содержать ссылку на простой Int:

for
  _ <- someRef.update(_ + 1)
  curr <- someRef.get
  _ <- IO { println(s"current value is $curr") }
yield ()

modify в помощь

Мы можем немного это улучшить, используя modify, который будет делать то же самое, что и update, но вернет обновленное значение для дальнейшего использования.

def modify[B](f: A => (A, B)): F[B] =
  @tailrec
  def spin: B =
    val c = ar.get
    val (u, b) = f(c)
    if !ar.compareAndSet(c, u) then spin
    else b

  F.delay(spin)

Как видите, это почти точно такая же реализация, как в примере AtomicInteger.incrementAndGet, показанном вначале, только на Scala. Можно заметить, что Ref основан на AtomicReference, чтобы делать свою работу.

Ограничения Ref

Вы, наверное, уже заметили, что в случае, если значение не удастся обновить, то функция, переданная в update/modify, должна запускаться недетерминировано и, возможно, запускаться несколько раз. Хорошая новость заключается в том, что это решение в целом оказывается намного быстрее, чем стандартный механизм блокировки и синхронизации, и намного безопаснее, поскольку это решение не может блокироваться.

Как только мы узнали, как просто работает Ref, то можем перейти к другому Cats Concurrent классу: Deferred.

Deferred

В отличие от Ref, Deferred:

Эти свойства делают Deferred простым и довольно интересным одновременно.

abstract class Deferred[F[_], A]:
  def get: F[A]
  def complete(a: A): F[Unit]

Deferred предназначен для чисто функциональной синхронизации: когда мы вызываем get на пустом Deferred, мы блокируем его до тех пор, пока значение не станет доступно. Согласно документации самого класса блокировка:

упомянутая блокировка является только семантической, никакие фактические потоки реализацией не блокируются

Тот же вызов get на непустом Deferred немедленно вернет сохраненное значение.

Другой метод - complete - установит значение, если экземпляр пуст, но при вызове на непустом Deferred вернет ошибку (failed IO в случае IO).

Здесь важно отметить, что Deferred требует, чтобы F был Concurrent, а это означает, что его можно отменить.

Хороший пример использования Deferred — это когда одна часть вашего приложения должна ждать другую. Пример ниже взят из отличного выступления Fabio Labella на Scala Italy 2019 — Composable Concurrency with Ref + Deferred, доступного на Vimeo.

def consumer(done: Deferred[IO, Unit]) =
  for
    c <- Consumer.setup
    _ <- done.complete(())
    msg <- c.read
    _ <- IO(println(s"Received $msg"))
  yield ()

def producer(done: Deferred[IO, Unit]) =
  for
    p <- Producer.setup()
    _ <- done.get
    msg = "Msg A"
    _ <- p.write(msg)
    _ <- IO(println(s"Sent $msg"))
  yield ()

def prog =
  for
    d <- Deferred[IO, Unit]
    _ <- consumer(d).start
    _ <- producer(d).start
  yield ()

В приведенном выше примере у нас есть производитель и потребитель, и мы хотим, чтобы производитель дождался завершения настройки потребителя, прежде чем начать писать сообщения, иначе все, что мы напишем в производителе, будет потеряно. Чтобы преодолеть эту проблему, мы можем использовать общий экземпляр Deferred и блокировать его get до тех пор, пока Deferred экземпляр done не будет заполнен значением на стороне потребителя (в данном случае простое Unit значение ()).

Конечно, приведенное выше решение не лишено проблем: если установка потребителя никогда не завершается, мы застреваем в ожидании, а производитель не может отправлять какие-либо сообщения. Чтобы преодолеть это, мы можем использовать тайм-аут на get, а также использовать Either[Throwable, Unit] или какую-либо другую конструкцию вместо простого Unit внутри объекта Deferred.

Deferred довольно прост, но в сочетании с Ref их можно использовать для создания более сложных структур данных, таких как, например, семафоры.

Для получения дополнительной информации рекомендуется посетить саму документацию Cats, где вы можете узнать больше о параллелизме в Cats и структурах данных, которые он предоставляет.


Ссылки: