Monadic Programmingのススメ - QCon Tokyo 2015 Conference

Monadic
Programmingのススメ!
Functional Reactive Programming
へのアプローチ
2015年年4⽉月21⽇日
Everforth
浅海智晴
自己紹介
•  1985年年富⼠士通(株)⼊入社
•  UNIXワークステーション/サーバーのOS、分散基盤、Web基盤の開発
に従事
•  2001年年9⽉月に独⽴立立
•  Java, XML, UMLを中⼼心に活動
•  2005年年4⽉月より2007年年3⽉月まで
•  稚内北北星学園⼤大学東京サテライト校教授
•  現在
•  (株) 匠BusinessPlace 取締役チーフコンサルタント
•  (株) Everforth 取締役CTO
•  著作
•  上流流⼯工程UMLモデリング(⽇日経BP)
•  マインドマップではじめるモデリング講座(翔泳社)
http://www.takumi-businessplace.co.jp/
http://www.apparel-cloud.com/
ApparelCloud
データを一元化することで、多様な情報を、様々なメディア・デバイスを
通じて
消費者に届け、真のCRMを実現する仕組み。
http://www.apparel-cloud.com/
ApparelCloud
複数メディア・コンテンツを⼀一元管理理し、クオリティの統⼀一と運⽤用コストの低減を実現。
コンテンツを⼆二重三重に登録する必要もなく、データ分析などの⼀一元化も可能。
ブランドサイト
ブランド アプリ
ブログ
オンラインストア
集計・分析
コンテンツ
クーポン
ニュース
ブログ
ショップ
ダッシュボード
ApparelCloud
システム構成
メディア
アプリケーション
Scala
Java
その他
PUSHSender
Web
play
WebAPI
EmailSender
iOS
play
play
Android
BatchEngine
EverforthEngine
Shell
play
iOS
Analysis
Engine
Android
Finagle
WebConsole
Web
管理
アプリケーション
Primefaces
(JSF)
Coordination
Engine
ServiceMix
Camel
Stream
Engine
Finagle
開発中
開発中
Scala利用の文脈
要求
Model
要求
Object-Functional
Analysis and Design
Summay Model
Regular Model
Mindmap Model
Use case Model
WireFrame
(+ Usage)
Domain Model
API Usage List
モデル
Service Description
Object-Functional
Programming
プログラム
プログラム
カスタマイズ
(開発)
プログラム
(自動生成)
カスタマイズ
Service
Cloud Service
Service Platform as-a-Service
Apparel Cloud
参考: Everforthのモデル体系
http://modegramming.blogspot.jp/2015/04/ofadeverforth.html
アジェンダ
背景
関数型プログラミング
Monadic Programming
Functional Reactive
Programming
キーワード
• Monad
• Monadic Programming
• Functional Reactive Programming
背景
新しい現実
ハードウェア
•  メニーコア、⼤大容量量メモリ、SSD
•  並列列プログラミング
クラウド・プラットフォーム
• 
• 
• 
• 
• 
• 
• 
クラウド・サービス、スマート・デバイス
故障、遅延
⼤大規模データ、⼤大規模演算
⾼高頻度度イベント、ストリーミング
⾮非同期、並列列、分散
NoSQL、インメモリデータベース
CQRS、Event Sourcing、Microservice
The Reactive Manifest
•  http://www.reactivemanifesto.org/
•  ⽇日本語訳
•  http://okapies.hateblo.jp/entry/2014/12/03/025921
•  Responsive
•  応答性:すぐ応答する
•  Resilient
•  耐故障性:回復復⼒力力に富む、⽴立立ち直りが早い
•  Elastic
•  弾⼒力力性:伸縮⾃自在の
•  Message Driven
•  メッセージ駆動
•  関連: Reactive Streams
•  http://www.reactive-streams.org/
Reactiveの実現
CPU Core
Function
Message
Function
Service
Function
Message
Service
Function
Function
Service
Service Cluster
Message
Function
Function
Service
Parallel
Service
Reactive
Distributed
Responsive
Concurrent
Concurrent
Resillience
Parallel
Elasticity
Distributed
Message Driven
Functional Reactive Programming
へのアプローチ
並列処理アプリケーション
Functional Reactive Programming
Monadic Programming
Object-Functional Programming
Functional Programming
Object-Oriented Programming
並列処理アプリケーション
Actor
Object-Oriented Programming
関数型プログラミング
関数型⾔言語の技術マップ
数理論理学
手続き型言語
Curry-Haward対応
関数型言語
様相論理
オブジェクト
指向言語
述語論理
直感主義命題論理&
自然演繹
単純型付ラムダ計算
命題
型
証明
計算
純粋関数型言語
代数的データ型
型クラス
抽象代数学
直積
直和
圏論
群論
モノイド
不変
副作用なし
参照透過性
置換モデル
永続データ構造
Hask圏
(プログラム圏)
モナド
Scala
モダン⽂文法
(e.g. Ruby)
Functional
Programming
(Haskell – α)
OOP
(Java + α)
DSL
(Scalable)
Scala
Java VM
代数的構造デザインパターン
結合律律 (associative law)
•  半群 (semigroup)
•  モノイド (monoid)
•  群 (group)
(a + b) + c = a + (b + c)
可換律律 (commutative law)
•  可換半群
•  可換モノイド
•  可換群(アーベル群)
a+b=b+a
分配律律 (distributive law)
•  環 (ring)
•  体 (field)
a * (b + c) = a * b + a * c
圏論論デザインパターン
モナド (monad)
圏 (category)
• Hask圏 (Scala圏?)
• クライスリ圏 (kleisli category)
射 (arrow,
morphism)
Applicative
functor
関⼿手 (functor)
並列処理アプリケーション
基本概念
Functional Reactive Programming
Monadic Programming
• 
• 
• 
• 
• 
• 
• 
• 
• 
• 
• 
• 
• 
Object-Functional Programming
再帰 (recursion)
Functional Programming
⾼高階関数 (high-order function)
不不変データ (immutable data)
Object-Oriented Programming
遅延評価 (lazy evaluation)
参照透過性 (referential transparency)
(項)書換えモデル (substitution model)
等式推論論 (equational reasoning)
代数的データ型 (algebraic data type)
直積, 直和 (direct product, direct sum)
永続データ構造 (persistent data structure)
エフェクト (effect)
型クラス (type class)
モナド (monad)
Monadic Programming
モナド
•  計算機科学におけるモナド(Monads)とは、計算機科学者の
Eugenio Moggiによって提案されたモジュール性を持たせた
表⽰示的意味論論の枠組みを⾔言う。プログラムとはクライスリ圏
の射である(a program is an arrow of a Kleisli category)、
という要請から合成規則としてクライスリトリプル(Kleisli
triple)というモナドと等価なものが⽤用いられる。(Wikipedia)
•  In functional programming, a monad is a structure that
represents computations defined as sequences of steps: a
type with a monad structure defines what it means to
chain operations, or nest functions of that type together.
This allows the programmer to build pipelines that
process data in steps, in which each action is decorated
with additional processing rules provided by the monad.
(Wikipedia)
•  ⾮非常に難しい概念念
•  使うのは(慣れれば)それほど難しくない
•  モダンな関数型プログラミングの中核概念念
再利利⽤用メカニズムの⽐比較
手続き型
アプリ
オブジェクト指向
関数型(Monadic)
アプリ
アプリ
Functor
Monad
サブルーチン
フレームワーク
型クラス
A→B
A→M[B]
オブジェクト
関数
モナドの⽤用途
•  コンテナ
•  コレクション(List, Vector)
•  成功失敗⽂文脈(Option)
•  インタープリタ
•  Freeモナド, Operationalモナド
•  DI(Dependency Injection)
•  Readerモナド
•  状態機械
•  Stateモナド
•  Processモナド
•  問合せ
•  Slick
•  Spark RDD(Resilient Distributed Dataset)
モナドの種類(⾮非公式分類)
•  コンテナ型
•  値を格納
•  コンビネータを実⾏行行すると即時に評価が⾏行行われる
•  インタープリタ型
•  関数を格納
•  コンビネータを実⾏行行すると関数を合成したモナド(インター
プリタ)が返ってくる
•  純粋関数型のセマンティクス
•  runメソッドなどでインタープリタを実⾏行行する
•  外部⼊入出⼒力力など⾮非純粋関数型処理理はこちらで⾏行行う
•  外部⼊入出⼒力力など副作⽤用を伴う処理理はインタープリタ型モナ
ドで対応
•  IOモナド
コンテナ型モナド
•  Option
•  成功/失敗状態を扱うモナド
•  List
•  データ列列を扱うモナド(シーケンシャルアクセス向け)
•  Vector
•  データ列列を扱うモナド(ランダムアクセス対応)
•  Stream
•  遅延評価によるデータ列列を扱うモナド
•  Try
•  例例外発⽣生状態を扱うモナド
•  Future
•  並列列実⾏行行を扱うモナド
インタープリタ型モナド
•  Stateモナド
•  状態を表現するモナド
•  IOモナド
•  外部⼊入出⼒力力を扱うモナド
•  Processモナド
•  状態機械を表現するモナド
•  ストリーム処理理を実現
Monadic Programmingの
⾮非公式定義
•  ⼀一義的にはモナドを活⽤用したプログラミング
•  本セッションでは以下の型クラス(Scalaz)を活⽤用したプ
ログラミングまでスコープを広げています
•  Functor
•  Applicative Functor
•  Monad
•  MonadはApplicative FunctorおよびFunctorでもある
•  Monadを⽬目的によってApplicative Functorまたは
Functorとして使うことも多い
Functor, Applicative Functor, Monad
⾮非公式理理解
Functor
F.point(x).map(f).map(g).map(h)
x
f
g
h
Option(1).map(f).map(g).map(h)
Applicative Functor
x
(A.point(x) |@| A.point(y) |@| A.point(z))(i(_, _, _))
f
y
g
i
(Option(1) |@| Option(2) |@| Option(3))(i(_, _, _))
z
^^(A.point(x), A.point(y), A.point(z))(i(_, _, _))
h
M.point(x).flatMap(f).flatMap(g).flatMap(h)
Monad
x
f
g
h
Option(1).flatMap(f).flatMap(g).flatMap(h)
for {
a <- M.point(x)
b <- f(a)
c <- g(b)
d <- h(c)
} yield d
型クラス&Monadの使い所
クラス
機能要求
予約
販売
顧客管理
ロギング
非機能要求
OOPではAOPやDIによっ
て実現してきた機能要求
と⾮非機能要求の分離離を細
粒粒度度、型安全に実現
セキュリティ
型クラス
&
Monad
トランザクション
例外処理
並列処理
分散処理
故障耐性
分析
Scalaz
•  https://github.com/scalaz/scalaz
•  キャッチフレーズ
•  昔: Scalaz: Type Classes and Pure Functional Data
Structures for Scala
•  今: An extension to the core Scala library for functional
programming. http://typelevel.org
•  最新の関数型プログラミングを可能にする機能群を
Scala向けに⽤用意
•  型クラス
•  純粋関数型データ構造
•  Haskellで実績のある機能群をScalaで実現
Monadicプログラミングの効⽤用
Java⾵風
def validate(name: String, age: Int): ValidationNEL[Throwable, (String,
Int)] = {!
val a = validateName(name) !
val b = validateAge(age) !
if (a.isSuccess && b.isSuccess) { !
val a1 = a.asInstanceOf[Success[NonEmptyList[Throwable], String]].a
val b1 = b.asInstanceOf[Success[NonEmptyList[Throwable], Int]].a !
Success((a1, b1)) !
} else if (a.isSuccess) { !
b.asInstanceOf[Failure[NonEmptyList[Throwable], (String, Int)]] !
} else if (b.isSuccess) { !
a.asInstanceOf[Failure[NonEmptyList[Throwable], (String, Int)]] !
} else { !
val a1 = a.asInstanceOf[Failure[NonEmptyList[Throwable], String]].e
val b1 = b.asInstanceOf[Failure[NonEmptyList[Throwable], Int]].e !
Failure(a1 |+| b1) !
} !
}!
!
!
Scala (関数型プログラミング)
def validate(name: String, age: Int):
ValidationNEL[Throwable, (String, Int)] = { !
validateName(name) match { !
case Success(a) => validateAge(age) match { !
case Success(b) => Success((a, b)) !
case Failure(e) => Failure(e) !
} !
case Failure(e1) => validateAge(age) match { !
case Success(b) => Failure(e1) !
case Failure(e2) => Failure(e1 |+| e2) !
} !
} !
} !
Scalaz (Monadicプログラミング)
def validate(name: String, age: Int):
ValidationNEL[Throwable, (String, Int)] = { !
(validateName(name) ⊛ validateAge(age))((_, _))
}!
!
参考: Scala Tips / Validation (10) - applicative
http://modegramming.blogspot.jp/2012/04/scala-tips-validation-10-applicative.html
プログラミング例例
•  3つの関数を並⾏行行動作させ、それぞれの実⾏行行結果を引数
にした関数を実⾏行行して最終結果を計算
• 
• 
• 
• 
calcString(x: Int): String
calcInt(x: Int): Int
calcFloat(x: Int): Float
finalCalc(x: String, y: Int, z: Float): String
•  並列列処理理はFutureモナドで実現
準備
object Util {
import java.net.URL
import scala.concurrent.duration._
import scalax.io.JavaConverters._
def go[T](label: String)(body: => T) {
val ts = System.currentTimeMillis
val r = body
val elapse = System.currentTimeMillis - ts
println(s"$label ($elapse ms): $r")
}
def getPageLength(url: String): Int = {
new URL(url).asInput.bytes.size
}
Scala流流のDSLの例例
go (“bigCalc”) {
val r = bigCalc(1000)
Await.result(r, 1.minute)
}
def calcString(n: Int): String = {
Thread.sleep(n)
n.toString
}
def calcInt(n: Int): Int = {
Thread.sleep(n)
n
}
def calcFloat(n: Int): Float = {
Thread.sleep(n)
n.toFloat
}
}
def finalCalc(s: String, i: Int, f: Float): String = {
s"$s-$i-$f"
}
基本形
def bigCalc(n: Int): String = {
val a = calcString(n)
val b = calcInt(n)
val c = calcFloat(n)
finalCalc(a, b, c)
}
> go(“bigCalc”)(bigCalc(1000))
bigCalc (3003 ms): 1000-1000-1000.0
Optionモナド
def bigCalcO(n: Int): Option[String] = {
for {
a <- Option(calcString(n))
b <- Option(calcInt(n))
c <- Option(calcFloat(n))
} yield finalCalc(a, b, c)
}
Monad
x
f
g
h
for式(for ... yield)でmonadicな
パイプラインを構築
> go(“bigCalcO”)(bigCalcO(1000))
bigCalcO (3005 ms): 1000-1000-1000.0
参考: Scala Tips/Option Index
http://modegramming.blogspot.jp/2012/02/scala-tips-option-index.html
Optionモナド
Monadic関数版
def calcStringO(n: Int): Option[String] = {
Option(calcString(n))
}
def calcIntO(n: Int): Option[Int] = {
Option(calcInt(n))
}
def calcFloatO(n: Int): Option[Float] = {
Option(calcFloat(n))
}
def bigCalcO2(n: Int):
Monad
x
f
h
A->M[B]
モナディック関数
for式にモナディック
関数を組み合わせるの
がイディオム
Option[String] = {
for {
a <- calcStringO(n)
b <- calcIntO(n)
c <- calcFloatO(n)
} yield finalCalc(a, b, c)
}
> go(“bigCalcO2”)(Await.result(bigCalcO2(1000), 1.minute))
bigCalcO2 (3005 ms): 1000-1000-1000.0
g
Monad
Futureモナド
x
f
g
h
val executorService = Executors.newFixedThreadPool(10)
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)
def bigCalcP(n: Int): Future[String] = {
for {
a <- Future(calcString(n))
b <- Future(calcInt(n))
c <- Future(calcFloat(n))
} yield finalCalc(a, b, c)
}
> go(“bigCalcP”)(Await.result(bigCalcP(1000), 1.minute))
bigCalcP (3017 ms): 1000-1000-1000.0
Monad
Futureモナド
改良良版
def bigCalcP2(n: Int): Future[String] = {
val fa = Future(calcString(n))
val fb = Future(calcInt(n))
val fc = Future(calcFloat(n))
for {
a <- fa
b <- fb
c <- fc
} yield finalCalc(a, b, c)
}
x
f
g
h
モナディック処理理の前
にFutuerを開始
> go(“bigCalcP2”)(Await.result(bigCalcP2(1000), 1.minute))
bigCalcP2 (1005 ms): 1000-1000-1000.0
Applicative Functor
Future Applicative
x
f
y
g
i
z
h
def bigCalcP3(n: Int): Future[String] = {
import scalaz._, Scalaz._
(Future(calcString(n)) |@| Future(calcInt(n)) |@| Future(calcFloat(n)))(finalCalc)
}
Applicativeを使うと綺
def bigCalcP31(n: Int): Future[String] = {
麗麗に記述できる
import scalaz._, Scalaz._
^^(Future(calcString(n)), Future(calcInt(n)), Future(calcFloat(n)))(finalCalc)
}
>go(“bigCalcP3”)(Await.result(bigCalcP3(1000), 1.minute))
bigCalcP3 (1162 ms): 1000-1000-1000.0
> go(“bigCalcP31”)(Await.result(bigCalcP31(1000), 1.minute))
bigCalcP31 (1004 ms): 1000-1000-1000.0
Future Applicative
改良良版
ロジック(機能要求)からApplicativeとして
パラメタ化した⾮非機能要求を分離離
Applicative Functor
x
f
y
g
i
z
h
def bigCalcA[A[_]](n: Int)(implicit C: Applicative[A]): A[String] = {
^^(
型クラスを利利⽤用し任意
C.point(calcString(n)),
のApplicativeを使える
C.point(calcInt(n)),
ように汎⽤用化
C.point(calcFloat(n))
Applicativeのpointメソッド
)(finalCalc)
はvirtual constructor
}
>go(“bigCalcAF”)(Await.result(bigCalcA[Future](1000), 1.minute))
bigCalcAF (1003 ms): 1000-1000-1000.0
> go(“bigCalcAI”)(bigCalcA[Id](1000), 1.minute))
bigCalcAI (3001 ms): 1000-1000-1000.0
Functional Reactive Programming
Reactiveの実現(再掲)
CPU Core
Function
Message
Function
Service
Function
Message
Service
Function
Function
Service
Service Cluster
Message
Function
Function
Service
Parallel
Service
Reactive
Distributed
Responsive
Concurrent
Concurrent
Resillience
Parallel
Elasticity
Distributed
Message Driven
Functional Reactive
Programmingで実現したいこと
•  関数型プログラミングのメリットを享受
•  参照透過性
•  関数による部品化/部品の組⽴立立て
•  パイプラインのセマンティクス
•  イベント駆動処理理
•  Callback hellの排除
•  ストリーム処理理
•  リソース管理理
•  フロー制御
•  ⼤大規模データ処理理
•  リソース管理理
•  省省メモリ
Functional Reactive Programming
の候補
•  RxJava – Scala
•  https://github.com/ReactiveX/RxJava
•  Observableモナド
•  Scalaz Stream
•  https://github.com/scalaz/scalaz-stream
•  Processモナド
•  Akka Stream
•  https://typesafe.com/blog/typesafe-announces-akkastreams
•  Akka actor
scalaz stream
•  ScalazベースのStreaming I/Oライブラリ
•  https://github.com/scalaz/scalaz-stream
•  ⽤用途
•  ⼊入出⼒力力処理理フレームワーク
•  パイプライン
•  部品化、部品合成によるロジック記述
•  ⼤大規模データ⼀一括処理理
•  ストリーム処理理
•  Processモナド
• 
• 
• 
• 
参照透過性:副作⽤用なし
部品化:パイプラインによる関数の合成
リソースの獲得・解放
フロー制御
•  状態機械
Processモナド
Process Monad
リソース管理
トランザクション管理
フロー制御
Channel
Sink
Channel
Sink
Process
Monad
Function
既存部品
Function
Process
Monad
アプリケーション・ロジック
プログラミング例例
•  パイプラインを流流れてくるデータを以下のルールでパ
ケット化
•  3つで⼀一つのパケットにする
•  パケットにシーケンス番号をつける
•  “start”データの前のパケットにエンドマークをつける
•  上記アルゴリズムを実現した部品を⼤大規模データ処理理と
ストリーミング処理理の両⽅方に適⽤用する
参考: [scalaz-stream] シーケンス番号とエンドマーク
http://modegramming.blogspot.jp/2015/03/scalaz-stream.html
準備
case class Packet(seqno: Int, end: Boolean, content: String)
def sink: Sink[Task, Packet] = {
io.channel((a: Packet) => Task.delay(println(a)))
}
参考: [scalaz-stream] Scala的状態機械/OOP編
http://modegramming.blogspot.jp/2015/03/scalaoop.html
参考: [scalaz-stream] Scala的状態機械/FP編
http://modegramming.blogspot.jp/2015/03/scalafp.html
参考: [scalaz-stream] ストリーミングで状態機械
http://modegramming.blogspot.jp/2015/04/scalaz-stream.html
アプリケーション・ロジック
def buildTextToPacket[M[_]: Monad](source: Process[M, String]):
Process[M, Packet] = {
既存部品とアプリケーションロ
source.
ジックを組合せてパイプライン
を構築
chunk(3). // 3個ずつチャンク化
pipe(zipWithNext). // 次のパケットを同時に取得
pipe(zipWithIndex). // インデックを採番
以下のロジックを実現
- 3つで⼀一つのパケットにする
map(toPacket)
- パケットにシーケンス番号をつける
- “start”データの前のパケットにエン
}
ドマークをつける
def toPacket(x: ((Vector[String], Option[Vector[String]]), Int)):
Packet = {
val ((current, next), index) = x
val content = current.mkString("-")
val isend = next.cata(_.headOption === Some("start"), true)
Packet(index + 1, isend, content)
}
⼤大規模データ処理理
val source = io.linesR("data.txt")
val pipeline = buildTextToPacket(source).to(sink)
pipeline.run.run
1
2
3
4
5
6
start
8
9
Packet(1,false,1-2-3)
Packet(2,true,4-5-6)
Packet(3,true,start-8-9)
汎⽤用部品を⼤大規模データ処理理に
適⽤用
Process Monad
リソース管理
フロー制御
Channel
chunk
zipWith
Next
zipWith
Index
toPacket
Sink
EventProcessor
object EventProcessor {
val q = async.unboundedQueue[String]
val eventStream: Process[Task, String] = q.dequeue
}
val queue = EventProcessor.q
val source = Vector("1", "2", "3", "4", "5", "6", "start", "8", "9")
source foreach { x =>
queue.enqueueOne(x).run
Thread.sleep(2000)
}
参考: [scalaz-stream] ストリーミングで状態機械
http://modegramming.blogspot.jp/2015/04/scalaz-stream.html
ストリーム処理理
val source = EventProcessor.eventStream
val pipeline = buildTextToPacket(source).to(sink)
pipeline.run.run
汎⽤用ロジックをストリーミング
処理理に適⽤用
Packet(1,false,1-2-3)
Packet(2,true,4-5-6)
Process Monad
リソース管理
フロー制御
Channel
chunk
zipWith
Next
zipWith
Index
toPacket
Sink
まとめ
•  モナド
•  マイクロ・フレームワーク
•  DSL(Domain Specific Language)
•  Monadic Programming
•  モナドベースのパイプラインプログラミング
•  関数による部品化、部品の組み⽴立立て
•  Functional Reactive Programming
•  Futureモナド
•  Processモナド
END