Distributed Data Stream Processing

Distributed Data Stream Processing
Sistemi Distribuiti e Cloud Computing
A.A. 2014/15
Matteo Nardelli
Matteo Nardelli
Big Data
Source: http://www.intel.it/content/www/it/it/communications/internet-minute-infographic.html
Matteo Nardelli
2
Big Data
Ogni anno prodotti circa 1200 EXABYTE (10^18, 2^60) di dati
Source: http://www.domo.com/learn/infographic-the-physical-size-of-big-data
Matteo Nardelli
3
Big Data
IBM [1]
ogni giorno vengono creati circa 2,5 trilioni (1018) di byte di dati ed
il 90% dei dati è stato creato solo negli ultimi due anni.
Big Data: indica collettivamente insiemi di dati caratterizzati da (4V):
• Volumi: elevati (terabyte, petabyte);
• Variabilità di rappresentazione: dati strutturati (database), non strutturati
(testi, video, immagini, audio);
• Velocità: dati in movimento, velocità di generazione e di analisi:
utilità dell’informazione estraibile degrada rapidamente con il passare del tempo;
• Veracity (veridicità): l’integrità e l’affidabilità dei dati:
potervi fare affidamento per le operazioni di decision making.
Matteo Nardelli
4
Interesse
• Produzione energetica:
analisi consumi e produzione rinnovabili per allocazione più efficiente;
• Finanziario:
evoluzione/previsione in tempo reale di quote azionarie;
• Medicina:
telemedicina, studiare e prevedere la diffusione epidemica (Google Flu);
• Sicurezza:
uso improprio di reti/sist. pagamento; behavioural pattern recognition;
• Servizi urbani:
dispositivi per info traffico; ottimizzazione trasporti in risposta ad eventi;
e.g., statistiche per taxi: tratte più frequenti, aree più redditizie (DEBS 2015[2]).
Matteo Nardelli
5
Interesse
Sean Patrick Murphy [3]
I believe that “big” actually means important. Scientists have long
known that data could create new knowledge but now the rest of
the world, including government and management in particular,
has realized that data can create value.
Elaborare flussi continui di dati, raccolti da sorgenti (e.g., sensori,
web) geograficamente distribuite che li producono con data rate
non predicibili, per ottenere risposte in modo tempestivo
Matteo Nardelli
6
Soluzioni
• Soluzioni derivanti da applicazioni/esigenze diverse
– Finanziario, rilevamento frodi, attacchi a reti, …
• Risultato: termini e soluzioni sovrapponibili (torre di babele)
Una non-soluzione sono i DBMS
o Impossibile memorizzazione tutti i dati
o Processarli solo su richiesta
DSMS (data stream management system): evoluzione dei DBMS.
Prevede:
• processamento continuo delle query (continuous query - CQ)
• query con sintassi SQL-like
• su flussi di dati transienti, non memorizzati.
Matteo Nardelli
7
Soluzioni
DSP (data stream processing):
•
•
è il modello che deriva dalla generalizzazione del DSMS (dati non persistenti,
continuous query)
«processamento di stream provenienti da sorgenti differenti, per produrre
nuovi stream in uscita» (Cugola, Margara[4])
CEP (complex event processing): si sviluppa in parallelo al DSMS, come
evoluzione del publish-subscribe;
• processamento di notifiche di eventi (n.b.: non dati generici)
• provenienti da sorgenti diverse
• per identificare pattern di eventi (o eventi complessi) di interesse
• (in publish-subscribe ogni evento è separato dagli altri)
MapReduce: paradigma per la computazione affidabile, scalabile e distribuita
•
•
dati memorizzazione su file system distribuito (i.e., GFS, HDFS)
paradigma «map» e «reduce» per lavorare su sottoinsiemi di dati
Matteo Nardelli
8
Data stream processing
Matteo Nardelli
9
Differenze con CEP
Tipo di dato:
•
•
CEP: notifiche di eventi
DSP: (teoricamente) qualsiasi
Tempo associato al dato ed ordinamento dei dati:
• CEP: molto importanti/essenziali
• DSP: non necessariamente considerati
Tipologia di linguaggi per definire applicazioni:
•
CEP: pattern-based language per specificare le firing condition e le azioni
da intraprendere («if this then that»).
•
DSP: (opzionale) regole di trasformazione (filtraggio, join, aggregazioni)
per processare gli stream in ingresso e produrre stream in uscita.
Matteo Nardelli
10
Differenze con Hadoop (MapReduce)
Diverse estensioni consentono di usare Hadoop:
• Per interrogare il dataset con un approccio SQL-like (Apache Hive)
• Per interrogare il dataset con linguaggio procedurale (Apache Pig)
• Produrre approssimazioni successive dei risultati e possibilità di fare CQ
(Hadoop Online[5])
Differenze sostanziali:
• Persistenza:
Hadoop necessita della memorizzazione dei dati
• Batching:
Anche negli approcci per il CQ (Hadoop Online) si lavora
considerando piccoli batch successivi da analizzare; questo
introduce un ritardo proporzionale alla dimensione del batch.
Matteo Nardelli
11
Panoramica
Matteo Nardelli
12
Panoramica: definizione delle applicazioni
Linguaggi formali: maggiore rigorosità, ma alta espressività
•
•
Dichiarativi:
Imperativi:
specificare il risultato (SQL-like); e.g., TESLA, IBM SPL;
specificare la composizione degli operatori primitivi;
e.g., SQuAl (Stream Query Algebra) di Aurora/Borealis;
Descrizione topologica: maggiore flessibilità
definizione esplicita dei componenti (built-in o definiti dall’utente) e delle
interconnessioni per mezzo di un grafo diretto aciclico, chiamato «topologia»
•
nodi = operatori dell'applicazione; archi = stream scambiato tra gli operatori
Matteo Nardelli
13
Panoramica: infrastruttura
Dove sono le risorse computazionali?
Cluster dedicato
•
•
nodi omogenei, «vicini» ed in numero staticamente definito
scelta tradizionale
Cloud
•
•
•
l’allocazione dinamica  migliore assorbimento delle fluttuazioni
nell'arrivo dei dati
nodi geograficamente distribuiti
nuovo interesse per il DSP, ma anche nuove problematiche
– i.e., scala, attenzione per la latenza tra i nodi, SLA
Soluzioni ibride
• insieme statico di nodi, estendibili con risorse on-demand nel cloud
• trade-off tra prestazioni, bilanciamento del carico e costi
Matteo Nardelli
14
Panoramica: scheduling
Scheduler: componente dei sistemi di DSP che assegna gli operatori delle
applicazioni da eseguire alle risorse computazionali a disposizione
Componente critico, influenza fortemente le performance del sistema e delle
applicazioni eseguite. Diverse soluzioni:
• Algoritmo centralizzato vs algoritmo distribuito
– Conoscenza intera rete, problemi di scalabilità
•
Metriche da ottimizzare
– Latenza, utilizzo della rete, importanza degli operatori, risorse
•
Capacità adattativa
•
Capacità di ottimizzare il grafo applicativo
– generalmente con definizione applicazioni con linguaggi formali
– e.g., merging, splitting e riordinamento degli operatori, load shedding
Matteo Nardelli
15
Applicazioni: caratteristiche principali
Le applicazioni hanno le seguenti caratteristiche (elenco non esaustivo):
– sorgenti sono emittenti di un flusso continuo (stream) dei dati;
– lo stream non viene memorizzato;
– lo stream viene riversato su un insieme di operatori (evt. distribuiti);
– gli operatori sono progettati per lavorare in parallelo;
– gli operatori interagiscono solo per mezzo degli stream;
– gli operatori eseguono delle funzioni ben precise, il cui risultato dipende
(generalmente) dal solo flusso in ingresso;
– gli operatori possono, a loro volta, generare un nuovo stream.
Matteo Nardelli
16
Tecniche avanzate
Per processare grossi flussi di dati, devo aumentare il grado di
parallelismo. Si distinguono:
• Pipeline: istruzione complessa suddivisa in una sequenza di passi
• Task parallelism: stesso dato utilizzato da diversi operatori (successori
nel grafo) in parallelo
• Data parallelism: diversi dati appartenenti allo stesso flusso sono
processati da diverse repliche dello stesso operatore
Tecniche di ottimizzazione
• Sharding
• Load Shedding
Matteo Nardelli
17
Tecniche avanzate: sharding
Stato degli operatori
• stateful se mantengono uno stato relativo allo stream;
• stateless altrimenti.
Rilassamento dello stato per aumentare il parallelismo di un operatore:
• partitioned stateful: considero stati indipendenti per sottoinsiemi non
sovrapponibili di flussi informativi (e.g., prezzo medio degli acquisti per utente)
Sharding: partizionamento orizzontale dei dati; dati con caratteristiche simili
sono indirizzate sempre alla stessa replica
•
Come determinare la replica di competenza?
Soluzioni hash-based calcolate su sottoinsieme di attributi dei dati
Matteo Nardelli
18
Tecniche avanzate: load shedding
Load shedding: sacrifica l’accuratezza dei risultati se il sistema è sovraccarico
Decisioni fondamentali:
Come scartare:
random, probabilistico, priority-based, tecniche avanzate
• Quando scartare il traffico:
comportamento proattivo o reattivo
• Dove scartare il traffico:
ridurre il carico vicino alla sorgente fa sprecare meno lavoro,
ma penalizza un numero maggiore di applicazioni
• Quanto scartare:
dipende dalla politica di shedding adottata
(e.g., fino a soddisfacimento soglia, percentuale, numero di classi)
•
Matteo Nardelli
19
Framework per il DSP
Amazon Kinesis
Apache Storm
Matteo Nardelli
20
Amazon Kinesis
Elaborazione real-time di streaming data su larga scala; definisce:
• Stream: sequenza di record
– Record: {sequence, partition-key, blob}; blob max size 50Kb
•
Shard: numero di “nodi” su cui suddividere lo stream; questi sono
determinati in base al datarate in ingresso ed in uscita desiderati
Come funziona?
•
Producers: (sorgenti esterne) generano i record, li immettono con HTTP PUT
Matteo Nardelli
21
Amazon Kinesis
•
Consumers: le applicazioni (generalmente su EC2) che processano ogni
record dello stream
- è possibile avere diverse applicazioni che consumano in modo
indipendente e concorrente
- l’output può essere: un altro Kinesis, EC2, DynamoDB, S3, altro
Vantaggi
• Kinesis gestisce automaticamente l’infrastruttura, lo storage e la
configurazione necessaria per il recupero e l’elaborazione dei dati
– Infrastruttura: load balancing, coordinamento tra i servizi distribuiti,
fault tolerance
– Storage: dati memorizzati (e replicati) in diverse Availability Zone della
stessa regione per 24 ore, periodo in cui sono disponibili
Limitazioni
•
•
Una sola regione: US East
10 shard per regione
Matteo Nardelli
22
Apache Storm
Framework distribuito, scalabile, fault-tolerant per il DSP
Applicazione (o topologia): componenti
• spout: sorgente delle tuple
• bolt: componente che elabora le tuple;
•
può generarne di nuove
stream: sequenza non limitata di tuple
(tupla: insieme di coppie chiave/valore)
spout
bolt
Poiché un bolt può essere replicato è possibile indicare:
• fieldGrouping: i campi per il partizionamento dello stream
• shuffleGrouping: non siamo interessati allo stato
Matteo Nardelli
23
Apache Storm: architettura
ZooKeeper (shared memory) scambio configurazione e sincronizzazione
Nimbus (nodo master)
•
•
Scheduling (distribuzione per l’esecuzione) delle applicazioni (topologie)
Monitoring applicazioni: riassegnamento in caso di fallimento
Worker Node
• Supervisor avvia e termina i worker process in base alle indicazioni di Nimbus
• Worker Process esegue (parte del) codice della topologia
Nimbus
ZooKeeper
Matteo Nardelli
WN
Supervisor
WN
Supervisor
WN
Supervisor
WP
WP
WP
24
Apache Storm: estensione
• Storm esegue all’interno di una infrastruttura locale
• Vogliamo aumentarne il grado di distribuzione
dell’infrastruttura
– latenza di rete diventa un fattore non trascurabile
– abilitare l’utilizzo di risorse presenti nel Cloud (ma non solo),
sfruttandone le politiche di provisioning
– avvicinare la computazione agli utenti finali o alle sorgenti
Caratteristiche dell’estensione
• Scheduling distribuito: fornire ad ogni nodo la capacità di gestire le
applicazioni che esegue
• Scheduling location-aware: eseguire l’applicazione in modo attento alle
distanze di rete tra i suoi elementi
• Capacità di adattamento: rispondere ai cambiamenti rete/applicazione
Matteo Nardelli
25
Apache Storm: estensione
Initial Scheduler (centralizzato)
assegna le applicazioni in modo
location-aware
Continuous Scheduler (distribuito)
riposiziona i componenti
dell’applicazione se migliorano
l’utilizzo di rete (banda x latenza)
Network Space Manager (distribuito)
fornisce ai nodi la capacità di stimare in
modo efficiente le distanze di rete,
senza interrogazione diretta (alg. Vivaldi)
Worker Monitor (distribuito)
registra il traffico scambiato tra i
componenti dell’applicazione
Matteo Nardelli
26
Apache Storm: un caso d’uso
WordCounter (esempio in storm-starter[6]):
• Avendo sorgenti che emettono continuamente frasi, vogliamo
contare le occorrenze di ogni parola
A cosa serve?
In modo simile vengono
individuati i trend su twitter[7]
Matteo Nardelli
27
Apache Storm: un caso d’uso
Topologia
Classe Java standard: main
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("spout", new RandomSentenceSpout(), 5);
tb.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
tb.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
StormSubmitter.submitTopology("word-count", new Config(), tb.createTopology());
Partizionamento Stream
Parallelismo componenti
Matteo Nardelli
API Storm
28
Apache Storm: un caso d’uso
RandomSentenceSpout (extends BaseRichSpout )
public void nextTuple() {
Utils.sleep(100);
collector.emit(new Values(getRandomSentence()));
}
public void declareOutputFields(OutputFieldsDeclarer d) {
d.declare(new Fields("sentence"));
}
API Storm
Dichiarazione stream
WordCount (extends BaseBasicBolt)
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getStringByField("word");
Integer count = updateWordCountHashMap(word);
collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer d) {
d.declare(new Fields("word", "count"));
}
Matteo Nardelli
Nuova tupla in uscita
29
Riferimenti
•
•
•
•
•
•
•
[1] IBM. What is big data? 2014. url: http://www-01.ibm.com/software/data/bigdata/what-is-bigdata.html.
[2] DEBS 2015: Grand Challenge. url: http://www.debs2015.org/call-grand-challenge.html
[3] Sean Murphy, A new definition for Big Data. url:
http://www.datacommunitydc.org/blog/2013/04/a-new-definition-for-big-data/
[4] A. Margara and G. Cugola. 2011. Processing flows of information: from data stream to complex
event processing. In Proc. of the 5th ACM DEBS '11. ACM.
[5] T.Condie, N.Conway, P.Alvaro et al. 2010. MapReduce online. In Proc. of the 7th USENIX
conference on NSDI'10. USENIX Association, Berkeley, CA, USA.
[6] Nathan Marz - Storm Starter. url: https://github.com/nathanmarz/storm-starter
[7] M.G. Noll. Real-time Treding Topics With a Distributed Rolling Count Algorithm in Storm:
http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm
•
•
•
•
•
Amazon Kinesis. url: http://aws.amazon.com/kinesis/
Apache Hive. url: https://hive.apache.org
Apache Pig. url: http://pig.apache.org
Apache Storm. url: https://storm.apache.org
N. Tatbul, U. Çetintemel, S. Zdonik et al. 2003. Load shedding in a data stream manager. In Proc of
the 29th international conference VLDB '03. VLDB Endowment 309-320.
Matteo Nardelli
30