• Non ci sono risultati.

Benchmarking Data Stream Processing Frameworks on Multicores

N/A
N/A
Protected

Academic year: 2021

Condividi "Benchmarking Data Stream Processing Frameworks on Multicores"

Copied!
109
0
0

Testo completo

(1)

Università di Pisa and Scuola Superiore Sant’Anna

Department of Computer Science

Master’s Degree in Computer Science and Networking

Benchmarking Data Stream Processing

Frameworks on Multicores

Candidate: Alessandra Fais Supervisor: Dr. Gabriele Mencagli Academic Year 2018/2019

(2)

Abstract

In recent years, the need for continuous processing and real-time analysis of data streams has increased rapidly. In order to achieve high-throughput and low-latency requirements, a stream application can be implemented choosing one of the various Data Stream Processing frameworks that offer suitable abstractions for operator parallelization and distribution. This work shows a comparison in terms of performance (bandwidth and latency) between traditional Data Stream Processing systems (Apache Storm and Flink) and the WindFlow C++17 library, which is an efficient streaming library developed by the Parallel Program-ming Models group at the Department of Computer Science of the University of Pisa. Four real-world Data Stream Processing applications have been implemented using Storm, Flink and WindFlow. Experiments are conducted on a single multi-core machine showing a significant throughput improvement and latency reduction by using the C++ solution with respect to the state-of-the-art frameworks.

(3)
(4)

"The good thing about science is that it’s true whether or not you believe in it."

(5)
(6)

Ringraziamenti

Mancano pochi giorni ormai alla conclusione di un percorso intenso, che mi ha fatto cresere molto sotto tanti aspetti, da quello formativo a quello umano. Tutto è iniziato esattamente 8 anni fa, quando ho deciso di partire alla volta di un qualcosa di totalmente nuovo per me, proseguendo poi sempre a Pisa con la scelta della magistrale. È stato un cammino arduo, costellato di scelte difficili, che mi ha però portato ad essere qui oggi e che mi ha consentito di incontrare persone speciali, di quelle che mi fanno ritenere fortunata e che mi rendono felice di aver intrapreso questa strada, scommettendo su me stessa.

In questa occasione vorrei ringraziare prima di tutto il Dott. Gabriele Mencagli, per il supporto professionale e morale, la grande disponibilità, la prontezza nel chiarirmi anche il più piccolo dubbio e la fiducia che mi ha sempre dimostrato durante questo lavoro di tesi. Vorrei dire grazie ai miei genitori, che mi hanno sempre spronato ad andare avanti, e anche nei momenti più difficili non hanno mai mancato di farmi avere il loro pieno supporto. Questo risultato lo dedico a loro, che hanno investito sul mio futuro, non avendo mai dubbi e credendoci sempre. E grazie a Marco, il fratello maggiore su cui posso contare da 27 anni, pronto a consigliarmi ed aiutarmi in ogni situazione. È stata un’annata piena di traguardi in famiglia, e sono contenta di aver potuto finalmente dare anche il mio contributo!

Un grazie speciale è per Orlando, mio fan numero uno, il cui sostegno è stato fonda-mentale. Un punto di riferimento, che mi sta accanto fin dai primi mesi di questo cammino universitario, e che mi porta a cercare di migliorare sempre.

Ringrazio di cuore Vale e Silvia, le amiche di una vita, che hanno fatto di tutto pur di essere presenti in questo giorno particolare, e che mi sono sempre state vicine, anche quando a separarci fisicamente sono diverse centinaia di kilometri di distanza.

Un grazie particolare va a Sara, amica che sono grata di aver trovato in questi anni. Abbiamo condiviso diverse tappe importanti di questo cammino, facendoci coraggio a vicenda in più occasioni, e riuscendo sempre ad andare avanti nonostante le difficoltà. Dopo la laurea triennale, è bello poter concludere assieme anche questo secondo capitolo.

Un ringraziamento lo vorrei rivolgere infine a chiunque mi abbia incoraggiato, mi sia stato vicino, mi abbia supportato e sopportato in tutti i sensi. A voi tutti va un sincero grazie, per aver contribuito, ognuno a suo modo, a quella che sono oggi e al conseguimento di questo traguardo importante.

(7)

Contents

Introduction 7

1 Introducing Data Stream Processing 11

1.1 Storm . . . 16

1.2 Flink . . . 20

2 WindFlow Library 29 2.1 FastFlow . . . 34

3 Benchmarking Streaming Applications 38 3.1 Fraud Detection . . . 39

3.1.1 Data-Flow graph description and bottleneck analysis . . . 39

3.1.2 Implementation . . . 40

3.2 Spike Detection . . . 45

3.2.1 Data-Flow graph description and bottleneck analysis . . . 46

3.2.2 Implementation . . . 48

3.3 Traffic Monitoring . . . 51

3.3.1 Data-Flow graph description and bottleneck analysis . . . 51

3.3.2 Implementation . . . 55

3.4 Word Count . . . 60

3.4.1 Data-Flow graph description and bottleneck analysis . . . 60

3.4.2 Implementation . . . 61

4 Experimental Results 65 4.1 Tests . . . 75

4.1.1 Performance results in terms of bandwidth . . . 76

4.1.2 Performance results in terms of latency . . . 85

5 Conclusions 91 A Overview on the abstractions offered by the analyzed DaSP frameworks 92 A.1 Comparison in terms of native parallelism dimensions . . . 94

B Complete code produced in this work 95 B.1 Description of the GitHub repositories . . . 95

(8)

List of Acronyms

API Application Programming Interface

BB Building Block

CRS Coordinate Reference System

DaSP Data Stream Processing

DTMC Discrete Time Markov Chain

ESRI Environmental Systems Research Institute

FD Fraud Detection

FIFO First-In First-Out

FPGA Field Programmable Gate Array

GB GigaByte

GCC GNU Compiler Collection

GIS Geographic Information System

GPS Global Positioning System

GPU Graphics Processing Unit

IoT Internet of Things

JVM Java Virtual Machine

MB MegaByte

MISO Multiple-Input Single-Output

OGC Open Geospatial Consortium

OSGeo Open Source Geospatial Foundation

PB PetaByte

PBB Parallel Building Block

POJO Plain Old Java Object

RPC Remote Procedure Call

SBB Sequential Building Block

SD Spike Detection

SIMO Single-Input Multiple-Output

SISO Single-Input Single-Output

SPE Stream Processing Engine

SPS Stream Processing System

SPSC Single-Producer Single-Consumer

TB TeraByte

TM Traffic Monitoring

WC Word Count

(9)

List of Figures

1.1 The Five Vs of Big Data. . . 12

1.2 Process of real-time calculation. . . 13

1.3 Process of real-time streaming calculation. . . 13

1.4 Basic structure of a batch processing application. . . 14

1.5 Example of Data-Flow streaming application. . . 15

1.6 Stateless and stateful operators. . . 16

1.7 Storm tuple re-transmission mechanism. . . 17

1.8 Example of Storm cluster. . . 17

1.9 High-level view of a Storm topology. . . 18

1.10 Task-level view of a Storm topology. . . 18

1.11 Storm stream groupings. . . 19

1.12 Different levels of abstraction offered by Flink. . . 20

1.13 Parallelized view of the Flink streaming Data-Flow of Figure 1.5. . . 22

1.14 Example of count-based windows and time-based windows on a stream. . . 23

1.15 Notions of time in Flink. . . 23

1.16 Partitioned state. . . 24

1.17 Simple Word Count example. . . 24

1.18 A-tuple-at-a-time data processing model (i.e., true streaming semantics). . . 26

1.19 Micro-batching data processing model (with time-based batch interval). . . 26

1.20 Example of backpressure in Flink. . . 27

2.1 Taxonomy of windowing models. . . 31

2.2 Example of MultiPipe structure. . . 32

2.3 Software layers of the FastFlow ecosystem on top of scale-up servers. . . 34

2.4 FastFlow representation of the WindFlow basic operators. . . 36

2.5 FastFlow representation of the WindFlow window-based operators. . . 36

2.6 Nesting of a new Matrioska in the R-Workers set. . . 37

3.1 Data-Flow graph of the FraudDetection application. . . 39

3.2 Storm’s class hierarchy for the spout. . . 41

3.3 Storm’s class hierarchy for the bolt. . . 41

3.4 Data-Flow graph of the SpikeDetection application. . . 46

3.5 Data-Flow graph of the TrafficMonitoring application. . . 51

3.6 Organization of the main file (.shp) of an ESRI shapefile. . . 54

3.7 Open GIS Simple Feature Geometry class hierarchy. . . 56

3.8 Open GIS Simple Feature object relevant content. . . 56

3.9 Data-Flow graph of the WordCount application. . . 60

4.1 Storm architecture. . . 66

4.2 Relationships between worker processes, executors (threads) and tasks in Storm. . . 67

(10)

4.4 Flink architecture. . . 69

4.5 Job execution process in Flink. . . 70

4.6 Example of job scheduling in Flink. . . 70

4.7 WindFlow chaining applied to the Data-Flow graph of the four applications

constituting the proposed benchmark. . . 74

4.8 Logical high-level representation of the structure of a generic streaming

ap-plication with the purpose of bandwidth evaluation. . . 77

4.9 Bandwidth results achieved for the four implementations of the FD application. 78

4.10 Bandwidth results achieved for the four implementations of the SD application. 79 4.11 Bandwidth results achieved for the four implementations of the TM

appli-cation. . . 80

4.12 Bandwidth results achieved for the five implementations of the WC

appli-cation. . . 81

4.13 Comparison among absolute maximum bandwidth values measured for the

four provided implementations of the FD application. . . 82

4.14 Comparison among absolute maximum bandwidth values measured for the

four provided implementations of the SD application. . . 83

4.15 Comparison among absolute maximum bandwidth values measured for the

four provided implementations of the TM application. . . 83

4.16 Comparison among absolute maximum bandwidth values measured for the

five provided implementations of the WC application. . . 83

4.17 Incremental percentage of bandwidth in WindFlow achieved enabling the

chaining optimization with respect to the unchained solution. . . 84

4.18 Latency distribution for the four implementations of the FD application. . . 85

4.19 Latency distribution for the four implementations of the SD application. . . 86

4.20 Latency distribution for the four implementations of the TM application. . . 87

4.21 Latency distribution for the five implementations of the WC application. . . 88

4.22 Comparison among mean latency values measured for the four provided

implementations of the FD application. . . 89

4.23 Comparison among mean latency values measured for the four provided

implementations of the SD application. A logarithmic scale has been used. . 90

4.24 Comparison among mean latency values measured for the four provided

implementations of the TM application. . . 90

4.25 Comparison among mean latency values measured for the five provided

(11)

List of Tables

A.1 DataStream transformations offered by Apache Flink. . . 92

A.2 Abstractions provided by Apache Storm. . . 93

A.3 Parallel patterns provided by WindFlow. . . 93

A.4 Parallelism dimensions directly supported in Apache Storm, Apache Flink

(12)

Listings

1.1 Flink streaming Data-Flow reflecting the graph in Figure 1.5 and Figure 1.13. 21

1.2 Storm implementation of the simple topology in Figure 1.17. . . 24

1.3 Flink implementation of the simple topology in Figure 1.17. . . 25

2.1 Instantiation of a pattern in WindFlow. . . 32

2.2 Complex composition of patterns in WindFlow. . . 32

2.3 Creation of a MultiPipe in WindFlow. . . 33

2.4 Structure of an input tuple in WindFlow. . . 33

3.1 Class structure of a Storm spout. . . 42

3.2 Class structure of a Storm bolt. . . 42

3.3 Creation of the FraudDetection Topology using Storm. . . 43

3.4 Creation of the FraudDetection Data-Flow graph using Flink. . . 43

3.5 Creation of the FraudDetection Data-Flow graph using WindFlow. . . 45

3.6 Creation of the SpikeDetection Topology using Storm. . . 48

3.7 Creation of the SpikeDetection Data-Flow graph using Flink. . . 48

3.8 Creation of the SpikeDetection Data-Flow graph using WindFlow. . . 50

3.9 Creation of the TrafficMonitoring Topology using Storm. . . 57

3.10 Creation of the TrafficMonitoring Data-Flow graph using Flink. . . 57

3.11 Creation of the TrafficMonitoring Data-Flow graph using WindFlow. . . 59

3.12 Creation of the WordCount Topology using Storm. . . 62

3.13 Creation of the WordCount Data-Flow graph using Flink. . . 62

3.14 Creation of the WordCount Data-Flow graph using WindFlow. . . 64

4.1 Submitting a Storm topology to a LocalCluster. . . 68

(13)
(14)

Introduction

Nowadays, technology has become pervasive in every aspect of the human life and new scenarios are rapidly emerging, especially in the context of Smart Cities and Industry 4.0 (e.g., smart processes, smart mobility, smart logistics) [1]. Is therefore more and more common to rely on a number of heterogeneous devices capable of acquiring information from their surroundings, which continuously produce the data collected as streams. Such sources of data are constantly increasing in number and variety, causing an explosion in the amount and type of information generated in the form of streams. In most cases, in order to extract significant knowledge from these streaming data, a real-time processing is required. Thus, there is a urgent need for innovative computing models suitable for processing huge amount of data (in the order of TBs), available as high-speed continuous unbounded sequences of tuples (i.e., records of attributes in the form of key-value pairs) which significance decays with time, and possibly characterized by irregular and time-varying rates.

Innovative Data Stream Processing (DaSP) frameworks have been designed as tools for the development of efficient streaming programs, capable of dealing with the streams’ peculiarities while guaranteeing tight performance constraints in terms of bandwidth (out-put rate) and latency (response time). The unbounded nature of streams makes storing all the input elements impracticable. Hence, it is necessary to study new stream processing approaches, overcoming the more traditional batch processing model, whose application is unfeasible in the described scenarios since it requires the preliminary storage of the whole data.

State-of-the-art DaSP systems aim at providing suitable abstractions to the program-mers of streaming applications, exploiting parallelism as a way to speed up the execution. The model of the computation is typically a Data-Flow graph, thus a streaming program is generally structured as compositions of core functionalities in a graph, where vertices correspond to computations (called operators or transformations) and arcs model streams. Parallelism can be expressed at two different levels in existing Stream Processing Engines (SPEs) [2]:

• Inter-Query Parallelism, capable of increasing the overall bandwidth by supporting the execution of multiple Data-Flow graphs in parallel.

• Intra-Query Parallelism, which provides parallelism within the single Data-Flow graph, being able to increase bandwidth and reduce latency. This can happen in two different ways: by means of Inter-Operator Parallelism (i.e., exploiting the in-herent parallelism between operators that run concurrently) and by taking advantage of Intra-Operator Parallelism (i.e., exploiting parallelism inside a single operator in-stance).

In order to target distributed clusters of homogeneous machines, modern SPEs easily ex-press Inter-Query Parallelism by distributing the parallel processing of many applications (graphs) instances over a number of computing nodes in the cluster (i.e., scaling-out).

(15)

They also express Inter-Operator Parallelism by means of pipeline parallelism, among dif-ferent operators, and data parallelism, where operators are replicated in several instances. Intra-Operator Parallelism is typically provided by means of replication applied to state-less operators or, in case of stateful operators, by exploiting the possibility of partitioning the stream (and the state) by key. The concept of key refers to a subset of the fields defining the tuples (records) carried by the stream, which can be used to logically identify sub-streams. Consequently, each replica of such stateful operator will only access its own partition of the state, relative to a precise subset of the keys. These are the only two forms of Intra-Operator Parallelism natively offered by modern SPEs. In summary, in order to exploit parallelism using the abstractions provided by state-of-the-art SPEs, the programmer has to be able to:

• recognize different computation phases in the application in order to apply pipeline parallelism (whilst paying an increase in latency in case of a high number of opera-tors);

• replicate stateless operators and assign the input tuples to the replicas in a load balanced fashion;

• replicate multi-keyed stateful operators, exploiting the possibility to partition the stream by key and thus having each replica working on a subset of the keys.

However, there are cases in which pure functional replication is not a feasible option (e.g., if key-grouping cannot be exploited and a real shared state has to be managed). Therefore, when parallelism implies the management of complex shared states and the consequent synchronization among the parallel entities, the lack of high-level parallel abstractions leaves the burden of finding appropriate solutions (and implementing them from scratch) to the programmer, notably increasing the effort required for developing efficient parallel streaming applications with a complex structure [2].

This work aims at measuring (i.e., quantifying) the benefits in terms of performance (bandwidth and latency) that may come from the usage of a DaSP solution based on C++ with respect to state-of-the-art JVM-based SPEs, on single multi-core machines. The WindFlow C++17 library has been chosen as an example of efficient streaming library exposing suitable abstractions (parallel patterns) based on the building blocks offered by the FastFlow C++ library, which provides a low-latency programming environment for streaming applications. The two representatives picked up among the mainstream SPE solutions are Apache Storm and Apache Flink, that both rely on the Java Virtual Machine (JVM), and have been chosen by taking into account their data processing semantics. In fact, both of them use a-tuple-at-a-time processing style, and this is consistent with the item-by-item WindFlow processing semantics.

Therefore, the comparison among these three frameworks must take into account that a part of the gaining in performance would be related to the difference in terms of run-time and to the removal of overheads that are present in distributed scenarios but not in single machine executions. In fact, both Storm and Flink are designed to target dis-tributed environments, relying on the JVM processing environment to easily obtain plat-form independence, but for this reason incurring in processing overheads induced by data (de-)serialization, garbage collection, etc. that limit the design of efficient data accesses and consequently the overall performance in throughput and latency [3]. On the other hand, being WindFlow a C++ library that targets single multi-core machines (i.e., scale-up scenarios), the above-mentioned overheads are not present.

A careful reader might be wondering then why it could be interesting to perform such a comparison, considering the different base performances at which the three frameworks start, directly related to the diverse characteristics of the run-times.

(16)

The answer stands in the following facts: • The real goal of the work:

The aim of the proposed study is not to perform a simple numerical comparison among the effective performances that can be achieved with the three frameworks. Being aware of the differences between the three streaming systems, the final goal is rather to quantify the benefit that could derive by using a C++ solution with characteristics similar to the ones offered by WindFlow, with respect to state-of-the-art DaSP solutions.

• The absence of another C++ baseline to confront with:

The choice of Storm and Flink frameworks as a basis for comparison has been forced by the absence in the current streaming scenario of a C++ DaSP framework offering a high-level API with similar characteristics to the ones offered by the mainstream JVM-based SPEs. Since this is one of the first works of his kind, the chosen method-ology appeared to be the only viable route to take.

In view of the above, this work has to be seen as a way of measuring the gaining in perfor-mance that could be reached through the design of an innovative DaSP framework based on C++ and offering proper high-level parallel patterns oriented to stream computation. The analysis carried out throughout the thesis allows to evaluate if a new SPE with such characteristics could overcome the current state-of-the-art alternatives, though taking into considerations the different nature of the three terms of comparison. Thanks to this in-vestigation, it appears that the integration between the parallel patterns methodology and DaSP scenarios effectively leads to performance benefits (in terms of bandwidth and la-tency) measured in scale-up scenarios.

In order to actually implement the described comparison, a benchmark of four ap-plications belonging to very different streaming domains has been designed. The nature of the chosen applications is quite dissimilar and this emphasizes the rapid expansion of streaming techniques to the most diverse real-world use cases.

• The first application belongs to the fraud analysis scenario and it is called Fraud Detection. It implements a streaming program capable of analyzing sequences of

credit-card transactions. The detection of possible fraudulent activities happens

through the identification of malicious sequences of transaction types, recognized among successive stream items.

• The second application is a typical use case of the IoT scenario. It is called Spike Detection and it collects measurements coming from different sensors about the sur-rounding environment, such as temperature, humidity and ambient light. Given a selected property to control, the application continuously monitors the incoming values and checks for anomalies (i.e., a received measure differs too much from the average value registered for that property).

• The third application is called Traffic Monitoring, and as the name suggests imple-ments a use case of vehicular traffic monitoring in a given city (Beijing city, in this case). Given GPS positions coming from moving objects (i.e., taxi vehicles) in the city area, the streaming program performs a map matching operation, meaning that it associates the received GPS coordinates with precise roads in the city, and eval-uates the real-time average speed of the vehicles for each road travelled by at least one monitored taxi.

(17)

• The fourth application, called Word Count, performs data analysis, deriving fre-quency statistics of the words in a given text.

The whole implementation of the benchmark, along with the relative documentation, is available online as open-source code. The Appendix B contains all the references to the code repositories and a description of the project structure. The other parts of the thesis are organized in the following way:

• Introduction: the current section, containing a complete overview of the thesis work and its objectives.

• Chapter 1: the section is devoted to the description of the current Data Stream Pro-cessing scenario. Throughout this chapter, the reader will be given the background of concepts required to understand the details of the work contained in the subsequent chapters. An introduction on Apache Storm and Apache Flink and their peculiarities is provided in the second half of the chapter.

• Chapter 2: the chapter contributes to the background part, focusing on the tion of the third term of comparison, the WindFlow library, with a detailed descrip-tion of the parallel patterns offered and their implementadescrip-tion in terms of FastFlow building blocks.

• Chapter 3: the section is centered on the description of the four applications com-posing the designed benchmark. Their structure as Data-Flow graphs is described, along with the details of their implementations using the three stream-oriented frame-works (i.e., Storm, Flink, WindFlow). For each application, possible optimizations and improvements of the currently given implementation are also discussed.

• Chapter 4: the chapter focuses on the discussion of the results gathered from the test phase. In the first part, the execution environments and programming models of the streaming frameworks are described, along with the architecture of the scale-up machine used for the test phase. The second part is completely devoted to the description of the performed tests and to the analysis of the experimental results, in terms of bandwidth and latency performance metrics.

• Conclusions: the final section, where conclusions are drawn from the presented work and from the experimental outcomes.

Appendix A contains a summary on the parallel building blocks offered by the analyzed frameworks, along with the kind of parallelism that they implement.

(18)
(19)

Chapter 1

Introducing Data Stream Processing

“Begin at the beginning,” the King said gravely, “and go on till you come to the end: then stop.”

— Lewis Carroll, Alice in Wonderland

We live in a technological era strongly characterized by the emerging scenarios of Smart Cities, Internet of Things (IoT) infrastructures and cyber-physical systems, where a variety of heterogeneous devices (e.g., traffic sensors, health sensors, temperature sensors, mobile devices) continuously produce streams of data that need to be processed in a near real-time way. The constant increasing in the number of sources (values come in general from any measurable activity) causes an explosion in the amount of data generated as stream that require real-time analysis. Stream Processing is the paradigm oriented to continuously process, analyze and monitor these (possibly infinite) sequences of data called streams [4]. Therefore, the technology has been pulled toward the development of a variety of modern applications capable of processing at high velocity huge amount of information not immediately available but produced continuously as streams.

The Data Stream Processing (DaSP) research area, devoted to the development of this kind of applications and solutions, can be considered as a fundamental part of the Big Data research domain. There are a number of characteristics that define the Big Data scenario, commonly referred to as the Four Vs [5][6] and further extended to Five Vs, as shown in Figure 1.1:

• Volume: refers to the large amount of data involved. In fact, the size of the data sets that need to be analyzed and processed keeps increasing and is frequently larger than TeraBytes (TBs) and PetaBytes (PBs) and even more. For instance, the video surveillance cameras of a medium-sized city in China can produce tens of TBs data every day. This means that the sheer volume of the data requires distinct and different processing technologies than traditional storage and processing capabilities.

• Variety: indicates the diversity of data types. In the past, the data types that were generated and processed were simpler and most of the data was structured, but now with the emerging of social networking, IoT, mobile computing, etc. much semi-structured or unsemi-structured data is produced (e.g., text, sensor data, audio, video, log files) and the types of data become uncountable. The variety in data types frequently requires distinct processing capabilities and specialist algorithms. An example of high variety data sets would be the CCTV audio and video files that are generated at various locations in a city.

(20)

Figure 1.1: The Five Vs of Big Data.

• Velocity: refers to the speed at which data is generated. An example of data that is generated with high velocity would be Twitter messages [7] or Facebook posts [8][9] or any kind of data coming from sensors. The data processing and analysis speed needs to be high, in particular in the case of any kind of decision-making computation, and it continues to accelerate due to the real-time nature of data creation. This is the main reason of the shifting from the traditional batch processing toward the stream one.

• Value: indicates the commercial value and profit that comes from Big Data pro-cessing technologies. In fact, analyzing, propro-cessing and mining data (i.e., extracting important information and knowledge) can be very useful for enterprises that can derive models and apply them to improve research, production, operations and sales. • Veracity: refers to the quality of the data that is being analyzed. If data has many records that are valuable to analyze and that contribute in a meaningful way to the overall results then the veracity is high. On the contrary, if data contains a high percentage of meaningless records the veracity is low and a large portion of the data set is simply noise. An example of a high veracity data set would be data from a medical experiment or trial.

Data that is high volume, high velocity and high variety must be processed with ad-vanced analytical and algorithmic tools to reveal meaningful information. Hence, these five characteristics of the data fully define the knowledge domain that deals with the storage, processing, and analysis of the data sets labeled as Big Data.

From the point of view of the computing models providing data analysis approaches, it is possible to highlight three main alternatives: batch computing, real-time computing and stream computing (as a specialization of the real-time computing case).

(21)

• Batch Computing is the traditional way of performing offline computations over large amounts of data collected over a period of time. Huge volumes of data are first stored and then processed altogether in order to get detailed insights. As a consequence, this model can be used only in those situations where results are not required to be real-time: this is the price to be paid for guaranteeing a high precision of the analytic results, produced having all the input data available at once before the beginning of the computation.

• Real-Time Interactive Computing generally needs to process large amounts of data, meeting some of the requirements of non-real-time computing (e.g., accurate results) but also guaranteeing a response time (i.e., time needed to compute results) that is usually in the order of milliseconds. The real-time computing phases are in general real-time data collection, real-time data analysis and processing, and real-time query services, as shown in Figure 1.2.

Figure 1.2: Process of real-time calculation.

• Stream Computing is a specialization of the real-time interactive computing case where the data source is real-time and continuous and the responses need to be produced in a real-time fashion. Examples of such real-time scenarios are trading systems, fraud analysis, monitoring or social networks analysis. Any kind of real-time computation is performed while the stream data is changing, capturing the informa-tion that may be useful to the users and sending the result out, as shown in Figure 1.3.

Figure 1.3: Process of real-time streaming calculation.

Traditional tools for Big Data are designed to work with massive data sets characterized by a large though finite size, where data are immediately available and permanent (e.g., distributed file-systems). This is the scope of Data-Intensive Parallel Applications, that can be implemented exploiting batch processing oriented tools, such as MapReduce [10], Spark [11] and Hadoop [12]. These mainstream frameworks are Big Data batch processing systems that ease the job of writing distributed applications with an underlying structure like the one shown in Figure 1.4. The application programmer doesn’t have to worry about fault tolerance, reliability, synchronization or availability [13] since it’s the framework that takes care of these aspects for him/her.

(22)

Figure 1.4: Basic structure of a batch processing application.

The need for a computation model like the stream processing one, completely op-posed to the more traditional batch processing, derives mainly from the intrinsic nature of streams. Since they are inherently unbounded, storing all the input tuples and then performing offline computations is not feasible. Moreover, the high velocity at which new items are available in a stream comes along with potentially irregular and time-varying rates and the time-decaying significance of each tuple in real applications. These are as-pects with which traditional batch processing systems don’t have to deal and that increase the complexity of managing stream data. Consequently, DaSP applications must guaran-tee strong performance requirements on bandwidth and latency and must provide as well innovative parallel processing models that only keep in memory a reasonable portion of the stream while doing the processing. Hence the need for new tools that allow to reason in terms of time (stream computations are time-aware) and not only in terms of data to be computed in a time-agnostic way. The development of new computing paradigms to slice the stream in order to maintain a limited amount of data in memory and possibly apply batch processing transformations on each slice gains importance. In this respect, one fun-damental concept adopted to cope with the infeasibility of storing the complete input data in main memory is a buffering technique called windowing. It provides finite stream por-tions containing the most recent items satisfying certain properties: as the stream advances the window slides and the processing is continuously repeated over each new window. This kind of continuous computations (queries) over streams can be programmed by using the abstractions provided by the currently available Stream Processing Systems (SPSs).

SPSs are based on the Data-Flow programming paradigm. Data-Flow programs describe processing tasks and their precedence ordering. They are commonly represented as directed graphs, where nodes correspond to computations (and they are called operators) and edges define data dependencies. Operators are the basic functional units of a Data-Flow appli-cation. They consume data from one or more input streams, perform a computation on them, and emit results toward one or more output streams for further processing.

Operators without input ports are called data sources and operators without output ports are called data sinks. A Data-Flow graph must have at least one data source and one data sink [14]. An example of Data-Flow graph is shown in Figure 1.5.

The relationship between data received and produced for each operator is defined by the operator selectivity and can be of different kinds, such as:

• Exactly-Once Semantics: means that each incoming record affects the final results exactly once or, in other words, for each input tuple there will always be one cor-responding output tuple. Referring to SPSs’ delivery guarantees, dependent on the mechanisms adopted to provide fault-tolerance, the exactly-once semantics ensures that, even in case of a machine or software failure, there’s no duplicate data and no data that goes unprocessed, and the computational semantics and its effect on the state maintained by the system is as all the items are processed exactly one time.

(23)

Figure 1.5: Example of Data-Flow streaming application.

• At-Least-Once Semantics: in the context of SPSs’ delivery guarantees, means that every tuple will be fully processed but, when some failure occurs and records are replicated and emitted again, there is a possibility that an operator in the graph will process the same data item more than once during the whole computation. Hence, a message could be re-transmitted and re-computed until an ackowledgement is received: it will be received for sure, but duplicates might exist and might be considered in the application semantics.

• At-Most-Once Semantics: means that each incoming record is processed by each operator in the graph but a corresponding output result is not always emitted. The operator can emit exactly one tuple for each received input item or none. Therefore, there are no guarantees that a message that has been sent will be received and processed. This is the easiest and low-overhead delivery guarantee.

Therefore, the input/output selectivity defines in general the average number of input/out-put items received or emitted by each operator firing in the graph. Moreover, the mech-anisms adopted in a specific SPS to provide fault-tolerance reflect into different delivery guarantees and operators’ selectivity and affect the way an application is written in a spe-cific framework. Fault-tolerance is one of the most important requirements for SPSs in order to cope with single or multiple nodes failure (i.e., preserving the correct functioning of the entire system). It can concern the failure of centralized components within the clus-ter (i.e., nodes) or the failure of parallel applications run on multiple nodes. In the first case a solution is to replicate the data and re-assign the processing to another node in the cluster, while in the second case the system should be able to re-start the application on different/or the same nodes. Moreover, besides the guarantee of a correct operation of the system, fault-tolerance also implies the ability to complete message passing in a correct way by giving a clear semantics to the user.

Finally, operators are characterized also by the kind of state that they maintain during the computation, as shown in Figure 1.6: a stateless operator doesn’t maintain any history about past items received (e.g., a filter operator that drops out all the items not satisfying a certain property is stateless), a stateful operator maintains a state containing information on the data processed so far (the state can be a succinct data structure – like a sketch – or a collection of the last items received and stored in a buffer – like a window).

(24)

Figure 1.6: Stateless and stateful operators.

State-of-the-art Stream Processing Engines (SPEs) handle high-velocity streams by distributing the processing over a large number of computing nodes in a cluster (i.e., scaling-out the processing) and they generally rely on the Java Virtual Machine (JVM) as the underlying processing environment for platform independence [3]. Among them, Apache Flink [15], Spark Streaming [16] and Storm [17] are the most widely used. The next subsections will go into detail for two of these SPEs, Apache Storm and Flink, which are the ones taken into account in this work.

1.1

Storm

Apache Storm is a real-time data processing framework with three main acting scopes: stream processing (i.e., it can be used for processing new data in real-time and updating databases), continuous computation (i.e., it can be used for executing continuous queries on data streams and streaming the results into clients) and distributed RPC (i.e., it can be used for parallelizing an intense query implemented as a distributed function that waits for invocation messages and, when it receives one, starts the execution and sends back the results) [5][18]. Since the focus of this work is stream computation, the stream processing capabilities of Storm will be the ones analyzed in the following. This distributed real-time computation system provides an abstraction layer for writing and executing parallel real-time stream applications in distributed shared-nothing architectures [19]. The approach of Storm is similar to the way Hadoop [12] provides a set of general primitives for doing batch processing [20]. In fact, Storm has been developed at first to fill the lack of real-time in the Hadoop framework. The main idea driving the evolution of Storm and the other SPEs is providing suitable abstractions that allow the programmer of a DaSP application to avoid the process of manually build a network of queues and workers to do real-time processing. In addition, other properties such as fault-tolerance, no data loss and scalability are guar-anteed [18]. Indeed, Storm is declared to be highly-scalable (i.e., it is able to horizontally scale, using thousands of workers per cluster), fault-tolerant (i.e., it automatically reassigns tasks on failed nodes and, at the task level, if a task fails then messages are automatically reassigned to quickly restart processing), reliable (i.e., it offers processing guarantees by supporting at-least-once processing semantics and, if any failure happen during processing of data, spouts are responsible to resend the data for processing, as in Figure 1.7) and language agnostic (i.e., the processing logic can be written in any programming language) [21].

(25)

Figure 1.7: Storm tuple re-transmission mechanism.

A key concept in Storm is that of topology. A Storm topology is a top-level abstraction that represents the parallel streaming application to be executed as a network of com-putational entities. It is essentially a Data-Flow graph of the computation where each node contains processing logic and links between nodes describe the flowing of data in the network. A topology runs forever (or until it is killed, of course), and consumes data from the configured sources passing it down the processing pipeline. Nodes in a Storm topology constitute then a directed acyclic graph of operators that allow stream transformations. They can be of two types only:

• Spouts: that are the sources of the streams;

• Bolts: that can consume multiple input streams, execute some processing on the received items and emit new streams in output. In general, they are transformation stages and sinks.

Spouts and bolts execute in parallel: for each node it is possible to specify the amount of parallelism required and the framework will spawn that number of threads across the cluster to do the execution. Once the topology has been fully defined by its nodes and links, it can be submitted to a Storm cluster (like the one in Figure 1.8) for execution. Further details on the parallelism of a Storm topology and how to actually run a topology in a Storm cluster will be provided in Chapter 4.

M M M M INPUT OUTPUT

Figure 1.8: Example of Storm cluster.

A stream in Storm is a core abstraction representing an unbounded sequence of tuples (i.e., records of attributes in the form of key-value pairs). Bolts can subscribe to streams in order to receive the corresponding tuples while spouts and bolts emitting tuples to a stream send them to every bolt that subscribed to that stream [22]. As we can see in the example in Figure 1.9, three bolts subscribed to the stream emitted by the top-left spout node, this means that all the three bolts will receive tuples from that spout.

(26)

Figure 1.9: High-level view of a Storm topology.

Storm tuples represent the data model adopted by the framework. Each tuple is a named list of values, and each field can be an object of any serializable type [23]. Storm provides stream groupings as a way to define how to send tuples between parallel entities (i.e., spouts, bolts) in the current topology. That is, a stream grouping defines the routing of the tuples between the parallel replicas (i.e., tasks) of each operator. At the task level, a topology can look like the one in the example of Figure 1.10.

Figure 1.10: Task-level view of a Storm topology.

Since each bolt executes as multiple tasks in the topology, a stream is partitioned and partitions are distributed among the bolt’s tasks. Thus, each task of a particular bolt will only get a subset of the tuples from the subscribed streams and stream grouping allows to control this partitioning of the tuples [24]. There are different kinds of stream groupings, as shown in Figure 1.11, that can be set for each operator when the topology is defined. Among them:

• Shuffle Grouping: it is the simplest and most commonly used one. Tuples are dis-tributed uniformly at random across the receiver bolt’s tasks, thus the processing load is balanced among the tasks [24], as shown in Figure 1.11(a).

• Fields Grouping: it allows to partition the stream by a subset of its fields (i.e., one or more than one field defining each tuple is used as a key to identify sub-streams) guaranteeing that equal values for that subset of fields go to the same replica of the bolt operator (i.e., this is exactly an hashing modulo the number of tasks in the receiver node) [22], as illustrated in Figure 1.11(b).

(27)

• All Grouping: it replicates the stream across all the bolt’s tasks, thus a single copy of each tuple is sent to all instances of the receiving bolt [24][25], as shown in Fig-ure 1.11(c).

• Global Grouping: it forwards all the tuples toward a single target instance, specifically the task with the lowest id [24][25], as represented in Figure 1.11(d).

• Custom Grouping: it sets the destination task by using a user-defined strategy, con-figurable implementing the CustomStreamGrouping interface.

(a) Shuffle Grouping. (b) Fields Grouping.

(c) All Grouping. (d) Global Grouping.

Figure 1.11: Storm stream groupings.

The core implementation of Storm follows the true streaming model, where tuples are processed one-at-a-time, and supports at-least-once processing semantics. A higher-level API called Trident, built on top of Storm, provides exactly-once processing, transactional data-store persistence and a set of common stream analytic operations [26], in addition to the core Storm properties.

A variety of simple bolt abstraction types are available, like filters, aggregations, joins and storage/retrieval from persistent stores. The programmer of a DaSP application can also define custom functions in order to implement the logic of the operators in the topology. This last approach is fundamental in case of complex operations and shared states to be managed between the parallel tasks of an operator. In fact, the abstractions provided by the framework are too low-level and they are not sufficient to cover the more widespread cases. Hence, implementing a complicated functionality or maintaining a consistent shared state it’s possible but it increases the complexity of the application development process and all the burden falls on the programmer.

(28)

1.2

Flink

Apache Flink is a framework and distributed processing engine for stateful computa-tions over unbounded and bounded data streams. Its run-time supports both batch and data stream processing and two core APIs (DataSet and DataStream, respectively), based on fluent interfaces, are exposed.

A fluent API is primarily designed to be readable and to flow. The price to be paid

is more effort, both in thinking and in the API construction itself. In fact, a simple

API of constructor, setter, and addition methods is much easier to write [27]. A fluent interface can be seen as an internal Domain Specific Language (DSL) built atop a base language, whose syntax it borrows and stylizes, that is capable of relaying or maintaining the instruction context for a series of method calls. A well-designed DSL can be easier to program with respect to a traditional library and this improves programmers’ productivity and communication with domain experts [28]. Flink’s fluent API exploits method chaining, where return values from methods are used to relay instruction context, which in this case is the object instance making the first method invocation [29].

Flink provides multiple higher-level APIs offering dedicated libraries for common use cases and application domains. The different levels of abstraction offered by Flink to develop streaming/batch applications are represented in Figure 1.12. The lowest level of abstraction offers stateful streaming, allowing users to process events from one or more streams and use consistent fault tolerant states. DataStream and DataSet APIs are fluent interfaces offering the common building blocks for data processing (e.g., user-specified transformations, joins, aggregations, windows, states) and data types, represented as classes in the respective programming languages (i.e., Java, Scala). The Table API im-plements an extended relational model with tables with a schema attached and comparable operations, such as select, project, join, group-by, aggregate, etc. The highest level of ab-straction offered represents programs as SQL queries that can be executed over tables defined in the Table API underlying layer [30].

Figure 1.12: Different levels of abstraction offered by Flink.

This work keeps the focus on the DataStream abstraction, that supplies primitives for many common stream processing operations, such as windowing, a-tuple-at-a-time trans-formations, and enriching events by querying an external data store. The DataStream core API is available for Java and Scala and is based on functions, such as map, reduce and aggregate. Functions can be defined by extending interfaces or as Java or Scala lambda functions [31]. Moreover, compatibility layers for Hadoop MapReduce and Storm are pro-vided. Thanks to the Flink’s Hadoop compatibility package it is possible to improve the performance of many data analysis applications implemented as Hadoop MapReduce jobs and run in clusters around the world. In fact, Flink provides interfaces for Mapper and Reducer functions, as well as InputFormat and OutputFormat, in both Java and Scala

(29)

[32]. For what concerns Flink’s Storm compatibility API, it allows to improve Storm in several dimensions, including strong consistency guarantees (i.e., providing exactly-once semantics), a higher-level DataStream API, support for windowing, as well as better per-formances in terms of higher throughput and lower latency. Thanks to this compatibility layer it is possible to run almost unmodified Storm topologies using Flink, thus benefiting from superior performance, by embedding Storm code (i.e., spouts and bolts logic) as op-erators inside Flink DataStream programs [33]. This is exactly the approach adopted in this work, where the Flink versions of the applications provided as benchmark have been implemented exploiting the Flink’s Storm compatibility layer.

The basic building blocks of Flink programs are streams and transformations. Con-ceptually, a stream is a potentially never-ending flow of data records, as stated before. Transformations are operations that given one or more streams as input, produce one or more output streams as a result. Flink programs in execution are mapped to streaming Data-Flows, consisting of streams and transformation operators, and each one starting with one or more sources and ending in one or more sinks. Often, the correspondence between the transformations in the program and the operators in the Data-Flow graph is a one-to-one [30], as shown in Listing 1.1 that provides a possible implementation for the streaming Data-Flow graph in Figure 1.5 and Figure 1.13 (parallelized view).

1 // source

2 DataStream<String> lines = env.addSource(

3 new FlinkKafkaConsumer<>(...)); 4

5 // first transformation

6 DataStream<Event> events = lines.map( 7 (line) -> parse(line)); 8

9 // second transformation

10 DataStream<Statistics> stats = events 11 .keyBy("id")

12 .timeWindow(Time.seconds(10))

13 .apply(new MyWindowAggregationFunction()); 14

15 // sink

16 stats.addSink(new BucketingSink(basePath));

Listing 1.1: Flink streaming Data-Flow reflecting the graph in Figure 1.5 and Figure 1.13.

DataStream programs in Flink implement transformations on data streams (e.g., filter-ing, updating state, defining windows, aggregating). Streams are initially created from var-ious sources (e.g., message queues, socket streams, files) and results are returned via sinks, that are generally used to write the data to files or to standard output (for example the command line terminal). In the example in Listing 1.1, the source is a FlinkKafkaConsumer that takes as arguments one or more topics to read from, a de-serialization schema telling Flink how to interpret/decode the messages and further configuration properties [34], while the sink is a BucketingSink that emits its input elements to file-system files within buckets and takes as argument the base directory that contains one sub-directory for every bucket. Bucket directories contain several part files, one for each parallel sub-task of the sink, each one containing the actual output data [35].

The execution of a Flink program can happen in a local JVM, or on clusters of many

machines. Indeed, programs in Flink are inherently parallel and distributed. During

execution, a stream has one or more stream partitions, and each operator has one or more operator sub-tasks, which are independent of one another and execute in different threads

(30)

(and possibly on different machines or containers). The parallelism of a particular operator corresponds to the number of its sub-tasks and the parallelism of a stream always coincides to that of its source [30]. The way in which streams transport data between two parallel operators identifies different patterns, such as:

• One-To-One (or forwarding) pattern: it preserves the partitioning and ordering of

the elements, meaning that the sub-task i of the receiver operator will see the same

elements in the same order as they were produced by sub-taski of the source operator.

An example of this pattern can be found in Figure 1.13 between the Source operator and the Map one.

• Redistributing pattern: it changes the partitioning of the stream since each sub-task sends data to different target sub-tasks, depending on the selected transformation. Examples in this class of patterns are:

– keyBy: modifies the partitions by hashing (i.e., assigning all records with the same key to the same partition, and to the same receiver’s sub-task);

– rebalance: distributes the data in a round-robin fashion, creating equal load per partition (i.e, balanced work-load across the receiver’s sub-tasks in this case of infinite input streams);

– shuffle: partitions elements randomly according to a uniform distribution, balancing the load among the receiver’s sub-tasks;

– broadcast: broadcasts elements to every sub-task of the receiver operator. In general, in a redistributing exchange the ordering among the elements is only preserved within each pair of sending and receiving sub-tasks. Considering again

the example in Figure 1.13, in the communication between sub-task 1 of the Map

operator and sub-task 2 of the keyBy one, the ordering within each key is preserved,

but the parallelism does introduce non-determinism regarding the order in which the aggregated results for different keys arrive at the sink [30].

(31)

Flink provides support for windowing to ease the implementation of aggregating oper-ations on streams (e.g., count, sum). These kinds of computation are scoped by windows since it would be otherwise impossible to aggregate on all the items in an unbounded se-quence. Flexible windowing is supplied (i.e., tumbling windows - with no overlap -, sliding windows - with overlap -, and session windows - punctuated by a gap of inactivity). More-over, windows can be count-driven or time-driven [30]. An example is shown in Figure 1.14.

Figure 1.14: Example of count-based windows and time-based windows on a stream.

Different time semantics are available and can be used in order to define windows. The three notions of time supported by Flink in streaming programs are:

• Processing time: refers to the system time of the machine that is executing the operator that performs a time-based operation.

• Event time: is the time that each individual event occurred on its producing device and it is typically embedded within the records with a timestamp. In event time, the progress of time depends on the data, not on any wall clocks. For this reason, event time programs specify how to generate watermarks, which constitute the mechanism that signals progress in event time.

• Ingestion time: is the time that events enter Flink Data-Flow at the source operator. Each record gets the source’s current time as a timestamp and time-based operations refer to that timestamp.

Figure 1.15 illustrates the three notions of time presented above [36].

Figure 1.15: Notions of time in Flink.

Finally, Flink implements fault tolerance mechanisms (combining stream replay and checkpointing) to recover programs when some failures (e.g., machine hardware failures, network failures, transient program failures) occur and to continue their execution [37]. Consistent snapshots are taken, related to a specific position in the input streams and the corresponding state for each of the operators.

(32)

Figure 1.16: Partitioned state.

A state is intended to be an embedded key-value store, partitioned and distributed among the parallel tasks of a stateful operator, as shown in Figure 1.16 [30]. Snapshots repre-sent consistent checkpoints to which the system can fall back in case of failure, preserving the application’s semantics as in a failure-free execution [38]. The exactly-once processing semantics is therefore always guaranteed by restoring the state of the operators and re-playing the events from the point of the checkpoint. The checkpoint interval is decided as a trade-off between the overhead of fault tolerance during execution and the recovery time (the number of events that need to be replayed) [30].

It is possible to make some final considerations about the main characteristics of the two SPEs presented above, discussing both differences and similarities.

The first thing to notice is the level of abstraction in processing streams of data and the consequently development style that characterizes the APIs of the two SPSs. On one hand, Storm is a bit more low-level and offers a compositional interface. The programmer has to deal with spouts and bolts directly: each one implements a Storm’s abstract class and the implementation of some specific methods is required. The operators must be then connected together to perform transformations and aggregations on individual messages in a reactive way, as shown in Listing 1.2 that illustrates how to implement the simple topology of Figure 1.17 [39]. On the other hand, Flink’s fluent interface is more readable and high-level, providing a more functional-like way to process events. The implementa-tion of the simple topology of Figure 1.17 exploiting Flink’s fluent API looks like the one in Listing 1.3 [40].

(33)

1 TopologyBuilder builder = new TopologyBuilder();

2 builder.setSpout("spout", new RandomSentenceSpout(), 5); 3 builder.setBolt("split", new SplitSentence(), 8)

4 .shuffleGrouping("spout");

5 builder.setBolt("count", new WordCount(), 12)

6 .fieldsGrouping("split", new Fields("word"));

Listing 1.2: Storm implementation of the simple topology in Figure 1.17.

1 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 2 DataSet<String> text = env.readTextFile("/path/to/file");

3 DataSet<Tuple2<String, Integer>> counts =

4 // split up the lines in pairs (2-tuples) containing: (word,1)

5 text.flatMap(new Tokenizer())

6 // group by the tuple field "0" and sum up tuple field "1"

7 .groupBy(0)

8 .sum(1);

9 counts.writeAsCsv(outputPath, "\n", " ");

Listing 1.3: Flink implementation of the simple topology in Figure 1.17.

Continuing the comparison, it is to be noted that Flink is suitable for both stream and batch processing. In fact, its run-time natively supports both domains thanks to pipelined data transfers between parallel tasks: records are collected in buffers and immediately shipped from producing tasks to receiving tasks. In the case of batch jobs, it is sufficient to use blocking data transfers [41]. Storm is a data stream processor without batch capa-bilities. Flink’s pipelined engine is internally implemented in a way similar to Storm. In fact, the interfaces of Flink’s parallel tasks resemble Storm’s bolts. Both the frameworks try therefore to reach low latency stream processing by pipelined data transfers. However, as discussed before, Flink offers a more high-level API compared to Storm. Indeed, Flink’s DataStream API provides functions such as Map, GroupBy, Window, and Join, while lot of these operations must be manually implemented when using Storm, having to manage also one or more readers and collectors by hand in order to correctly provide the complete bolt functionality [41].

For what concerns processing semantics, Storm guarantees at-least-once processing while Flink provides exactly-once. Storm implements its processing guarantee using record-level acknowledgments, while Flink’s approach is based on the use of markers and check-points and is more lightweight than Storm’s solution. Flink’s method is essentially a variant of the Chandy-Lamport algorithm [42]. Data sources periodically inject markers into the data stream and whenever an operator receives such a marker, it checkpoints its internal state. The marker must be received by all data sinks in order to commit both the records processed since the marker arrival and the marker iself. If a failure occurs, all sources operators are reset to their state when they saw the last committed marker and processing is continued [41]. Actually, as mentioned above, Storm is able to offer an exactly-once semantics by means of a high-level API called Trident. However, Trident is based on mini-batches and hence has a different processing semantics with respect to Flink.

Both Storm and Flink SPSs adopt a-tuple-at-a-time data processing semantics, mean-ing that the transformation functions of the operators in the Data-Flow graph are processed as soon as a new tuple is available. This is also called true-streaming semantics, since each new piece of data is processed when it arrives, as shown in Figure 1.18. The other possi-ble approach is the micro-batching semantics, where processing is applied whenever a new small batch of input items is complete. Micro-batches are small and/or processed at small intervals, hence the duration is generally a fixed amount of processing time, as shown in

(34)

Figure 1.19. Spark Streaming is an example of a system that supports micro-batch pro-cessing. The data processing model adopted have implications both for the architecture of data processing systems and for the applications using them. In order to implement the true streaming semantics, an event-driven architecture is required, in which the internal workflow of the system is designed to continuously monitor for new data and dispatch processing as soon as that data is received. On the other hand, the internal workflow in a batch processing system only checks for new data periodically, and only processes that data when the next batch window occurs. At the application level, batch processing is generally appropriate for use cases where having the most up-to-date data has not critical importance and where tolerance for slower response time is higher, while stream processing is necessary for use cases that require live interaction and real-time responsiveness [43].

