Pure functional HTTP APIs in Scala
В этом разделе приведен пример реализации на Scala 3 чистой имплементации http-сервиса из книги Grassel Jens - Pure functional HTTP APIs in Scala.
Для реализации будем использовать следующие библиотеки:
- http4s
- Doobie (в качестве базы данных)
- Flyway для миграции БД (или запуска скриптов)
- Circe для JSON кодеков
- Iron для использования уточняющих типов (вместо библиотеки refined, как в примере книги)
- PostgreSQL JDBC драйвер
- pureconfig (для загрузки файлов конфигурации)
В примерах используются Scala 3.3.0
и последние версии библиотек на июль 2023.
Спецификация сервиса
Сначала укажем точную область действия и API сервиса. Сервис должен предоставлять конечные точки HTTP API для:
- создания типа данных продукта, определяемого уникальным идентификатором
- добавления переводов имени продукта по коду языка и уникальному идентификатору
- возврата существующих переводов для продукта
- возврата списка всех существующих продуктов с их переводами
Модель данных
Определим модель очень простой, чтобы не переборщить с реализацией.
- Код языка определяется стандартом ISO 639-1 (например, двухбуквенный код).
- Перевод должен содержать код языка и название продукта (непустая строка).
- Продукт должен содержать уникальный идентификатор (UUID версии 4) и список переводов.
База данных
Данные будут храниться в реляционной базе данных (RDBMS). Поэтому нам нужно определить таблицы и отношения в базе данных.
Таблица продуктов
Таблица products
должна содержать только уникальный идентификатор, который также является первичным ключом.
Таблица имен
Таблица names
должна содержать столбец для идентификатора продукта, один для кода языка и один для имени.
Его первичный ключ представляет собой комбинацию идентификатора продукта и кода языка.
Все столбцы должны быть не нулевыми.
Отношение к продуктам реализуется ограничением внешнего ключа к таблице продуктов через идентификатор продукта.
HTTP API
HTTP API должен предоставлять следующие эндпоинты на заданных путях:
Path | HTTP method | Function |
---|---|---|
/products |
POST | Создание продукта |
/products |
GET | Получение всех продуктов и переводов |
/product/{UUID} |
PUT | Добавление перевода |
/product/{UUID} |
GET | Получение всех переводов для заданного продукта |
Данные должны быть закодированы в JSON с использованием следующей спецификации:
JSON для перевода:
{
"lang": "ISO-639-1 Code",
"name": "A non empty string."
}
JSON для продукта:
{
"id": "The-UUID-of-the-product",
"names": [ ... список переводов ... ]
}
Модели
Для начала реализуем простые и понятные модели. Нам нужен класс для хранения переводов или, лучше, одного перевода.
final case class Translation(lang: LanguageCode, name: ProductName)
В качестве типов параметров используются уточняющие типы, ограничивающие множество значений областью, которую считаем валидной. Подробнее об уточняющих типах и зачем они нужны рассказывается в статье. В данном случае используются такие типы:
type LanguageCode = String :| Match["^[a-z]{2}$"]
type ProductName = String :| Not[Blank]
В качестве LanguageCode
используется двухсимвольная строка латинского алфавита в нижнем регистре,
а в качестве ProductName
- непустая строка.
Они необходимы для того, чтобы нельзя было создавать Translation
, например, с name
равным null
или ""
.
Необходимые кодеки для кодирования и раскодирования в Json предоставляются библиотекой интеграции Iron с Circe
Теперь о модели продукта.
В качестве типа идентификатора используем уточняющий тип, соответствующий формату UUID.
А в качестве типа списка переводов - непустое множество NonEmptySet
из библиотеки cats.
type ProductId = String :| Match["^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"]
final case class Product(id: ProductId, names: NonEmptySet[Translation])
Тестирование
Отдельно остановимся на тестировании. В качестве тестового фреймворка можно выбрать любой фреймворк, я выбрал MUnit. Кроме того, используем интеграцию со ScalaCheck для покрытия большего количества вариантов.
Необходимо проверить декодирование Translation
со следующими кейсами:
decode[Translation](s).isLeft
, гдеs
- строка не JSON форматаdecodeAccumulating[Translation](json)
выдает заданный список ошибок (в данном случае"DecodingFailure at .lang: Should match ^[a-z]{2}$"
и"DecodingFailure at .name: !(Should only contain whitespaces)"
), гдеjson
имеет валидный формат, но невалидные значения, например,val json = """{"lang":"rus","name":""}"""
:lang
- невалидный код, например,rus
вместоru
, аname
- пустая строка.-
Представим, что есть две переменные
t: Translation
иval json = s"""{"lang":${t.lang.asJson.noSpaces},"name":${t.name.asJson.noSpaces}}"""
decode[Translation](json)
должен равнятьсяt
t.asJson.noSpaces
должен равнятьсяjson
- как следствие двух предыдущих:
decode[Translation](t.asJson.noSpaces)
должен равнятьсяt
Аналогично проверяется Product
:
decode[Product](s).isLeft
, гдеs
- строка не JSON форматаdecodeAccumulating[Product](json)
выдает заданный список ошибок, гдеjson
имеет валидный формат, но невалидные значения, например,val json = """{"id":"id12","names":[]}"""
:id
- произвольная строка, неподходящая по формату к UUID, аnames
- пустая коллекция.-
Представим, что есть две переменные
p: Product
иval json = s"""{"id":${p.id.asJson.noSpaces},"names":${p.names.asJson.noSpaces}}"""
decode[Product](json)
должен равнятьсяp
p.asJson.noSpaces
должен равнятьсяjson
- как следствие двух предыдущих:
decode[Product](p.asJson.noSpaces)
должен равнятьсяp
Обработка конфигурации
Теперь определимся с конфигурацией.
У нас будет два конфига: ApiConfig
- конфигурация для http API и DatabaseConfig
- для работы с БД.
ApiConfig
должен состоять из host
и port
, где host
- непустая строка, а port
- число от 1
до 65535
.
DatabaseConfig
должен состоять из driver
(непустая строка), url
(валидный url),
user
(непустая строка), pass
(непустая строка).
Вот как будет выглядеть код в Scala 3:
import io.github.iltotore.iron.*
import io.github.iltotore.iron.constraint.all.*
import pureconfig.*
import pureconfig.generic.derivation.default.*
type NonEmptyString = String :| Not[Blank]
type DatabaseUrl = String :|
Match["""(\b(https?|ftp|file)://)?[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"""]
type PortNumber = Int :| Interval.Closed[1, 65535]
given ConfigReader[NonEmptyString] =
ConfigReader.fromString[NonEmptyString](ConvertHelpers.optF(_.refineOption))
given ConfigReader[DatabaseUrl] =
ConfigReader.fromString[DatabaseUrl](ConvertHelpers.optF(_.refineOption))
given ConfigReader[PortNumber] =
ConfigReader.fromString[PortNumber](ConvertHelpers.optF(_.toIntOption.flatMap(_.refineOption)))
final case class ApiConfig(host: NonEmptyString, port: PortNumber) derives ConfigReader
final case class DatabaseConfig(
driver: NonEmptyString,
url: DatabaseUrl,
user: NonEmptyString,
pass: NonEmptyString
) derives ConfigReader
В первую очередь мы определяем три уточняющих типа: NonEmptyString
, DatabaseUrl
, PortNumber
.
Затем определяются конфиги ApiConfig
и DatabaseConfig
.
Конфиги будут читаться с помощью библиотеки pureconfig,
поэтому используем derives ConfigReader
- встроенную поддержку наследования классов типов в Scala 3.
К сожалению, на данный момент (июль 2023) для библиотеки iron
нет модуля взаимодействия с pureconfig,
поэтому необходимо дополнительно определить given ConfigReader[T]
для каждого уточняющего типа.
Теперь конфиги могут быть прочитаны следующим образом:
val apiConfig = ConfigFactory.parseString(s"""api{"host":"api.example.com","port":1234}""")
ConfigSource.fromConfig(apiConfig).at("api").load[ApiConfig]
// Right(ApiConfig(api.example.com,1234))
val databaseConfig = ConfigFactory.parseString(
s"""database {
| "driver": "org.postgresql.Driver",
| "url": "jdbc:postgresql://localhost:5422/test-database",
| "user": "pure",
| "pass": "password"
|}""".stripMargin
)
ConfigSource.fromConfig(databaseConfig).at("database").load[DatabaseConfig]
// Right(DatabaseConfig(org.postgresql.Driver,jdbc:postgresql://localhost:5422/test-database,pure,password))
Рассмотренный пример в Scastie
Тестирование
Для проверки корректности создания ApiConfig
необходимо рассмотреть 4 случая:
- входящая строка в формате Json пустая или невалидная -
"{}"
:ConfigSource.fromConfig(config).at("api").load[ApiConfig].isLeft
, гдеval config = ConfigFactory.parseString("{}")
host
- невалидный (пустая строка),port
- валидный (от1
до65535
): как в примере выше, толькоval config = ConfigFactory.parseString("""api{"host":"","port":1234}""")
host
- валидный (непустая строка),port
- невалидный (меньше1
или больше65535
): как в примере выше, толькоval config = ConfigFactory.parseString("""api{"host":"localhost","port":65536}""")
host
иport
валидные:ConfigSource.fromConfig(config).at("api").load[ApiConfig]
должен равнятьсяconfig: ApiConfig
, гдеval config = ConfigFactory.parseString(s"""api{"host":"${config.host}","port":${config.port}}""")
Аналогично добавляются тесты для DatabaseConfig
.
Уровень базы данных
Для простоты будем придерживаться Flyway для миграции базы данных. Напишем код миграции, используя шаблон интерпретатора (в Scala он стал известен под названием "tagless final").
trait DatabaseMigrator[F[_]]:
def migrate(url: DatabaseUrl, user: NonEmptyString, pass: NonEmptyString): F[Int]
Мы определяем трейт, который описывает функциональность, требуемую интерпретатором, и используем параметр высшего типа, чтобы иметь возможность абстрагироваться от типа. Теперь давайте продолжим с интерпретатором Flyway.
final class FlywayDatabaseMigrator extends DatabaseMigrator[IO]:
override def migrate(url: DatabaseUrl, user: NonEmptyString, pass: NonEmptyString): IO[Int] =
IO {
val flyway: Flyway = Flyway.configure()
.dataSource(url, user, pass)
.load()
flyway.migrate().migrationsExecuted
}
Doobie
Поскольку мы уже начали с использования "tagless final", мы могли бы продолжить и определить базу для нашего репозитория.
trait Repository[F[_]]:
def loadProduct(id: ProductId): F[Seq[(ProductId, LanguageCode, ProductName)]]
def loadProducts(): Stream[F, (ProductId, LanguageCode, ProductName)]
def saveProduct(p: Product): F[Int]
def updateProduct(p: Product): F[Int]
В этом примере используются уточняющие типы, кроме того, возвращаемый тип loadProducts
связан с fs2.Stream
,
потому что мы хотим добиться чисто функциональной потоковой передачи.
Итак, давайте посмотрим, как выглядит репозиторий, использующий doobie.
final class DoobieRepository[F[_]: Sync](tx: Transactor[F]) extends Repository[F]:
import DoobieRepository.given
override def loadProduct(id: ProductId): F[Seq[(ProductId, LanguageCode, ProductName)]] = ???
override def loadProducts(): Stream[F, (ProductId, LanguageCode, ProductName)] = ???
override def saveProduct(p: Product): F[Int] = ???
override def updateProduct(p: Product): F[Int] = ???
end DoobieRepository
object DoobieRepository:
given Read[(ProductId, LanguageCode, ProductName)] =
Read[(String, String, String)].map { case (x, y, z) => (x.refine, y.refine, z.refine) }
given Write[(ProductId, LanguageCode, ProductName)] =
Write[(String, String, String)].contramap(p => (p._1, p._2, p._3))
Метод получения продукта по заданному идентификатору выглядит так:
override def loadProduct(id: ProductId): F[Seq[(ProductId, LanguageCode, ProductName)]] =
sql"""SELECT products.id, names.lang_code, names.name
FROM products
JOIN names ON products.id = names.product_id
WHERE products.id = ${id.toString}"""
.query[(ProductId, LanguageCode, ProductName)]
.to[Seq]
.transact(tx)
Метод loadProduct
просто возвращает все строки для одного продукта из базы данных.
Параметр будет правильно интерполирован Doobie, поэтому здесь нам не нужно беспокоиться о SQL-инъекциях.
Указываем тип запроса, инструктируем Doobie преобразовать его в последовательность и отдаем транзактору.
Обратите внимание, что код в этот момент не запускается! Doobie просто предоставляет свободную структуру (читай бесплатные монады), которую можно интерпретировать позже.
Метод loadProducts
эквивалентен первому, но возвращает данные обо всех продуктах,
и в виде потока с использованием библиотеки fs2, обеспечивающей чистую функциональную потоковую передачу.
override def loadProducts(): Stream[F, (ProductId, LanguageCode, ProductName)] =
sql"""SELECT products.id, names.lang_code, names.name
FROM products
JOIN names ON products.id = names.product_id
ORDER BY products.id"""
.query[(ProductId, LanguageCode, ProductName)]
.stream
.transact(tx)
При сохранении продукта используется монадическая нотация для нашей программы, чтобы иметь короткое замыкание в случае сбоя. Doobie также поместит все команды в транзакцию базы данных. Сама функция попытается создать "главную" запись в таблице товаров и впоследствии сохранить все переводы.
override def saveProduct(p: Product): F[Int] =
val namesSql = "INSERT INTO names (product_id, lang_code, name) VALUES (?, ?, ?)"
val namesValues = p.names.toNonEmptyList.map(t => (p.id, t.lang, t.name))
val program = for
pi <- sql"INSERT INTO products (id) VALUES(${p.id.toString})".update.run
ni <- Update[(ProductId, LanguageCode, ProductName)](namesSql).updateMany(namesValues)
yield pi + ni
program.transact(tx)
Метод updateProduct
также использует монадическую нотацию, как и метод saveProduct
.
Разница в том, что сначала он удаляет все известные переводы, прежде чем сохранить заданные.
override def updateProduct(p: Product): F[Int] =
val namesSql = "INSERT INTO names (product_id, lang_code, name) VALUES (?, ?, ?)"
val namesValues = p.names.toNonEmptyList.map(t => (p.id, t.lang, t.name))
val program = for
dl <- sql"DELETE FROM names WHERE product_id = ${p.id.toString}".update.run
ts <- Update[(ProductId, LanguageCode, ProductName)](namesSql).updateMany(namesValues)
yield dl + ts
program.transact(tx)
http4s routes
Определяем следующую маршрутизацию для проекта:
final class ProductRoutes[F[_]: Concurrent](repo: Repository[F]) extends Http4sDsl[F]:
val routes: HttpRoutes[F] = HttpRoutes.of[F]:
case GET -> Root / "product" / UUIDVar(id) => ???
case req @ PUT -> Root / "product" / UUIDVar(id) => ???
final class ProductsRoutes[F[_]: Concurrent](repo: Repository[F]) extends Http4sDsl[F]:
val routes: HttpRoutes[F] = HttpRoutes.of[F]:
case GET -> Root / "products" => ???
case req @ POST -> Root / "products" => ???
Как видно, DSL ближе к синтаксису Scala и довольно легко читается. Можно было бы привязать роуты к IO, но желательно иметь больше гибкости и определить более абстрактную структуру.
Рассмотрим реализацию роутов для продукта:
final class ProductRoutes[F[_]: Concurrent](repo: Repository[F]) extends Http4sDsl[F]:
given EntityDecoder[F, Product] = jsonOf
val routes: HttpRoutes[F] = HttpRoutes.of[F]:
case GET -> Root / "product" / UUIDVar(id) =>
for
rows <- repo.loadProduct(id.toString.refine)
resp <- Product.fromDatabase(rows).fold(NotFound())(p => Ok(p.asJson))
yield resp
case req @ PUT -> Root / "product" / UUIDVar(id) =>
req
.as[Product]
.flatMap: p =>
for
cnt <- repo.updateProduct(p)
res <- cnt match
case 0 => NotFound()
case _ => NoContent()
yield res
.handleErrorWith { case _: InvalidMessageBodyFailure =>
BadRequest()
}
end ProductRoutes
Сначала нам нужно включить кодеки JSON в область видимости для http4, такие как EntityDecoder
.
В маршруте для обновления продукта (PUT
) мы просто загружаем строки базы данных,
которые передаем через вспомогательную функцию, чтобы создать правильный продукт и вернуть его.
Маршрут обновления (через PUT
) преобразует тело запроса в Product
и передает его функции обновления репозитория.
В случае успешного обновления возвращается ответ NoContent
, не успешного - NotFound
.
В маршруте для получения продукта по идентификатору передаем идентификатор в БД и анализируем результат.
Если продукт не найден, то возвращаем NotFound
, найден - Ok(p.asJson)
- найденный продукт в Json формате.
Тестирование роутов
Для роутов продукта необходимо проверить следующее:
- если продукта не существует, то возвращается статус
Status.NotFound
с пустым телом ответа - если продукт существует, то возвращается статус
Status.Ok
, где в теле передается json-продукта - при обновлении продукта с невалидным телом запроса возвращается статус
Status.BadRequest
с пустым телом ответа - при обновлении продукта с валидным телом запроса,
но несуществующим идентификатором продукта, возвращается статус
Status.NotFound
с пустым телом ответа - при обновлении продукта с валидным телом запроса и существующим идентификатором продукта,
возвращается статус
Status.NoContent
с пустым телом ответа GET
послеPUT
должен возвращать обновленный продукт
Маршрут продуктов:
final class ProductsRoutes[F[_]: Concurrent](repo: Repository[F]) extends Http4sDsl[F]:
given EntityDecoder[F, Product] = jsonOf
val routes: HttpRoutes[F] = HttpRoutes.of[F]:
case GET -> Root / "products" =>
val prefix = Stream.eval("[".pure[F])
val suffix = Stream.eval("]".pure[F])
val ps = repo
.loadProducts()
.groupAdjacentBy(_._1)
.map: (id, rows) =>
Product.fromDatabase(rows.toList)
.collect { case Some(p) => p }
.map(_.asJson.noSpaces)
.intersperse(",")
@SuppressWarnings(Array("org.wartremover.warts.Any"))
val result: Stream[F, String] = prefix ++ ps ++ suffix
Ok(result)
case req @ POST -> Root / "products" =>
req
.as[Product]
.flatMap: p =>
for
cnt <- repo.saveProduct(p)
res <- cnt match
case 0 => InternalServerError()
case _ => NoContent()
yield res
.handleErrorWith { case _: InvalidMessageBodyFailure =>
BadRequest()
}
end ProductsRoutes
Для маршрута продуктов, опять же, нужны контекстные параметры для кодеков JSON,
чтобы иметь возможность сериализовать и десериализовать наши объекты.
Маршрут POST
для создания продукта в основном такой же, как маршрут обновления из предыдущей части.
Мы создаем Product
из тела запроса, передаем его в функцию сохранения репозитория
и возвращаем ответ: NoContent
- если сохранение в репозитории завершилось успешно, InternalServerError
- в противном случае.
Маршрут GET
для возврата всех продуктов вызывает соответствующую функцию репозитория,
возвращающую поток, который мы отображаем с помощью вспомогательной функции.
После этого мы используем collect
для преобразования потока из Option[Product]
в поток Product
,
который мы передаем функции Ok
из http4s
.
Тестирование роутов
Для роутов продуктов необходимо проверить следующее:
- при запросе продуктов, если продуктов не существует, то возвращается статус
Status.Ok
с пустым списком продуктов - при запросе продуктов, если продукты есть, то возвращается статус
Status.Ok
со списком продуктов - при добавлении продукта с невалидным телом запроса возвращается статус
Status.BadRequest
с пустым телом ответа - при добавлении продукта с валидным телом запроса возвращается статус
Status.NotFound
с пустым телом ответа - при добавлении продукта с валидным телом запроса, но он не может быть сохранен,
возвращается статус
Status.InternalServerError
с пустым телом ответа GET
послеPOST
должен возвращать добавленные продукты
Запуск приложения
В нашей основной точке входа мы просто инициализируем все необходимые компоненты и соединяем их вместе.
object Pure extends IOApp:
def run(args: List[String]): IO[ExitCode] =
val migrator: DatabaseMigrator[IO] = new FlywayDatabaseMigrator
val configsIO =
for
cfg <- IO(ConfigFactory.load(getClass.getClassLoader))
apiConfig <- loadConfig[ApiConfig](cfg, "api")
dbConfig <- loadConfig[DatabaseConfig](cfg, "database")
yield (apiConfig, dbConfig)
...
private def loadConfig[A: ConfigReader](cfg: Config, namespace: String): IO[A] =
val result = ConfigSource.fromConfig(cfg).at(namespace).load[A]
IO.fromEither(result.left.map(error => new IllegalArgumentException(error.prettyPrint())))
Вначале вычисляются заданные конфиги. После успешной загрузки конфигурации мы продолжаем миграцию базы данных. Наконец, мы создаем транзактор, необходимый Doobie, и репозиторий базы данных.
...
val program =
for
configs <- configsIO
(apiConfig, dbConfig) = configs
_ <- migrator.migrate(dbConfig.url, dbConfig.user, dbConfig.pass)
host <- IO.fromOption(Host.fromString(apiConfig.host))(
new IllegalArgumentException("Invalid host")
)
port <- IO.fromOption(Port.fromInt(apiConfig.port.toInt))(
new IllegalArgumentException("Invalid port")
)
yield
val tx = Transactor.fromDriverManager[IO](
driver = dbConfig.driver.toString,
url = dbConfig.url.toString,
user = dbConfig.user.toString,
password = dbConfig.pass.toString,
logHandler = None
)
val repo = new DoobieRepository(tx)
val productRoutes = new ProductRoutes(repo)
val productsRoutes = new ProductsRoutes(repo)
val routes = productRoutes.routes <+> productsRoutes.routes
val httpApp = Router("/" -> routes).orNotFound
val server = EmberServerBuilder
.default[IO]
.withHost(host)
.withPort(port)
.withHttpApp(httpApp)
server.build.use(_ => IO(StdIn.readLine())).as(ExitCode.Success)
...
Выше мы создаем маршруты через классы, комбинируем их (с помощью оператора <+>
) и создаем приложение http4s
,
явно используя ввод-вывод, таким образом связывая наши абстрактные маршруты с вводом-выводом.
Служба будет работать до тех пор, пока вы не нажмете Enter.
...
program.attempt.unsafeRunSync() match
case Left(e) =>
IO {
println("*** An error occured! ***")
if e ne null then println(e.getMessage)
ExitCode.Error
}
case Right(r) => r
Мы пытаемся запустить нашу программу и выполнить возможные побочные эффекты с помощью метода unsafeRunSync
из Cats effects.
Но чтобы обеспечить корректный тип возвращаемого значения для IOApp
, нам нужно оценить возвращаемое значение,
которое является либо ошибкой, либо правильным кодом выхода.
В случае ошибки мы выводим ее на консоль (здесь нет логирования) и явно устанавливаем код ошибки в качестве возвращаемого значения.
Обратите внимание, что мы также заключаем наш обработчик ошибок в IO, чтобы отсрочить возможные побочные эффекты.
Ссылки:
- Исходный код разобранных примеров на Scala 3
- Grassel Jens - Pure functional HTTP APIs in Scala: