Stream Processing Plattformen & die Qual der Wahl_ Matthias Niehoff Die Basics 2 Warum Stream Processing?_ 3 Unendliche, kontinuierliche Daten_ •Infinite and continuous data 4 Geschwindigkeit & Real Time_ •And some mind breaking Bulletpoints 1 •And some mind breaking Bulletpoints 2 •And some mind breaking Bulletpoints 3 • Or some great Sub-Bulletpoints 1 • Or some great Sub-Bulletpoints 2 • Or some great Sub-Bulletpoints 3 • And some mind breaking Bulletpoints 4 • And some mind breaking Bulletpoints 5 5 Erst Verarbeiten, dann speichern_ Persistenz Query stream processing Persistenz stream processing stream processing 6 Distributed Stream Processing_ •Unbegrenzter Datenstrom •Kontinuierliche Verarbeitung, Aggregation und Analyse •MapReduce ähnliches Verarbeitungsmodell •In-Memory Verarbeitung •Latenz im Bereich von Millisekunden oder Sekunden •Skalieren durch Verteilen •Häufig modelliert als DAG 7 Eventzeit vs. Verarbeitungszeit_ t in Minuten 1 2 3 4 5 6 7 8 9 Event Verarbeitung • Eventzeit: • Zeitpunkt, an dem das Event aufgetreten ist • Verarbeitungszeit: • Zeitpunkt, an dem das Event vom System beobachtet wurde 8 Eventzeit vs. Verarbeitungszeit_ •Differenz ist nicht nur != 0 •Differenz schwankt stark • Ressourcen bedingt (CPU, Netzwerk,..) • Software bedingt (verteilte Systeme..) • Daten bedingt (Schlüsselverteilung, Varianzen in Daten selbst) • Analyse nach Verarbeitungszeit • einfacher aber ggfs. zu ungenau • Analyse nach Eventzeit • komplexer, dafür genauer 9 State & Window Verarbeitung_ •Nicht triviale Anwendungen benötigen meist einen State • z.b. Aggregationen über einen längeren / unendlichen Zeitraum • (input, state) -> (output, state’) • gespeichert in Memory • interessant im Fehlerfall 10 Windowing & Sliding_ • Window als (zeitlich) begrenzter State • Tumbling Window • Sliding Window • Session Window • Unterschiedliche Trigger • Zeit • Anzahl 11 Tumbling Window_ 12 Sliding Window_ 13 Session Window_ User 1 User 2 Inaktivität Inaktivität Zeit 14 Window Verarbeitung und Zeiten_ •Mit Verarbeitungszeit einfach •Mit Eventzeit schwerer • Vollständigkeit (out of order Events) • Buffering • Strategien bei Eventzeit Windows • Watermarks • Trigger • Akkumulation • Mehr Informationen • https://www.oreilly.com/ideas/the-world-beyond-batch• streaming-101 http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf 15 Die Kandidaten 16 Apache Storm_ •Gestartet 2010 durch BackType/Twitter, Apache seit 2013 •Pionier im Big Data / Stream Bereich •Technologie der Lambda Architektur •Low Level API •Spouts und Bolts beschreiben eine Topologie •Trident: High Level Erweiterung auf Storm Basis • Aggregationen • State & Window Operationen • Join, Merge, Group, -- 17 Apache Spark_ •Open Source (2010) & Apache Projekt (2013) •Einheitliche Batch & Stream Verarbeitung •Breite Akzeptanz •RDD als Basis 18 Apache Samza_ •Entwickelt bei LinkedIn, Open Source 2013 •Verfolgt den Log Ansatz von Kafka •Ausgeführt auf YARN •Geeignet für große States •Erweiterbar über APIs 19 Apache Flink_ •Gestartet 2008 als europäisches Forschungsprojekt •Low Latency Streaming und High Throughput Batch Processing •Flexible States und Windows •Streaming First Ansatz 20 Die Analyse 21 Aspekte von Streaming Anwendungen_ •Runtime •Programming Model •Skalierbarkeit •Latenz •Durchsatz •Resilienz / Delivery Guarantees •Reife •Community 22 Runtime - Native Streaming_ Verarbeitung Empfänger Senke geringe Latenz geringer Durchsatz flexibel Fehlertoleranz komplexer Lastenverteilung komplexer 23 Runtime - Microbatching_ Verarbeitung Senke Empfänger Microbatches hoher Durchsatz höhere Latenz einfacher Fehlertolerant weniger Flexibel (z.B. Windows) State Verarbeitung komplexer 24 Programmiermodell_ Komponentenbasiert •Operatoren und Quellen als Komponenten •Eigene Komponenten •manuelle Topologie Definition Deklarativ •High Level API •Higher Order Functions •Abstrakte Datentypen •Fortgeschrittene Operationen inkludiert •Eingebaute Optimierungen 25 Word Count - Flink_ valenv=StreamExecutionEnvironment.getExecutionEnvironment valtext=env.socketTextStream("localhost",9999) valcounts=text .flatMap(_.toLowerCase.split("\\W+")) .filter(_.nonEmpty) .map(_,1) .groupBy(0) .sum(1) counts.print env.execute("ScalaSocketStreamWordCount") 26 Word Count - Spark_ valsparkConf= newSparkConf().setAppName("StreamingWordCount") valssc=newStreamingContext(sparkConf,Seconds(1)) ssc.checkpoint(".") valmappingFunc=(key:String,value:Option[Int],state: State[Int])=>{ valsum=value.getOrElse(0)+state.getOption.getOrElse(0) valoutput=(key,sum) state.update(sum) output } valwordCountState=StateSpec.function(mappingFunc) 27 Word Count - Spark_ vallines=ssc.socketTextStream(args(0),args(1).toInt) valwords=lines.flatMap(_.split("")) valwordsWithCount=words.map(x=>(x,1)) valstateDstream=wordsWithCount.mapWithState(wordCountState) stateDstream.print() ssc.start() ssc.awaitTermination() 28 Word Count - Storm_ TopologyBuilderbuilder=newTopologyBuilder(); builder.setSpout("spout",newRandomSentenceSpout(),5); builder.setBolt("split",newSplitSentence(),8) .shuffleGrouping(„spout"); builder.setBolt("count",newWordCount(),12) .fieldsGrouping("split",newFields("word")); 29 Word Count - Storm_ Configconf=newConfig(); conf.setMaxTaskParallelism(3); LocalClustercluster=newLocalCluster(); cluster.submitTopology("word-count",conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); 30 Word Count - Storm_ publicstaticclassWordCountextendsBaseBasicBolt{ Map<String,Integer>counts=newHashMap<String,Integer>(); publicvoidexecute(Tupletuple,BasicOutputCollectorcollector) { Stringword=tuple.getString(0); Integercount=counts.get(word); if(count==null) count=0; count++; counts.put(word,count); collector.emit(newValues(word,count)); } publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer) {declarer.declare(newFields("word","count"));} } 31 Word Count - Storm Trident_ Trident TridentTopologytopology=newTridentTopology(); TridentStatewordCounts= topology.newStream("spout1",spout) .each(newFields("sentence"),newSplit(),new Fields("word")) .groupBy(newFields("word")) .persistentAggregate(newMemoryMapState.Factory(),new Count(),newFields("count")) .parallelismHint(6); 32 Word Count - Storm Trident_ Trident publicclassSplitextendsBaseFunction{ publicvoidexecute(TridentTupletuple,TridentCollector collector){ Stringsentence=tuple.getString(0); for(Stringword:sentence.split("")){ collector.emit(newValues(word)); } } } 33 Word Count - Samza_ classWordCountTaskextendsStreamTaskwithInitableTask{ privatevarstore:CountStore=_ definit(config:Config,context:TaskContext){ this.store=context.getStore("wordcount-store") .asInstanceOf[KeyValueStore[String,Integer]] } 34 Word Count - Samza_ overridedefprocess(envelope:IncomingMessageEnvelope, collector:MessageCollector,coordinator:TaskCoordinator){ valwords=envelope.getMessage.asInstanceOf[String].split("") words.foreach{key=> valcount:Integer=Option(store.get(key)).getOrElse(0) store.put(key,count+1) collector.send(newOutgoingMessageEnvelope(new SystemStream("kafka","wordcount"), (key,count))) } } 35 Zustellungsgarantien_ •Maximal mögliche Garantien •Beeinflussen Performance •Nicht in jeder Kombination möglich (abhängig von Quelle) Trident At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once* 36 Latenz & Durchsatz_ •Abhängig von der Runtime •Höhere Latenz --> höherer Durchsatz Flink Custom ~50ms Storm Storm Trident Samza Spark Streaming 500ms 30.000ms 37 Latenz Latenz & Durchsatz_ Throughput https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at 38 Performance_ •Viele Variablen, Unparteiische Tests schwierig •Latenz vs. Durchsatz •Delivery Guarantees •Fehlertoleranz •Tuning •Netzwerk, Daten Lokalität, Serialisierung 39 Skalierbarkeit_ •Skalieren durch Partitionierung • Partitionieren der Daten • Partitionieren des Flows 40 Fault Tolerance_ •Erneute Verarbeitung nicht einfach möglich •Anfang und Ende schwer zu bestimmen •State muss auch gesichert werden •Verschiedene Ansätze • • • • Record Ack Micro Batching Transactional Updates Snapshots 41 Fault Tolerance - Storm_ Ack Ack Ack Ack Ack Ack 42 Fault Tolerance - Spark & Storm Trident_ •Fehlgeschlagene Microbatches werden wiederholt •Batch Acknowledge statt Record Acknowledge • Checkpoints für States 43 Fault Tolerance - Samza_ •Transaktionale Updates auf Transaction Log •Kafka als Transaction Log Kafka Samza partition 0 Kafka partition 1 Checkpoint partition 2 partition 0: offset .. partition 1: offset .. partition 2: offset .. 44 Fault Tolerance - Flink_ •Distributed Checkpoints 45 Event- & Verarbeitungszeit_ •Native Eventzeitverarbeitung nur in Flink • Out-of-order Events • Watermarks • Trigger finalStreamExecutionEnvironmentenv= StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); • Eventzeit als Key in anderen Framework möglich • Keine out-of-order Events 46 Das Ergebnis 47 Überblick_ Trident Runtime Programmiermodell Nativ Microbatching Microbatching Komponentenbasiert Nativ Nativ Deklarativ Komponenten basiert Deklarativ Durchsatz Gering Mittel Hoch Hoch Hoch Latenz Gering Mittel Mittel Gering Gering Garantien At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once* Eventzeit Handling Nein Nein Nein Nein Ja Reife & Community Hoch Hoch Hoch Mittel Mittel 48 Spark wenn..._ •bereits Spark Batch Anwendungen vorhanden sind •viele Umsysteme integriert werden •eine große Community wichtig ist •Scala kein Problem ist •Latenz kein Kriterium ist 49 Storm für ..._ •Sehr niedrige Latenz, niedriges Volumen •At-Least Once Verarbeitung •Zustandslose Verarbeitung •Ggfs. Heron als Alternative 50 Samza wenn ..._ •Kafka ist omnipräsent •Große States •Kein Exactly Once •Kafka Streams als Alternative 51 Und Flink ..._ •Für Eventzeit Verarbeitung •Für pures Streaming •Sehr gute Konzepte •Etwas weniger Umsysteme •Nutzen und Mitarbeit an einem jungen Projekt 52 Ein Satz zu_ •Apache Beam • High Level API für Streaming Runner •Google Cloud Data Flow • Googles Cloud Streaming Framework; Beam Implementierung •Apex • YARN based direct-streaming with checkpointing •Flume • Logfile Streaming insb. in HDFS •Kafka Streams • Streaming integriert in Kafka ab 0.10, einfache Anwendungen • Heron • Storm Nachfolger, API kompatibel, verbesserter Throughput & Latency 53 Questions? Matthias Niehoff, IT-Consultant codecentric AG Zeppelinstraße 2 76185 Karlsruhe, Germany mobil: +49 (0) 172.1702676 [email protected] www.codecentric.de blog.codecentric.de matthiasniehoff 90 Picture Reference_ • Logfile: Linux Screenshots, Flickr • Sensors, IT Network: Wikipedia • Devices: Brad Forst, Flickr • Speed: Rool Paap, Flickr • Graph: Wikipedia • Stateful Processing: data-artisans.com • Window & Sliding Windows, Flink Übersicht, Flink Fault Tolerance: Apache Flink • Storm Topologien: Apche Storm • Spark Übersicht: Apache Spark • Samza Übersicht: Apache Samza • Unendliche Daten: https://i.ytimg.com/vi/9rE3kbGmP4w/maxresdefault.jpg 55
© Copyright 2024 ExpyDoc