val - Java Forum Stuttgart

Stream Processing Plattformen
& die Qual der Wahl_
Matthias Niehoff
Die Basics
2
Warum Stream Processing?_
3
Unendliche, kontinuierliche Daten_
•Infinite and continuous data
4
Geschwindigkeit & Real Time_
•And some mind breaking Bulletpoints 1
•And some mind breaking Bulletpoints 2
•And some mind breaking Bulletpoints 3
• Or some great Sub-Bulletpoints 1
• Or some great Sub-Bulletpoints 2
• Or some great Sub-Bulletpoints 3
• And some mind breaking Bulletpoints 4
• And some mind breaking Bulletpoints 5
5
Erst Verarbeiten, dann speichern_
Persistenz
Query
stream processing
Persistenz
stream processing
stream processing
6
Distributed Stream Processing_
•Unbegrenzter Datenstrom
•Kontinuierliche Verarbeitung, Aggregation und Analyse
•MapReduce ähnliches Verarbeitungsmodell
•In-Memory Verarbeitung
•Latenz im Bereich von Millisekunden oder Sekunden
•Skalieren durch Verteilen
•Häufig modelliert als DAG
7
Eventzeit vs. Verarbeitungszeit_
t in Minuten
1
2
3
4
5
6
7
8
9
Event
Verarbeitung
• Eventzeit:
• Zeitpunkt, an dem das Event aufgetreten ist
• Verarbeitungszeit:
• Zeitpunkt, an dem das Event vom System beobachtet wurde
8
Eventzeit vs. Verarbeitungszeit_
•Differenz ist nicht nur != 0
•Differenz schwankt stark
• Ressourcen bedingt (CPU, Netzwerk,..)
• Software bedingt (verteilte Systeme..)
• Daten bedingt (Schlüsselverteilung, Varianzen in Daten selbst)
• Analyse nach Verarbeitungszeit
• einfacher aber ggfs. zu ungenau
• Analyse nach Eventzeit
• komplexer, dafür genauer
9
State & Window Verarbeitung_
•Nicht triviale Anwendungen benötigen meist einen State
• z.b. Aggregationen über einen längeren / unendlichen Zeitraum
• (input, state) -> (output, state’)
• gespeichert in Memory
• interessant im Fehlerfall
10
Windowing & Sliding_
• Window als (zeitlich) begrenzter State
• Tumbling Window
• Sliding Window
• Session Window
• Unterschiedliche Trigger
• Zeit
• Anzahl
11
Tumbling Window_
12
Sliding Window_
13
Session Window_
User 1
User 2
Inaktivität
Inaktivität
Zeit
14
Window Verarbeitung und Zeiten_
•Mit Verarbeitungszeit einfach
•Mit Eventzeit schwerer
• Vollständigkeit (out of order Events)
• Buffering
• Strategien bei Eventzeit Windows
• Watermarks
• Trigger
• Akkumulation
• Mehr Informationen
• https://www.oreilly.com/ideas/the-world-beyond-batch•
streaming-101
http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf
15
Die Kandidaten
16
Apache Storm_
•Gestartet 2010 durch BackType/Twitter, Apache seit 2013
•Pionier im Big Data / Stream Bereich
•Technologie der Lambda Architektur
•Low Level API
•Spouts und Bolts beschreiben eine Topologie
•Trident: High Level Erweiterung auf Storm Basis
• Aggregationen
• State & Window Operationen
• Join, Merge, Group, --
17
Apache Spark_
•Open Source (2010) & Apache Projekt (2013)
•Einheitliche Batch & Stream Verarbeitung
•Breite Akzeptanz
•RDD als Basis
18
Apache Samza_
•Entwickelt bei LinkedIn, Open Source 2013
•Verfolgt den Log Ansatz von Kafka
•Ausgeführt auf YARN
•Geeignet für große States
•Erweiterbar über APIs
19
Apache Flink_
•Gestartet 2008 als europäisches Forschungsprojekt
•Low Latency Streaming und High Throughput Batch Processing
•Flexible States und Windows
•Streaming First Ansatz
20
Die Analyse
21
Aspekte von Streaming Anwendungen_
•Runtime
•Programming Model
•Skalierbarkeit
•Latenz
•Durchsatz
•Resilienz / Delivery Guarantees
•Reife
•Community
22
Runtime - Native Streaming_
Verarbeitung
Empfänger
Senke
geringe Latenz
geringer Durchsatz
flexibel
Fehlertoleranz komplexer
Lastenverteilung komplexer
23
Runtime - Microbatching_
Verarbeitung
Senke
Empfänger
Microbatches
hoher Durchsatz
höhere Latenz
einfacher Fehlertolerant
weniger Flexibel (z.B. Windows)
State Verarbeitung komplexer
24
Programmiermodell_
Komponentenbasiert
•Operatoren und Quellen als
Komponenten
•Eigene Komponenten
•manuelle Topologie Definition
Deklarativ
•High Level API
•Higher Order Functions
•Abstrakte Datentypen
•Fortgeschrittene Operationen
inkludiert
•Eingebaute Optimierungen
25
Word Count - Flink_
valenv=StreamExecutionEnvironment.getExecutionEnvironment
valtext=env.socketTextStream("localhost",9999)
valcounts=text
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map(_,1)
.groupBy(0)
.sum(1)
counts.print
env.execute("ScalaSocketStreamWordCount")
26
Word Count - Spark_
valsparkConf=
newSparkConf().setAppName("StreamingWordCount")
valssc=newStreamingContext(sparkConf,Seconds(1))
ssc.checkpoint(".")
valmappingFunc=(key:String,value:Option[Int],state:
State[Int])=>{
valsum=value.getOrElse(0)+state.getOption.getOrElse(0)
valoutput=(key,sum)
state.update(sum)
output
}
valwordCountState=StateSpec.function(mappingFunc)
27
Word Count - Spark_
vallines=ssc.socketTextStream(args(0),args(1).toInt)
valwords=lines.flatMap(_.split(""))
valwordsWithCount=words.map(x=>(x,1))
valstateDstream=wordsWithCount.mapWithState(wordCountState)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
28
Word Count - Storm_
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("spout",newRandomSentenceSpout(),5);
builder.setBolt("split",newSplitSentence(),8)
.shuffleGrouping(„spout");
builder.setBolt("count",newWordCount(),12)
.fieldsGrouping("split",newFields("word"));
29
Word Count - Storm_
Configconf=newConfig();
conf.setMaxTaskParallelism(3);
LocalClustercluster=newLocalCluster();
cluster.submitTopology("word-count",conf,
builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
30
Word Count - Storm_
publicstaticclassWordCountextendsBaseBasicBolt{
Map<String,Integer>counts=newHashMap<String,Integer>();
publicvoidexecute(Tupletuple,BasicOutputCollectorcollector)
{
Stringword=tuple.getString(0);
Integercount=counts.get(word);
if(count==null)
count=0;
count++;
counts.put(word,count);
collector.emit(newValues(word,count));
}
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer)
{declarer.declare(newFields("word","count"));}
}
31
Word Count - Storm Trident_
Trident
TridentTopologytopology=newTridentTopology();
TridentStatewordCounts=
topology.newStream("spout1",spout)
.each(newFields("sentence"),newSplit(),new
Fields("word"))
.groupBy(newFields("word"))
.persistentAggregate(newMemoryMapState.Factory(),new
Count(),newFields("count"))
.parallelismHint(6);
32
Word Count - Storm Trident_
Trident
publicclassSplitextendsBaseFunction{
publicvoidexecute(TridentTupletuple,TridentCollector
collector){
Stringsentence=tuple.getString(0);
for(Stringword:sentence.split("")){
collector.emit(newValues(word));
}
}
}
33
Word Count - Samza_
classWordCountTaskextendsStreamTaskwithInitableTask{
privatevarstore:CountStore=_
definit(config:Config,context:TaskContext){
this.store=context.getStore("wordcount-store")
.asInstanceOf[KeyValueStore[String,Integer]]
}
34
Word Count - Samza_
overridedefprocess(envelope:IncomingMessageEnvelope,
collector:MessageCollector,coordinator:TaskCoordinator){
valwords=envelope.getMessage.asInstanceOf[String].split("")
words.foreach{key=>
valcount:Integer=Option(store.get(key)).getOrElse(0)
store.put(key,count+1)
collector.send(newOutgoingMessageEnvelope(new
SystemStream("kafka","wordcount"),
(key,count)))
}
}
35
Zustellungsgarantien_
•Maximal mögliche Garantien
•Beeinflussen Performance
•Nicht in jeder Kombination möglich (abhängig von Quelle)
Trident
At-least-once
Exactly-once*
Exactly-once*
At-least-once
Exactly-once*
36
Latenz & Durchsatz_
•Abhängig von der Runtime
•Höhere Latenz --> höherer Durchsatz
Flink
Custom
~50ms
Storm
Storm Trident
Samza
Spark Streaming
500ms
30.000ms
37
Latenz
Latenz & Durchsatz_
Throughput
https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
38
Performance_
•Viele Variablen, Unparteiische Tests schwierig
•Latenz vs. Durchsatz
•Delivery Guarantees
•Fehlertoleranz
•Tuning
•Netzwerk, Daten Lokalität, Serialisierung
39
Skalierbarkeit_
•Skalieren durch Partitionierung
• Partitionieren der Daten
• Partitionieren des Flows
40
Fault Tolerance_
•Erneute Verarbeitung nicht einfach möglich
•Anfang und Ende schwer zu bestimmen
•State muss auch gesichert werden
•Verschiedene Ansätze
•
•
•
•
Record Ack
Micro Batching
Transactional Updates
Snapshots
41
Fault Tolerance - Storm_
Ack
Ack
Ack
Ack
Ack
Ack
42
Fault Tolerance - Spark & Storm Trident_
•Fehlgeschlagene Microbatches werden wiederholt
•Batch Acknowledge statt Record Acknowledge
• Checkpoints für States
43
Fault Tolerance - Samza_
•Transaktionale Updates auf Transaction Log
•Kafka als Transaction Log
Kafka
Samza
partition 0
Kafka
partition 1
Checkpoint
partition 2
partition 0: offset ..
partition 1: offset ..
partition 2: offset ..
44
Fault Tolerance - Flink_
•Distributed Checkpoints
45
Event- & Verarbeitungszeit_
•Native Eventzeitverarbeitung nur in Flink
• Out-of-order Events
• Watermarks
• Trigger
finalStreamExecutionEnvironmentenv=
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
• Eventzeit als Key in anderen Framework möglich
• Keine out-of-order Events
46
Das Ergebnis
47
Überblick_
Trident
Runtime
Programmiermodell
Nativ
Microbatching Microbatching
Komponentenbasiert
Nativ
Nativ
Deklarativ
Komponenten
basiert
Deklarativ
Durchsatz
Gering
Mittel
Hoch
Hoch
Hoch
Latenz
Gering
Mittel
Mittel
Gering
Gering
Garantien
At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once*
Eventzeit
Handling
Nein
Nein
Nein
Nein
Ja
Reife &
Community
Hoch
Hoch
Hoch
Mittel
Mittel
48
Spark wenn..._
•bereits Spark Batch Anwendungen vorhanden sind
•viele Umsysteme integriert werden
•eine große Community wichtig ist
•Scala kein Problem ist
•Latenz kein Kriterium ist
49
Storm für ..._
•Sehr niedrige Latenz, niedriges Volumen
•At-Least Once Verarbeitung
•Zustandslose Verarbeitung
•Ggfs. Heron als Alternative
50
Samza wenn ..._
•Kafka ist omnipräsent
•Große States
•Kein Exactly Once
•Kafka Streams als Alternative
51
Und Flink ..._
•Für Eventzeit Verarbeitung
•Für pures Streaming
•Sehr gute Konzepte
•Etwas weniger Umsysteme
•Nutzen und Mitarbeit an einem jungen Projekt
52
Ein Satz zu_
•Apache Beam
• High Level API für Streaming Runner
•Google Cloud Data Flow
• Googles Cloud Streaming Framework; Beam Implementierung
•Apex
• YARN based direct-streaming with checkpointing
•Flume
• Logfile Streaming insb. in HDFS
•Kafka Streams
• Streaming integriert in Kafka ab 0.10, einfache Anwendungen
• Heron
• Storm Nachfolger, API kompatibel, verbesserter Throughput &
Latency
53
Questions?
Matthias Niehoff,
IT-Consultant
codecentric AG
Zeppelinstraße 2
76185 Karlsruhe, Germany
mobil: +49 (0) 172.1702676
[email protected]
www.codecentric.de
blog.codecentric.de
matthiasniehoff
90
Picture Reference_
• Logfile: Linux Screenshots, Flickr
• Sensors, IT Network: Wikipedia
• Devices: Brad Forst, Flickr
• Speed: Rool Paap, Flickr
• Graph: Wikipedia
• Stateful Processing: data-artisans.com
• Window & Sliding Windows, Flink Übersicht, Flink Fault Tolerance: Apache Flink
• Storm Topologien: Apche Storm
• Spark Übersicht: Apache Spark
• Samza Übersicht: Apache Samza
• Unendliche Daten: https://i.ytimg.com/vi/9rE3kbGmP4w/maxresdefault.jpg
55