Параллелизм
Параллелизм с помощью чистых функций
Рассмотрим операцию суммирования чисел:
def sum(ints: Seq[Int]): Int =
ints.foldLeft(0)((a, b) => a + b)
Здесь используется последовательное сворачивание, когда мы проходим по всей коллекции элемент за элементом, каждый раз складывая текущий элемент к итоговому результату.
Вместо последовательного сворачивания можно было бы использовать алгоритм "разделяй и властвуй":
def sum(ints: IndexedSeq[Int]): Int =
if ints.length <= 1 then
ints.headOption.getOrElse(0)
else
val (l, r) = ints.splitAt(ints.length / 2)
sum(l) + sum(r)
sum(IndexedSeq(1, 2, 3, 4, 5))
// 15
Последовательность делится пополам с помощью функции splitAt
, рекурсивно суммируются обе половины,
а затем результаты объединяются.
И в отличие от реализации на основе foldLeft
,
реализацию "разделяй и властвуй" можно распараллелить — две половины можно суммировать параллельно.
Тип данных для параллельных вычислений
Для параллельных вычислений с помощью чистых функций можно определить следующий API:
def unit[A](a: A): Par[A]
def fork[A](a: => Par[A]): Par[A]
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
extension [A](pa: Par[A])
def run: A
def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C]
unit
"поднимает" константу до параллельных вычислений.fork
помечает для параллельного вычисления. Вычисление не произойдет, пока оно не будет принудительно запущено с помощьюrun
.lazyUnit
заключает свой невычисленный аргумент вPar
и помечает его для параллельного вычисления.run
извлекает значение изPar
, фактически выполняя вычисление.map2
объединяет результаты двух параллельных вычислений с помощью бинарной функции.
Тогда сумму можно реализовать так:
def sum(ints: IndexedSeq[Int]): Par[Int] =
if ints.size <= 1 then
Par.unit(ints.headOption.getOrElse(0))
else
val (l, r) = ints.splitAt(ints.size / 2)
Par.map2(sum(l), sum(r))(_ + _)
Законы
Для параллельных вычислений должен выполняться следующий закон:
unit(x).map(f) == unit(f(x))
Ссылки: