Параллелизм
Для написания параллельных приложений на Scala, можно использовать нативный поток Java, но Scala Future предлагает более высокий уровень и идиоматический подход.
Введение
Вот описание Scala Future
из его Scaladoc:
"Future
представляет собой значение, которое может быть или не быть доступным в настоящее время,
но будет доступно в какой-то момент, или исключение, если это значение не может быть сделано доступным".
Чтобы продемонстрировать, что это значит, сначала рассмотрим однопоточное программирование. В однопоточном мире результат вызова метода привязывается к переменной следующим образом:
def aShortRunningTask(): Int = 42
val x = aShortRunningTask()
В этом коде значение 42
сразу привязывается к x
.
При работе с Future
процесс назначения выглядит примерно так:
def aLongRunningTask(): Future[Int] = ???
val x = aLongRunningTask()
Но главное отличие в этом случае заключается в том, что, поскольку aLongRunningTask
возвращает неопределенное время,
значение x
может быть доступно или недоступно в данный момент, но оно будет доступно в какой-то момент — в будущем.
Другой способ взглянуть на это с точки зрения блокировки.
В этом однопоточном примере оператор println
не печатается до тех пор,
пока не завершится выполнение aShortRunningTask
:
def aShortRunningTask(): Int =
Thread.sleep(500)
42
val x = aShortRunningTask()
println("Here")
И наоборот, если aShortRunningTask
создается как Future
, оператор println
печатается почти сразу,
потому что aShortRunningTask
порождается в другом потоке — он не блокируется.
В этой главе будет рассказано, как использовать Future
,
в том числе как запускать несколько Future
параллельно и объединять их результаты в выражении for
.
Также будут показаны примеры методов, которые используются для обработки значения Future
после его возврата.
ОFuture
, важно знать, что они задуманы как одноразовая конструкция "Обработайте это относительно медленное вычисление в каком-нибудь другом потоке и верните мне результат, когда закончите". В отличие от этого, акторы Akka предназначены для работы в течение длительного времени и отвечают на множество запросов в течение своей жизни. В то время как субъект может жить вечно,Future
в конечном итоге содержит результат вычисления, которое выполнялось только один раз.
Пример в REPL
Future
используется для создания временного кармана параллелизма.
Например, можно использовать Future
, когда нужно вызвать алгоритм,
который выполняется неопределенное количество времени — например, вызов удаленного микросервиса, —
поэтому его желательно запустить вне основного потока.
Чтобы продемонстрировать, как это работает, начнем с примера Future
в REPL.
Во-первых, вставим необходимые инструкции импорта:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
Теперь можно создать Future
. Для этого примера сначала определим долговременный однопоточный алгоритм:
def longRunningAlgorithm() =
Thread.sleep(10_000)
42
Этот причудливый алгоритм возвращает целочисленное значение 42
после десятисекундной задержки.
Теперь вызовем этот алгоритм, поместив его в конструктор Future
и присвоив результат переменной:
scala> val eventualInt = Future(longRunningAlgorithm())
eventualInt: scala.concurrent.Future[Int] = Future(<not completed>)
Вычисления начинают выполняться после вызова longRunningAlgorithm()
.
Если сразу проверить значение переменной eventualInt
, то можно увидеть, что Future
еще не завершен:
scala> eventualInt
val res1: scala.concurrent.Future[Int] = Future(<not completed>)
Но если проверить через десять секунд еще раз, то можно увидеть, что оно выполнено успешно:
scala> eventualInt
val res2: scala.concurrent.Future[Int] = Future(Success(42))
Хотя это относительно простой пример, он демонстрирует основной подход:
просто создайте новое Future
с помощью своего долговременного алгоритма.
Одна вещь, на которую следует обратить внимание - это то, что ожидаемый результат 42
обернут в Success
,
который обернут в Future
.
Это ключевая концепция для понимания: значение Future
всегда является
экземпляром одного из scala.util.Try
: Success
или Failure
.
Поэтому, при работе с результатом Future
, используются обычные методы обработки Try
.
Использование map с Future
Future
имеет метод map
, который используется точно так же, как метод map
для коллекций.
Вот как выглядит результат, при вызове map
сразу после создания переменной eventualInt
:
scala> val a = eventualInt.map(_ * 2)
a: scala.concurrent.Future[Int] = Future(<not completed>)
Как показано, для Future
, созданного с помощью longRunningAlgorithm
,
первоначальный вывод показывает Future(<not completed>)
.
Но если проверить значение a
через десять секунд, то можно увидеть, что оно содержит ожидаемый результат 84
:
scala> a
res1: scala.concurrent.Future[Int] = Future(Success(84))
Еще раз, успешный результат обернут внутри Success
и Future
.
Использование методов обратного вызова с Future
В дополнение к функциям высшего порядка, таким как map
, с Future
также можно использовать методы обратного вызова.
Одним из часто используемых методов обратного вызова является onComplete
, принимающий частичную функцию,
в которой обрабатываются случаи Success
и Failure
:
eventualInt.onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => e.printStackTrace
}
Если вставить этот код в REPL, то в конечном итоге придет результат:
Got the callback, value = 42
Другие методы Future
Класс Future
имеет некоторые методы, которые можно найти в классах коллекций Scala, в том числе:
filter
flatMap
map
Методы обратного вызова:
onComplete
andThen
foreach
Другие методы трансформации:
fallbackTo
recover
recoverWith
См. страницу "Futures and Promises"
для обсуждения дополнительных методов, доступных для Future
.
Запуск нескольких Future и объединение результатов
Чтобы запустить несколько вычислений параллельно и соединить их результаты после завершения всех Future
,
можно использовать выражение for
.
Правильный подход такой:
- Запустить вычисления, которые возвращают
Future
результаты - Объединить их результаты в выражении
for
- Извлечь объединенный результат, используя
onComplete
или аналогичный метод
Пример
Рассмотрим следующий пример.
Ключевой момент - сначала запускаются вычисления, возвращающие Future
, а затем они объединяются в выражении for
:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
val startTime = System.currentTimeMillis()
def delta() = System.currentTimeMillis() - startTime
def sleep(millis: Long) = Thread.sleep(millis)
@main def multipleFutures1 =
println(s"creating the futures: ${delta()}")
// (1) запуск вычислений, возвращающих Future
val f1 = Future { sleep(800); 1 } // в конце концов возвращается 1
val f2 = Future { sleep(200); 2 } // в конце концов возвращается 2
val f3 = Future { sleep(400); 3 } // в конце концов возвращается 3
// (2) объединение нескольких Future в выражении `for`
val result =
for
r1 <- f1
r2 <- f2
r3 <- f3
yield
println(s"in the 'yield': ${delta()}")
(r1 + r2 + r3)
// (3) обработка результата
result.onComplete {
case Success(x) =>
println(s"in the Success case: ${delta()}")
println(s"result = $x")
case Failure(e) =>
e.printStackTrace
}
println(s"before the 'sleep(3000)': ${delta()}")
// важно для небольшой параллельной демонстрации: не глушить jvm
sleep(3000)
После запуска этого приложения, вывод выглядит следующим образом:
creating the futures: 1
before the 'sleep(3000)': 2
in the 'yield': 806
in the Success case: 806
result = 6
Как показывает вывод, Future
создаются очень быстро,
и всего за две миллисекунды достигается оператор печати непосредственно перед операцией sleep(3000)
в конце метода.
Весь этот код выполняется в основном потоке JVM.
Затем, через 806 мс, три Future
завершаются, и выполняется код в блоке yield
.
Затем код немедленно переходит к успешному завершению в методе onComplete
.
Вывод 806 мс является ключом к тому, чтобы убедиться, что три вычисления выполняются параллельно.
Если бы они выполнялись последовательно, общее время составило бы около 1400 мс —
сумма времени ожидания трех вычислений.
Но поскольку они выполняются параллельно, общее время чуть больше,
чем у самого продолжительного вычисления f1
, которое составляет 800 мс.
Обратите внимание, что если бы вычисления выполнялись в выражении for
,
они выполнялись бы последовательно, а не параллельно:
// последовательное выполнение (не параллельно!)
for
r1 <- Future { sleep(800); 1 }
r2 <- Future { sleep(200); 2 }
r3 <- Future { sleep(400); 3 }
yield
r1 + r2 + r3
Итак, если необходимо, чтобы вычисления выполнялись параллельно, не забудьте запустить их вне выражения for
.
Метод, возвращающий Future
Было показано, как передавать однопоточный алгоритм в конструктор Future
.
Ту же технику можно использовать для создания метода, который возвращает Future
:
// моделируем медленно работающий метод
def slowlyDouble(x: Int, delay: Long): Future[Int] = Future {
sleep(delay)
x * 2
}
Как и в предыдущих примерах, достаточно просто присвоить результат вызова метода новой переменной.
Тогда, если сразу проверить результат, то можно увидеть, что он не завершен,
но по истечении времени задержки в Future
результат будет выдан:
scala> val f = slowlyDouble(2, 5_000L)
val f: concurrent.Future[Int] = Future(<not completed>)
scala> f
val res0: concurrent.Future[Int] = Future(<not completed>)
scala> f
val res1: concurrent.Future[Int] = Future(Success(4))
Ключевые моменты о Future
Подводя итог, несколько ключевых моментов о Future
:
Future
создается для запуска задач вне основного потокаFuture
предназначены для одноразовых, потенциально длительных параллельных задач, которые в конечном итоге возвращают значение; они создают временный карман параллелизмаFuture
начинает работать в момент построения- преимущество
Future
над потоками заключается в том, что они работают с выражениямиfor
и имеют множество методов обратного вызова, упрощающих процесс работы с параллельными потоками - при работе с
Future
не нужно беспокоиться о низкоуровневых деталях управления потоками - результат
Future
обрабатывается с помощью методов обратного вызова, таких какonComplete
иandThen
, или методов преобразования, таких какfilter
,map
и т.д. - значение внутри
Future
всегда является экземпляром одного из типовTry
:Success
илиFailure
- при использовании нескольких
Future
для получения одного результата, они объединяются в выраженииfor
Кроме того, как было видно по операторам import, Scala Future
зависит от ExecutionContext
.
Дополнительные сведения о Future
см. в статье Futures and Promises,
в которой обсуждаются futures, promises и execution contexts.
В ней также обсуждается, как выражение for
транслируется в операцию flatMap
.
Ссылки: