Don't Repeat Yourself

Don't Repeat Yourself (DRY) is a principle of software development aimed at reducing repetition of all kinds. -- wikipedia

IO モナドを使った Web アプリケーションの構築

Scala の記事を書くのは地味に初めてかもしれません.今回は,Scala の cats-effect というライブラリの中にある IO モナドを使って Web アプリケーションを構築してみたいと思ってやってみたので,その話を簡単にメモしておきます.

cats とは?

Scala 界隈だとかなり有名です.Scalaz か cats を選ぶことが多いと思います.Scalaz も大変すばらしいライブラリですが,今回は cats の話を少しします.

cats は関数型プログラミングScala で行うために必要な抽象度の高い関数セットを提供するライブラリです.抽象度の高い関数セットとはどういうことか…私なりの理解で行くと,通常 ScalaList[A]Option[A] などには flatMap という関数がついていますよね.あれはあくまで,個別具体のモナドに対する個別実装がなされています.一方で,flatMap という命令は,実は型にすることで抽象化が可能です.具体的には trait FlatMap[F[_]] とすることで可能です.

F[_] というのは Scala では型構築子と呼ばれ,この中にはたとえば OptionFutureList などの,F[_] となりうる条件を満たす型を入れることができます.つまり,FlatMap[F[_]] は,Option などにミックスインしていくことによって,flatMap ,命令を付け足していくことができる.FlatMap[F[_]] として抽象化されているので,自身で独自のモナドを作ることもできます.そのための道具を提供しているライブラリである,ということです.

また,これらの抽象的なパーツを存分に組み合わせたモナド変換子なども提供されており,非常に強力な関数型プログラミング向けのライブラリです.

typelevel.org

cats-effect の IO モナドとは?

IOFuture は,Future が即時評価なのに対して,IO は遅延評価を行うという点で異なります.遅延評価なのでスタックセーフですね.

もちろんモナドなので,Future と同じようにコンビネータでどんどんつなげていくことができます.詳しい機能は下記の公式ドキュメントがあるのでそちらもご覧ください.

typelevel.org

私が個人的に好きなのは,スレッドの切り替えを明示できる関数 contextShift をもっていることです.これを明示的に書くことができるのはわかりやすくていいですね.

なお,Future から IO への切り替えは IO#fromFuture という関数でできます.なので,Future を使っているライブラリの関数も IO に切り替えて使用することができます.

IO モナドの中身をちょっと見てみる

IO モナドの中身ですが,次のような代数的データ型をもっており,各々のケースクラスに応じて中身の評価が遅延で (Pure の場合は即時で) 走ります.

  private[effect] final case class Pure[+A](a: A)
    extends IO[A]
  private[effect] final case class Delay[+A](thunk: () => A)
    extends IO[A]
  private[effect] final case class RaiseError(e: Throwable)
    extends IO[Nothing]
  private[effect] final case class Suspend[+A](thunk: () => IO[A])
    extends IO[A]
  private[effect] final case class Bind[E, +A](source: IO[E], f: E => IO[A])
    extends IO[A]
  private[effect] final case class Async[+A](
    k: (IOConnection, Either[Throwable, A] => Unit) => Unit)
    extends IO[A]

IORunLoop#loop という関数の中で,各代数的データ型に対する処理が走ります.

IO モナドと Web アプリケーションをつなげるには - http4s

多くのライブラリでは,Future を使用しています.たとえば Akka-HTTP は scala.concurrent.Future を使用しています.

IO には IOApp という,IO ベースでランタイムを起動できるトレイトが用意されていて,それを使用すると綺麗に実装を書ききることができるんですね.

上の IO#fromFuture を使って Akka-HTTP と IO をつなげてみようと苦心したんですが,やり方がよくわからず諦めました.

そのかわりに今回は http4s というライブラリを使用することにします.このライブラリは,IO を使って処理をつなげていくことを前提として設計されており,cats フレンドリーを謳っているだけあって,cats と組み合わせて使うと非常に心地がよかったです.前置きが長くなりましたが実装していきましょう.

http4s | http4s

実装してみる

要件

シンプルに行きましょう.使い方を見てみたいだけなので.

  • とりあえずヘルスチェックを返すエンドポイントをもったアプリケーションを作る

構成

次のような構成を取ります.

  • Bootstrap: いわゆる main 関数.
  • EndpointProvider: エンドポイントの設定をまとめあげるトレイト
  • Endpoint: エンドポイントを表現するトレイト

リポジトリはこちらです.sbt の設定などはリポジトリをご覧ください.

Main 関数を作ってみる

StramApp[F[_]] という起動まわりの処理を一手に担ってくれるトレイトが提供されていますので,これを使って構築していきましょう.

なお,MixInEndpointProvider というのは Cake Pattern のボイラープレートです.Cake Pattern というのは,DI の手法のひとつです.DI ライブラリを使うとエラーメッセージがよくわからない上に実行時にしか DI に成功したか失敗したかがわからないことが多いですが,Cake Pattern はコンパイルタイムである程度うまく依存関係を解決できているかどうかわかるという点でいいなと思っています.今回は余計なライブラリを依存関係に含めたくないのもあって,Cake Pattern を使用しています.

import cats.effect.IO
import endpoints.MixInEndpointProvider
import fs2.StreamApp
import org.http4s.server.blaze._

import scala.concurrent.ExecutionContext.Implicits.global

object Bootstrap extends StreamApp[IO] with MixInEndpointProvider {
  override def stream(
      args: List[String],
      requestShutdown: IO[Unit]): fs2.Stream[IO, StreamApp.ExitCode] = {
    BlazeBuilder[IO]
      .bindHttp(8080, "localhost")
      .mountService(endpointProvider.endpoints)
      .serve
  }
}

エンドポイントの提供元を作る

テストをする際に楽にしようかなと思って,型構築子をつけて型クラスを生成するようにしましたが,正直あまり意味がないかもしれません.やってみたかっただけということで.MixInEndpointProvider[F[_]] としておきたかったし,きっとそういう手法もあると思うんですが,implicit の値を別で与える必要が出てきて解決策が見えなかったので妥協しました.

package endpoints

import cats.effect.IO
import endpoints.api.MixInHealthCheckEndpoint
import org.http4s.Method.GET
import org.http4s._
import org.http4s.dsl.impl.Root
import org.http4s.dsl.io.{->, /}
import org.http4s.server.middleware.CORS

trait EndpointProvider[F[_]] {
  def endpoints: HttpService[F]
}

object EndpointProvider {
  implicit val endpointProviderIO: EndpointProvider[IO] = new EndpointProvider[IO] with MixInHealthCheckEndpoint {
    override def endpoints: HttpService[IO] =  CORS(HttpService[IO] {
      case GET -> Root / "api" => healthCheckEndpoint.endpoint
    })
  }
}

trait UsesEndpointProvider {
  val endpointProvider: EndpointProvider[IO]
}

trait MixInEndpointProvider {
  val endpointProvider: EndpointProvider[IO] = implicitly[EndpointProvider[IO]]
}

さて,また別の MixIn がいますね.これが今回のヘルスチェック用エンドポイントの本体です.深掘りしていきましょう.

package endpoints.api

import cats.effect.IO
import org.http4s.{Response, Status}

trait HealthCheckEndpoint[F[_]] extends Endpoint[F]

object HealthCheckEndpoint {
  implicit def healthCheckEndpoint = new HealthCheckEndpoint[IO] {
    override def endpoint: IO[Response[IO]] = IO.pure(Response(Status.Ok))
  }
}

trait UsesHealthCheckEndpoint {
  val healthCheckEndpoint: HealthCheckEndpoint[IO]
}

trait MixInHealthCheckEndpoint {
  val healthCheckEndpoint: HealthCheckEndpoint[IO] =
    implicitly[HealthCheckEndpoint[IO]]
}

ヘルスチェックなので,単に 200 OK を返すだけです.

IO#apply によって IO を生成します.これは先述の Delay という case class を中で呼んでおり,この Delay が実質遅延評価になっています.ちなみに IO#pure を呼び出すと,即時評価の IO が生成されます.

IO#apply は同期処理が走ることに注意が必要です.非同期処理を走らせるには IO#async を呼び出す必要があります.明示的に書かせるという点でよい設計だと思います.

これで一応は動くようになりました.ただ,受け取った JSON をどうするかとか,あるいは JSON にして返したいパターンなどはまだ試していません.

感想

IO で非同期処理をコンビネータにしてつないで書いていくことができるというのは大きいですね.Scala には for-yield があるので,コールバック地獄もそれを使うことによって避けることができます.

IOFuture との変換もしっかり考えられているので,スタックが厳しい箇所に部分的に IO モナドを使うといった使い方もできそうですね.最近仕事でそういう箇所があり,ぜひ使ってみたいなと思いました.

http4s そのものは,とくだんハマりポイントもなく軽量ないいライブラリだと思います.

Cake Pattern の Uses, MixIn といったボイラープレート側を型構築子で持たせて,DI する瞬間に型クラスで中身が切り替わるみたいな実装をしたいのですが,それについてはまた調査をして記事にしようかなと思います.ご存知の方いらっしゃいましたら教えてください🙇