Параллелизм

Для написания параллельных приложений на Scala, можно использовать нативный поток Java, но Scala Future предлагает более высокий уровень и идиоматический подход.

Введение

Вот описание Scala Future из его Scaladoc:

"Future представляет собой значение, которое может быть или не быть доступным в настоящее время, но будет доступно в какой-то момент, или исключение, если это значение не может быть сделано доступным".

Чтобы продемонстрировать, что это значит, сначала рассмотрим однопоточное программирование. В однопоточном мире результат вызова метода привязывается к переменной следующим образом:

def aShortRunningTask(): Int = 42
val x = aShortRunningTask()

В этом коде значение 42 сразу привязывается к x.

При работе с Future процесс назначения выглядит примерно так:

def aLongRunningTask(): Future[Int] = ???
val x = aLongRunningTask()

Но главное отличие в этом случае заключается в том, что, поскольку aLongRunningTask возвращает неопределенное время, значение x может быть доступно или недоступно в данный момент, но оно будет доступно в какой-то момент — в будущем.

Другой способ взглянуть на это с точки зрения блокировки. В этом однопоточном примере оператор println не печатается до тех пор, пока не завершится выполнение aShortRunningTask:

def aShortRunningTask(): Int =
  Thread.sleep(500)
  42
val x = aShortRunningTask()
println("Here")

И наоборот, если aShortRunningTask создается как Future, оператор println печатается почти сразу, потому что aShortRunningTask порождается в другом потоке — он не блокируется.

В этой главе будет рассказано, как использовать Future, в том числе как запускать несколько Future параллельно и объединять их результаты в выражении for. Также будут показаны примеры методов, которые используются для обработки значения Future после его возврата.

О Future, важно знать, что они задуманы как одноразовая конструкция "Обработайте это относительно медленное вычисление в каком-нибудь другом потоке и верните мне результат, когда закончите". В отличие от этого, акторы Akka предназначены для работы в течение длительного времени и отвечают на множество запросов в течение своей жизни. В то время как субъект может жить вечно, Future в конечном итоге содержит результат вычисления, которое выполнялось только один раз.

Пример в REPL

Future используется для создания временного кармана параллелизма. Например, можно использовать Future, когда нужно вызвать алгоритм, который выполняется неопределенное количество времени — например, вызов удаленного микросервиса, — поэтому его желательно запустить вне основного потока.

Чтобы продемонстрировать, как это работает, начнем с примера Future в REPL. Во-первых, вставим необходимые инструкции импорта:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

Теперь можно создать Future. Для этого примера сначала определим долговременный однопоточный алгоритм:

def longRunningAlgorithm() =
  Thread.sleep(10_000)
  42

Этот причудливый алгоритм возвращает целочисленное значение 42 после десятисекундной задержки. Теперь вызовем этот алгоритм, поместив его в конструктор Future и присвоив результат переменной:

scala> val eventualInt = Future(longRunningAlgorithm())
eventualInt: scala.concurrent.Future[Int] = Future(<not completed>)

Вычисления начинают выполняться после вызова longRunningAlgorithm(). Если сразу проверить значение переменной eventualInt, то можно увидеть, что Future еще не завершен:

scala> eventualInt
val res1: scala.concurrent.Future[Int] = Future(<not completed>)

Но если проверить через десять секунд еще раз, то можно увидеть, что оно выполнено успешно:

scala> eventualInt
val res2: scala.concurrent.Future[Int] = Future(Success(42))

Хотя это относительно простой пример, он демонстрирует основной подход: просто создайте новое Future с помощью своего долговременного алгоритма.

Одна вещь, на которую следует обратить внимание - это то, что ожидаемый результат 42 обернут в Success, который обернут в Future. Это ключевая концепция для понимания: значение Future всегда является экземпляром одного из scala.util.Try: Success или Failure. Поэтому, при работе с результатом Future, используются обычные методы обработки Try.

Использование map с Future

Future имеет метод map, который используется точно так же, как метод map для коллекций. Вот как выглядит результат, при вызове map сразу после создания переменной eventualInt:

scala> val a = eventualInt.map(_ * 2)
a: scala.concurrent.Future[Int] = Future(<not completed>)

Как показано, для Future, созданного с помощью longRunningAlgorithm, первоначальный вывод показывает Future(<not completed>). Но если проверить значение a через десять секунд, то можно увидеть, что оно содержит ожидаемый результат 84:

scala> a
res1: scala.concurrent.Future[Int] = Future(Success(84))

Еще раз, успешный результат обернут внутри Success и Future.

Использование методов обратного вызова с Future

В дополнение к функциям высшего порядка, таким как map, с Future также можно использовать методы обратного вызова. Одним из часто используемых методов обратного вызова является onComplete, принимающий частичную функцию, в которой обрабатываются случаи Success и Failure:

eventualInt.onComplete {
  case Success(value) => println(s"Got the callback, value = $value")
  case Failure(e) => e.printStackTrace
}

Если вставить этот код в REPL, то в конечном итоге придет результат:

Got the callback, value = 42

Другие методы Future

Класс Future имеет некоторые методы, которые можно найти в классах коллекций Scala, в том числе:

Методы обратного вызова:

Другие методы трансформации:

См. страницу "Futures and Promises" для обсуждения дополнительных методов, доступных для Future.

Запуск нескольких Future и объединение результатов

Чтобы запустить несколько вычислений параллельно и соединить их результаты после завершения всех Future, можно использовать выражение for.

Правильный подход такой:

  1. Запустить вычисления, которые возвращают Future результаты
  2. Объединить их результаты в выражении for
  3. Извлечь объединенный результат, используя onComplete или аналогичный метод

Пример

Рассмотрим следующий пример. Ключевой момент - сначала запускаются вычисления, возвращающие Future, а затем они объединяются в выражении for:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

val startTime = System.currentTimeMillis()
def delta() = System.currentTimeMillis() - startTime
def sleep(millis: Long) = Thread.sleep(millis)

@main def multipleFutures1 =

  println(s"creating the futures:   ${delta()}")

  // (1) запуск вычислений, возвращающих Future
  val f1 = Future { sleep(800); 1 }   // в конце концов возвращается 1
  val f2 = Future { sleep(200); 2 }   // в конце концов возвращается 2
  val f3 = Future { sleep(400); 3 }   // в конце концов возвращается 3

  // (2) объединение нескольких Future в выражении `for`
  val result =
    for
      r1 <- f1
      r2 <- f2
      r3 <- f3
    yield
      println(s"in the 'yield': ${delta()}")
      (r1 + r2 + r3)

  // (3) обработка результата
  result.onComplete {
    case Success(x) =>
      println(s"in the Success case: ${delta()}")
      println(s"result = $x")
    case Failure(e) =>
      e.printStackTrace
  }

  println(s"before the 'sleep(3000)': ${delta()}")

  // важно для небольшой параллельной демонстрации: не глушить jvm
  sleep(3000)

После запуска этого приложения, вывод выглядит следующим образом:

creating the futures:   1
before the 'sleep(3000)': 2
in the 'yield': 806
in the Success case: 806
result = 6

Как показывает вывод, Future создаются очень быстро, и всего за две миллисекунды достигается оператор печати непосредственно перед операцией sleep(3000) в конце метода. Весь этот код выполняется в основном потоке JVM. Затем, через 806 мс, три Future завершаются, и выполняется код в блоке yield. Затем код немедленно переходит к успешному завершению в методе onComplete.

Вывод 806 мс является ключом к тому, чтобы убедиться, что три вычисления выполняются параллельно. Если бы они выполнялись последовательно, общее время составило бы около 1400 мс — сумма времени ожидания трех вычислений. Но поскольку они выполняются параллельно, общее время чуть больше, чем у самого продолжительного вычисления f1, которое составляет 800 мс.

Обратите внимание, что если бы вычисления выполнялись в выражении for, они выполнялись бы последовательно, а не параллельно:

// последовательное выполнение (не параллельно!)
for
  r1 <- Future { sleep(800); 1 }
  r2 <- Future { sleep(200); 2 }
  r3 <- Future { sleep(400); 3 }
yield
  r1 + r2 + r3

Итак, если необходимо, чтобы вычисления выполнялись параллельно, не забудьте запустить их вне выражения for.

Метод, возвращающий Future

Было показано, как передавать однопоточный алгоритм в конструктор Future. Ту же технику можно использовать для создания метода, который возвращает Future:

// моделируем медленно работающий метод
def slowlyDouble(x: Int, delay: Long): Future[Int] = Future {
  sleep(delay)
  x * 2
}

Как и в предыдущих примерах, достаточно просто присвоить результат вызова метода новой переменной. Тогда, если сразу проверить результат, то можно увидеть, что он не завершен, но по истечении времени задержки в Future результат будет выдан:

scala> val f = slowlyDouble(2, 5_000L)
val f: concurrent.Future[Int] = Future(<not completed>)

scala> f
val res0: concurrent.Future[Int] = Future(<not completed>)

scala> f
val res1: concurrent.Future[Int] = Future(Success(4))

Ключевые моменты о Future

Подводя итог, несколько ключевых моментов о Future:

Кроме того, как было видно по операторам import, Scala Future зависит от ExecutionContext.

Дополнительные сведения о Future см. в статье Futures and Promises, в которой обсуждаются futures, promises и execution contexts. В ней также обсуждается, как выражение for транслируется в операцию flatMap.


Ссылки: