Monadic Programmingのススメ! Functional Reactive Programming へのアプローチ 2015年年4⽉月21⽇日 Everforth 浅海智晴 自己紹介 • 1985年年富⼠士通(株)⼊入社 • UNIXワークステーション/サーバーのOS、分散基盤、Web基盤の開発 に従事 • 2001年年9⽉月に独⽴立立 • Java, XML, UMLを中⼼心に活動 • 2005年年4⽉月より2007年年3⽉月まで • 稚内北北星学園⼤大学東京サテライト校教授 • 現在 • (株) 匠BusinessPlace 取締役チーフコンサルタント • (株) Everforth 取締役CTO • 著作 • 上流流⼯工程UMLモデリング(⽇日経BP) • マインドマップではじめるモデリング講座(翔泳社) http://www.takumi-businessplace.co.jp/ http://www.apparel-cloud.com/ ApparelCloud データを一元化することで、多様な情報を、様々なメディア・デバイスを 通じて 消費者に届け、真のCRMを実現する仕組み。 http://www.apparel-cloud.com/ ApparelCloud 複数メディア・コンテンツを⼀一元管理理し、クオリティの統⼀一と運⽤用コストの低減を実現。 コンテンツを⼆二重三重に登録する必要もなく、データ分析などの⼀一元化も可能。 ブランドサイト ブランド アプリ ブログ オンラインストア 集計・分析 コンテンツ クーポン ニュース ブログ ショップ ダッシュボード ApparelCloud システム構成 メディア アプリケーション Scala Java その他 PUSHSender Web play WebAPI EmailSender iOS play play Android BatchEngine EverforthEngine Shell play iOS Analysis Engine Android Finagle WebConsole Web 管理 アプリケーション Primefaces (JSF) Coordination Engine ServiceMix Camel Stream Engine Finagle 開発中 開発中 Scala利用の文脈 要求 Model 要求 Object-Functional Analysis and Design Summay Model Regular Model Mindmap Model Use case Model WireFrame (+ Usage) Domain Model API Usage List モデル Service Description Object-Functional Programming プログラム プログラム カスタマイズ (開発) プログラム (自動生成) カスタマイズ Service Cloud Service Service Platform as-a-Service Apparel Cloud 参考: Everforthのモデル体系 http://modegramming.blogspot.jp/2015/04/ofadeverforth.html アジェンダ 背景 関数型プログラミング Monadic Programming Functional Reactive Programming キーワード • Monad • Monadic Programming • Functional Reactive Programming 背景 新しい現実 ハードウェア • メニーコア、⼤大容量量メモリ、SSD • 並列列プログラミング クラウド・プラットフォーム • • • • • • • クラウド・サービス、スマート・デバイス 故障、遅延 ⼤大規模データ、⼤大規模演算 ⾼高頻度度イベント、ストリーミング ⾮非同期、並列列、分散 NoSQL、インメモリデータベース CQRS、Event Sourcing、Microservice The Reactive Manifest • http://www.reactivemanifesto.org/ • ⽇日本語訳 • http://okapies.hateblo.jp/entry/2014/12/03/025921 • Responsive • 応答性:すぐ応答する • Resilient • 耐故障性:回復復⼒力力に富む、⽴立立ち直りが早い • Elastic • 弾⼒力力性:伸縮⾃自在の • Message Driven • メッセージ駆動 • 関連: Reactive Streams • http://www.reactive-streams.org/ Reactiveの実現 CPU Core Function Message Function Service Function Message Service Function Function Service Service Cluster Message Function Function Service Parallel Service Reactive Distributed Responsive Concurrent Concurrent Resillience Parallel Elasticity Distributed Message Driven Functional Reactive Programming へのアプローチ 並列処理アプリケーション Functional Reactive Programming Monadic Programming Object-Functional Programming Functional Programming Object-Oriented Programming 並列処理アプリケーション Actor Object-Oriented Programming 関数型プログラミング 関数型⾔言語の技術マップ 数理論理学 手続き型言語 Curry-Haward対応 関数型言語 様相論理 オブジェクト 指向言語 述語論理 直感主義命題論理& 自然演繹 単純型付ラムダ計算 命題 型 証明 計算 純粋関数型言語 代数的データ型 型クラス 抽象代数学 直積 直和 圏論 群論 モノイド 不変 副作用なし 参照透過性 置換モデル 永続データ構造 Hask圏 (プログラム圏) モナド Scala モダン⽂文法 (e.g. Ruby) Functional Programming (Haskell – α) OOP (Java + α) DSL (Scalable) Scala Java VM 代数的構造デザインパターン 結合律律 (associative law) • 半群 (semigroup) • モノイド (monoid) • 群 (group) (a + b) + c = a + (b + c) 可換律律 (commutative law) • 可換半群 • 可換モノイド • 可換群(アーベル群) a+b=b+a 分配律律 (distributive law) • 環 (ring) • 体 (field) a * (b + c) = a * b + a * c 圏論論デザインパターン モナド (monad) 圏 (category) • Hask圏 (Scala圏?) • クライスリ圏 (kleisli category) 射 (arrow, morphism) Applicative functor 関⼿手 (functor) 並列処理アプリケーション 基本概念 Functional Reactive Programming Monadic Programming • • • • • • • • • • • • • Object-Functional Programming 再帰 (recursion) Functional Programming ⾼高階関数 (high-order function) 不不変データ (immutable data) Object-Oriented Programming 遅延評価 (lazy evaluation) 参照透過性 (referential transparency) (項)書換えモデル (substitution model) 等式推論論 (equational reasoning) 代数的データ型 (algebraic data type) 直積, 直和 (direct product, direct sum) 永続データ構造 (persistent data structure) エフェクト (effect) 型クラス (type class) モナド (monad) Monadic Programming モナド • 計算機科学におけるモナド(Monads)とは、計算機科学者の Eugenio Moggiによって提案されたモジュール性を持たせた 表⽰示的意味論論の枠組みを⾔言う。プログラムとはクライスリ圏 の射である(a program is an arrow of a Kleisli category)、 という要請から合成規則としてクライスリトリプル(Kleisli triple)というモナドと等価なものが⽤用いられる。(Wikipedia) • In functional programming, a monad is a structure that represents computations defined as sequences of steps: a type with a monad structure defines what it means to chain operations, or nest functions of that type together. This allows the programmer to build pipelines that process data in steps, in which each action is decorated with additional processing rules provided by the monad. (Wikipedia) • ⾮非常に難しい概念念 • 使うのは(慣れれば)それほど難しくない • モダンな関数型プログラミングの中核概念念 再利利⽤用メカニズムの⽐比較 手続き型 アプリ オブジェクト指向 関数型(Monadic) アプリ アプリ Functor Monad サブルーチン フレームワーク 型クラス A→B A→M[B] オブジェクト 関数 モナドの⽤用途 • コンテナ • コレクション(List, Vector) • 成功失敗⽂文脈(Option) • インタープリタ • Freeモナド, Operationalモナド • DI(Dependency Injection) • Readerモナド • 状態機械 • Stateモナド • Processモナド • 問合せ • Slick • Spark RDD(Resilient Distributed Dataset) モナドの種類(⾮非公式分類) • コンテナ型 • 値を格納 • コンビネータを実⾏行行すると即時に評価が⾏行行われる • インタープリタ型 • 関数を格納 • コンビネータを実⾏行行すると関数を合成したモナド(インター プリタ)が返ってくる • 純粋関数型のセマンティクス • runメソッドなどでインタープリタを実⾏行行する • 外部⼊入出⼒力力など⾮非純粋関数型処理理はこちらで⾏行行う • 外部⼊入出⼒力力など副作⽤用を伴う処理理はインタープリタ型モナ ドで対応 • IOモナド コンテナ型モナド • Option • 成功/失敗状態を扱うモナド • List • データ列列を扱うモナド(シーケンシャルアクセス向け) • Vector • データ列列を扱うモナド(ランダムアクセス対応) • Stream • 遅延評価によるデータ列列を扱うモナド • Try • 例例外発⽣生状態を扱うモナド • Future • 並列列実⾏行行を扱うモナド インタープリタ型モナド • Stateモナド • 状態を表現するモナド • IOモナド • 外部⼊入出⼒力力を扱うモナド • Processモナド • 状態機械を表現するモナド • ストリーム処理理を実現 Monadic Programmingの ⾮非公式定義 • ⼀一義的にはモナドを活⽤用したプログラミング • 本セッションでは以下の型クラス(Scalaz)を活⽤用したプ ログラミングまでスコープを広げています • Functor • Applicative Functor • Monad • MonadはApplicative FunctorおよびFunctorでもある • Monadを⽬目的によってApplicative Functorまたは Functorとして使うことも多い Functor, Applicative Functor, Monad ⾮非公式理理解 Functor F.point(x).map(f).map(g).map(h) x f g h Option(1).map(f).map(g).map(h) Applicative Functor x (A.point(x) |@| A.point(y) |@| A.point(z))(i(_, _, _)) f y g i (Option(1) |@| Option(2) |@| Option(3))(i(_, _, _)) z ^^(A.point(x), A.point(y), A.point(z))(i(_, _, _)) h M.point(x).flatMap(f).flatMap(g).flatMap(h) Monad x f g h Option(1).flatMap(f).flatMap(g).flatMap(h) for { a <- M.point(x) b <- f(a) c <- g(b) d <- h(c) } yield d 型クラス&Monadの使い所 クラス 機能要求 予約 販売 顧客管理 ロギング 非機能要求 OOPではAOPやDIによっ て実現してきた機能要求 と⾮非機能要求の分離離を細 粒粒度度、型安全に実現 セキュリティ 型クラス & Monad トランザクション 例外処理 並列処理 分散処理 故障耐性 分析 Scalaz • https://github.com/scalaz/scalaz • キャッチフレーズ • 昔: Scalaz: Type Classes and Pure Functional Data Structures for Scala • 今: An extension to the core Scala library for functional programming. http://typelevel.org • 最新の関数型プログラミングを可能にする機能群を Scala向けに⽤用意 • 型クラス • 純粋関数型データ構造 • Haskellで実績のある機能群をScalaで実現 Monadicプログラミングの効⽤用 Java⾵風 def validate(name: String, age: Int): ValidationNEL[Throwable, (String, Int)] = {! val a = validateName(name) ! val b = validateAge(age) ! if (a.isSuccess && b.isSuccess) { ! val a1 = a.asInstanceOf[Success[NonEmptyList[Throwable], String]].a val b1 = b.asInstanceOf[Success[NonEmptyList[Throwable], Int]].a ! Success((a1, b1)) ! } else if (a.isSuccess) { ! b.asInstanceOf[Failure[NonEmptyList[Throwable], (String, Int)]] ! } else if (b.isSuccess) { ! a.asInstanceOf[Failure[NonEmptyList[Throwable], (String, Int)]] ! } else { ! val a1 = a.asInstanceOf[Failure[NonEmptyList[Throwable], String]].e val b1 = b.asInstanceOf[Failure[NonEmptyList[Throwable], Int]].e ! Failure(a1 |+| b1) ! } ! }! ! ! Scala (関数型プログラミング) def validate(name: String, age: Int): ValidationNEL[Throwable, (String, Int)] = { ! validateName(name) match { ! case Success(a) => validateAge(age) match { ! case Success(b) => Success((a, b)) ! case Failure(e) => Failure(e) ! } ! case Failure(e1) => validateAge(age) match { ! case Success(b) => Failure(e1) ! case Failure(e2) => Failure(e1 |+| e2) ! } ! } ! } ! Scalaz (Monadicプログラミング) def validate(name: String, age: Int): ValidationNEL[Throwable, (String, Int)] = { ! (validateName(name) ⊛ validateAge(age))((_, _)) }! ! 参考: Scala Tips / Validation (10) - applicative http://modegramming.blogspot.jp/2012/04/scala-tips-validation-10-applicative.html プログラミング例例 • 3つの関数を並⾏行行動作させ、それぞれの実⾏行行結果を引数 にした関数を実⾏行行して最終結果を計算 • • • • calcString(x: Int): String calcInt(x: Int): Int calcFloat(x: Int): Float finalCalc(x: String, y: Int, z: Float): String • 並列列処理理はFutureモナドで実現 準備 object Util { import java.net.URL import scala.concurrent.duration._ import scalax.io.JavaConverters._ def go[T](label: String)(body: => T) { val ts = System.currentTimeMillis val r = body val elapse = System.currentTimeMillis - ts println(s"$label ($elapse ms): $r") } def getPageLength(url: String): Int = { new URL(url).asInput.bytes.size } Scala流流のDSLの例例 go (“bigCalc”) { val r = bigCalc(1000) Await.result(r, 1.minute) } def calcString(n: Int): String = { Thread.sleep(n) n.toString } def calcInt(n: Int): Int = { Thread.sleep(n) n } def calcFloat(n: Int): Float = { Thread.sleep(n) n.toFloat } } def finalCalc(s: String, i: Int, f: Float): String = { s"$s-$i-$f" } 基本形 def bigCalc(n: Int): String = { val a = calcString(n) val b = calcInt(n) val c = calcFloat(n) finalCalc(a, b, c) } > go(“bigCalc”)(bigCalc(1000)) bigCalc (3003 ms): 1000-1000-1000.0 Optionモナド def bigCalcO(n: Int): Option[String] = { for { a <- Option(calcString(n)) b <- Option(calcInt(n)) c <- Option(calcFloat(n)) } yield finalCalc(a, b, c) } Monad x f g h for式(for ... yield)でmonadicな パイプラインを構築 > go(“bigCalcO”)(bigCalcO(1000)) bigCalcO (3005 ms): 1000-1000-1000.0 参考: Scala Tips/Option Index http://modegramming.blogspot.jp/2012/02/scala-tips-option-index.html Optionモナド Monadic関数版 def calcStringO(n: Int): Option[String] = { Option(calcString(n)) } def calcIntO(n: Int): Option[Int] = { Option(calcInt(n)) } def calcFloatO(n: Int): Option[Float] = { Option(calcFloat(n)) } def bigCalcO2(n: Int): Monad x f h A->M[B] モナディック関数 for式にモナディック 関数を組み合わせるの がイディオム Option[String] = { for { a <- calcStringO(n) b <- calcIntO(n) c <- calcFloatO(n) } yield finalCalc(a, b, c) } > go(“bigCalcO2”)(Await.result(bigCalcO2(1000), 1.minute)) bigCalcO2 (3005 ms): 1000-1000-1000.0 g Monad Futureモナド x f g h val executorService = Executors.newFixedThreadPool(10) implicit val executionContext = ExecutionContext.fromExecutorService(executorService) def bigCalcP(n: Int): Future[String] = { for { a <- Future(calcString(n)) b <- Future(calcInt(n)) c <- Future(calcFloat(n)) } yield finalCalc(a, b, c) } > go(“bigCalcP”)(Await.result(bigCalcP(1000), 1.minute)) bigCalcP (3017 ms): 1000-1000-1000.0 Monad Futureモナド 改良良版 def bigCalcP2(n: Int): Future[String] = { val fa = Future(calcString(n)) val fb = Future(calcInt(n)) val fc = Future(calcFloat(n)) for { a <- fa b <- fb c <- fc } yield finalCalc(a, b, c) } x f g h モナディック処理理の前 にFutuerを開始 > go(“bigCalcP2”)(Await.result(bigCalcP2(1000), 1.minute)) bigCalcP2 (1005 ms): 1000-1000-1000.0 Applicative Functor Future Applicative x f y g i z h def bigCalcP3(n: Int): Future[String] = { import scalaz._, Scalaz._ (Future(calcString(n)) |@| Future(calcInt(n)) |@| Future(calcFloat(n)))(finalCalc) } Applicativeを使うと綺 def bigCalcP31(n: Int): Future[String] = { 麗麗に記述できる import scalaz._, Scalaz._ ^^(Future(calcString(n)), Future(calcInt(n)), Future(calcFloat(n)))(finalCalc) } >go(“bigCalcP3”)(Await.result(bigCalcP3(1000), 1.minute)) bigCalcP3 (1162 ms): 1000-1000-1000.0 > go(“bigCalcP31”)(Await.result(bigCalcP31(1000), 1.minute)) bigCalcP31 (1004 ms): 1000-1000-1000.0 Future Applicative 改良良版 ロジック(機能要求)からApplicativeとして パラメタ化した⾮非機能要求を分離離 Applicative Functor x f y g i z h def bigCalcA[A[_]](n: Int)(implicit C: Applicative[A]): A[String] = { ^^( 型クラスを利利⽤用し任意 C.point(calcString(n)), のApplicativeを使える C.point(calcInt(n)), ように汎⽤用化 C.point(calcFloat(n)) Applicativeのpointメソッド )(finalCalc) はvirtual constructor } >go(“bigCalcAF”)(Await.result(bigCalcA[Future](1000), 1.minute)) bigCalcAF (1003 ms): 1000-1000-1000.0 > go(“bigCalcAI”)(bigCalcA[Id](1000), 1.minute)) bigCalcAI (3001 ms): 1000-1000-1000.0 Functional Reactive Programming Reactiveの実現(再掲) CPU Core Function Message Function Service Function Message Service Function Function Service Service Cluster Message Function Function Service Parallel Service Reactive Distributed Responsive Concurrent Concurrent Resillience Parallel Elasticity Distributed Message Driven Functional Reactive Programmingで実現したいこと • 関数型プログラミングのメリットを享受 • 参照透過性 • 関数による部品化/部品の組⽴立立て • パイプラインのセマンティクス • イベント駆動処理理 • Callback hellの排除 • ストリーム処理理 • リソース管理理 • フロー制御 • ⼤大規模データ処理理 • リソース管理理 • 省省メモリ Functional Reactive Programming の候補 • RxJava – Scala • https://github.com/ReactiveX/RxJava • Observableモナド • Scalaz Stream • https://github.com/scalaz/scalaz-stream • Processモナド • Akka Stream • https://typesafe.com/blog/typesafe-announces-akkastreams • Akka actor scalaz stream • ScalazベースのStreaming I/Oライブラリ • https://github.com/scalaz/scalaz-stream • ⽤用途 • ⼊入出⼒力力処理理フレームワーク • パイプライン • 部品化、部品合成によるロジック記述 • ⼤大規模データ⼀一括処理理 • ストリーム処理理 • Processモナド • • • • 参照透過性:副作⽤用なし 部品化:パイプラインによる関数の合成 リソースの獲得・解放 フロー制御 • 状態機械 Processモナド Process Monad リソース管理 トランザクション管理 フロー制御 Channel Sink Channel Sink Process Monad Function 既存部品 Function Process Monad アプリケーション・ロジック プログラミング例例 • パイプラインを流流れてくるデータを以下のルールでパ ケット化 • 3つで⼀一つのパケットにする • パケットにシーケンス番号をつける • “start”データの前のパケットにエンドマークをつける • 上記アルゴリズムを実現した部品を⼤大規模データ処理理と ストリーミング処理理の両⽅方に適⽤用する 参考: [scalaz-stream] シーケンス番号とエンドマーク http://modegramming.blogspot.jp/2015/03/scalaz-stream.html 準備 case class Packet(seqno: Int, end: Boolean, content: String) def sink: Sink[Task, Packet] = { io.channel((a: Packet) => Task.delay(println(a))) } 参考: [scalaz-stream] Scala的状態機械/OOP編 http://modegramming.blogspot.jp/2015/03/scalaoop.html 参考: [scalaz-stream] Scala的状態機械/FP編 http://modegramming.blogspot.jp/2015/03/scalafp.html 参考: [scalaz-stream] ストリーミングで状態機械 http://modegramming.blogspot.jp/2015/04/scalaz-stream.html アプリケーション・ロジック def buildTextToPacket[M[_]: Monad](source: Process[M, String]): Process[M, Packet] = { 既存部品とアプリケーションロ source. ジックを組合せてパイプライン を構築 chunk(3). // 3個ずつチャンク化 pipe(zipWithNext). // 次のパケットを同時に取得 pipe(zipWithIndex). // インデックを採番 以下のロジックを実現 - 3つで⼀一つのパケットにする map(toPacket) - パケットにシーケンス番号をつける - “start”データの前のパケットにエン } ドマークをつける def toPacket(x: ((Vector[String], Option[Vector[String]]), Int)): Packet = { val ((current, next), index) = x val content = current.mkString("-") val isend = next.cata(_.headOption === Some("start"), true) Packet(index + 1, isend, content) } ⼤大規模データ処理理 val source = io.linesR("data.txt") val pipeline = buildTextToPacket(source).to(sink) pipeline.run.run 1 2 3 4 5 6 start 8 9 Packet(1,false,1-2-3) Packet(2,true,4-5-6) Packet(3,true,start-8-9) 汎⽤用部品を⼤大規模データ処理理に 適⽤用 Process Monad リソース管理 フロー制御 Channel chunk zipWith Next zipWith Index toPacket Sink EventProcessor object EventProcessor { val q = async.unboundedQueue[String] val eventStream: Process[Task, String] = q.dequeue } val queue = EventProcessor.q val source = Vector("1", "2", "3", "4", "5", "6", "start", "8", "9") source foreach { x => queue.enqueueOne(x).run Thread.sleep(2000) } 参考: [scalaz-stream] ストリーミングで状態機械 http://modegramming.blogspot.jp/2015/04/scalaz-stream.html ストリーム処理理 val source = EventProcessor.eventStream val pipeline = buildTextToPacket(source).to(sink) pipeline.run.run 汎⽤用ロジックをストリーミング 処理理に適⽤用 Packet(1,false,1-2-3) Packet(2,true,4-5-6) Process Monad リソース管理 フロー制御 Channel chunk zipWith Next zipWith Index toPacket Sink まとめ • モナド • マイクロ・フレームワーク • DSL(Domain Specific Language) • Monadic Programming • モナドベースのパイプラインプログラミング • 関数による部品化、部品の組み⽴立立て • Functional Reactive Programming • Futureモナド • Processモナド END
© Copyright 2024 ExpyDoc