• Non ci sono risultati.

D ATA S C TREAM LOUD - P BASED ROCESSING

N/A
N/A
Protected

Academic year: 2021

Condividi "D ATA S C TREAM LOUD - P BASED ROCESSING "

Copied!
113
0
0

Testo completo

(1)

C LOUD - BASED

D ATA S TREAM P ROCESSING

Thomas Heinze, Leonardo Aniello, Leonardo Querzoni, Zbigniew Jerzak

DEBS 2014

Mumbai 26/5/2014

(2)

T UTORIAL GOALS

Data stream processing (DSP) was in the past

considered a solution for very specific problems.

§  Financial trading

§  Logistics tracking

§  Factory monitoring

Today the potentialities of DSPs start to be used in

more general settings.

§  DSP : online processing = MR : batch processing

DSPs will possibly be offered as services from

cloud-based providers ?

(3)

T UTORIAL GOALS

Here we present an overview about current

research trends within data streaming systems

§  How they consider the requirements imposed by recent use-cases

§  How are they moving toward cloud platforms

Two focus areas:

§  Scalability

§  Fault Tolerance

(4)

T HE A UTHORS

Thomas Heinze!

Phd Student @ SAP

Phd Topic: Elastic Data Stream Processing Leonardo Aniello!

Researcher fellow @ Sapienza Univ. of Rome Leonardo Querzoni!

Assistant professor @ Sapienza Univ. of Rome

Zbigniew Jerzak!

Senior Researcher @ SAP Co-Organizer of DEBS Grand Challenge

(5)

O UTLINE

§  Historical overview of data stream processing

§  Use cases for cloud-based data stream

processing

§   How DSPs work: Storm as example

§   Scalability methods

§  Fault tolerance integrated in modern DSPs

§  Future research directions and conclusions

(6)

C LOUD - BASED

D ATA S TREAM P ROCESSING

Historical Overview

6  

(7)

D ATA S TREAM P ROCESSING E NGINE

§  It is a piece of software that

§  continuously calculates results for long-standing queries

§  over potentially infinite data streams

§  using operators

§ algebraic (filters, join, aggregation)

§ user defined

§  That can be stateless or stateful

Source   Aggr   Sink  

(8)

R EQUIREMENTS

[1]

Rule 1 - Keep the data moving

Rule 2 - Query using SQL on stream (StreamSQL)

Rule 3 - Handle stream imperfections (delayed, missing and out-of- order data)

Rule 4 - Generate predictable outcomes

Rule 5 - Integrate stored and streaming data Rule 6 - Guarantee data safety and availability

Rule 7 - Partition and scale applications automatically Rule 8 - Process and respond instantaneously

[1] M. Stonebraker, U. Cetintemel and S. Zdonik. "The 8 Requirements of real-time Stream Processing", ACM SIGMOD Record, 2005.

(9)

T HREE G ENERATIONS

§  First Generation

§  Extensions to existing database engines or simplistic engines

§  Dedicated to specific use cases

§  Second Generation

§  Enhanced methods regarding language expressiveness, load balancing, fault tolerance

§  Third Generation

§  Dedicated towards trend of cloud computing; designed towards massive parallelization

(10)

G ENEALOGY

Aurora  (2002)  

Medusa  (2004)  

Borealis  (2005)  

TelegraphCQ  (2003)  

NiagaraCQ   (2001)  

STREAM  (2003)    

      StreamMapReduce(2011)  

StreamMine3G  (2013)   Yahoo!  S4(2010)  

Apache  S4(2012)   TwiOer  Storm  (2011)   Trident  (2013)  

StreamCloud(2010)  

ElasPc  StreamCloud  (2012)   SPADE(2008)  

ElasPc  Operator(2009)   InfoSphere  Streams(2010)  

FLUX  (2002)   GigaScope  (2002)  

Spark  Streaming  (2010)   D-­‐Streams(2013)  

Cayuga  (2007)  

SASE  (2008)  

ZStream  (2009)   CAPE  (2004)  

D-­‐CAPE  (2005)  

SGuard  (2008)  

PIPES  (2004)   HybMig  (2006)  

StreamInsight  (2010)  

TimeStream  (2013)   CEDR  (2005)  

SEEP  (2013)   Esper  (2006)  

Truviso  (2006)   StreamBase  (2003)  

Coral8  (2005)  

Aleri  (2006)   Sybase  CEP  (2010)   Oracle  CQL  (2006)   Oracle  CEP  (2009)  

DejaVu  (2009)  

System  S(2005)  

LAAR(2014)  

StreamIt  (2002)  

Flextream(2007)  

ElasPc  System  S(2013)   Cisco  Prime  (2012)  

RTM  Analyzer  (2008)  

webMethods  Business  Events(2011)   TIBCO  StreamBase  (2013)   TIBCO  BusinessEvents(2008)  

NILE  (2004)  

NILE-­‐PDT  (2005)  

RLD  (2013)  

MillWheel(2013)   Johka  (2007)  

(11)

F IRST G ENERATION - T ELEGRAPH CQ

[2]

§  Data stream processing engine built on top of

Postgres DB

[2] S. Chandrasekaran et al. "TelegraphCQ: Continuous Dataflow Processing", SIGMOD, 2003.

(12)

S ECOND G ENERATION - B OREALIS

[3]

§  Joint research project of Brandeis University,

Brown University and MIT

§  Duration: ~3 years

§   Allowed the experimentation of several

techniques

[3] D. J. Abadi et al. "The Design of the Borealis Stream Processing Engine", CIDR 2005.

(13)

B OREALIS MAIN NOVELTIES

§  Load-aware Distribution

§  Fine-grained High-availability

§  Load Shedding mechanisms

(14)

T HIRD GENERATION

§  Novel use cases are driving toward

§  Uprecedeted levels of parallelism

§  Efficient fault tolerance

§  Dynamic scalability

§  Etc.

(15)

C LOUD - BASED

D ATA S TREAM P ROCESSING

Use Cases for Cloud-Based Data Stream Processing

(16)

S CENARIOS FOR THIRD GENERATION

DSP S

§  Tracking of query trend evolution in Google

§  Analysis of popular queries submitted to Twitter

§  User profiling at Yahoo! based on submitted queries

§  Bus routing monitoring and management in Dublin

§  Sentiment analysis on multiple tweet streams

§  Fraud monitoring in cellular telephony

(17)

Q UERY MONITORING AT G OOGLE

§  Analyze queries submitted to Google search engine to create a query historical model

§  Run on Zeitgeist on top of MillWheel

§  Incoming searches are organized in 1-second buckets

§  Buckets are compared to historical data

§  Useful to promptly detect anomalies (spikes/dips)

17  

Akidau, Balikov, Bekiroğlu, Chernyak, Haberman, Lax, McVeety, Mills, Nordstrom, Whittle.

MillWheel: Fault-tolerant Stream Processing at Internet Scale.VLDB, 2013.

25/05/14 Cloud-Based Data Stream Processing

(18)

P OPULAR QUERY ANALYSIS AT T WITTER

§  When a relevant event happens, an increase occurs in the number of queries submitted to Twitter

§  These queries have to be correlated in real-time with tweets (2.1 billion tweets per day)

§  Runs on Storm

§  These spikes are likely to fade away in a limited amount of time

§  Useful to improve the accuracy of popular queries

Improving Twitter Search with Real-time Human Computation, 2013

http://blog.echen.me/2013/01/08/improving-twitter-search-with-real-time-human-computation

(19)

P OPULAR QUERY ANALYSIS AT T WITTER

§  Problem

§  Sudden peak of queries about a new (never seen before) event

§  How to properly assign the right semantic (categorization) to the queries?

§  Example

§  Solution

§  Employ Human Evaluation (Amazon’s Mechanical Turk Service) to produce categorizations to queries unseen so far

§  incorporate such categorizations into backend models

19  

how can we understand that

#bindersfullofwomen refers to politics?

25/05/14 Cloud-Based Data Stream Processing

(20)

P OPULAR QUERY ANALYSIS AT T WITTER

Search   Engine  

Kaea   Queue  

query  Log  

Spout  

Bolt  

(query,  ts)  

popular   queries  

Human   EvaluaPon  

categoriza5ons  of  queries   engine  tuning  

Storm  

(21)

U SER PROFILING AT Y AHOO !

§  Queries submitted by users are evaluated (thousands queries per second by millions of users)

§  Run on S4[1]

§  Useful to generate highly personalized advertising

21  

[1] L. Neumeyer, B. Robbins, A. Nair, and A. Kesari. S4: Distributed Stream Computing Platform.ICDMW, 2010.

25/05/14 Cloud-Based Data Stream Processing

(22)

B US ROUTING MANAGEMENT IN D UBLIN

§  Tracking of bus locations (1000 buses) to improve public transportation for 1.2 million citizens

§  Run on System S

§  Position tracking by GPS signals to provide real-time traffic information monitoring

§  Useful to predict arrival times and suggest better routes

Dublin City Council - Traffic Flow Improved by Big Data Analytics Used to Predict Bus Arrival and Transit Times. IBM, 2013.

http://www-03.ibm.com/software/businesscasestudies/en/us/corp?docid=RNAE-9C9PN5

(23)

S ENTIMENT ANALYSIS ON TWEET STREAMS

§  Tweet streams flowing at high rates (10K tweet/s)

§  Sentiment analysis in real-time

§  Limited computation time (latency up to 2 seconds)

§  Run on Timestream[1]

§  Useful to continuously estimate the mood about specific topics

23  

[1] Z. Qian, Y. He, C. Su, Z. Wu, H. Zhu, T. Zhang, L. Zhou, Y. Yu, Z. Zhang.

Timestream: Reliable Stream Computation in the Cloud. Eurosys, 2013.

25/05/14 Cloud-Based Data Stream Processing

(24)

F RAUD MONITORING FOR MOBILE CALLS

§  Fraud detection by real-time processing of Call Detail Records (10k-50k CDR/s)

§  Requires self-joins over large time windows (queries on millions of CDRs)

§  Run on StreamCloud[1]

§  Useful to reactively spot dishonest behaviors

[1] V. Gulisano, R. Jimenez-Peris, M. Patino-Martinez, C. Soriente, and P. Valduriez.

Streamcloud: An Elastic and Scalable Data Streaming System. IEEE TPDS, 2012.

(25)

R EQUIREMENTS

§  Real-time and continuous complex analysis of heavy data streams

§  More than 10k event/s

§  Limited computation latency

§  Up to few seconds

§  Correlation of new and historical data

§  Computations have a state to be kept

§  Input load varies considerably over time

§  Computations have to adapt dynamically (Scalability)

§  Hardware and Software failures can occur

§  Computations have to transparently tolerate faults with limited performance degradation (Fault Tolerance)

25  

25/05/14 Cloud-Based Data Stream Processing

(26)

C LOUD - BASED

D ATA S TREAM P ROCESSING

How DSPs work: Storm as example

(27)

S TORM

§  Storm is an open source distributed realtime

computation system

§  Provides abstractions for implementing event-based computations over a cluster of physical nodes

§  Manages high throughput data streams

§  Performs parallel computations on them

§   It can be effectively used to design complex

event-driven applications on intense streams of

data

(28)

S TORM

§  Originally developed by Nathan Marz and team

at BackType, then acquired by Twitter, now an

Apache Incubator project.

§   Currently used by Twitter, Groupon, The Weather

Channel, Taobao, etc.

§  Design goals:

§  Guaranteed data processing

§  Horizontal scalability

§  Fault Tolerance

(29)

S TORM

An application is represented by a topology:

Spout

Spout

Spout

Spout

Bolt Bolt

Bolt

Bolt Bolt

Bolt

Bolt

Bolt

(30)

O PERATOR E XPRESSIVENESS

§  Storm is designed for custom operator definition

§  Bolts can be designed as POJOs adhering to a specific interface

§  Implementations in other languages are feasible

§   Trident

[1]

offers a more high level programming

interface

[1] N. Marz. Trident tutorial. https://github.com/nathanmarz/storm/wiki/Trident-tutorial, 2013.

(31)

O PERATOR E XPRESSIVENESS

§  Operators are connected through grouping:

§  Shuffle grouping

§  Fields grouping

§  All grouping

§  Global grouping

§  None grouping

§  Direct grouping

§  Local or shuffle grouping

(32)

P ARALLELIZATION

§  Storm asks the developer to provide “parallelism

hints” in the topology

Spout

Spout

Spout

Spout

Bolt Bolt

Bolt

Bolt Bolt

Bolt

Bolt

Bolt x3

x8

x5

(33)

S TORM INTERNALS

§  A storm cluster is constituted by a Nimbus node

and n Worker nodes

Master node Worker node 1

Worker (process) Executor

Slots

Supervisor Supervisor

Worker node n Storm cluster

Scheduler

Nimbus (process)

(34)

T OPOLOGY EXECUTION

§  A topology is run by submitting it to Nimbus

§  Nimbus allocates the execution of components (spouts and bolts) to the worker nodes using a scheduler

§  Each component has multiple instances (parallelism)

§  Each instance is mapped to an executor

§  A worker is instantiated whenever the hosting node must run executors for the submitted topology

§  Each worker node locally manages incoming/outgoing streams and local computation

§  The local surpervisor takes care that everything runs as prescribed

§  Nimbus monitors worker nodes during the execution to manage potential failures and the current resource usage

(35)

T OPOLOGY EXECUTION

§  Storm’s default scheduler (EvenScheduler)

applies a simple round robin strategy

Executor A.1 Executor A.2 Executor A.3 Executor A.4 Executor B.1 Executor B.2 Executor C.1 Executor D.1 Executor D.2 Executor D.3 Executor D.4

Component A Component B Component C Component D

Executors

W1 W2 W3

W4 W5

W6

Wm Configured

workers

WN1 WN2 WN3

WN4 WN5

WN6

WNn Available worker

nodes

(36)

A PRACTICAL EXAMPLE

§  Word count: the HelloWorld for DSPs

§  Input: stream of text (e.g. from documents)

§  Output: number of appearance for each word

HelloStorm LineReader

Spout

WordSplitter Bolt

WordCounter Bolt

text lines words

TXT docs

(37)

A PRACTICAL EXAMPLE

§  LineReaderSpout: reads docs and creates tuples

public class LineReaderSpout implements IRichSpout {

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context;

this.fileReader = new FileReader(conf.get("inputFile").toString());

this.collector = collector;

}

public void nextTuple() { if (completed) {

Thread.sleep(1000);

}

String str; BufferedReader reader = new BufferedReader(fileReader);

while ((str = reader.readLine()) != null) { this.collector.emit(new Values(str), str);

completed = true;

}

public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line"));

}

public void close() { fileReader.close();

} }

(38)

A PRACTICAL EXAMPLE

§  WordSplitterBolt: cuts lines in words

