Chi l`ha detto?

GO REACTIVE!

Applicazione anni 2000




PRINCIPI DI
REACTIVE PROGRAMMING


Cluster di migliaia di processi multicore
Tempi di risposta nell’ordine dei millisec
 100% uptime
 Dati nell’ordine dei Petabyte
 Accedute da qualsiasi tipo di dispositivo

Università degli Studi di Padova
Dipartimento di Matematica
Corso di Laurea in Informatica, A.A. 2013 – 2014
[email protected]
GO REACTIVE!
Reactive Application
Orientate agli eventi
Scalabili
"Readily responsive to a stimulus"
 Resilienti
Merriam-Webster
 Responsive

Ingegneria del software mod. B
Riccardo Cardin
react to events
La natura event-driven abilita
alle altre qualità
react to load
La scalabilità non deve dipendere
da risorse condivise
react to failure
Sistemi resilienti permettono
di recuperare errori a tutti
i livelli
react to users
I tempi di risposta non devono
dipendere dal carico di lavoro
• Real-time, engaging, reach,
collaborative
• Nessuna latenza nelle
risposte
• Nessun redesing per
ottenere la scalabilità
• Scalabilità on-demand
• Risk-management

Ingegneria del software mod. B
2
REACTIVE MANIFESTO
Nuovi requisiti richiedono nuove tecnologie

Applicazione moderne (≥ 2010)

INGEGNERIA DEL SOFTWARE

Decine di server
Tempi di risposta nell’ordine dei secondi
Ore di downtime per manutenzione
Dati nell’ordine dei Gigabyte
Accedute da dispositivi desktop
responsive
scalable
resilient
event-driven
3
Riccardo Cardin
• Loosely coupled design
• Communication orientation
• Uso efficiente delle risorse
Ingegneria del software mod. B
• Downtime è perdita
di denaro
• Parte del design
4
Riccardo Cardin
MODELLO SINCRONO

MODELLO SINCRONO
Il mondo sincrono è senza futuro...



Scarso utilizzo delle feature offerte dai processori moderni
Larga diffusione delle architetture multiprocessore
Cicli non ottimizzati
 Scarso parallelismo o gestione difficoltosa
 Tipico dei linguaggi imperativi e ad oggetti


Esempio: recuperare il nome del file più pesante
public String getBiggestFile(final File folder) {
long maxLength = 0L;
String fileName = null;
for (final File fileEntry : folder.listFiles()) {
if (maxLength < fileEntry.length()) { // Not so efficient
fileName = fileEntry.getName();
maxLength = fileEntry.length();
}
}
return fileName ;
}
Frequenti operazioni di I/O

Il mondo sincrono è senza futuro...

// Synchronous world (w/o Future)
val session = socialNetwork.createSessionFor("user", credentials)
// blocked on method, waiting for results (latency)...
session.getFriends()
// There are a lot of operation to do!!! :(
fileEntry.length() // I/O
C, C++, Java, ...
...
5
Tempo totale di attesa
Ingegneria del software mod. B
Riccardo Cardin
MODELLO ASINCRONO

Java
if (maxLength… // computation
Ingegneria del software mod. B
CPU poco utilizzata,
I/O sovrabbondante
6
Riccardo Cardin
MODELLO ASINCRONO
Callbacks

Funzioni da invocare al termine di una elaborazione
Gestione migliore della CPU
 Ridotti tempi di attesa
Callbacks

CPU1



CPU2
Più processi gestiscono più richieste
CPU3
Asynchronous JavaScript and XML (AJAX)
CPU4
var callback = function(){
alert('I was called back asynchronously');
};
Tempo totale di attesa
In attesa della risposta, è
$.ajax({
possibile continuare
type: 'GET',
url: 'http://example.com',
l’elaborazione (rendering
done: callback, // positive case
fail: anotherCallback // negative case
});

Linguaggi o framework che gestiscano l’elaborazione
asincrona e concorrente

Ma...c’è sempre un ma...
UI)

7
jQuery
Ingegneria del software mod. B
I/O è ancora presente, ma
viene distribuito
sull’elaborazione di più CPU
Riccardo Cardin
Node.js, jQuery, ...
Ingegneria del software mod. B
8
Riccardo Cardin
module.exports = function (dir, cb) {
fs.readdir(dir, function (er, files) {
if (er) return cb(er)
var counter = files.length
var errored = false
var stats = []
MODELLO ASINCRONO

Callbacks...HELL!!!
Difficile gestire le
eccezioni con blocchi
try/catch
files.forEach(function (file, index) {
fs.stat(path.join(dir,file), function (er, stat) {
if (errored) return
if (er) {
Innesto callback per gestire
errored = true
return cb(er)
i flussi degli eventi
}
stats[index] = stat
if (--counter == 0) {
var largest = stats
.filter(function (stat) { return stat.isFile() })
.reduce(function (prev, next) { // [6]
if (prev.size > next.size) return prev
return next
})
cb(null, files[stats.indexOf(largest)])
}
}) // [1]
Difficile da verificare
}) // [2]
e manutenere
}) // [3]
9
}
Ingegneria del software mod. B
Riccardo Cardin
FUTURES E PROMISES

node.js
Ingegneria del software mod. B

Tecnica per eseguire molte operazioni in parallelo, in
modo efficiente e non-blocking
Monad

Rappresenta un contesto di esecuzione (framework)

Stile dichiarativo
 Immutabile




Componibili
Implementano meccanismi per la gestione delle eccezioni

// With Futures (asynchronous world)
val session = socialNetwork.createSessionFor("user", credentials)
// Create placeholder for computation
val f: Future[List[Friend]] = future { session.getFriends() }
// Statements here can be executed, while waiting for results :)
scala
Ingegneria del software mod. B
Amplificatore di tipi, computation builder
Può racchiudere un tipo
public class Monad<T> {
public Monad(T t) { /* ... */ }
}
Rappresenta un segnaposto per un risultato futuro

Riccardo Cardin
FUTURES E PROMISES
Futures

10
È componibile con altre monadi
 Composizione (a.k.a. concatenazione) sul tipo racchiuso
public class Monad<T> {
public abstract <V> Monad<V> bind(Function<T, Monad<V>> f);
}

11
Riccardo Cardin

Spesso permette di estrarre il valore del tipo contenuto
Caratteristica dei linguaggi funzionali
Ingegneria del software mod. B
12
Riccardo Cardin
FUTURES E PROMISES

FUTURES E PROMISES
Monad

È simile ad una bolla!

Può racciudere
qualcosa
Future[T] è una monad

È qualcosa di
non concreto


Può
evaporare,
lasciando
libero il
contenuto
13
Può ricevere istruzioni su
cosa fare del contenuto
Riccardo Cardin
FUTURES E PROMISES
Promise[T]

Non più callback hell!!!
 Si riporta il modello asincrono al modello sincrono
 Se un future nella catena fallisce, fallisce l’intera catena

15
scala
Riccardo Cardin
14
scala
Riccardo Cardin
Valore che rappresenta
l’elaborazione
Single-assigment container (monad) / proxy


Ingegneria del software mod. B
Ingegneria del software mod. B

Composizione funzionale di più elaborazioni
// First future execution
val rateQuote = future {
connection.getCurrentValue(USD)
}
// Bind second execution to first
val purchase = rateQuote map { quote =>
if (isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
// Get final result
purchase onSuccess {
case _ => println("Purchased " + amount + " USD")
}
import scala.util.{Success, Failure}
// Asynchronous execution inside future context
val f: Future[List[String]] = future {
session.getRecentPosts
}
posts è il risulato
// Do something else...
f onComplete {
dell’elaborazione
// onSuccess
case Success(posts) => for (post <- posts) println(post)
// onFailure
case Failure(t) => println("An error has occured: " +
t.getMessage)
}
FUTURES E PROMISES
Future[T] è una monad

Successfully completed / Failed with an exception
Eventual execution: nessun vincolo temporale
t è un’eccezione
Ingegneria del software mod. B

Racchiudono l’elaborazione da eseguire in modo
asincrono (callback)
Un Future legge il risultato di un’elaborazione asincrona
Una Promise scrive (completa) un Future
scala
promise[T]
val p =
// Future that is completed by the promise
val f = p.future
val consumer = future {
startDoingSomething()
f onSuccess {
case r => doSomethingWithResult()
}
}
val producer = future {
val r = produceSomething()
p success r // Promise completes the Future
// It should be ‘p failure exception’ also
continueDoingSomethingUnrelated()
}
Ingegneria del software mod. B
r
produceSomething
startDoingSomething
doSomethingWithResult
16
Riccardo Cardin
FUTURES E PROMISES

FUTURES E PROMISES
Javascript promises (Q library)


Promise e future sono fusi nelle medesime strutture
A Future that may be explicitly completed (setting its value
and status), and may include dependent functions and actions
that trigger upon its completion.
promiseMeSomething()
// value is the return value of promiseMeSomething
.then(function (value) {
// Do something as soon as promiseMeSomething finishes
},
// Something went wrong, reason is an exception
then ritona una
function (reason) {
promise, quindi è
});

Promise stile Scala  Q Deferreds
Ingegneria del software mod. B
Javadoc

di
completare una promise.
concatenabile

17
Riccardo Cardin
REACTIVE EXTENSIONS (RX)

di Scala
Ingegneria del software mod. B
Observables (RxJava)
Iterable (pull)
Observable (push)
T next()
onNext(T)
Iterable<T> getData()
Discover error
throws Exception
onError(Exception)
Observable<T> getData
Complete
returns
onCompleted()
T getData()
Asynchronous
Future<T> getData()
Estensione del design pattern Observer
Utilizzo di operatori di composizione (monads!)
public static void hello(String... names) {
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello " + s + "!");
}
});
}
// hello("Ben", "George");  Hello Ben! Hello George!
18
Riccardo Cardin
Retrieve data
Synchronous
Ingegneria del software mod. B
thenApply è simile a map
Event
Multiple items

CompletableFuture.get()
// f1 is a promise that will completed with an Integer value
CompletableFuture<Double> f3 =
f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI).
exceptionally(ex -> "We have a problem: " + ex.getMessage());

Observable sequences
Single items

Composizione (future stile Scala)
Effettua l’override del
metodo
REACTIVE EXTENSIONS (RX)
Gestione sequenze di valori in modo asincrono

Creazione (usando le lambda extension)
final CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
//...long running...
return "42";
}, executor);
concatenabile
var deferred = Q.defer();
FS.readFile("foo.txt", "utf-8", function (error, text) {
if (error) { deferred.reject(new Error(error)); }
else { deferred.resolve(text); }
});
Deferred permette
return deferred.promise;
Java 8 promises: CompletableFuture<T>
19
Riccardo Cardin
Aggiunta al
pattern GoF
getVideos().subscribe(new Observer<Video>() {
def void onNext(Video video) { // Invoked on next element
println(“Video: ” + video.videoId)
}
def void onError(Exception e) { // Invoked on error
e.printStackTrace()
}
def void onCompleted() { // Invoked when stream finishes to emit
println(“Completed”)
}
}
Ingegneria del software mod. B
20
Riccardo Cardin
REACTIVE EXTENSIONS (RX)

REACTIVE EXTENSIONS (RX)
Observables (RxJava)


Composizione Observables
Monads  Componibili

Mantengono la proprietà di asincronia originaria
21
Ingegneria del software mod. B
Riccardo Cardin
ACTOR MODEL
22
Ingegneria del software mod. B
ACTOR MODEL
Each actor is a form of reactive object, executing some
computation in response to a message and sending out a
reply when the computation is done

Messaggi (task)

Interfaccia di comunicazione dell’attore

Elaborazione dei messaggi uno per volta
John C. Mitchell



Elaborazioni solo in risposta a stimoli esterni



Reactive

Riccardo Cardin

Dormiente / attivo
Non c’è un concetto esplicito di thread
Invio di un messaggio async a se stesso o altro attore
 Creazione nuovi attori
 Modifica del proprio comportamento
Coda di messaggi (mail box)
Nessuna garanzia sull’ordine di arrivo

Mail system

Composti di tre parti

Tre azioni fondamentali
Variare nel tempo (comportamento)
Ogni attore ha associato un mail address


Nessuno stato interno (...in teoria...)  Immutabili
Ingegneria del software mod. B
tag
23
Riccardo Cardin
target
operazione
Ingegneria del software mod. B
data
email address del
receiver
24
Riccardo Cardin
ACTOR MODEL

ACTOR MODEL
Esempio


Ad ogni cambio di stato viene creato un nuovo attore


State change: no race conditions
Behavioural change
Akka

Toolkit e runtime che implementano attori su JVM
Get_min
Min|1
Insert|2
A1:[1, 2, 4, 7]
A1:[1, 4, 7]
A1:[2, 4, 7]
25
26
(c) http://akka.io
Ingegneria del software mod. B
Riccardo Cardin
AKKA
Ingegneria del software mod. B
AKKA
scala
type Receive = PartialFunction[Any, Unit]
trait Actor {
def receive: Receive // Actor actual behaviour
implicit val self: ActorRef // Reference to itself
def sender: ActorRef // Reference to the sender of last message
implicit val context: ActorContext // Execution context
}
abstract class ActorRef {
// Send primitives
def !(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit
def tell(msg: Any, sender: ActorRef) = this.!(msg)(sender)
// ...
}
trait ActorContext {
// Change behaviour of an Actor
def become(behavior: Receive, discardOld: Boolean = true): Unit
def unbecome(): Unit
// Create a new Actor
def actorOf(p: Props, name: String): ActorRef
def stop(a: ActorRef): Unit // Stop an Actor
// ...
}
Ingegneria del software mod. B
Riccardo Cardin

Esempio

Implementazione di un contatore con gli attori
class Counter extends Actor {
// State == explicit behaviour
def counter(n: Int): Receive = {
// Receive two types of messages: ‘incr’ and ‘get’
// ‘incr’ change actor’s behaviour
case ”incr” => context.become(counter(n + 1))
// ‘get’ returns current counter value to sender
case ”get” => sender ! n
}
def receive = counter(0) // Default behaviour
}
a!incr
27
Riccardo Cardin
a!incr
a:[0]
Ingegneria del software mod. B
a!get
a:[1]
È possibile
modellare
anche
stati
interni
scala
sender!2
a:[2]
28
Riccardo Cardin
AKKA

Resilienza

Error flow
sup
sub1
AKKA
sub2
sub1.1

Supervisor
Actor lifecycle
Contenimento e riposte automatiche agli errori
An actor
Gli attori in errore vengono terminati o riavviati
La decisione è presa da un attore supervisore
 Gli attori con supervisione formano una struttura ad albero
 Il supervisore crea i suoi subordinati

new Actor
preStart

class Manager extends Actor {
// OneForOneStrategy restarts only actor which died
override val supervisorStrategy = OneForOneStrategy() {
case _: DBException => Restart // reconnect to DB
case _: ActorKilledException => Stop
case _: ServiceDownException => Escalate
}
// ...
context.actorOf(Props[DBActor], ”db”)
// ...
}
Start
Le fasi di riavvio
possono essere
molteplici
fail
Restart
preStart
new Actor
ActorRef rimane
valido tra un
riavvio e l’altro
stop
Stop
29
30
postStop
Ingegneria del software mod. B
Riccardo Cardin
BIBLIOGRAFIA







Riccardo Cardin
BIBLIOGRAFIA
The Reactive Manifesto http://www.reactivemanifesto.org/
Promises in Node.js with Q – An Alternative to Callbacks
http://strongloop.com/strongblog/promises-in-node-js-withq-an-alternative-to-callbacks/
The Reactive Extensions (Rx) http://msdn.microsoft.com/enus/data/gg577609.aspx
Futures and Promises http://docs.scalalang.org/overviews/core/futures.html
Java concurrency (multi-threading)
http://www.vogella.com/tutorials/JavaConcurrency/article.h
tml#futures
Java 8: Definitive guide to CompletableFuture
http://nurkiewicz.blogspot.it/2013/05/java-8-definitiveguide-to.html
Taming asynchronous programming
http://eamodeorubio.github.io/tamingasync/#/
Ingegneria del software mod. B
Ingegneria del software mod. B







31
Riccardo Cardin

Monadic futures in Java 8
http://zeroturnaround.com/rebellabs/monadic-futures-injava8/
Monads http://ericlippert.com/category/monads/
Callback Hell http://callbackhell.com/
CompletableFuture
http://www.slideshare.net/kojilin/completable-future
RxJava Wiki https://github.com/Netflix/RxJava/wiki
Functional Reactive Programming with RxJava
https://speakerdeck.com/benjchristensen/functionalreactive-programming-with-rxjava-javaone-2013
Principle of Reactive Programming
https://class.coursera.org/reactive-001
Actors
http://doc.akka.io/docs/akka/snapshot/scala/actors.html
Ingegneria del software mod. B
32
Riccardo Cardin