Основы параллелизма Cats с Ref и Deferred
Перевод статьи "Cats Concurrency basics with Ref and Deferred", автор Krzysztof Grajek
Параллельный доступ и ссылочная прозрачность
Ref
и Deferred
являются основными строительными блоками структур в ФП, используемых параллельно.
Особенно при использовании с tagless final абстракцией
эти два элемента при построении бизнес-логики могут дать нам и параллельный доступ, и ссылочную прозрачность,
и мы можем использовать их для создания более сложных структур, таких как счетчики и конечные автоматы.
Прежде чем углубиться в Ref
и Deferred
, полезно знать,
что параллелизм в Cats основан на Java AtomicReference
, и мы начнем наше путешествие здесь.
Atomic Reference
AtomicReference
является одним из элементов пакета java.util.concurrent.atomic
,
и в документах Oracle
мы можем прочитать, что java.util.concurrent.atomic
это:
Небольшой набор классов, поддерживающих безблокировочное поточно-ориентированное программирование с одиночными переменными. По сути, классы в этом пакете расширяют понятие
volatile
значений, полей и элементов массива до тех, которые также обеспечивают атомарную операцию условного обновления....
Экземпляры классов AtomicBoolean, AtomicInteger, AtomicLong и AtomicReference каждый обеспечивают доступ и обновления к одной переменной соответствующего типа.
AtomicReference
с нами начиная с Java 1.5, и он используется для
получения большей производительности, чем синхронизация (что обычно, но не всегда).
Когда вам нужно поделиться некоторыми данными между потоками, то вы должны защитить доступ к этой части данных.
Простейшим примером может быть увеличение некоторого int
: i = i + 1
.
Наш пример состоит на самом деле из 3 операций:
сначала мы читаем значение i
, затем прибавляем 1
к этому значению,
а в конце присваиваем i
вновь вычисленное значение.
В многопоточных приложениях может возникнуть ситуация,
когда каждый поток будет выполнять эти 3 шага между шагами других потоков,
и окончательное значение i
невозможно будет предсказать.
Обычно возникают мысли о механизме слова synchronised
или класса блокировки,
но с atomic.*
больше не нужно беспокоиться о явной синхронизации,
и вы можете использовать предоставленные служебные атомарные типы,
где автоматически включается проверка, выполняется ли операция за один шаг.
Возьмем в качестве примера AtomicInteger.incrementAndGet
:
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
С помощью операции compareAndSet
мы либо обновляем наши данные, либо терпим неудачу,
но мы никогда не заставляем поток ждать.
Таким образом, если compareAndSet
в incrementAndGet
не удается,
то мы просто пытаемся повторить всю операцию снова, извлекая текущее значение данных в get()
.
С другой стороны, с synchronized
-подобными механизмами нет ограничений на количество операторов,
которые можно выполнить, пока блокировка получена,
но этот блок никогда не выйдет из строя и может заставить вызывающий поток ждать,
обеспечивая вероятность взаимоблокировки или снижения производительности.
Теперь, разобрав основы, давайте перейдем к нашей первой мегазвезде параллелизма.
Ref
Cats Ref
очень похож на упомянутую выше атомарную ссылку Java,
основные отличия в том, что первое - Ref
используется с tagless final абстракцией F
,
второе - оно всегда содержит значение,
и значение, содержащееся в Ref
— типа A
, - это неизменяемое значение.
abstract class Ref[F[_], A]:
def get: F[A]
def set(a: A): F[Unit]
def modify[B](f: A => (A, B)): F[B]
// ... и т.д.
Ref[F[_], A]
является чисто функциональной изменяемой ссылкой:
- параллельной
- свободно блокируемой
- и всегда содержит значение
Он создается путем присвоения начального значения, и каждая операция оборачивается в F
, например, в cats.effect.IO
.
Если внимательно посмотреть на сопутствующий объект для Cats Ref
,
то увидим, что необходимо предоставить также Sync[F]
:
def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))
Приведенный выше метод является лишь примером многих операций, доступных в Ref
,
он используется для построения Ref
с начальным значением.
Sync
дает возможность приостановить любые побочные эффекты
с помощью своего метода delay
для каждой операции на Ref
.
Ref
довольно простая конструкция, мы можем сосредоточиться в основном на
ее методах get
, set
и of
, чтобы понять, как она работает.
get и set подход
Допустим, у нас есть объект (назовем его Shared
), который должен быть обновлен несколькими потоками.
И мы используем для этого методы get
и set
, создав служебный метод, который поможет нам на этом пути:
def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] =
for
sh <- trace.get()
_ <- trace.set(Shared(sh, msg))
yield ()
Наш объект Shared
можно создать,
применив для создания нового экземпляра его предыдущее состояние и новое значение.
Shared
на самом деле может быть всем, чем мы захотим: простым списком, ассоциативным массивом
или чем-то еще, к чему мы хотим получить одновременный безопасный доступ.
Я только что изобрел Shared(prev: Shared, msg: String)
для целей этого урока.
F
в примере выше был заменен на конкретный IO
из Cats effect,
но имейте в виду, что Ref
полиморфен на F
и может использоваться с другими библиотеками.
С нашей монадической IO
мы выполняем flatMap
на каждом шаге
и устанавливаем значение, хранящееся в нашей Ref
, на желаемое —
или... подождите, может быть, мы этого не делаем.
При таком подходе при одновременном вызове modifyShared
мы можем потерять обновления!
Это потому, что у нас может быть ситуация, когда, например,
два потока могут прочитать значение с помощью get
, и каждый из них будет выполнять set
одновременно.
Методы get
и set
не вызываются атомарно вместе.
Атомарный update
Конечно, мы можем улучшить приведенный выше пример и использовать другие доступные методы из Ref
.
Для выполнения get
и set
вместе можно использовать update
.
def update(f: A => A): F[Unit]
Это решит проблему с обновлением значения, но у простого update
есть свои недостатки.
Если мы хотим прочитать значение сразу после его обновления, аналогично тому, как мы использовали get
и set
,
то мы можем в конечном итоге прочитать устаревшие данные, скажем, наш Ref
будет содержать ссылку на простой Int
:
for
_ <- someRef.update(_ + 1)
curr <- someRef.get
_ <- IO { println(s"current value is $curr") }
yield ()
modify
в помощь
Мы можем немного это улучшить, используя modify
, который будет делать то же самое, что и update
,
но вернет обновленное значение для дальнейшего использования.
def modify[B](f: A => (A, B)): F[B] =
@tailrec
def spin: B =
val c = ar.get
val (u, b) = f(c)
if !ar.compareAndSet(c, u) then spin
else b
F.delay(spin)
Как видите, это почти точно такая же реализация,
как в примере AtomicInteger.incrementAndGet
, показанном вначале, только на Scala.
Можно заметить, что Ref
основан на AtomicReference
, чтобы делать свою работу.
Ограничения Ref
Вы, наверное, уже заметили, что в случае, если значение не удастся обновить,
то функция, переданная в update
/modify
,
должна запускаться недетерминировано и, возможно, запускаться несколько раз.
Хорошая новость заключается в том, что это решение в целом оказывается намного быстрее,
чем стандартный механизм блокировки и синхронизации,
и намного безопаснее, поскольку это решение не может блокироваться.
Как только мы узнали, как просто работает Ref
, то можем перейти к другому Cats Concurrent классу: Deferred
.
Deferred
В отличие от Ref
, Deferred
:
- создан пустой
- можно выполнить только один раз
- и однажды установленный, он не может быть изменен или снова стать пустым.
Эти свойства делают Deferred
простым и довольно интересным одновременно.
abstract class Deferred[F[_], A]:
def get: F[A]
def complete(a: A): F[Unit]
Deferred
предназначен для чисто функциональной синхронизации:
когда мы вызываем get
на пустом Deferred
, мы блокируем его до тех пор, пока значение не станет доступно.
Согласно документации самого класса блокировка:
упомянутая блокировка является только семантической, никакие фактические потоки реализацией не блокируются
Тот же вызов get
на непустом Deferred
немедленно вернет сохраненное значение.
Другой метод - complete
- установит значение, если экземпляр пуст,
но при вызове на непустом Deferred
вернет ошибку (failed IO в случае IO).
Здесь важно отметить, что Deferred
требует, чтобы F
был Concurrent
, а это означает, что его можно отменить.
Хороший пример использования Deferred
— это когда одна часть вашего приложения должна ждать другую.
Пример ниже взят из отличного выступления Fabio Labella на Scala Italy 2019 —
Composable Concurrency with Ref + Deferred, доступного на Vimeo.
def consumer(done: Deferred[IO, Unit]) =
for
c <- Consumer.setup
_ <- done.complete(())
msg <- c.read
_ <- IO(println(s"Received $msg"))
yield ()
def producer(done: Deferred[IO, Unit]) =
for
p <- Producer.setup()
_ <- done.get
msg = "Msg A"
_ <- p.write(msg)
_ <- IO(println(s"Sent $msg"))
yield ()
def prog =
for
d <- Deferred[IO, Unit]
_ <- consumer(d).start
_ <- producer(d).start
yield ()
В приведенном выше примере у нас есть производитель и потребитель,
и мы хотим, чтобы производитель дождался завершения настройки потребителя, прежде чем начать писать сообщения,
иначе все, что мы напишем в производителе, будет потеряно.
Чтобы преодолеть эту проблему, мы можем использовать общий экземпляр Deferred
и блокировать его get
до тех пор, пока Deferred
экземпляр done
не будет заполнен значением
на стороне потребителя (в данном случае простое Unit
значение ()
).
Конечно, приведенное выше решение не лишено проблем:
если установка потребителя никогда не завершается, мы застреваем в ожидании,
а производитель не может отправлять какие-либо сообщения.
Чтобы преодолеть это, мы можем использовать тайм-аут на get
,
а также использовать Either[Throwable, Unit]
или какую-либо другую конструкцию вместо простого Unit
внутри объекта Deferred
.
Deferred
довольно прост, но в сочетании с Ref
их можно использовать для
создания более сложных структур данных, таких как, например, семафоры.
Для получения дополнительной информации рекомендуется посетить саму документацию Cats, где вы можете узнать больше о параллелизме в Cats и структурах данных, которые он предоставляет.
Ссылки: