Distributed Data Stream Processing Sistemi Distribuiti e Cloud Computing A.A. 2014/15 Matteo Nardelli Matteo Nardelli Big Data Source: http://www.intel.it/content/www/it/it/communications/internet-minute-infographic.html Matteo Nardelli 2 Big Data Ogni anno prodotti circa 1200 EXABYTE (10^18, 2^60) di dati Source: http://www.domo.com/learn/infographic-the-physical-size-of-big-data Matteo Nardelli 3 Big Data IBM [1] ogni giorno vengono creati circa 2,5 trilioni (1018) di byte di dati ed il 90% dei dati è stato creato solo negli ultimi due anni. Big Data: indica collettivamente insiemi di dati caratterizzati da (4V): • Volumi: elevati (terabyte, petabyte); • Variabilità di rappresentazione: dati strutturati (database), non strutturati (testi, video, immagini, audio); • Velocità: dati in movimento, velocità di generazione e di analisi: utilità dell’informazione estraibile degrada rapidamente con il passare del tempo; • Veracity (veridicità): l’integrità e l’affidabilità dei dati: potervi fare affidamento per le operazioni di decision making. Matteo Nardelli 4 Interesse • Produzione energetica: analisi consumi e produzione rinnovabili per allocazione più efficiente; • Finanziario: evoluzione/previsione in tempo reale di quote azionarie; • Medicina: telemedicina, studiare e prevedere la diffusione epidemica (Google Flu); • Sicurezza: uso improprio di reti/sist. pagamento; behavioural pattern recognition; • Servizi urbani: dispositivi per info traffico; ottimizzazione trasporti in risposta ad eventi; e.g., statistiche per taxi: tratte più frequenti, aree più redditizie (DEBS 2015[2]). Matteo Nardelli 5 Interesse Sean Patrick Murphy [3] I believe that “big” actually means important. Scientists have long known that data could create new knowledge but now the rest of the world, including government and management in particular, has realized that data can create value. Elaborare flussi continui di dati, raccolti da sorgenti (e.g., sensori, web) geograficamente distribuite che li producono con data rate non predicibili, per ottenere risposte in modo tempestivo Matteo Nardelli 6 Soluzioni • Soluzioni derivanti da applicazioni/esigenze diverse – Finanziario, rilevamento frodi, attacchi a reti, … • Risultato: termini e soluzioni sovrapponibili (torre di babele) Una non-soluzione sono i DBMS o Impossibile memorizzazione tutti i dati o Processarli solo su richiesta DSMS (data stream management system): evoluzione dei DBMS. Prevede: • processamento continuo delle query (continuous query - CQ) • query con sintassi SQL-like • su flussi di dati transienti, non memorizzati. Matteo Nardelli 7 Soluzioni DSP (data stream processing): • • è il modello che deriva dalla generalizzazione del DSMS (dati non persistenti, continuous query) «processamento di stream provenienti da sorgenti differenti, per produrre nuovi stream in uscita» (Cugola, Margara[4]) CEP (complex event processing): si sviluppa in parallelo al DSMS, come evoluzione del publish-subscribe; • processamento di notifiche di eventi (n.b.: non dati generici) • provenienti da sorgenti diverse • per identificare pattern di eventi (o eventi complessi) di interesse • (in publish-subscribe ogni evento è separato dagli altri) MapReduce: paradigma per la computazione affidabile, scalabile e distribuita • • dati memorizzazione su file system distribuito (i.e., GFS, HDFS) paradigma «map» e «reduce» per lavorare su sottoinsiemi di dati Matteo Nardelli 8 Data stream processing Matteo Nardelli 9 Differenze con CEP Tipo di dato: • • CEP: notifiche di eventi DSP: (teoricamente) qualsiasi Tempo associato al dato ed ordinamento dei dati: • CEP: molto importanti/essenziali • DSP: non necessariamente considerati Tipologia di linguaggi per definire applicazioni: • CEP: pattern-based language per specificare le firing condition e le azioni da intraprendere («if this then that»). • DSP: (opzionale) regole di trasformazione (filtraggio, join, aggregazioni) per processare gli stream in ingresso e produrre stream in uscita. Matteo Nardelli 10 Differenze con Hadoop (MapReduce) Diverse estensioni consentono di usare Hadoop: • Per interrogare il dataset con un approccio SQL-like (Apache Hive) • Per interrogare il dataset con linguaggio procedurale (Apache Pig) • Produrre approssimazioni successive dei risultati e possibilità di fare CQ (Hadoop Online[5]) Differenze sostanziali: • Persistenza: Hadoop necessita della memorizzazione dei dati • Batching: Anche negli approcci per il CQ (Hadoop Online) si lavora considerando piccoli batch successivi da analizzare; questo introduce un ritardo proporzionale alla dimensione del batch. Matteo Nardelli 11 Panoramica Matteo Nardelli 12 Panoramica: definizione delle applicazioni Linguaggi formali: maggiore rigorosità, ma alta espressività • • Dichiarativi: Imperativi: specificare il risultato (SQL-like); e.g., TESLA, IBM SPL; specificare la composizione degli operatori primitivi; e.g., SQuAl (Stream Query Algebra) di Aurora/Borealis; Descrizione topologica: maggiore flessibilità definizione esplicita dei componenti (built-in o definiti dall’utente) e delle interconnessioni per mezzo di un grafo diretto aciclico, chiamato «topologia» • nodi = operatori dell'applicazione; archi = stream scambiato tra gli operatori Matteo Nardelli 13 Panoramica: infrastruttura Dove sono le risorse computazionali? Cluster dedicato • • nodi omogenei, «vicini» ed in numero staticamente definito scelta tradizionale Cloud • • • l’allocazione dinamica migliore assorbimento delle fluttuazioni nell'arrivo dei dati nodi geograficamente distribuiti nuovo interesse per il DSP, ma anche nuove problematiche – i.e., scala, attenzione per la latenza tra i nodi, SLA Soluzioni ibride • insieme statico di nodi, estendibili con risorse on-demand nel cloud • trade-off tra prestazioni, bilanciamento del carico e costi Matteo Nardelli 14 Panoramica: scheduling Scheduler: componente dei sistemi di DSP che assegna gli operatori delle applicazioni da eseguire alle risorse computazionali a disposizione Componente critico, influenza fortemente le performance del sistema e delle applicazioni eseguite. Diverse soluzioni: • Algoritmo centralizzato vs algoritmo distribuito – Conoscenza intera rete, problemi di scalabilità • Metriche da ottimizzare – Latenza, utilizzo della rete, importanza degli operatori, risorse • Capacità adattativa • Capacità di ottimizzare il grafo applicativo – generalmente con definizione applicazioni con linguaggi formali – e.g., merging, splitting e riordinamento degli operatori, load shedding Matteo Nardelli 15 Applicazioni: caratteristiche principali Le applicazioni hanno le seguenti caratteristiche (elenco non esaustivo): – sorgenti sono emittenti di un flusso continuo (stream) dei dati; – lo stream non viene memorizzato; – lo stream viene riversato su un insieme di operatori (evt. distribuiti); – gli operatori sono progettati per lavorare in parallelo; – gli operatori interagiscono solo per mezzo degli stream; – gli operatori eseguono delle funzioni ben precise, il cui risultato dipende (generalmente) dal solo flusso in ingresso; – gli operatori possono, a loro volta, generare un nuovo stream. Matteo Nardelli 16 Tecniche avanzate Per processare grossi flussi di dati, devo aumentare il grado di parallelismo. Si distinguono: • Pipeline: istruzione complessa suddivisa in una sequenza di passi • Task parallelism: stesso dato utilizzato da diversi operatori (successori nel grafo) in parallelo • Data parallelism: diversi dati appartenenti allo stesso flusso sono processati da diverse repliche dello stesso operatore Tecniche di ottimizzazione • Sharding • Load Shedding Matteo Nardelli 17 Tecniche avanzate: sharding Stato degli operatori • stateful se mantengono uno stato relativo allo stream; • stateless altrimenti. Rilassamento dello stato per aumentare il parallelismo di un operatore: • partitioned stateful: considero stati indipendenti per sottoinsiemi non sovrapponibili di flussi informativi (e.g., prezzo medio degli acquisti per utente) Sharding: partizionamento orizzontale dei dati; dati con caratteristiche simili sono indirizzate sempre alla stessa replica • Come determinare la replica di competenza? Soluzioni hash-based calcolate su sottoinsieme di attributi dei dati Matteo Nardelli 18 Tecniche avanzate: load shedding Load shedding: sacrifica l’accuratezza dei risultati se il sistema è sovraccarico Decisioni fondamentali: Come scartare: random, probabilistico, priority-based, tecniche avanzate • Quando scartare il traffico: comportamento proattivo o reattivo • Dove scartare il traffico: ridurre il carico vicino alla sorgente fa sprecare meno lavoro, ma penalizza un numero maggiore di applicazioni • Quanto scartare: dipende dalla politica di shedding adottata (e.g., fino a soddisfacimento soglia, percentuale, numero di classi) • Matteo Nardelli 19 Framework per il DSP Amazon Kinesis Apache Storm Matteo Nardelli 20 Amazon Kinesis Elaborazione real-time di streaming data su larga scala; definisce: • Stream: sequenza di record – Record: {sequence, partition-key, blob}; blob max size 50Kb • Shard: numero di “nodi” su cui suddividere lo stream; questi sono determinati in base al datarate in ingresso ed in uscita desiderati Come funziona? • Producers: (sorgenti esterne) generano i record, li immettono con HTTP PUT Matteo Nardelli 21 Amazon Kinesis • Consumers: le applicazioni (generalmente su EC2) che processano ogni record dello stream - è possibile avere diverse applicazioni che consumano in modo indipendente e concorrente - l’output può essere: un altro Kinesis, EC2, DynamoDB, S3, altro Vantaggi • Kinesis gestisce automaticamente l’infrastruttura, lo storage e la configurazione necessaria per il recupero e l’elaborazione dei dati – Infrastruttura: load balancing, coordinamento tra i servizi distribuiti, fault tolerance – Storage: dati memorizzati (e replicati) in diverse Availability Zone della stessa regione per 24 ore, periodo in cui sono disponibili Limitazioni • • Una sola regione: US East 10 shard per regione Matteo Nardelli 22 Apache Storm Framework distribuito, scalabile, fault-tolerant per il DSP Applicazione (o topologia): componenti • spout: sorgente delle tuple • bolt: componente che elabora le tuple; • può generarne di nuove stream: sequenza non limitata di tuple (tupla: insieme di coppie chiave/valore) spout bolt Poiché un bolt può essere replicato è possibile indicare: • fieldGrouping: i campi per il partizionamento dello stream • shuffleGrouping: non siamo interessati allo stato Matteo Nardelli 23 Apache Storm: architettura ZooKeeper (shared memory) scambio configurazione e sincronizzazione Nimbus (nodo master) • • Scheduling (distribuzione per l’esecuzione) delle applicazioni (topologie) Monitoring applicazioni: riassegnamento in caso di fallimento Worker Node • Supervisor avvia e termina i worker process in base alle indicazioni di Nimbus • Worker Process esegue (parte del) codice della topologia Nimbus ZooKeeper Matteo Nardelli WN Supervisor WN Supervisor WN Supervisor WP WP WP 24 Apache Storm: estensione • Storm esegue all’interno di una infrastruttura locale • Vogliamo aumentarne il grado di distribuzione dell’infrastruttura – latenza di rete diventa un fattore non trascurabile – abilitare l’utilizzo di risorse presenti nel Cloud (ma non solo), sfruttandone le politiche di provisioning – avvicinare la computazione agli utenti finali o alle sorgenti Caratteristiche dell’estensione • Scheduling distribuito: fornire ad ogni nodo la capacità di gestire le applicazioni che esegue • Scheduling location-aware: eseguire l’applicazione in modo attento alle distanze di rete tra i suoi elementi • Capacità di adattamento: rispondere ai cambiamenti rete/applicazione Matteo Nardelli 25 Apache Storm: estensione Initial Scheduler (centralizzato) assegna le applicazioni in modo location-aware Continuous Scheduler (distribuito) riposiziona i componenti dell’applicazione se migliorano l’utilizzo di rete (banda x latenza) Network Space Manager (distribuito) fornisce ai nodi la capacità di stimare in modo efficiente le distanze di rete, senza interrogazione diretta (alg. Vivaldi) Worker Monitor (distribuito) registra il traffico scambiato tra i componenti dell’applicazione Matteo Nardelli 26 Apache Storm: un caso d’uso WordCounter (esempio in storm-starter[6]): • Avendo sorgenti che emettono continuamente frasi, vogliamo contare le occorrenze di ogni parola A cosa serve? In modo simile vengono individuati i trend su twitter[7] Matteo Nardelli 27 Apache Storm: un caso d’uso Topologia Classe Java standard: main TopologyBuilder tb = new TopologyBuilder(); tb.setSpout("spout", new RandomSentenceSpout(), 5); tb.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); tb.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); StormSubmitter.submitTopology("word-count", new Config(), tb.createTopology()); Partizionamento Stream Parallelismo componenti Matteo Nardelli API Storm 28 Apache Storm: un caso d’uso RandomSentenceSpout (extends BaseRichSpout ) public void nextTuple() { Utils.sleep(100); collector.emit(new Values(getRandomSentence())); } public void declareOutputFields(OutputFieldsDeclarer d) { d.declare(new Fields("sentence")); } API Storm Dichiarazione stream WordCount (extends BaseBasicBolt) public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getStringByField("word"); Integer count = updateWordCountHashMap(word); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer d) { d.declare(new Fields("word", "count")); } Matteo Nardelli Nuova tupla in uscita 29 Riferimenti • • • • • • • [1] IBM. What is big data? 2014. url: http://www-01.ibm.com/software/data/bigdata/what-is-bigdata.html. [2] DEBS 2015: Grand Challenge. url: http://www.debs2015.org/call-grand-challenge.html [3] Sean Murphy, A new definition for Big Data. url: http://www.datacommunitydc.org/blog/2013/04/a-new-definition-for-big-data/ [4] A. Margara and G. Cugola. 2011. Processing flows of information: from data stream to complex event processing. In Proc. of the 5th ACM DEBS '11. ACM. [5] T.Condie, N.Conway, P.Alvaro et al. 2010. MapReduce online. In Proc. of the 7th USENIX conference on NSDI'10. USENIX Association, Berkeley, CA, USA. [6] Nathan Marz - Storm Starter. url: https://github.com/nathanmarz/storm-starter [7] M.G. Noll. Real-time Treding Topics With a Distributed Rolling Count Algorithm in Storm: http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm • • • • • Amazon Kinesis. url: http://aws.amazon.com/kinesis/ Apache Hive. url: https://hive.apache.org Apache Pig. url: http://pig.apache.org Apache Storm. url: https://storm.apache.org N. Tatbul, U. Çetintemel, S. Zdonik et al. 2003. Load shedding in a data stream manager. In Proc of the 29th international conference VLDB '03. VLDB Endowment 309-320. Matteo Nardelli 30
© Copyright 2025 ExpyDoc