• Non ci sono risultati.

Large Scale Data Streaming

N/A
N/A
Protected

Academic year: 2021

Condividi "Large Scale Data Streaming"

Copied!
85
0
0

Testo completo

(1)

Contents

Introduction 1

1. Big Data and Data Stream Processing 3

1.1. Big Data . . . 3

1.2. Data Stream Processing . . . 7

1.2.1. Data Stream Applications . . . 7

1.2.2. The Requirements of Real-time Stream Processing . . . 8

2. Storm Architecture 10 2.1. What is Storm? . . . 10

2.2. Storm's main concepts . . . 11

2.2.1. Stream . . . 11

2.2.2. Spout . . . 11

2.2.3. Bolt . . . 12

2.2.4. Storm topology . . . 12

2.2.5. Storm cluster . . . 13

2.3. Parallelism of a Storm topology . . . 14

2.4. Stream Grouping . . . 16

2.5. Communication mechanisms . . . 18

2.6. Guaranteeing Message Processing . . . 20

2.6.1. Reliability API . . . 22

2.7. Storm advanced features . . . 24

2.7.1. Trident . . . 24

2.7.1.1. Trident's main concepts . . . 24

2.7.1.2. Trident's API and State Management . . . 26

3. Enhanced Storm Implementation 27 3.1. Enhanced Storm: goals and motivations . . . 27

3.2. Application scenario for example usage . . . 28

(2)

Contents ii 3.3. Stateless Operators . . . 29 3.3.1. Map . . . 30 3.3.2. Filter. . . 31 3.3.3. MoFilter . . . 33 3.3.4. Union . . . 35

3.3.5. Simple Stateless Topology Example . . . 36

3.4. Stateful Operators . . . 38 3.4.1. State Abstraction . . . 39 3.4.2. Window Abstraction . . . 40 3.4.3. Aggregate . . . 41 3.4.4. Join . . . 46 3.5. Database-specic Operators . . . 49 3.5.1. INSERT . . . 49 3.5.2. UPDATE . . . 49 3.5.3. DELETE . . . 50 3.5.4. SELECT . . . 51

4. Enhanced Storm Scalability Evaluation 52 4.1. Enhanced Storm Monitor . . . 52

4.2. Evaluation Testbed and Data Set . . . 54

4.3. Enhanced Storm Conguration for the Evaluation . . . 55

4.4. Scalability Evaluation Results . . . 56

4.4.1. Stateless Operators: Union . . . 56

4.4.2. Stateful Operators: Aggregate . . . 58

4.4.3. Stateful Operators: Join . . . 59

4.4.4. Scale-out Evaluation . . . 60

5. Conclusions 62 A. Enhanced Storm 1.0 User Guide 63 A.1. Setting up a Storm cluster . . . 63

A.1.1. Set up a Zookeeper cluster . . . 64

A.1.2. Install dependencies on master and worker machines . . . . 64

A.1.3. Download and extract a Storm release to master and worker machines. . . 64

A.1.4. Add Enhanced Storm jar to Storm ./lib folder . . . 64

A.1.5. Fill in mandatory conguration into storm.yaml . . . 64

(3)

Contents iii

A.2. Enhanced Storm API overview . . . 66

A.2.1. Stateless Operators . . . 67

A.2.1.1. Map . . . 67

A.2.1.2. Filter . . . 68

A.2.1.3. MoFilter . . . 68

A.2.1.4. Union . . . 69

A.2.2. Stateful Operators . . . 70

A.2.2.1. Aggregate . . . 70

A.2.2.2. Join . . . 71

A.2.3. Database-specic Operators . . . 72

A.2.3.1. Insert . . . 72

A.2.3.2. Update . . . 73

A.2.3.3. Delete . . . 74

(4)

List of Figures

1.1. Hype Cycle for Emerging Technologies 2013. Source: Gartner. . . . 4

1.2. Social network in numbers: an overview(12/2013). Source: Jeremy Waite(Head of Social Strategy EMEA, Adobe). . . 4

1.3. Global storage size and repartition between analog and digital data, from 1986 to 2007. Source: [HL11]. . . 5

1.4. Digital Universe size from 2009 to 2020: growing by a factor of 44. Source: [GR10]. . . 6

1.5. Data Stream Processing with optional storage. . . 7

2.1. Stream of tuples. . . 11

2.2. Storm spout. . . 12

2.3. Storm bolt.. . . 12

2.4. Storm topology. . . 13

2.5. Storm cluster. . . 14

2.6. Relationships between worker processes, executors and tasks. . . 15

2.7. Storm topology at the task level. . . 16

2.8. Overview of a worker's internal message queues in Storm[Nol]. . . . 19

2.9. Example of tuple tree built by Storm . . . 21

2.10. Trident batch partitioning example. . . 25

3.1. UML diagram for stateless operators. . . 30

3.2. Map operator example usage. . . 31

3.3. Filter operator example usage. . . 33

3.4. MoFilter operator example usage. . . 34

3.5. Union operator example usage . . . 35

3.6. Simple Stateless Topology Example . . . 36

3.7. UML diagram for stateful operators. . . 39

3.8. UML diagram for State abstraction. . . 40

3.9. UML diagram for Window abstraction. . . 41

3.10. AggregateState UML diagram. . . 42

(5)

List of Figures v

3.11. AggregateAlgorithm UML diagram. . . 42

3.12. Aggregate algorithm on a Window based on the number of tuples. . 43

3.13. Aggregate algorithm on a Window based on time. . . 44

3.14. Example of aggregate algorithm on a Window based on the number of tuples.. . . 45

3.15. AggregateState UML diagram. . . 46

3.16. Join algorithm on a Window based on time. . . 47

3.17. Example of join algorithm on a Window based on time.. . . 48

3.18. Enhanced Storm insert operator . . . 49

3.19. Enhanced Storm update operator . . . 50

3.20. Enhanced Storm delete operator . . . 50

3.21. Enhanced Storm select operator . . . 51

4.1. Storm UI screenshot. . . 53

4.2. Enhanced Storm Monitor . . . 54

4.3. Blade's architecture overview. . . 55

4.4. Union operator throughput evaluation . . . 58

4.5. Aggregate operator throughput evaluation . . . 59

4.6. Join operator throughput evaluation . . . 60

4.7. Maximum throughput reached by the operators under evaluation. . 61

A.1. Map constructor javadoc.. . . 67

A.2. Filter constructor javadoc. . . 68

A.3. MoFilter constructor javadoc. . . 68

A.4. Union constructor javadoc.. . . 69

A.5. Aggregate constructor javadoc. . . 70

A.6. Join constructor javadoc.. . . 71

A.7. Insert constructor javadoc. . . 72

A.8. Update constructor javadoc. . . 73

A.9. Delete constructor javadoc.. . . 74

(6)

List of Tables

2.1. Built-in stream groupings[Mare]. . . 17

3.1. Call Description Record (CDR) schema used for example usage of En-hanced Storm operators. . . 28

4.1. Call Description Record (CDR) schema used for the evaluation of En-hanced Storm operators. . . 55

4.2. Enhanced Storm Evaluation: bolt conguration. . . 56

A.1. Call Description Record (CDR) schema used for sample code of Enhanced Storm operators. . . 67

(7)

Introduction

Over the last few years, applications that require real-time processing of an huge

amount of data are pushing the limits of traditional data processing infrastructure[SCZ05]. Many applications in several domains such as telecommunications, large scale

sen-sor networks, nancial, online applications, computer network management, secu-rity and others, require real-time processing of continuos data ows[GJPPM+12]: this kind of computation systems are usually called Data Stream Management Systems (DSMSs) or Stream Processing Engines (SPEs).

Traditional Data Base Management Systems (DBMSs) implements the store-than-process paradigm; it means that:

• Data require to be stored (persistently) and indexed before they could be

processed.

• Data processing is asynchronous in relation to their arrival.

In DSMSs data streams are not stored but are rather processed on-the-y using continuos queries[GJPPM+12]: the query is constantly standing over the stream-ing data and results are continuously output.

One of the most famous and used DSMS is called Storm: Storm does for real-time processing what Hadoop[Had] did for batch processing[Marn].

Storm is a powerful tool and has a simple programming model, but it does not provide a bulit-in implementation of stream-oriented operators: this is a strong limitation because the user is forced to write a case-specic implementation every time.

The goal of the work described in this thesis is to build a distributed real-time computation system on top of Storm, called Enhanced Storm, that provides to the user built-in relation algebra and database-specic operators for streaming computation.

Enhanced Storm maintains Storm fault-tolerance and scalability: in this way we supply to the user a generic, high performing and easy-to-use system.

Enhanced Storm was developed at the Distributed System Laboratory(LSD) of the Universidad Politecnica de Madrid(UPM)[UPM].

(8)

Introduction 2

The thesis is organized as follows: Chapter 1 provides an introduction to big data, real-time stream processing and its requirements.

Chapter 2 presents Storm architecture in detail.

Chapter 3 shows how Enhanced Storm operators work and gives some details about Enhanced Storm implementation.

Chapter 4 presents the results of Enhanced Storm evaluation, focused above all on the scalability of the system.

(9)

1. Big Data and Data Stream Processing

Contents

1.1. Big Data. . . 3

1.2. Data Stream Processing . . . 7

1.2.1. Data Stream Applications . . . 7

1.2.2. The Requirements of Real-time Stream Processing . . . 8

1.1. Big Data

The question is: what do we mean by Big Data?

There is not a formal denition of Big Data: it refers to datasets whose size is beyond the ability of typical database software tools to capture, store, manage, and analyze.

This denition is intentionally subjective and incorporates a moving denition of how big a dataset needs to be in order to be considered big data.

For instance, we don't dene big data in terms of being larger than a certain number of terabytes (thousands of gigabytes), but we assume that, as technology advances over time, the size of datasets that qualify as big data will also increase. Today Big Data range from a few dozen terabytes to multiple petabytes (thousands of terabytes)[MC11].

In Figure1.1 you can have a look at Gartner's Hype Cycle for Emerging Technolo-gies for the year 2013[Gar13].

The point of the Hype Cycle is to give enterprises some ideas about how far various technologies are from the "plateau of productivity" where they can be more easily adopted.

The cycle has ve stages and you can see that Big Data is in the Peak of Inated Expectation and over a span of 10 years will reach a mainstream adoption.

(10)

1.1 Big Data 4

Figure 1.1.: Hype Cycle for Emerging Technologies 2013. Source: Gartner.

The ability to generate, communicate, share and access data has been revolu-tionized by the increasing number of people, devices and sensors that are now connected by digital networks.

In 2013 about 5.9 billion mobile devices were in use and by 2017 there will be more mobile devices in use than people on the planet[CCS13].

The rapid adoption of mobile devices is also driving up the usage of social network-ing: as you can see in Figure1.2at the end of 2013 Facebook reached 1.4 billion of active users. YouTube claims to upload 100 hours of video every minute[You14].

Figure 1.2.: Social network in numbers: an overview(12/2013). Source: Jeremy Waite(Head of Social Strategy EMEA, Adobe).

(11)

1.1 Big Data 5

Increasing applications of the Internet of Things, i.e., sensors and devices em-bedded in the physical world and connected by networks to computing resources, is another trend driving growth in big data: tens of millions of networked sen-sor nodes are now present in the transportation, automotive, utilities, and retail sectors[MC11].

The volume of data is growing at an exponential rate: this is the result of the studies about the size of digital information created and replicated each years. Martin Hilbert and Priscila López published a paper in Science[HL11] that ana-lyzed total global storage capacity from 1986 to 2007.

Their analysis showed that global storage capacity grew at an annual rate of 23% over that period.

Their study also documented the rise of digitization: they estimated that the percentage of data stored in digital form increased from the 25% in 2000 (analog forms such as books, photos, and audio/video tapes making up the bulk of data storage capacity at that time) to a dominant 94% share in 2007 as media such as hard drives, CDs, and digital tapes grew in importance (Figure 1.3).

Figure 1.3.: Global storage size and repartition between analog and digital data, from 1986 to 2007. Source: [HL11].

(12)

1.2 Big Data 6

Starting in 2007, the information-management company EMC sponsored the re-search rm IDC to produce an annual series of reports on the Digital Universe to size the amount of digital information created and replicated each year.

This analysis showed that in 2007, the amount of digital data created in a year exceeded the world's data storage capacity for the rst time[Gan07].

In short, there was no way to actually store all of the digital data being created. They also found that the rate at which data generation is increasing is much faster than the world's data storage capacity is expanding, pointing strongly to the continued widening of the gap between the two.

Their analysis estimated that the total amount of data created and replicated in 2009 was 800 exabytes (enough to ll a stack of DVDs reaching to the moon and back) and surpassed 1.8 zettabytes in 2011[GR11].

They projected that this volume would grow by 44 times to 2020(Figure 1.4), an implied annual growth rate of 40%[GR10].

Our stack of DVDs would now reach halfway to Mars.

Figure 1.4.: Digital Universe size from 2009 to 2020: growing by a factor of 44. Source: [GR10].

(13)

1.2 Data Stream Processing 7

1.2. Data Stream Processing

Data Stream Processing is a novel computing paradigm particularly suited for application scenarios where massive amount of data must be processed with low latency.

Rather than processing stored data like in traditional database systems (DBMSs), Data Stream Management Systems (DSMSs), also called Stream Processing En-gines (SPEs), process data on-the-y[GJPPM+12] (Figure 1.5).

A data stream is a potentially innite sequence of tuples: all the tuples of a stream share the same schema.

A DSMS processes data streams coming from dierent sources to produce streams of new data as an output: while traditional DBMSs are designed to work on persistent data and run queries just once to return a complete answer, DSMSs deal with transient data and execute standing queries, which run continuously and provide updated answers as new data arrives[CM12].

Figure 1.5.: Data Stream Processing with optional storage.

1.2.1. Data Stream Applications

Below you can nd a few example (following [CJ09]) of data stream applications:

• Telecommunication applications: the Call Detail Record (CDR)

or Station Message Detail Recording (SMDR) information and various mes-sages from network elements such as alarm mesmes-sages, performance mesmes-sages

(14)

1.2 Data Stream Processing 8

and others, fall into the category of stream data. The online billing sys-tem requires processing CDRs information in real-time in order to generate billing information on-the-y. Moreover the number of CDRs that must be processed to detect fraud in real-time is in the range of 10,000-50,000 CDR/second. In such applications, most queries for fraud detection include one or more self-joins of the CDR stream using complex predicates, requiring comparison of millions of CDR pairs per second[GJPPM+12].

• Sensor applications: sensor data monitoring is another group of

ap-plications of data stream processing; they are used to monitor various events and conditions using complex ltering and computations over sensor data streams. For instance, the highway trac monitoring and querying, the smart home, environmental and other types of monitoring are examples of sensor-generated stream processing applications.

• Financial applications: the data arising out of stock trading, cash

ows and credit card transactions can be viewed as data streams. The on-line analysis over these streams includes discovering correlations, identifying trends and outliers, forecasting future values and program trading.

• Computer network management applications: the Simple

Net-work Management Protocol (SNMP) data, the routing table information (such as BGP table information) and the network trac information are representative streams in the network eld. All these data arrive rapidly and are usually unbounded in size. Trac engineering and network secu-rity are two representative applications of data stream processing systems in computer network management eld.

• Online applications: online banking generates individual transaction

streams which need to be analyzed in a timely manner to identify potential fraud. Online auction systems such as eBay generate real-time bid streams and these systems need to update the current bid price and make decisions real-time. Large web sites such as Yahoo!, search engines such as Google and social network such as Facebook, generate numerous web-clicks and user-queries that need to be analyzed to enable applications such as personaliza-tion, load balancing, advertising, etc. on-the-y.

1.2.2. The Requirements of Real-time Stream Processing

Real-time stream processing systems have several requirements as outlined in [SCZ05]:

(15)

1.2 Data Stream Processing 9 1. Keep the data moving: process messages in-stream, without any re-quirement to store them to perform any operation or sequence of operations.

2. Query using SQL on Streams (StreamSQL): support a high-level Stream-SQL language with built-in extensible stream oriented primitives and opera-tors: StreamSQL should extend the semantics of standard SQL (that assumes records in a nite stored dataset) by adding to it rich windowing constructs and stream-specic operators (i.e. merge operator).

3. Handle Stream Imperfections (Delayed, Missing and Out-of-Order Data): have built-in mechanism to provide resiliency against stream im-perfections, including missing and out-of-order data, which are commonly present in real-world data streams.

4. Generate Predictable Outcomes: guarantee predictable and repeatable outcomes.

5. Integrate Stored and Streaming Data: have the capability to eciently store, access, and modify state information, and combine it with live stream-ing data. For seamless integration, the system should use a uniform language when dealing with either type of data.

6. Guarantee Data Safety and Availability: ensure that the applications are up and available and the integrity of the data maintained at all times, despite failures.

7. Partition and Scale Applications Automatically: have the capability to distribute processing across multiple processors and machines to achieve incremental scalability. Ideally, the distribution should be automatic and transparent.

8. Process and Respond Instantaneously: have a highly-optimized, minimal-overhead execution engine to deliver real-time response for high-volume ap-plications.

(16)

2. Storm Architecture

Contents

2.1. What is Storm? . . . 10

2.2. Storm's main concepts . . . 11

2.2.1. Stream . . . 11

2.2.2. Spout . . . 11

2.2.3. Bolt . . . 12

2.2.4. Storm topology . . . 12

2.2.5. Storm cluster . . . 13

2.3. Parallelism of a Storm topology. . . 14

2.4. Stream Grouping . . . 16

2.5. Communication mechanisms . . . 18

2.6. Guaranteeing Message Processing . . . 20

2.6.1. Reliability API . . . 22

2.7. Storm advanced features . . . 24

2.7.1. Trident . . . 24

2.1. What is Storm?

Storm is a distributed real-time computation system that is fault-tolerant and guarantees data processing[Marn].

Storm was created at Backtype, a company acquired by Twitter in 2011.

It is a free and open source project licensed under the Eclipse Public License. The EPL is a very permissive license, allowing you to use Storm for either open source or proprietary purposes. Storm does for real-time processing what Hadoop[Had] did for batch processing[Marn].

Storm has a simple API and was designed from the ground up to be usable with any programming language.

In 2013 the Apache Software Foundation has accepted Storm into its incubator program.

(17)

2.2 Storm's main concepts 11

Storm is used by over 50 companies[Marg].

2.2. Storm's main concepts

Storm's core concepts are:

• Stream • Spout • Bolt • Storm topology • Storm cluster 2.2.1. Stream

The stream is the core abstraction in Storm.

A stream is an unbounded, potentially innite sequence of tuples (Figure 2.1). Streams are dened with a schema that names the elds in the stream's tuples. By default, tuples can contain integers, longs, shorts, bytes, strings, doubles, oats, booleans, and byte arrays.

It is possible to dene custom types using serialization. Every stream has a unique ID in Storm.

Figure 2.1.: Stream of tuples.

2.2.2. Spout

A spout is a source of streams.

Generally spouts will read tuples from an external source and emit them (e.g. a Kestrel queue or the Twitter API).

Spouts can either be reliable or unreliable.

A reliable spout is capable of replaying a tuple if it failed to be processed by Storm, whereas an unreliable spout forgets about the tuple as soon as it is emitted. Spouts can emit tuples on more than one stream (Figure 2.2).

(18)

2.2 Storm's main concepts 12

Figure 2.2.: Storm spout.

2.2.3. Bolt

A bolt processes any number of input streams and produces new streams (Fig-ure 2.3).

Bolts can do anything from ltering, functions, aggregations, joins, talking to databases, and more (Storm user has to write his own code).

Bolts can do simple stream transformations. Doing complex stream transforma-tions often requires multiple steps and thus multiple bolts.

For example, transforming a stream of tweets into a stream of trending images requires at least two steps: a bolt to do a rolling count of retweets for each image and one or more bolts to stream out the top X images (you can do this particular stream transformation in a more scalable way with three bolts than with two). Bolts can emit more than one stream.

Figure 2.3.: Storm bolt.

2.2.4. Storm topology

A topology is a network of spouts and bolts.

(19)

2.2 Storm's main concepts 13

Storm topologies can be seen as a graph where each node is a spout or a bolt and edges indicate which bolts are subscribing to which streams (Figure 2.4).

Figure 2.4.: Storm topology.

In the above example Bolt4 is subscribing to Stream A and Stream B, instead Bolt5 is subscribing to Stream C .

A topology run indenitely across a Storm cluster. 2.2.5. Storm cluster

There are two kinds of nodes on a Storm cluster:

• Master node • Worker node

The master node runs a daemon called "Nimbus".

Nimbus is responsible for distributing code around the cluster, assigning tasks to machines and monitoring failures.

Each worker node runs a daemon called the "Supervisor".

The supervisor listens for work assigned to its machine and manages worker pro-cesses as necessary, based on what Nimbus has assigned to it.

Each worker process executes a subset of a topology.

A running topology consists of many worker processes distributed across many machines.

All coordination between the Nimbus and the Supervisors is done through Zookeeper[Zoo]. Zookeeper is a centralized service for maintaining conguration information,

(20)

2.3 Parallelism of a Storm topology 14

Additionally, the Nimbus daemon and Supervisor daemons are fail-fast and state-less: they immediately report any failure and all state is kept in Zookeeper or on local disk. If they die, they will restart like nothing happened.

An example of a Storm cluster with one master node, three Zookeeper nodes and ve worker nodes is shown in Figure 2.5.

Figure 2.5.: Storm cluster.

2.3. Parallelism of a Storm topology

Storm distinguishes among the following three main entities that are used to ac-tually run a topology in a Storm cluster:

• Worker process • Executor • Task

(21)

2.4 Parallelism of a Storm topology 15

Figure 2.6.: Relationships between worker processes, executors and tasks.

A worker process executes a subset of a topology. A worker process belongs to a specic topology and may run one or more executors for one or more components (spouts or bolts) of this topology.

A running topology consists of many such processes running on many machines within a Storm cluster.

An executor is a set of threads that are spawned by a worker process. It may run one or more tasks for the same component (spout or bolt). The number of executors (threads) for a component can change over time.

A task performs the actual data processing. The number of tasks for a component is always the same throughout the lifetime of a topology: this means that the following condition holds true: #executors ≤ #tasks. By default, the number of tasks is set to be the same as the number of executors, i.e. Storm will run one task per executor.

Tasks were introduced in Storm in order to choose and eventually rebalance the load of a topology in a cluster.

(22)

2.4 Stream Grouping 16

2.4. Stream Grouping

A stream grouping tells a topology how to send tuples between two components. Spouts and bolts execute in parallel as many tasks across the cluster. If you look at how a topology is executing at the task level, it looks something like in Figure 2.7.

Figure 2.7.: Storm topology at the task level.

When a task for Bolt A emits a tuple to Bolt B, which task should it send the tuple to?

A "stream grouping" answers this question by telling Storm how to send tuples between sets of tasks. The dierent kinds of stream grouping are described in the Table 2.1.

(23)

2.5 Stream Grouping 17

Name Description

Shue grouping Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.

Fields grouping The stream is partitioned by the elds specied in the grouping. For example, if the stream is grouped by the "user-id" eld, tuples with the same "user-id" will always go to the same task,

but tuples with dierent "user-id"'s may go to dierent tasks.

All grouping The stream is replicated across all the bolt's tasks. Use this grouping with care.

Global grouping The entire stream goes to a single one of the bolt's tasks. Specically, it goes to the task

with the lowest id.

None grouping This grouping species that you don't care how the stream is grouped. Currently, none groupings are equivalent to shue groupings. Eventually though, Storm will push down bolts

with none groupings to execute in the same thread as the bolt or spout they subscribe from

(when possible).

Direct grouping This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the emitDirect methods. A bolt can get the task

ids of its consumers by either using the provided TopologyContext or by keeping track

of the output of the emit method in OutputCollector (which returns the task ids

that the tuple was sent to).

Local or shue grouping If the target bolt has one or more tasks in the same worker process, tuples will be shued to just those in-process tasks. Otherwise, this acts

like a normal shue grouping. Table 2.1.: Built-in stream groupings[Mare].

(24)

2.5 Communication mechanisms 18

2.5. Communication mechanisms

There are three kind of communication mechanisms in Storm:

1. Intra-worker communication,

2. Inter-worker communication,

3. Inter-topology communication.

Intra-worker communication (or internal messaging) means the messaging that happens within a worker process in Storm, which is communication that is re-stricted to happen within the same Storm machine/node. For this communication Storm relies on various message queues backed by LMAX Disruptor[Dis], which is a high performance inter-thread messaging library.

This communication within the threads of a worker process is dierent from Storm's inter-worker communication, which normally happens across ma-chines and thus over the network. For the latter Storm uses ZeroMQ[Zer] by default (in Storm 0.9 there is experimental support for Netty[Net] as the network messaging backend). That is, ZeroMQ/Netty are used when a task in one worker process wants to send data to a task that runs in a worker process on a dierent machine in the Storm cluster.

There is not a built-in inter-topology communication mechanism in Storm: in order to make two or more dierent topologies in communication we have to develop a custom communication mechanism.

So recapping:

• Intra-worker communication (inter-thread on the same Storm node):

LMAX Disruptor.

• Inter-worker communication (node-to-node across the network): ZeroMQ

or Netty.

• Inter-topology communication: nothing built into Storm, you must take

care of this yourself with e.g. a messaging system such as Kafka/RabbitMQ, a database, etc.

In Figure2.8you can see a Storm worker process internal message queues: queues related to a worker process are colored in red, queues related to the worker's various executor threads are colored in green. For readability reasons it is shown only one worker process and only one executor thread within that worker process.

(25)

2.5 Communication mechanisms 19

Figure 2.8.: Overview of a worker's internal message queues in Storm[Nol].

To manage its incoming and outgoing tuples each worker process has a single receive thread that listens on the worker's TCP port (as congured via supervi-sor.slots.ports). The parameter topology.receiver.buer.size determines the batch size that the receive thread uses to place incoming tuples into the incoming queues of the worker's executor threads. Similarly, each worker has a single send thread that is responsible for reading messages from the worker's transfer queue and sending them over the network to downstream consumers. The size of the transfer queue is congured via topology.transfer.buer.size.

• The topology.receiver.buer.size is the maximum number of tuples that are

batched together at once for appending to an executor's incoming queue by the worker receive thread (which reads the tuples from the network). It sets the size of a simple ArrayList that is used to buer incoming tuples. Setting this parameter too high may cause a lot of problems (heartbeat thread gets

(26)

2.6 Guaranteeing Message Processing 20

starved, throughput plummets[Marc]). The default value is 8 elements, and the value must be a power of 2 (this requirement comes indirectly from LMAX Disruptor).

• Each element of the transfer queue congured with topology.transfer.buer.size

is actually a list of tuples. The various executor send threads will batch out-going tuples o their outout-going queues onto the transfer queue. The default value is 1024 elements.

Each worker process controls one or more executor threads. Each executor thread has its own incoming queue and outgoing queue. As described above, the worker process runs a dedicated worker receive thread that is responsible for moving incoming messages to the appropriate incoming queue of the worker's various ex-ecutor threads. Similarly, each exex-ecutor has its dedicated send thread that moves an executor's outgoing messages from its outgoing queue to the parent worker's transfer queue. The sizes of the executors' incoming and outgoing queues are con-gured via topology.executor.receive.buer.size and topology.executor.send.buer.size, respectively.

Each executor thread has a single thread that handles the user logic for the spout/bolt (i.e. the user code), and a single send thread which moves messages from the executor's outgoing queue to the worker's transfer queue.

• The topology.executor.receive.buer.size is the size of the incoming queue for

an executor. Each element of this queue is a list of tuples. Here, tuples are appended in batch. The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).

• The topology.executor.send.buer.size is the size of the outgoing queue for an

executor. Each element of this queue will contain a single tuple. The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).

2.6. Guaranteeing Message Processing

Storm guarantees that each tuple emitted by a spout will be fully processed. In order to understand the tuple fully processed concept, suppose to have a topology that reads sentences o of a Kestrel queue, splits the sentences into its constituent words, and then emits for each word the number of times it has seen that word before.

(27)

2.6 Guaranteeing Message Processing 21

A tuple coming o the spout triggers many tuples being created based on it: a tuple for each word in the sentence and a tuple for the updated count for each word.

On Figure 2.9 you can see the tree of messages.

Figure 2.9.: Example of tuple tree built by Storm

Storm considers a tuple coming o a spout "fully processed" when the tuple tree has been exhausted and every tuple in the tree has been processed.

A tuple is considered failed when its tree of messages fails to be fully processed within a specied timeout.

The following listing represents the interface that spouts implement:

public interface ISpout extends Serializable {

void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

void close();

void activate();

void deactivate();

void nextTuple();

void ack(Object msgId);

void fail(Object msgId); }

(28)

2.6 Guaranteeing Message Processing 22

the spout.

The spout uses the SpoutOutputCollector provided in the open() method to emit a tuple to one of its output streams.

When emitting a tuple, the spout provides a unique message id that will be used to identify the tuple later.

Emitting a message to the SpoutOutputCollector looks like this:

_collector.emit(new Values("field1", "field2", 3) , msgId);

Next, the tuple is sent to the bolts and Storm takes care of tracking the tree of messages that is created.

If Storm detects that a tuple is fully processed, Storm will call the ack() method on the originating spout task with the message id that the spout provided to Storm. Likewise, if the tuple times-out Storm will call the fail() method on the spout. Note that a tuple will be acked or failed by the exact same spout task that created it. So if a spout is executing as many tasks across the cluster, a tuple won't be acked or failed by a dierent task than the one that created it.

In this way Storm provides an at-least-once processing guarantee: it means that Storm guarantees that a tuple will processed at least one time (maybe more than one).

2.6.1. Reliability API

In order to use Storm's reliability capabilities we need to:

1. Notify Storm when we are creating a new link in the tuple tree.

2. Notify Storm when we have nished processing an individual tuple.

By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately.

Storm's API provides a concise way of doing both of these tasks. The following listing represents the interface that bolts implement:

public interface IBolt extends Serializable {

void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

void execute(Tuple input);

void cleanup(); }

(29)

2.6 Guaranteeing Message Processing 23

Specifying a link in the tuple tree is called anchoring.

Anchoring is done at the same time you emit a new tuple in the execute() method. The bolt uses the OutputCollector provided in the prepare() method to emit a tuple to one of its output streams.

Here is an example took from [Marf]:

public void execute(Tuple tuple) {

String sentence = tuple.getString(0);

for(String word: sentence.split(" ")){

_collector.emit(tuple, new Values(word)); }

_collector.ack(tuple); }

This bolt execute() method splits a tuple containing a sentence into a tuple for each word.

Each word tuple is anchored by specifying the input tuple as the rst argument to emit.

Since the word tuple is anchored, the spout tuple at the root of the tree will be replayed (in case of reliable spout) later on if the word tuple failed to be processed downstream.

An output tuple can be anchored to more than one input tuple. This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts. Multi-anchoring is done by specifying a list of tuples rather than just a single tuple. Anchoring is how you specify the tuple tree, the next and nal piece to Storm's reliability API is specifying when you've nished processing an individual tuple in the tuple tree. This is done by using the ack() and fail() methods on the OutputCollector.

If you look back at the SplitSentence example above, you can see that the input tuple is acked after all the word tuples are emitted. Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.

A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of stateless operators, for instance lters and simple functions. Storm has an interface called IBasicBolt that encapsulates this pattern.

(30)

2.7 Storm advanced features 24

The above example can be written as a BasicBolt like follows:

public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0);

for(String word: sentence.split(" ")) { collector.emit(new Values(word)); }

}

This implementation is simpler than the rst implementation and semantically identical.

Tuples emitted to BasicOutputCollector are automatically anchored to the in-put tuple, and the inin-put tuple is acked for you automatically when the execute() method completes.

In contrast, bolts that do aggregations or joins may delay acking a tuple until after it has computed a result based on a bunch of tuples. Stateful operators like aggregations and joins will commonly multi-anchor their output tuples as well, so they will not follow the BasicBolt pattern.

2.7. Storm advanced features

2.7.1. Trident

Trident is a high-level abstraction for doing real-time computing on top of Storm[Marl]. Trident was designed and implemented by the same team that built Storm.

It allows you to seamlessly intermix high throughput, stateful stream process-ing with low latency distributed queryprocess-ing. Trident has consistent, exactly-once semantics[Marl].

2.7.1.1. Trident's main concepts

In Trident the main concept is the batch: Trident processes streams as batches of tuples, unilke Storm that processes one tuple at a time.

The batch size is set by the spout and the documentation[Mark] states: each batch is given a unique transaction id; however we found out that if there is more than one spout in the topology, this sentence is no longer true. We reported this bug[Bug].

(31)

2.7 Storm advanced features 25

Batches are partitioned among the nodes in the cluster: the operations are applied in parallel across each partition as you can see in Figure2.10.

Figure 2.10.: Trident batch partitioning example.

Let's think about the benets and the drawbacks of processing tuples in batch. When we are dealing with a source of state (e.g. a database), we execute one UP-DATE per batch and not one UPUP-DATE per tuple like in Storm. So, for instance, if the batch size is equal to N , with Storm we would do N update operation on our source of state, instead of 1 update operation performed with Trident.

Moreover Trident provides the following semantics which are sucient for achiev-ing exactly-once processachiev-ing semantics[Mark]:

• Tuples are processed as small batches, as exposed above.

• Each batch of tuples is given a unique id (except in the case of more than one

spout in the topology[Bug]) called the "transaction id" (txid). If the batch is replayed, it is given the exact same txid.

• State updates are ordered among batches. That is, the state updates for

batch 3 won't be applied until the state updates for batch 2 have succeeded. Achieve an exactly-once processing semantic means that Trident guarantees that a tuple will processed one and only one time. Of course, the latency of a tuple being processed in batch is higher than the one processed in one-at-a-time model like Storm.

(32)

2.7 Storm advanced features 26 2.7.1.2. Trident's API and State Management

Trident has higher level constructs upon Storm: it provides to the user a batch API[Marj] and some abstractions for reading and writing to stateful sources[Mark].

In this thesis we will not expose the details of Trident's API and state management. Broadly the batch API provides a set of operations that works intra-batch, i.e. for all the tuple that belong to the same batch. This operations are:

• projections, • lters, • functions, • repartitioning operations, • merges, • joins.

This operations make easier the implementation of the stateless operators(e.g. lter or map), but are useless for the stateful operators(e.g. aggregation or join).

In order to perform a cross-batches operation, Trident state is required: the state can be internal to the topology (e.g. kept in-memory and backed by HDFS) or external to the topology (e.g. stored in database like Cassandra). In a simply way Trident users can build, update and query his own custom state as described in [Mark].

(33)

3. Enhanced Storm Implementation

Contents

3.1. Enhanced Storm: goals and motivations. . . 27

3.2. Application scenario for example usage . . . 28

3.3. Stateless Operators . . . 29

3.3.1. Map . . . 30

3.3.2. Filter . . . 31

3.3.3. MoFilter. . . 33

3.3.4. Union . . . 35

3.3.5. Simple Stateless Topology Example . . . 36

3.4. Stateful Operators . . . 38 3.4.1. State Abstraction. . . 39 3.4.2. Window Abstraction . . . 40 3.4.3. Aggregate . . . 41 3.4.4. Join . . . 46 3.5. Database-specic Operators . . . 49 3.5.1. INSERT. . . 49 3.5.2. UPDATE . . . 49 3.5.3. DELETE . . . 50 3.5.4. SELECT . . . 51

This chapter provides some information about Enhanced Storm and the operators im-plemented for this system.

For each operator we provide a summary of its basic functionality, some implementation details and an example usage. For a complete description refer to the user guide in appendixA.

3.1. Enhanced Storm: goals and motivations

Enhanced Storm is a distributed real-time computation system built on top of Storm.

(34)

3.3 Application scenario for example usage 28

It keeps the scalability and the fault-tolerance of Storm; moreover it provides to the user built-in operators for streaming computation that are not present in Storm: they can be classied as stateless, stateful or database-specic.

Stateless operators do not keep state across tuples and perform their computation solely based on each input tuple.

Stateful operators perform operation on sequence of tuples. Because of the innite nature of the data stream, stateful operators perform their computation on sliding windows of tuples dened over a period of time (e.g. tuples received in the last hour) or as a xed number of tuples (e.g. last 100 tuples). The concept of window is not present in Storm.

Database-specic operators perform operation on an Apache Derby database. Enhanced Storm supply to the user a generic, high performing and easy-to-use system.

The reasons that drove us to not choose Trident as lower layer of Enhanced Storm are twofold:

1. Trident is not a stable and mature system: as exposed in the previous chapter there are still a lot of bug to be solved, not least the one found out by us [Bug].

2. We had more freedom working in a lower layer: Trident would hide some implementation details.

3.2. Application scenario for example usage

For the example usage of Enhanced Storm implemented operators we will use the Call Description Record (CDR) schema, as shown in Table 3.1.

Name Type Description

Src String Caller's number

Dst String Called's number

Start time Start time of call

End time End time of call

District Integer Area-Id where caller is located Lat Double Latitude coordinate of the caller Lon Double Longitude coordinate of the caller

Ts time Emission timestamp of the CDR

Table 3.1.: Call Description Record (CDR) schema used for example usage of Enhanced Storm operators.

(35)

3.3 Stateless Operators 29

3.3. Stateless Operators

A stateless operator processes one tuple at a time and might produce a tuple as output. If so, this output is produced based solely on the information contained by the input tuple. Therefore, it provides basic processing functionality such as ltering and transformation.

In Figure 3.1 you can see an UML diagram showing inheritance and interfaces of stateless operators: as said before in 2.6.1, the interface implemented by this operators, IStatelessOperator, extends the IBasicBolt interface oered by Storm API.

(36)

3.3 Stateless Operators 30

Figure 3.1.: UML diagram for stateless operators.

3.3.1. Map

Basic Functionality

Map is a generalized projection operator dened as: M ap(S) = {A01 = f1(tin), ..., A

0

(37)

3.3 Stateless Operators 31

It takes one input stream S and, for each input tuple tin, produces an output tuple

with attributes A0 1, ..., A 0 m where, A 0 i = fi(tin), for 1 ≤ i ≤ m.

{fi}1≤i≤m is a set of user dened functions that allow both mathematical and

logical expressions. Implementation Details

This bolt uses an EvalExpression object provided by Enhanced Storm API. EvalExpression uses the MVEL library[MVE] and has two main method: the rst one compiles all the expressions dened by the user and it is called by the Map bolt object constructor; the second one executes all the precompiled expressions on the actual tuple and it is called by the Map bolt execute() method.

Example Usage

Figure 3.2 shows an example of a Map operator dene as follow:

M ap(S) = {Call Duration = Startin− Endin, District = Distructin}

It extracts the District and the Call Duration(dened as above) from each incom-ing tuple.

Figure 3.2.: Map operator example usage.

3.3.2. Filter

Basic Functionality

Filter operator is used for content-based tuple routing and to discard tuples. It is dened as:

(38)

3.3 Stateless Operators 32

F ilter(S) = {(P1, O1)}

It takes one input stream S and has one output stream O1in which the Filter

operator emits input tuples such that P1(tin) = T RU E, where P1 is a user dened

predicate.

Implementation Details

This bolt uses an EvalExpression object provided by Enhanced Storm API. EvalExpression uses the MVEL library[MVE] and has two main method: the rst one compiles the predicate dened by the user and it is called by the Filter bolt object constructor; the second one executes the precompiled predicate on the actual tuple and it is called by the Filter bolt execute() method. If the predicate specied by the user is true for the input tuple, it is emitted in the output stream. Example Usage

Figure 3.3 shows an example of a Filter operator dene as follow:

F ilter(S) = {(”District > 50 and T s < 1000”, O1)}

It emits all the tuples that satisfy the above predicate and discards all the other tuples.

(39)

3.3 Stateless Operators 33

Figure 3.3.: Filter operator example usage.

3.3.3. MoFilter Basic Functionality

MoFilter operator is used for content-based tuple routing and to discard tuples. It is dened as:

M oF ilter(S) = {(P1, O1), ..., (Pm, Om)}

It takes one input stream S and has m output stream O1...Om. The MoFilter

operator emits input tuples on the output stream Oi if Pi(tin) = T RU E, with

1 ≤ i ≤ m. P1...Pm is a set of user dened predicates.

Implementation Details

This bolt uses an EvalExpression object provided by Enhanced Storm API. EvalExpression uses the MVEL library[MVE] and has two main method: the rst one compiles all the predicate dened by the user and it is called by the MoFilter

(40)

3.3 Stateless Operators 34

bolt object constructor; the second one executes the precompiled predicates on the actual tuple and it is called by the MoFilter bolt execute() method. If the ith predicate specied by the user is true for the input tuple, it is emitted in the ith output stream for 1 ≤ i ≤ m (the tuple could be emitted in zero, one or more output stream).

Example Usage

Figure 3.4 shows an example of a MoFilter operator dene as follow:

F ilter(S) = {(”Src = 3285815711”, O1), (”End > 21 : 00 : 00 and End < 22 : 00 : 00”, O2)}

N.B. the input stream is only one: I.

(41)

3.3 Stateless Operators 35

3.3.4. Union

Basic Functionality

Union operator has multiple input streams and a single output one. It is dened as

F ilter(S1...Sm)

where S1...Smare the input stream. Input tuples are emitted to the output Stream

in FIFO order.

Implementation Details

Union bolt simply emits the input tuples to the output stream in FIFO order in the execute() method.

Example Usage

Figure 3.5 shows an example of an Union operator.

(42)

3.3 Stateless Operators 36

3.3.5. Simple Stateless Topology Example

In this section we show a simple example code of a topology composed by stateless operators.

Suppose that we have two source of information with the CDR schema (3.2) and we want to extract some information about the calls located in a certain District (in the example the 145) for example the call duration and the timestamp only under some condition (in the example call duration equals or above one minute and timestamp under 1000).

The Storm Enhanced topology that implements this query is shown in Figure 3.6 and following you can nd the example code.

Figure 3.6.: Simple Stateless Topology Example

public static void main(String[] args) {

//topology builder

TopologyBuilder builder = new TopologyBuilder();

//configuration parameters

Config conf = new Config(); ...

//topology

StormTopology topology = builder.createTopology();

(43)

3.3 Stateless Operators 37

LinkedHashMap<String, String> fieldTypeMap = new LinkedHashMap<String, String>();

fieldTypeMap.put("src", "String"); fieldTypeMap.put("dst", "String"); fieldTypeMap.put("start", "Long"); fieldTypeMap.put("end", "Long");

fieldTypeMap.put("district", "Integer"); fieldTypeMap.put("lat", "Double"); fieldTypeMap.put("lon", "Double"); fieldTypeMap.put("timestamp", "Long"); Schema schema = new Schema(fieldTypeMap);

//CDR spouts

FraudLinearSpout spout1 = new FraudLinearSpout("streamCDR1", schema, 0); builder.setSpout("spout1", spout1, numExecutorSpout1);

FraudLinearSpout spout2 = new FraudLinearSpout("streamCDR2", schema, 0); builder.setSpout("spout2", spout2, numExecutorSpout2);

//union bolt

UnionBolt unionBolt = new UnionBolt("unionStream", spout.getOutputFields());

builder.setBolt("union", unionBolt, numExecutorUnion) .shuffleGrouping("spout1", "streamCDR1") .shuffleGrouping("spout2", "streamCDR2");

//filter bolt

FilterBolt filterBolt = new FilterBolt("filterStream",

spout.getOutputFields(),

"District == 145"); builder.setBolt("filter", filterBolt, numExecutorFilter)

.shuffleGrouping("union", "unionStream");

//map bolt

HashMap<String, String> outputExpressionMap = new HashMap<String, String>();

outputExpressionMap.put("District", "District");

outputExpressionMap.put("CallDuration", "End - Start"); outputExpressionMap.put("Ts", "Ts");

MapBolt mapBolt = new MapBolt("mapStream", outputExpressionMap); builder.setBot("map", mapBolt, numExecutorMap);

.shuffleGrouping("filter", "filterStream");

(44)

3.4 Stateful Operators 38

HashMap<String, String> outputConditionMap = new HashMap<String, String>();

outputConditionMap.put("stream1", "Ts < 1000");

outputConditionMap.put("stream2", "CallDuration >= 60");

MoFilterBolt moFilterBolt = new MoFilterBolt(outputConditionMap, mapBolt.getOutputFields());

builder.setBot("mux", moFilterBolt, numExecutorMoFilter); .shuffleGrouping("map", "mapStream");

//submit the topology if(LOCAL){

//local mode

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("local_stateless_topology", conf, topology);

}

else {

//cluster mode

StormSubmitter.submitTopology("cluster_stateless_topology, conf, topology);

} }

As you can see in the code above, the topology could be submitted both in local mode and in cluster mode. The local mode is used especially for debug.

For details on the usage of the operators refer to appendixA.

3.4. Stateful Operators

Stateful operators keep state across input tuples through windowing. That is, an operator keeps an arbitrary long window of input tuples and computation is performed across all events in its window: each output tuple is a function of all the events stored in the window.

Enhanced Storm adds to Storm API the State and the Window abstractions (there is not such concepts in Storm) as well as built-in stateful operators like aggregate and join.

In Figure 3.7 you can see an UML diagram showing inheritance and interfaces of stateful operators: as said before in 2.6.1, the interface implemented by this operators, IStatefulOperator, extends the IRichBolt interface oered by Storm

(45)

3.4 Stateful Operators 39

API, beacause for stateful operators we can't use the automatic anchoring and acking oered by the IBasicBolt interface.

Figure 3.7.: UML diagram for stateful operators.

3.4.1. State Abstraction

State abstraction is used by stateful operators in order to keep state among tuples.

In Figure 3.8 is shown an UML diagram for the State abstraction provided by Enhanced Storm.

There are two concrete classes extending the abstract class State: the SimpleState class that is a list of Storm tuples and the GroupByState class that is a map between a String identifying the group and a SimpleState object representing the state related to this group.

(46)

3.4 Stateful Operators 40

Figure 3.8.: UML diagram for State abstraction.

3.4.2. Window Abstraction

Stateful operators perform their computation on sliding windows of tuples dened over a period of time or as a xed number of tuples.

The window abstraction is essential in a streaming computation system, because a stream is potentially an innite sequence of tuples: in Storm there is not a formal denition of a window.

In Enhanced Storm, as you can see in Figure 3.9, a Window class is dened by the window type(number of tuples or period of time), the window size and the advance, that specify how to discard the tuples in the window.

Each of the two types can have two dierent state object: a SimpleState object or a GroupByState object.

(47)

3.4 Stateful Operators 41

Figure 3.9.: UML diagram for Window abstraction.

3.4.3. Aggregate Basic Functionality

Aggregate operator computes aggregate functions (e.g., sum, min, count, etc.) on a window of tuples. It is dened as:

Aggregate(S) = {A01 = f1(W ), ..., A

0

m = fm(W ) [Group − By = (A1, ..., An)]}

Windows are either based on time (e.g., tuples in the last 10 minutes) or on the number of tuples (e.g., the last 100 tuples) as described in 3.4.2. In both cases, when the window is full, a set of aggregate functions {fi}1≤i≤m are computed over

the tuples currently in the window and results are emitted as an output tuples. Given the tuples currently stored in the window W , the output tuple has attributes A01, ..., A0m.

Once an output tuple has been produced, the window is updated purging stale tuples, according to advance parameter of the window. Further, tuples can be separated (e.g., stored in independent windows as described in 3.4.2) by their

(48)

3.4 Stateful Operators 42

contents using the Group-By parameter. Implementation Details

Aggregate operator is a stateful operator: in Figure 3.10 you can see an UML diagram showing the inheritance of the AggregateState class, that represents the state of an aggregate operator.

There are four concrete implementation of the AggregateState class: each of them is based on a dierent conguration of the window (on tuples number or on time / simple or group-by).

Figure 3.10.: AggregateState UML diagram.

In the execute() method the aggregate operator executes an algorithm based on the conguration of the window specied by the user: in Figure 3.11 you can see the AggregateAlgorithm UML diagram.

(49)

3.4 Stateful Operators 43

In Figure 3.12 there is a ow chart representing the algorithm executed by the Aggregate operator in the execute() method, when the user specify a simple window based on the number of tuples; the inputTuple is immediately insert in the window, than there is a check: if the window is not full the operator doesn't do anything, otherwise (the window is full) the operator gets all the tuples in the window and discards the earliest advance (parameter of the window) tuples in the window; then the operator applies the aggregate functions on the tuples got from the window and emits the new values evaluated.

In the next paragraph (3.4.3) you can nd an example usage of the aggregate algorithm on a Window based on the number of tuples.

Figure 3.12.: Aggregate algorithm on a Window based on the number of tuples.

In Figure 3.13 there is a ow chart representing the algorithm executed by the Aggregate operator in the execute() method, when the user specify a simple win-dow based on time; the algorithm is a bit more complex than the one used for the window based on the number of tuples: the Aggregate bolt keeps three variables, currentSecondInWindow, that represent the timestamp (with the granularity of the second) of the last tuple inserted in the window, nextTimestamp that represents the threshold after which the window is considered full, i.e. when the operator has to evaluate the aggregation functions on the tuples in the window and emits the new values, and upperBoundTimestamp that indicates from which timestamp the

(50)

3.4 Stateful Operators 44

operator starts discarding tuples and is upadated based on the advance parame-ter. Firstly, the operator insert all the input tuples with the same timestamp in the window; if the current input tuple timestamp is not equals to currentSecond-InWindow (it means that the current input tuple timestamp is greater than the currentSecondInWindow, because we suppose the tuple arrival ordered in time), the operator enters in a cycle that continue till the current input tuple timestamp is greater than the nextTimestamp: in the cycle, if the window is not empty the operator gets all the tuples in the window with a timestamp less or equal the next-Timestamp and discards all the tuples in the window with a timestamp less than the upperBoundTimestamp; then the operator applies the aggregate functions on the tuples got from the window, updates the nextTimestamp by adding advance, and emits the new values evaluated. If the window is empty than the operator has to update the nextTimestamp without emits anything: the jump of the timestamp could be big in order to avoid a great number of cycle only to compute the sum between nextTimestamp and the advance.

At the end of the cycle the inputTuple is inserted in the window and the cur-rentSecondInWindow is updated.

Figure 3.13.: Aggregate algorithm on a Window based on time.

Example Usage

In Figure 3.14 you nd an example of how the aggregate algorithm on a Window based on the number of tuples works. In the example the window size is 3 and the advance is 2.

(51)

3.4 Stateful Operators 45

Figure 3.14.: Example of aggregate algorithm on a Window based on the number of tuples.

(52)

3.4 Stateful Operators 46

3.4.4. Join

Basic Functionality

The join operator allows correlating tuples from multiple streams. It is dened as: J oin(Sl, Sr) = {(P, O

0 )}

Join uses windows to keep state across input tuples. Windows are based on time (e.g., tuples in the last 10 minutes) for the join operator. In particular, two separate windows are kept for the two input streams: Sl denoted the left input

stream and Sr denoted the right input stream. The user-dened predicate P is

evaluated over pair of tuples, each belonging to one window. Tuples pairs that satisfy the predicate are propagated over the output stream O0

that has attributes A01, ..., A0m specied from the user and deriving from m projections over both Sl

and Sr.

Implementation Details

Join operator is a stateful operator: in Figure 3.15 you can see an UML diagram showing the inheritance of the JoinTimeState class, that represents the state of a join operator.

There are two concrete implementation of the JoinTimeState class: the former, JoinSimpleTimeState has two simple time windows attributes, the latter, Join-GroupByTimeState has two group-by time windows attributes.

(53)

3.4 Stateful Operators 47

In the execute() method the join operator executes an algorithm on a Window based on time: in Figure3.16you can see a ow chart representing that algorithm. Firstly the operator identies if the input tuple comes from the left input stream or the right input stream; suppose the case of a tuple coming from the left(right) input stream: the operator inserts the input tuple in the left(right) window and get all the tuples in the right(left) window with a timestamp less or equal than the timestamp of the input tuple but greater than or equal to the dierence between that timestamp and the window size; moreover the operator removes all the tuples with a timestamp less than that dierence from the right(left) window. After that the operator does the join, evaluating the user-dened predicate between the input tuple and the tuples got from the right(left) window; if the result of the join is not empty the operator emits that result.

In the next paragraph (3.4.4) you can nd an example usage of the join algorithm on a Window based on time.

Figure 3.16.: Join algorithm on a Window based on time.

Example Usage

In Figure 3.17you nd an example of how the join algorithm on a Window based on time works. In the example the window size is 3 seconds.

(54)

3.5 Stateful Operators 48

(55)

3.5 Database-specic Operators 49

3.5. Database-specic Operators

A database operator handles incoming tuples and enables correlate these tuples with information persisted in a traditional database. They oer basic functionality such as persistent tuple storage and retrieval. The cost of this new capability is the latency of the database that is known to be greater than typical in memory processing operations.

The database operators are implemented using the Apache Derby DB[Der], an open source relational database implemented entirely in Java; they access a Derby DB using the JDBC (Java DataBase Connectivity)[JDB], a Java-based data access technology.

3.5.1. INSERT Basic Functionality

The insert operator stores input tuples in a table of a relational database. It uses standard SQL expressions to insert a new row in the table.

Implementation Details

Figure 3.18.: Enhanced Storm insert operator

3.5.2. UPDATE Basic Functionality

The update operator is used to update tuples, using an SQL expression parame-terized with the elds of an input tuple.

(56)

3.5 Database-specic Operators 50

Implementation Details

Figure 3.19.: Enhanced Storm update operator

3.5.3. DELETE Basic Functionality

The delete operator is used to delete tuples from a table of a relational DB, ac-cording to an SQL expression that is parameterized with the elds of an input tuple.

Implementation Details

(57)

3.5 Database-specic Operators 51

3.5.4. SELECT Basic Functionality

The select operator parameterizes a Select SQL expression with the elds of an input tuple. The expression is evaluated against a table and the resulting tuples are propagated over the output stream as output tuples.

Implementation Details

(58)

4. Enhanced Storm Scalability

Evaluation

Contents

4.1. Enhanced Storm Monitor . . . 52

4.2. Evaluation Testbed and Data Set . . . 54

4.3. Enhanced Storm Conguration for the Evaluation . . . 55

4.4. Scalability Evaluation Results . . . 56

4.4.1. Stateless Operators: Union . . . 56

4.4.2. Stateful Operators: Aggregate . . . 58

4.4.3. Stateful Operators: Join . . . 59

4.4.4. Scale-out Evaluation . . . 60

In this chapter we analyze the results of Enhanced Storm evaluation: for each Enhanced Storm operator we performed a functional validation and after that we focused our evaluation on its scalability; in particular the experiments evalu-ated the scalability, in terms of throughput, of the union operator as example of stateless operator, and of the aggregate and join operators, more challenging to distribute, as example of stateful operators.

Before showing the nal results we will describe the Monitor used to get the data for the evaluation: Storm provides a built-in monitoring tool (Storm UI[Marh]), but it does not provides a way to store the statistics in order to analyze the performance of the system; it simply visualizes a set of metrics at a certain time. It was not suited for our evaluation, so we developed the Enhanced Storm Monitor as described in the following section.

After that we will present the evaluation testbed, the data set, the Enhanced Storm conguration used during the experiments and the nal results of the evaluation.

4.1. Enhanced Storm Monitor

In order to retrieve Enhanced Storm cluster statistics in real-time and store that data in a user-specied location (probably in his local machine), we develop a

(59)

4.1 Enhanced Storm Monitor 53

Monitor that opens a connection with the master node in the cluster (that runs the Nimbus daemon, as explained in2.2.5) on a specic port (6627), and ask for the metrics aggregated by the Nimbus; the metrics are made available by the Nimbus in xed time intervals and the minimum period at which the metrics refresh is set at 3 seconds and can not be changed.

The Storm Enhanced Monitor gets, every 3 seconds by default, all the metrics provided by Storm statistics API: this API were generated for Java language by Thrift[Thr] compiler. This metrics, relative to the components of a Storm topology, are the same that Storm UI[Marh] (see Figure 4.1) tool provides:

• Emitted tuples: #tuples emitted by the component,

• Trasferred tuples: #tuples actually transferred from one component

to another,

• Executed tuples (only for bolts): #tuples processed by the component, • Acked/Failed tuples: #tuples acked or failed by the ackers for

guar-anteeing message processing (see 2.6) ,

• Capacity (only for the bolts): % of the time in the last 10 minutes the bolt

spent executing tuples,

• Execute latency (only for the bolts): time spent in the execute() method

for a tuple,

• Process latency (only for the bolts): time until tuple is acked.

(60)

4.2 Evaluation Testbed and Data Set 54

Enhanced Storm Monitor gives also information about the cluster, the topologies running on it and the Nimbus conguration. It works like shown in Figure 4.2.

Figure 4.2.: Enhanced Storm Monitor

4.2. Evaluation Testbed and Data Set

All experiments were run on a shared-nothing cluster of 14 nodes (blades) with a total of 56 cores. All blades are Supermicro SYS-5015M-MF+ equipped with a quad-core Intel Xeon X32202@2.40GHz, 8GB of RAM and 1Gbit Ethernet and a directly attached 0.5TB hard disk. During the experiments, roughly half of the available nodes was devoted to load injection in order to reach input rates that would saturate the remaining nodes. In Figure 4.3 you can see the blade's architecture.

(61)

4.3 Enhanced Storm Conguration for the Evaluation 55

Figure 4.3.: Blade's architecture overview.

Our evaluation focused on mobile telephony application scenario where activities like customer billing and fraud detection, require processing massive amount of calls per seconds. Each call generates a tuple referred to as Call Description Record (CDR), that contains information about the parties involved in the call. The schema of the CDRs used for the evaluation is shown in Table 4.1.

Name Type Description

Src String Caller's number

Dst String Called's number

Start time Start time of call

End time End time of call

District Integer Area-Id where caller is located Lat Double Latitude coordinate of the caller Lon Double Longitude coordinate of the caller

Ts time Emission timestamp of the CDR

Table 4.1.: Call Description Record (CDR) schema used for the evaluation of Enhanced Storm operators.

4.3. Enhanced Storm Conguration for the Evaluation

For Enhanced Storm evaluation we follow what Nathan Marz (Storm creator) suggests about the tuning of Storm in [Mara], setting the following parameter conguration (see2.5 for details on the parameters specied below):

conf.setMaxSpoutPending(20000); //acking flow control

conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);

conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

Riferimenti

Documenti correlati

Solution proposed by Roberto Tauraso, Dipartimento di Matematica, Universit`a di Roma “Tor Vergata”, via della Ricerca Scientifica, 00133 Roma,

Heteropolymer, Semiflexible Chain, Disorder, Persistence Length, Large Scale Limit, Tensor Analysis, Non-Commutative Fourier

La strada che abbiamo scelto per provare che il “Paradigma dell’Evolvibilità” può essere usato con successo è di mostrare come, due prodotti molto differenti,

Figure 3 shows more details on the precision of our solution by showing the number of global icebergs that should be detected (referred to as generated) and the number of items

The quantification of quantumness is necessary to assess how much a physical system departs from a classical behaviour and thus gauge the quantum enhancement in opera- tional tasks

Carbazoles, prevalent as structural motifs in various synthetic materials and naturally occurring alkaloids, as is known, have many applications such as

However, at present only low temperature superconductors (LTS), operating at boiling helium temperature, are used to design and produce magnets, because the

Cartesian reference system belonging to the triangle