Figure 1.18: A-tuple-at-a-time data processing model (i.e., true streaming semantics).

Figure 1.19: Micro-batching data processing model (with time-based batch interval).

Another point that is worth considering is Flink’s adjustable latency, that refers to the way Flink sends records from one task to the other. As said before, thanks to pipelined data transfers records are forwarded as soon as they are produced. For efficiency, these records are collected in a buffer which is sent over the network once it is full or a certain time threshold is met. The latency of records is controlled by this threshold that specifies the maximum amount of time that a record will stay in a buffer without being sent to the next task. However, no guarantees are provided about the time it takes for a record from entering the Data-Flow graph to leaving it. In fact, this depends on other factors too, like the processing time within tasks and the number of network transfers [41]. A strictly related aspect is that of backpressure. The term refers to the situation where a system is receiving data at a higher rate than its process rate, during a temporary load spike. This can happen for example when garbage collection stalls and incoming data start to build up or when a data source exhibit a spike on how fast it is sending data. This kind of situa-tions can lead to exhaustion of resources, or even data loss, if not dealt with correctly [44]. The example described in Figure 1.20 will help understand the problem and the way Flink deals with it. It represents a data streaming pipeline with a source, a streaming job, and a sink, that is processing data, indicated by the black bars (each one representing 1 million records). At steady state, the system processes data at a speed of 5 million elements per second, as shown in Figure 1.20(a). Suddenly, the source has a spike and produces data

(35)

at a double rate for the duration of one second, causing 5 million more items to build up in the second node’s input queue, as shown in Figure 1.20(b). One solution to this problem would be to drop these elements, but this is not acceptable since it would break the exaclty-once processing guarantee. Hence, Flink solves the problem by persistently buffering the additional data and adjusting the source speed to that of the slowest part of the pipeline, reaching again a steady state, as shown in Figure 1.20(c) [44]. The backpres-sure mechanism adopted by Flink is as each operator uses blocking queues with bounded capacity: a slower receiver will slow down the sender as soon as the buffering space of the

queue is exhausted. In Storm versions before the1.0, the only way to throttle the input

to a topology was to enable ACKing and set the topology.max.spout.pending property, that means fixing a threshold for the maximum number of tuples that can be pending

acknowledgment in the topology at a given time. From Apache Storm 1.0 an automatic

backpressure mechanism is provided, based on configurable high/low watermarks expressed as a percentage of a task’s buffer size. If the high watermark is reached then the topology’s spouts are slowed down. If the low watermark is reached then the throttling is stopped [45].

(a)

(b)

(c)

Figure 1.20: Example of backpressure in Flink.

Flink improves Storm even for what concerns user-defined states and streaming win-dows [41]. In fact, using Flink it is possible to maintain a custom state in the operators, which can also participate in the checkpointing for fault tolerance. Hence, the exactly-once

guarantee is provided for custom user-defined states too. Only from Storm 1.0 a similar

support for stateful bolts with automatic checkpointing has been introduced, by means of the StatefulBolt API [45]. Flink extensively supports streaming windows and window

aggregation, as already stated before. In Storm releases before the 1.0, the developers

had to build their own windowing logic. In fact, there where no high-level abstractions that a streaming application programmer could use to define a window in a standard

way in a topology. Starting from Storm version 1.0, a native windowing API has been

included, thanks to which it is possible to specify windows with respect to two param-eters: the window length (i.e., duration of the window) and the sliding interval (Storm supports sliding and tumbling windows based on time duration and/or event count) [45].

Even though Storm1.0 provides new abstractions for backpressure, partitioned states and

windowing, the support for implementing shared states among the operators’ tasks and complex window-based streaming computations is still very limited.

(36)

The next chapters will be devoted to further explore the main weaknesses of these two mainstream SPEs. The analysis will focus on scale-up scenarios, instead of scale-out ones (i.e., the target of the introduced state-of-the-art SPEs), and an efficient C++ library for streaming computations (targeting scale-up servers) will be proposed.

(37)

Chapter 2

WindFlow Library

“However far the stream flows, it never forgets its source.”

— Nigerian Proverb

Chapter 1 contains an overview of two widely used frameworks that are the state-of-the-art in the stream processing scenario and have been chosen as a basis for comparison in this work (i.e., Apache Storm and Apache Flink). This chapter will focus on the third key component of the presented study, namely the WindFlow [46] library.

WindFlow is a C++17 Data Stream Processing (DaSP) parallel library for multicores and GPUs, developed by the Parallel Programming Models group at the Department of

Computer Science of the University of Pisa1. The idea behind the project is to provide a

tool to perform stream computation, which is based on the building blocks offered by the FastFlow C++ library (see next section), and to measure the benefits of this approach. Several parallel patterns (operators) are defined in the WindFlow streaming library, and

more complex patterns can be built by combining the basic ones. WindFlow parallel

patterns are directly provided as abstractions, available to the programmer, that can be instantiated, composed and even nested in order to implement the parallel structure of the application. Moreover, the patterns are reusable since they can be conveniently parame-terized by the programmer to generate specific parallel programs and they are accessible by means of an API designed as a fluent-interface. WindFlow API has a compositional approach, focusing on the execution entities and the connections among them (Storm style) rather than expressing how to transform data at each step of the computation (Flink style). However, thanks to the designed fluent interface, operators can be declared by focusing on their functional semantic aspect and even the data distribution semantics between opera-tors is abstracted being implicitly defined in the pattern.

WindFlow Parallel Patterns (Operators)

WindFlow provides several patterns that can be used to create a parallel streaming

application by defining the corresponding Data-Flow graph2.

Two fundamental operators are obviously the ones needed to create the source and sink nodes of the Data-Flow graph. The Source operator generates the stream as a sequence of items characterized by the same data type. The generation can happen in a item-by-item fashion or by means of a single loop generation function, using a dedicated Shipper

1

This is the official web site of the PPMs research group: http://calvados.di.unipi.it/paragroup

2

In order to properly describe the specifications of the WindFlow library, the teaching material of the PhD Course "Data Stream Processing (from the Parallelism perspective)" of the University of Pisa, held by Dr. Gabriele Mencagli during the academic year 2018/2019, has been utilised.

Riferimenti

Documenti correlati

This analysis used quantitative indicators to assess infrastructural facilities in relation to the urban area and the resident population. In addition, the study compared the

Teseo e Arianna paiono quindi discendere dai loro antichi predecessori senza soluzione di conti- nuità; sono anche, tuttavia, profondamente figli del programma poetico

Le aziende manifatturiere hanno grandi opportunità di crescita se riescono a sviluppare il business dei servizi; nei mercati maturi, l’offerta di servizi a valore aggiunto e

Taken together, Propositions (1) and (2) highlight a potential conflict in a setting where memory is endogenous: the principal always prefers perfect signal recollection, but the

L’organizzazione e la scansione temporale del corso seguivano un criterio di approccio dimensionalmente progressivo: partendo dalla fase propedeutica di interpretazione grafi ca

Therefore, given the already proven relationship between me- chanical vibration and muscle activation, the aim of this study was to investigate with a ball-handling test (ball score)

That is, middle-level triathletes systematically reduce their stride length and increase their stride frequency when running after cycling, in comparison to running without

In our cohort of patients, infections due to Gram-negative bacteria were common (65.2 % of ICU-acquired infec- tions), admission to the ICU for non-surgical treatment and