public class WordSpitterBolt implements IRichBolt {

public void prepare(Map stormConf, TopologyContext context,OutputCollector c) { this.collector = c;

}

public void execute(Tuple input) {

String sentence = input.getString(0);

String[] words = sentence.split(" ");

for(String word: words){

word = word.trim();

if(!word.isEmpty()){

word = word.toLowerCase();

collector.emit(new Values(word));

} }

collector.ack(input);

}

public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word"));

(39)

A PRACTICAL EXAMPLE

§  WordCounterBolt: counts word occurrences

public class WordCounterBolt implements IRichBolt {

public void prepare(Map stormConf, TopologyContext context, OutputCollector c) { this.counters = new HashMap<String, Integer>();

this.collector = c;

}

public void execute(Tuple input) { String str = input.getString(0);

if(!counters.containsKey(str)){

counters.put(str, 1);

} else {

Integer c = counters.get(str) +1;

counters.put(str, c);

}

collector.ack(input);

}

public void cleanup() {

for (Map.Entry<String, Integer> entry : counters.entrySet()){

System.out.println (entry.getKey()+" : " + entry.getValue());

} }

25/05/14 Cloud-Based Data Stream Processing 39

(40)

A PRACTICAL EXAMPLE

§  HelloStorm: contains the topology definition

public class HelloStorm {

public static void main (String[] args) throws Exception{

Config config = new Config();

config.put("inputFile", args[0]);

config.setDebug(true);

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("line-reader-spout", new LineReaderSpout());

builder.setBolt("word-spitter", new WordSpitterBolt().shuffleGrouping(

"line-reader-spout");

builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping(

"word-spitter");

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("HelloStorm", config, builder.createTopology());

Thread.sleep(10000);

cluster.shutdown();

}

(41)

C LOUD - BASED

D ATA S TREAM P ROCESSING

Scalability

(42)

P ARTITIONING S CHEMES

§  Data Parallelism:

§  How to parallelize the execution of an operator?

§  How to detect the optimal level of parallelization?

§  Operator Distribution:

§  How to distribute the load across available hosts?

§  How to achieve a load balance between these machines?

(43)

R QUIREMENTS FOR D ATA P ARALLELISM

§  Transparent to the user

§  correct results in correct order (identical to sequential execution)

25/05/14 Cloud-based Data Stream Processing

Filter

Filter Round-

Robin Union

Aggr

Aggr Hash

(groupID)

Merge&

Sort

(44)

D ATA P ARALLELISM

§  First presented by FLUX[1] and Borealis[2]

§  Explicitely done using partitioning operators

§  User needs to decide:

§ partitioning scheme

§ merging scheme

§ level of parallelism

[1] M. Shah, et al. "Flux: An adaptive partitioning operator for continuous query systems."

In ICDE, 2003.

[2] D. Abadi, et al. "The Design of the Borealis Stream Processing Engine." In CIDR, 2005..

(45)

P ARALLELISM FOR C LOUD -B ASED DSP

§  Massive parallelization (>100 partitions)

§  Support custom operators

§  Adapt parallelization level to the workload

without user interaction

25/05/14 Cloud-based Data Stream Processing

(46)

D ATA P ARALLELISM IN S TORM

§  User defines number of parallel task

§  Storm support different partitioning schemes

(aka grouping):

§  Shuffle grouping, Fields grouping, All grouping, Global grouping, None grouping, Direct grouping, Local or shuffle grouping, Custom

(47)

M AP R EDUCE FOR S TREAMING

[3,4]

§  Extend MapReduce model for streaming data:

§  Break strict phases

§  Introduce Stateful Reducer

25/05/14 Cloud-based Data Stream Processing

Map Map Map

Reduce Reduce

Output Input

[3] T. Condie, et al. "MapReduce Online." In NSDI,2010.

[4] A. Brito, et al. "Scalable and low-latency data processing with stream mapreduce."

CloudCom, 2011.

(48)

S TATEFUL R EDUCER

[4]

reduceInit(k1) {

// custom user class

S = new State();S.sum = 0; S.count = 0;

// object S is now associated with key k1 return S;

}

reduce(Key k, <List new, List expired, List window, UserState S>) { For v in expired do:

// Remove contribution of expired events S.sum -= v; S.count--;

For v in new do:

// Add contribution of new events S.sum += v; S.count++;

send(k1, S.sum/S.count);

}

[4] A. Brito, et al. "Scalable and low-latency data processing with StreamMapReduce." CloudCom, 2011.

(49)

A UTO P ARALLELIZATION

[5,6]

§  Goal:

§  detect parallelizable regions in operator graph with custom operators for user-defined operators

§  Runtime support for enforcing safety conditions

§  Safety Conditions:

§  For an operator: no state or partitioned state, selectivity < 1, at most one pre/successor

§  For an parallel region: compatible keys, forwarded keys, region-local fusion dependencies

25/05/14 Cloud-based Data Stream Processing

[5] S Schneider, et al. "Auto-parallelizing stateful distributed streaming applications." In PACT, 2012.

[6] Wu et al. “Parallelizing Stateful Operators in a Distributed Stream Processing System: How, Should You and How Much?” In DEBS, 2012.

(50)

A UTO P ARALLELIZATION

§  Compiler-based Approach:

§  Characterize each operator based on a set of criterias (state type, selectivity, forwarding)

§  Merge different operators together into parallel regions (based on left-first approach)

§   Best parallelization strategy is applied

automatically

§  Level of parallelism is decided on job admission

(51)

ELASTICITY[7]

§  Goal: React to unpredicted load peaks & reduce

the amount of ideling resources.

25/05/14 Cloud-based Data Stream Processing

Workload Load

Static Provisioning Elastic Provisioning Underprovisioning

Overprovisioning

[7] M. Armbrust, et al. "A view of cloud computing." Communications of the ACM, 2010.

(52)

D IFFERENT T YPES OF E LASTICITY

§  Vertical Elasticity (Scale up)

§  Adapt the number of threads per operator based on the workload.

§  Horizontal Elasticity (Scale out)

§  Adapt the number of hosts based on the workload.

(53)

E LASTIC O PERATOR E XECUTION

[8]

§  Dynamically adapt number of threads for a

stateless operator based on the workload

§  Limitations: works only on thread-level and for

stateless operators

25/05/14 Cloud-based Data Stream Processing

[8] S. Schneider, et al. "Elastic scaling of data parallel operators in stream processing." In IPDPS, 2009.

Operator

Thread Pool

(54)

E LASTIC A UTO -P ARALLELIZATION

[9]

§  Combines ideas of Elasticity and Auto-

Parallelization

§  Adapt parallelization level using controller-based

approach

§   Dynamic adaption of parallelization level based

on operator migration protocol

[9] B. Gedik, et al. “Elastic Scaling for Data Stream Processing." In TPDS, 2013.

(55)

Horizontal barrier Vertical

barrier

E LASTIC A UTO -P ARALLELIZATION

§  Dynamic adaption of parallelization: lend &

borrow approach

25/05/14 Cloud-based Data Stream Processing

Op1

Op2

Splitter Merger

Op2

Op2

1.  Lend Phase 2. Borrow Phase

(56)

O PERATOR D ISTRIBUTION

§  Well-studied problem since first DSP prototypes

§  Typically referred to as „Operator Placement“

§  Examples: Borealis, SODA, Pietzuch et al.

§   Design decisions[10]

§  Optimization goal: What is optimized ?

§  Execution mode: Centralized or Decentralized?

§  Algorithm runtime: Offline, online or both?

[10] G.T. Lakshmanan, et al. "Placement strategies for internet-scale data stream systems."

In IEEE Internet Computing, 2008.

(57)

O PTIMIZATION G OAL

§  Different objectives:

§  Balance CPU load (handle load variations)

§  Minimize Network latency (minimize latency for sensor networks)

§  Latency optimization (predictable quality of service)

25/05/14 Cloud-based Data Stream Processing

(58)

E XECUTION M ODE

§  Centralized Execution:

§  All decision done by centralized management component

§  Major disadvantage: manager becomes scalability bottleneck

§  Decentralized Execution:

§  Different hosts try to agree on an operator placement

§  Two examples: operator routing and commutative execution

(59)

D ECENTRALIZER E XECUTION

§  Based on pairwise agreement

[11]

25/05/14 Cloud-based Data Stream Processing

Host 1 Host 2 Host 3 Host 4

Aggr4 Aggr2

Filter 2

Filter 3

Filter 5 Filter 6

Filter 4 Filter 1 Aggr 3

Aggr 4 Aggr 3

Aggr 5 Aggr 6

Source 1 Source 2

[11] M. Balazinska, et al. "Contract-Based Load Management in Federated Distributed Systems." NSDI. Vol. 4. 2004.

(60)

D ECENTRALIZER E XECUTION

§  Based on decentralized routing

[12]

Host 2

Host 3

Host 4 Host 1

Src1

Src2

J D

F F

[12] Y.Ahmad, et al. "Network-aware query processing for stream-based applications. In VLDB, 2004.

(61)

A LGORITHM R UNTIME

§  Offline

§  Based on estimation of input rates, selectivities, etc.

§  Online

§  Mostly simplistic initial estimation (e.g. round-robin or random placement)

§  Continously measurements and adaption during runtime

25/05/14 Cloud-based Data Stream Processing

(62)

M OVEMENT P ROTOCOLS

§  How to ensure loss-free movement of

operators?

§  No input event shall be lost

§  State need to be restored on new host

§  Movement strategies:

§  Pause & Resume vs. Parallel track

§  large overlap with research on adaptive query processing[13]

[13] Y. Zhu, et al. "Dynamic plan migration for continuous queries over data streams." In SIGMOD, 2004.

(63)

P AUSE & R ESUME

§  Approach:

1.  Stop execution

2.  Move state to new host

3.  Restart execution on new hosts

§   Properties:

§  Simple & generic

§  Latency Peak can be observed due to pausing

25/05/14 Cloud-based Data Stream Processing

(64)

PARALLEL TRACK

§  Approach:

1.  Start new instance

2.  Move or create up to date state

3.  Stop old instance as soon as instances are in sync

§  Properties:

§  No latency peak

§  Requires duplicate detection, detection of „sync“

status

§  Events processed twice

(65)

O PERATOR P LACEMENT ITHIN C LOUD -

B ASED DSP

§  Different setup (highly location-wise distributed

vs. single cluster)

§  Larger scale (100 …. 1000 hosts)

§  New objectives: Elasticity, Monetary Cost, Energy

Efficiency

Mostly centralized, adaptive solutions optimizing

CPU utilization or monetary cost with Pause &

Resume Operator Movement.

25/05/14 Cloud-based Data Stream Processing

(66)

RECENT OPERATOR PLACEMENT

APPROACHES

§  SQPR

[14]

§  Query planner for data centers with heterogenes resources

§   MACE

[15]

§  Present san approach for precise latency estimation

[14] V. Kalyvianaki et al. „SQPR: Stream query planning with reuse“. In ICDE, 2011.

[15] B. Chandramouli et al. „Accurate latency estimation in a distributed event processing system.“ In ICDE, 2011.

(67)

S TORM L OAD M ODEL

§  Worker: physical JVM and executes subset of all

the tasks of the topology

§  Task: Parallel instance of an operator

§   Executor: Thread of an worker, executes one or

more tasks of the same operator

25/05/14 Cloud-based Data Stream Processing

[16] Storm. http://storm.incubator.apache.org/documentation/Understanding-the-parallelism-of-a- Storm-topology.html.

(68)

E XAMPLE FOR S TORM L OAD M ODEL

Config conf = new Config();

conf.setNumWorkers(3); // use two worker

processes topologyBuilder.setSpout(„src", new Source(), 3);

topologyBuilder.setBolt(„aggr", new Aggregation(), 6).shuffleGrouping(„src");

topologyBuilder.setBolt(„sink", new Sink(), 6).shuffleGrouping(„aggr");

StormSubmitter.submitTopology( "mytopology", conf, topologyBuilder.createTopology() );

http://storm.incubator.apache.org/documentation/Understanding-the-parallelism-of-a-Storm- topology.html.

(69)

S TORM : D ISTRIBUTION M ODEL

[16]

Source (spout)

Aggr (bolt)

Sink (bolt) Parallelism

hint =3

Parallelism hint =6

Parallelism hint =3

Worker 1

Source Task Sink Task Aggr Task Aggr Task

Worker 2

Source Task Sink Task Aggr Task Aggr Task

Worker 3

Source Task Sink Task Aggr Task Aggr Task

[16] Storm. http://storm.incubator.apache.org/documentation/Understanding-the-parallelism-of-a- Storm-topology.html.

(70)

A DAPTIVE P LACEMENT IN S TORM

§  Manual reconfiguration (pause & resume

complete topology)

§  Enhanced Load scheduler for Twitter Storm[17]

§  Load balancing adapted to Storm architecture

$  storm  rebalance  mytopo  -­‐n  4  -­‐e  src=8

 

[17] L. Aniello, et al. "Adaptive online scheduling in storm.“ In DEBS, 2013.

(71)

D IFFERENT L EVELS OF E LASTICITY

25/05/14 Cloud-based Data Stream Processing

Elastic Action Taken System

Virtual Machine Move virtual machines transparent to the user. [18]

Engine New engine instance is started and new queries are employed on this engine

[19]

Query New query instance is started and the input data is split

[20]

Operator New operator instance is created and the input data is split

[21]

[18] T. Knauth, et al. "Scaling non-elastic applications using virtual machines." In Cloud, 2011.

[19] W. Kleiminger, et al."Balancing load in stream processing with the cloud." In ICDEW, 2011.

[20] Matteo Migliavacca, et al. "SEEP: scalable and elastic event processing.“ In Middleware, 2010.

[21] R. Fernandez, et al. „Integrating scale out and fault tolerance in stream processing using operator state management”, SIGMOD 2013.

(72)

E LASTICS DSP S YSTEM A RCHITECTURE

Elasticity Manager Resource

Manager

Data Stream Processing Engine

Data Stream Processing Engine

Data Stream Processing Engine

Virtual Machine

Virtual Machine Virtual Machine

. .

. . . .

(73)

S TREAM C LOUD

[22]

§  Realized different level of elasticity on a highly

scalable DSP

§  Studied different major design decisions:

§  Optimal Level of Elasticity

§  Migration strategy

§  Load balancing based on user-defined thresholds

[22] V. Guilsano, et al. „Streamcloud: An elastic and scalable data streaming system”, TPDS, 2012.

(74)

SEEP

[21]

§  Present the state of the art mechanims for state

management

§  combine fault tolerance & scale out using same

mechanism

§   Highly scalable and fast recovery

[21] R. Fernandez, et al. "Integrating scale out and fault tolerance in stream processing using operator state management", SIGMOD 2013.

(75)

M ILL W HEEL

[23]

§  Similar mechanisms like SEEP or StreamCloud

§  Support massive scale out

§  Centralized manager

§  Optimization of CPU Load

[23] T. Akidau, et al. „MillWheel: Fault-Tolerant Stream Processing at Internet Scale”, VLDB 2013.

(76)

C LOUD - BASED

D ATA S TREAM P ROCESSING

Fault tolerance

(77)

F AULT TOLERANCE IN DSP S

§  Small scale stream processing

§  Faults are an exception

§  Optimize for the lucky case

§  Catch faults at execution time and start recovery procedures (possibly expensive)

§  Large scale DSPs (e.g. cloud based)

§  Faults are likely to affect every execution

§  Consider them in the design process

(78)

F AULT TOLERANCE IN DSP S

§  Two main fault causes

§  Message losses

§  Computational element failures

§   Event tracking

§  Makes sure each injected event is correctly processed with well-defined semantics

§   State management

§  Makes sure that the failure of a computational

element will not affect the system‘s correct execution

(79)

E VENT TRACKING

§  When a fault occurs, events may need to be

replayed

§  Typical approach:

§  Request acks from downstream operators

§  Detect losses by setting timeouts on acks

§  Replay lost events from upstream operators

§  Tracking and managing information on processed

events may be needed to guarantee specific

processing semantics

(80)

S TORM – ET

§  Event tracking in storm is guaranteed by two

different techniques:

§  Acker processes è at-least-once message processing

§  Transactional topologies è exactly-once message processing

(81)

S TORM – ET

§  A tuple injected in the system can cause the

production of multiple tuples in the topology

§  This production partially maps the underlying

application topology

§   Storm keeps track of the processing DAG

stemming from each input tuple

(82)

S TORM – ET

§  Example: word-count topology

Spout Splitter

bolt

Counter bolt

“stay hungry, stay foolish”

text lines words

[“stay”, “hungry”, “stay”, “foolish”] [“stay”, 2]

[“hungry”, 1]

[“foolish”, 1]

“stay hungry, stay foolish”

“stay” [“stay”, 1]

“hungry”

“stay”

“foolish”

[“hungry”, 1]

[“foolish”, 1]

[“stay”, 2]

(83)

S TORM – ET

§  Acker tasks are responsible for monitoring the

flow of tuples in the DAG

§  Each bolt “acks” the correct processing of a tuple

§  The processing of a tuple can be committed when it has fully traversed the DAG

§  In this case the acker notifies the original spout

§   The spout must implement an ack(Object msgId)

method

§  It can be used to garbage collect tuple state

(84)

S TORM – ET

§  If a tuple does not reach the end of the DAG

§  The acker timeouts and invoke fail(Object msgId) on the spout

§  The spout method implementation is in charge of replaying failed tuples

§  Notice: the original data source must be able to

reliably reply events (e.g. a reliable MQ)

(85)

S TORM – ET

§  Developer perspective:

§  Correctly connect bolts with tuple sources (anchoring)

§  Anchoring is how you specify the tuple tree

public void execute(Tuple tuple) {

String sentence = tuple.getString(0);

for(String word: sentence.split(" ")) {

_collector.emit(tuple, new Values(word));

}

_collector.ack(tuple);

}

Anchor  

Source:  hOps://github.com/nathanmarz/storm/wiki/Guaranteeing-­‐message-­‐processing  

(86)

S TORM – ET

§  Developer perspective:

§  Explicitly ack processed tuples

§  Or extend BaseBasicBolt

public void execute(Tuple tuple) {

String sentence = tuple.getString(0);

for(String word: sentence.split(" ")) {

_collector.emit(tuple, new Values(word));

}

_collector.ack(tuple);

} Explicit  ack  

Source:  hOps://github.com/nathanmarz/storm/wiki/Guaranteeing-­‐message-­‐processing  

(87)

S TORM – ET

§  Implement ack() and fail() on the spout(s)

public void nextTuple() { if(!toSend.isEmpty()){

for(Map.Entry<Integer, String> transactionEntry: toSend.entrySet()){

Integer transactionId = transactionEntry.getKey();

String transactionMessage = transactionEntry.getValue();

collector.emit(new Values(transactionMessage),transactionId);

}

toSend.clear();

} }

public void ack(Object msgId) { messages.remove(msgId);

}

public void fail(Object msgId) {

Integer transactionId = (Integer) msgId;

Integer failures = transactionFailureCount.get(transactionId) + 1;

if(failures >= MAX_FAILS){

throw new RuntimeException(”Too many failures on Tx [”+transactionId+"]”);

}

transactionFailureCount.put(transactionId, failures);

toSend.put(transactionId,messages.get(transactionId));

}

Source:  J.  Leibiusky,  G.  Eisbruch  and  D.  Simonassi.  Gekng  Started  with  Storm.  O’Reilly,  2012  

re-­‐schedule   Failed  topology   Completed  processing  

(88)

S TORM – ET

§  How does the acker task work?

§  It is a standard bolt

§  The acker tracks for each tuple emitted by a spout the corresponding DAG

§  It acks the spout whenever the DAG is complete

§  You can instantiate parallel ackers to improve

performance

§  Tuples are randomly assigned to ackers to improve load balancing (uses mod hashing)

(89)

S TORM – ET

§  How can ackers keep track of DAGs?

§  Each tuple is identified by a unique 64bit ID

§  The acker stores in a map for each tuple IP

§ The ID of the emitter task

§ An ack val

§   An ack val is a bit-vector that encodes

§  IDs of tuples stemmed from the initial one

§  IDs of acked tuples

(90)

S TORM – ET

§  Ack val management

§  When a tuple is emitted by a spout

§ Initialize the vector an encode in it the tuple ID

§  When a tuple is acked

§ XOR its ID in the ack val

§  When an anchored tuple is emitted

§ XOR its ID in the ack val

§  When the ack val is empty, all tuples in the DAG have been acked (with high probability)

(91)

S TORM – ET

§  If exactly-once semantics is required use a

transactional topology

§  Transaction = processing + committing

§  Processing is heavily parallelized

§  Committing is strictly sequential

§  Storm takes care of

§  State management (through Zookeeper)

§  Transaction coordination

§  Fault detection

§  Provides a batch processing API

§  Note: requires a source able to reply data batches

Source:  hOps://github.com/nathanmarz/storm/wiki/TransacPonal-­‐topologies  

(92)

E VENT T RACKING

§  In other systems the event tracking functionality

is strongly coupled with state management

§  events cannot be garbage collected when they are acknowledged

§  Need to wait for the checkpointing of a state updated with such events

§   Timestream does not store all the events and re-

compute those to be replayed by tracking their

dependencies with input events, similarly to

Storm

(93)

E VENT T RACKING

§  SEEP stores non-checkpointed events on the

upstream operators

§  Millwheel persists all intermediate results to an

underlying Distributed File System.

§  It also provides exactly-once semantics

§   D-Streams stores data to be processed in

immutable partitioned datasets

§  These are implemented as resilient distributed datasets (RDD)

(94)

S TATE M ANAGEMENT

§  Stateful operators require their state to be

persisted in case of failures.

§  Two classic approaches

§  Active replication

§  Passive replication

(95)

S TATE M ANAGEMENT

Operator

Operator

Operator

Previous stage Next stage

State

State

State

Data

State implicitly synchronized by ordered evaluation of same data

Active replication

Operator

Operator (dormant)

Operator (dormant)

Previous stage Next stage

State

State

State

Data

State periodically persisted on stable storage and recovered on demand

Passive replication

Checkpoints

(96)

S TORM - SM

§  What happen when tasks fail ?

§  If a worker dies its supervisor restarts it

§ If it fails on startup Nimbus will reassign it on a different machine

§  If a machine fails its assigned tasks will timeout and Nimbus will reassign them

§  If Nimbus/Supervisors die they are simply restarted

§ Behave like fail-fast processes

§ They’re stateless in practice

§ Their state is safely maintained in in a Zookeeper cluster

(97)

S TORM - SM

§  There is no explicit state management for

operators in Storm

§  Trident builds automatic SM on top of it

§  Batch of tuples have unique Tx id

§  If a batch is retried it will have the exact same Tx id

§  State updates are ordered among batches

§ A new Tx is not committed if an old one is still pending

§  Transactional state guarantees transparently

exactly-once tuple processing

(98)

S TORM - SM

§  Transactional state is possible only if supported

by the data source

§  As an alternative

§  Opaque transactional state

§ Each tuple is guaranteed to be executed exactly in one transaction

§ But the set of transactions for a given Tx id can change in case of failures

§  Non transactional state

(99)

S TORM - SM

§  Few combinations guarantee exactly-once sem.

State

Non

transactional Transactional Opaque transactional

Spout

Non transactional No No No

Transactional No Yes! Yes

Opaque

transactional No No Yes

(100)

A PACHE S4 - SM

§  Lightweight approach

§  Assumes that lossy failovers are acceptable.

§  PEs hosted on failed PNs are automatically moved to a standby server

§  Running PEs periodically perform uncoordinated and asynchronous checkpointing of their internal state

§  Distinct PE instances can checkpoint their state autonomously without synchronization

§ No global consistency

§  Executed asynchronously by first serializing the operator state and then saving it to stable storage through a pluggable adapter

§  Can be overridden by a user implementation

(101)

M ILLWHEEL - SM

§  Guarantees strongly consistent processing

§  Checkpoints on persistent storage every single state change incurred after a computation

§  Can be executed

§ before emitting results downstream

§ operator implementations are automatically rendered idempotent with respect to the execution

§ after emitting results downstream

§ it’s up to the developer to implement idempotent operators if needed

§  Produced results are checkpointed with state (strong productions)

(102)

T IME S TREAM - SM

§  Takes a similar approach

§  State information checkpointed to stable storage

§  For each operator the state includes

§  state dependency: the list of input events that made the operator reach a specific state

§  output dependency: the list of input events that made an

operator produce a specific output starting from a specific state.

§  Allow to correctly recover from a fault without having to store all the intermediate events produced by the

operators and their states.

§  Is it possibile to periodically checkpoint a full operator state in order to avoid re-emitting the whole history of input events in order to recompute it.

(103)

SEEP - SM

§  Takes a different route allowing state to be

stored on upstream operators

§  allows SEEP to treat operator recovery as a special case of a standard operator scale-out procedure

§  state in SEEP is characterized by three elements

§ internal state

§ output buffers

§ routing state

§  treated differently to reduce the state management impact on system performance.

Riferimenti

Documenti correlati

An analysis of monthly average concentrations of total nitrogen and total phosphorus in the raw wastewater from the fruit and vegetable as well as dairy processing, reveals that

The proposed methodological approaches, based on the application of modern information technologies for the collection of initial data and their processing, make it possible to obtain

▪ The broadcast variable is stored in the main memory of the driver in a local variable.  And it is sent to all worker nodes that use it in one or more

 The input pairs are grouped in partitions based on the integer value returned by a function applied on the key of each input pair.  This operation can be useful to improve

Both middleware services (such as workload and data management) and applications (such as monitoring tools etc.) require an interface to the Grid Information Service (IS) to

where prefix is defined as the CP dimension ( #define prefix 64 ), OFDMA_symbol_os is the symbol offset of the considered burst (i.e. the OFDMA symbol at which the burst starts: in

The development of a heater is a significant technological challenge tied with the need of reaching high ignition temperatures (1500-2000 K) minimizing the power

A recent model, used in literature to evaluate the protective effect of some compounds (drugs, natural substances) on type 2 diabetes mellitus and hepatic