• Non ci sono risultati.

Applying Anomaly Detection to Fast Data in Industrial Processes

N/A
N/A
Protected

Academic year: 2021

Condividi "Applying Anomaly Detection to Fast Data in Industrial Processes"

Copied!
133
0
0

Testo completo

(1)

UNIVERSITY OF PISA

Department of Computer Science Master Degree in Business Informatics

Master Thesis

Applying Anomaly Detection to Fast Data

in Industrial Processes

Supervisor Dr. Roberto TRASARTI External Supervisors Dr. Benjamin KL ¨OPPER Dr. Marcel DIX Candidate Pierluca SERRA

(2)
(3)

Abstract

Big Data technologies and machine learning are about to revolutionise the industrial domain in different applications. Nowadays, industrial control systems are used to manage plant operations, to allow operators to control the activities of the entire plant, and to react to critical situations. A direct consequence of this, is that large amount of data is continuously generated, and it represents a significant information source. The analysis of historical data could then be applied in different scenarios, in order to support future decisions and prevent critical cases. In light of this, new prospectives for the development of analytic techniques are then available, but a major concern is the lack of appropriate platforms in the industrial domain. This project investigates the field of the anomaly detection techniques concerning the industrial processes, and it provides a dependable Big Data platform to be used as support to plant operators. The study has two important directions: first, the most appropriate architecture and technologies are pointed out; secondly, the most reliable algorithmic approaches are considered. The final outcome then consists of a complete fast, scalable and fault-tolerant platform, that offers anomaly detection services on historical and real-time data.

(4)
(5)

Acknowledgments

I would first like to thank my thesis advisor Dr. Roberto Trasarti, who guided me in the development of this project. He has been always available with suggestions and guide lines, replying to me at every single question, despite the fact I was in a different country during these months of work. I would like to thank my supervisor Dr. Benjamin Klöpper, senior scientist in ABB, that guided me and supported me during this project. Our weekly meetings have been the perfect occasions in which improving the results of this study, and in which discussing new ideas and opinions. I would also like to thank Dr. Marcel Dix, scientist in ABB, who believed in me, and gave me the opportunity to be involved in a conference of the FEE project, and in a workshop in ABB. He has been the one who supported me in many different situations, and increased my motivation in the results of this work, putting his trust in my capabilities. I am thankful to my parents, Marisa and Carlo, to gave me the possibility in reach this opportunity in my life, and to have always believed in me. Their affection and their support is what is most precious in my life. I am also thankful to my beloved girlfriend Alessandra, who has always encouraged me in this project, and has always been by my side, despite the distance. Without her support, and her love, I would not be the man who I am now. Moreover, I would like to thank my colleagues Adam Arnaud, Alessandro Marrella, Julia Markina and Holger Johann for have been friends, with who discussing many aspects of this project, and with who sharing my life in these months. Finally, I am thankful to Federico and Sabino, friends that supported me during this study with daily motivation and affection. Thank you all, each of you contributed to my life, therefore, to the realization of this work.

(6)
(7)

To Alessandra, and to my parents, Marisa and Carlo.

(8)
(9)

Contents

Table of Contents vii

1 Introduction 1

2 Description of the Application 5

2.1 Anomaly Detection in process plant . . . 5

2.2 Dataset properties . . . 7

3 Reference Architecture for Big and Fast Data 11 3.1 Lambda Architecture . . . 11

3.2 Kappa Architecture . . . 13

3.3 Pattern: real-time Stream Processing with In-Memory DB . . 16

3.4 Pattern: Big Data Analytics with Tachyon . . . 17

3.5 Pattern: Connect Big Data Analytics to real-time Stream Pro-cessing . . . 19 3.6 Failure-Recovery Strategies . . . 20 3.6.1 At-Most-Once Delivery . . . 20 3.6.2 At-Least-Once Delivery . . . 20 3.6.3 Exactly-Once Delivery . . . 21 3.7 Discussions . . . 22

4 Specify Architecture and Technologies 25 4.1 Lambda Architecture . . . 25

4.1.1 Batch layer . . . 26

4.1.2 Serving layer . . . 26

4.1.3 Speed layer . . . 27

4.2 Lambda Architecture details . . . 28

4.3 Technologies . . . 30

4.3.1 Data ingestion: Apache Kafka . . . 31

4.3.2 Distributed storage: Apache Hadoop . . . 33

4.3.3 Distributed batch processing: Apache Spark . . . 35

4.3.4 Distributed real-time processing: Apache Spark Stream-ing . . . 38

(10)

Contents

4.4 Discussions . . . 42

5 Anomaly Detection Algorithms 43 5.1 Anomaly Detection Paradigms . . . 44

5.1.1 Modelling Approaches . . . 45

5.1.2 Considerations about a distributed implementation . 47 5.2 Distance Based Algorithm . . . 49

5.2.1 Algorithm Structure . . . 50

5.2.2 Algorithm Streaming Implementation . . . 57

5.3 Clustering Based Algorithm . . . 58

5.4 Binning Based Algorithm . . . 61

5.5 Deviation Based Algorithm . . . 64

5.5.1 Algorithm Structure . . . 65

5.5.2 Algorithm Streaming Implementation . . . 87

5.6 Discussions . . . 88

6 Algorithm Comparison 89 6.1 Time Complexity . . . 89

6.1.1 Distance Based Algorithm . . . 90

6.1.2 Deviation Based Algorithm . . . 93

6.1.3 Discussions . . . 96 6.2 Execution Time . . . 97 6.3 Semantic Analysis . . . 101 7 Conclusions 109 List of Figures 113 List of Tables 115 References 117

(11)

Chapter 1

Introduction

Today, industrial control systems are operational systems that use real-time data to create alarms to be shown to operators. In this way, critical situations can be detected, and operators are able to react immediately in order to solve the issues. Industrial control systems, however, usually base their analysis over fixed alarm thresholds, and they do not learn information from historical data. The life-cycle of a plant, indeed, generates a continuously growing amount of data, that can be stored in different format, and from which useful information, to support plant operators, could be extracted. With the development of machine learning techniques, and the raise of Big Data technologies, nowadays it is possible to improve the industrial control systems, by the use of analytic models and predictive approaches, in order to provide useful support to plant operators during critical situations. In particular, the information stored in the historical data can be used as source of knowledge from which learn the behaviours of the industrial plant’s elements, thus, knowledge on which basing more dependable real-time industrial control systems.

The project presented in this study is part of the public-founded project FEE [Pro14]. The goal of the FEE project is to offer early detection and decision support for critical situations in production environments, with the development of assistance systems to support plant operators in critical situ-ations. In particular, this study is focused on anomaly detection in process plants on historical and real-time data. Here, anomaly detection is meant as a service that allows plant operators in detecting critical situations by the analysis of data coming from the life-cycle of the plant. Finding anomalies inside industrial processes data, indeed, allows not only to understand what happened during critical known moments, but also to discover when hidden critical points have been reached by the production components.

The FEE project is defined in order to be used as service in industrial plants with different dimensions and various characteristics. Consequen-tially, since industrial plants are usually sources of large volume of data,

(12)

Chapter 1. Introduction

with high variety and velocity, the system here developed and implemented shall allow fast, scalable and fault tolerant analysis. The contribution of the thesis has been in:

1. development of a dependable architecture for anomaly detection ana-lysis;

2. choice and configuration of the technologies to be used on the top of the architecture;

3. implementation and optimization of a pre-existing algorithm, to pro-vide a solution for the historical and real-time analysis;

4. development and implementation of an alternative faster detection algorithm, to provide a solution for the historical and real-time ana-lysis.

Regarding the architecture analysis, the investigation has been directed to Big Data platforms due to the nature of the data analysed. Consequentially, distributed and scalable technologies have been used both in the historical and real-time analysis. For what concerns the pre-existing algorithm, it has been provided by a FEE project study [AKM+16], and it has been

implemen-ted in a distribuimplemen-ted platform. The second algorithm provided, instead, has been created, developed and implemented a faster alternative to the first solution. Finally, the two algorithms have been compared one to the other, in order to offer a clear and understandable view of the strengths and the weakness of both the two solutions provided. As a result, the contribution of this thesis consists of a Big Data architecture, made of different technolo-gies, and two anomaly detection algorithms implemented in the distributed platform.

During the first part of the study, in which Big Data architectures have been studied, the need of a specialised architecture for the industrial domain emerged. Part of the study has been founded on [MW15], and alternative prospectives have been found in [Kre14], [Jar14] and [Sha15]. However, existing reference models regarding the technologies to be used in the architecture, did not seem to be perfectly suited for this specific purpose. Initial guidelines have been found in [MW15], and the choice related to the different technologies used, has been carried forward together with the advance of the study. In the second part of the project, instead, focused on the development of the algorithms, guide lines have been found in [KKZ10], but the solutions provided have been finally developed and adapted to the specific application needs.

In conclusion, the project is structured as follows: chapter 2 shows a description of the application domain, and of the datasets used in the project; then, chapter 3 contains the investigation on different Big Data architectures, and, chapter 4 provides a detailed description of the final solution choice

(13)

and presents the technologies used in the system. Once the architecture is defined, the study continues with chapter 5 which presents the description of the algorithms developed. The two final algorithm solutions, are then compared in chapter 6. Finally, chapter 7 contains the final discussions and conclusions about the complete anomaly detection platform build in the study.

(14)
(15)

Chapter 2

Description of the Application

This second chapter of the thesis gives a detailed description of the domain application of the study. In particular, section 2.1 describes the field in which the system has been developed and implemented, while, section 2.2 reports a clear description of the datasets analysed during the development of the final solution provided.

2.1 Anomaly Detection in process plant

Anomaly detection applied in process plant, involves the consideration of different aspects regarding the optimal methods in order to obtain a reliable detection of critical situations. In particular, it is important to understand not only which is the nature of data coming from the industrial processes, but also, to investigate the requirements that an anomaly detection system should have in order to offer a dependable solution for plant operators.

The amount of data coming from a typical process plant, like a petrol-chemical one, has different natures, different formats, and regards the entire life-cycle of the plant. Data are structured (sensor reading, database tables), semi-structured (alarm, event logs) and unstructured (operation manuals), thus, they involve the use of different solutions. Since this project consist of a specific part of the entire FEE project, however, only structured data have been analyses during the study.

Furthermore, data consist of large volume of information. One of the petrol-chemical plants considered in the project, produces around 300GB of data volume each year, by collecting data from 60.000 sensors, located in the overall plant, with a sampling rates between 1 and 60 seconds. Each sensor tracks a specific element of the plant, and it provides a continuous informa-tion flow. In this case, data are then characterized by high velocity and high variety. The velocity is part of the life-cycle of the industrial plant, since it involves the use of a number of different components that are continuously working. High variety, instead, regards the various nature of the sensors.

(16)

Chapter 2. Description of the Application

Sensors, for example, can store information regarding temperature, flow, pressure, or can provide text description about a tracked element inside the plant. As a result, data coming from an industrial plant have a very high variety in terms of nature of the information, and, if it is related to a numerical measurement, in terms of distribution of the sensor. Regarding that, indeed, as will be show in the second part of this chapter, sensor values can be extremely constant, quite constant or extremely variable, thus, an-omaly detection in process plant has to deal with an high variety of sensor distributions.

The idea to build a system that can be used in industrial plants, leads to develop an high scalable solution that involves low latency processes. The need of a real-time anomaly detection, for data with high velocity and high variety, needs the support of a reliable system in order to offer a valu-able service to the plant operators. Real-time analysis, however, is usually based on historical data, in terms of intermediate results and aggregated information that can help the detection of anomalies in the real-time flow. Consequentially, the focus of the study is on developing a Big Data platform capable to offer scalable, low latency and fault tolerant analysis, both on historical and real-time data. In particular, the project’s target are chemical production plants, and it implies specific requirements regarding the system to be developed. First, chemical plants aresafety-critical systems [KDS+16],

thus, dependability and understandability of the system control, are im-portant requirements to be considered during the development. Secondly, plant’s process control systems arereal-time systems [KDS+16] with

determ-inistic deadlines. These requirements imply the development of a Big Data platform that respects different important properties, in order to offer a reliable system from different points of view [MW15]. First, a system has to berobust and fault tolerant. In this way, it should avoid the creation of

internal complexities (data inconsistency, data duplication), and it should react correctly to machine failures. Moreover, it has to providelow latency reads and updates, since it is a real-time system and its information should

be available and update as soon as possible, regarding the real-time data flow.Scalability and generalization should also be considered during the

de-veloping, since the system should scale maintaining its performances, and it should support a wide range of applications. Finally, anextensible system

that requiresminimal maintenance, provides the possibility to be extended

with additional functionalities, without be developed from scratch each time, and it assures reliable results without continuous need of mainten-ances. Consequentially, a Big Data platform for chemical industrial plants, should satisfy various properties, in order to provide a robust structure in which develop an anomaly detection algorithm. The combination of a dependable Big Data platform, and a reliable algorithm, indeed, consists of a correct solution for applying anomaly detection on data belonging to chemical industrial plants.

(17)

2.2. Dataset properties sensorId0 sensorId1 sensorId2 · · · sensorIdn

date − timeId0 vd0,s0 vd0,s1 vd0,s2 · · · vd0,sn

date − timeId1 vd1,s0 vd1,s1 vd1,s2 · · · vd1,sn ..

. ... ... ... . .. ...

date − timeIdk vdk,s0 vdk,s1 vdk,s2 · · · vdk,sn

Table 2.1: Example of the dataset schemas.

In conclusion, the application of anomaly detection in process plants, involves different requirements to be taken in mind both regarding the architecture and the algorithms. First, the architecture has to be robust and fault-tolerant, since process plants are critical production systems. Secondly, the algorithm should provide positive analysis over historical data, and low latency detection over real-time measurements. These aspects have been considered during this study, and the final result respects all the requirements asked to a system, to offer a correct anomaly detection support to plant operators.

2.2 Dataset properties

Datasets coming from process plants, are usually made oftime series,

se-quences of data points with a constant space in time one to another. For each time data point, values belonging to sensors located on the process plants, are registered and stored. In this way, from a starting date, it is possible to build the distribution of a given sensor during the time.

In this project, seven process plant datasets have been analysed. Each of them, contains a column related to the set of timestamps, thus, the one that indexes all the dataset, and different other columns, one for each sensor inside the plant. In the continuation of this study, a timestamp value will be identified by the use of the termdate-timeId, that explicitly defines the

separation between the date (made ofyear-month-day) and the time (made of hour-minutes-seconds). Moreover, the term sensorId will be used as identifier

for a single sensor inside a dataset, since there have not been duplicates inside the set of sensors for each set of data. An example of the database schema is reported in Table 2.1.

The datasets have a granularity of aminute, thus they consist of

measure-ments that are spaced one minute, one to each other. They have been treated as sequences of data points, without the application of specific time-series techniques. As it will be presented in chapter 5, indeed, ananomaly has not

been related to its time of appearance.

Table 2.2 reports some of the properties of each dataset. Regarding that,

(18)

Chapter 2. Description of the Application

here expressed in days to be more understandable, and it counts the days between the first date-timeId and the last one. However, just dataset1has all

the rows corresponding to the minutes from its starting date to its ending day, thus, all the minutes in 61 days. The others sets, instead, miss some measurements in different parts. For this reason, missing days reports the number of missing days in each dataset. The number is a rounded down approximation of the real value corresponding to the minutes missing, but it gives an idea of how the datasets are composed. The columns present min., instead, contains the number of rows of each dataset, and since the

granularity of them is of a minute, corresponds to the number of minutes (date-timeId) present into the set. Finally,sensors reports the number of

sensors (columns) that belong to the dataset.

A first consideration has to be done about the different size (in terms of rows) that dataset1 has, compared to all the others datasets. During

this project, the datasets used has been “sample” datasets; it means that the study has been focused on obtain the most valuable results regarding anomaly detection, a part of the sample size. The final solution, indeed, since corresponds to a usable product in the industrial domain, is a system capable in analysing a growing quantity of data, without suffering of excessive workloads, since all its components are scalable and distributed.

Inspecting the nature of the data, it has been found that all the datasets contain both numerical and text data. In particular, dataset1has 110 sensors

with just not-numerical values. Data of these types are, usually, sources of important information. They contain references toI/O Errors, Shut-downs

and a number of different others issue indicators. However, since the aim of this project has not been related to the analysis of the text data, they have not been considered as information sources, thus, they have been interpreted as

missing values. Regarding missing data, it has been chosen to not replace

them with a numerical value. Thinking about anomaly detection, indeed, it is not always clear if replace a missing value could lead to get different result. To avoid this option, missing values have been parsed as NaN values.

As it is visible from the number of sensors contained in the different data-sets (Table 2.2), dataset1has a considerable greater amount of sensors than

the others sets. The reason is that it contains different typologies of sensors, where the term typology means different types of sensor. Examples of ty-pologies, found in the datasets, are pressure sensors, temperature sensors and flow sensors. While dataset1contains different sensor typologies, the

same is not true for the other datasets. Indeed, they are related to a specific typology of sensor, thus, they are less various than the first set. In particular, the analysis has been carried out considering all the sensors as belonging to a single typology. Consequentially, the pipeline of transformations and analysis performed during the anomaly detection processes, does not de-pend on the nature of the sensor. A single difference in that, however, will be shown in section 5.5, in which two sensors typologies have been treated

(19)

2.2. Dataset properties Time range (days) Missing days Present min. (rows) Sensors (columns) dataset1 61 0 87840 1376 dataset2 404 31 537059 71 dataset3 360 56 437759 182 dataset4 404 50 509446 199 dataset5 404 31 537059 100 dataset6 404 31 537059 198 dataset7 404 31 537059 253

Table 2.2: Properties of the datasets analysed during the project. with a specific approach during a step of the anomaly detection algorithm.

Figure 2.1 shows three examples of sensor distributions contained into the datasets. In particular, Figure 2.1a reports the distribution of a sensor about which four anomalies were known before the analysis. Its distribu-tion, as visible in its box plot, is widely concentrated between its first and third quartiles, and just some values are identified as outliers in the figure. However, this sensor represents the only example of anomalies already defined before this study, in which the four anomalies, clearly shown in the distribution of the sensor, are referred to foaming events inside the plant. Nonetheless, since this was the only available example of anomalies inside the dataset (in particular, in dataset1), the analysis has been applied to an

unsupervised scenario, then, no additional notions about the existence (or

absence) of anomalies in the datasets were known.

Figure 2.1b and Figure 2.1c, instead, report two different examples of distributions found. The first example, shows a quite constant distribution, in which its box plot highlights the presence of some outliers. Most of the values are in a range of [300,400], and identify a sensor with a usual standard pattern. In these types of sensor, anomaly detection is usually quite simple: since sensors have clear regular distributions, anomalies are generally identifiable as outliers. The second examples shown, instead, has a two-fold distribution, in which the two parts widely differ from each other. In these cases, as no information about the underlining processes data were known, the anomaly detection approaches have been largely different regarding the two final algorithms proposed. As will be shown in chapter 6, one solution assumes that no anomaly exists between the first part and the second part of the distribution, while, the second algorithm proposed, detects the change into the distribution as an unusual variation.

Overall, these examples highlight the wide variety in terms of distribu-tion of the sensors belonging to the datasets. Some of them have a clear standard pattern, in which, at first approximation, it is simple to identify

(20)

Chapter 2. Description of the Application values 0 50 100 150 200 2014 Apr May 0.5 1.0 1.5 count 0 20000 40000 (a) values 0 500 1000 Jul Jan 2014 - 2015 0.5 1.0 1.5 count 0 2 4×105 (b) values −100 0 100 200 300

Apr Jul Oct Jan Apr

2014 - 2015

0.5 1.0 1.5

count

0 2×105

(c)

Figure 2.1: Examples of the distributions of three sensors.

anomalies and unusual values. Other sensors, instead, have a clear variable distribution in which anomaly detection has to be clearer defined.

Finally, the datasets analysed have been divided into two main sets, regarding the number of rows. In the continuous of this study, the term

smaller dataset will identify dataset1, while the term biggest dataset will

(21)

Chapter 3

Reference Architecture for Big

and Fast Data

Nowadays various architectures for Big Data Systems are available, and different technologies provide an easy way to communicate between one technology and the other [PP15]. As a result, an architecture for Big and Fast Data could be derived from the analysis of different constraints and from the consideration of the services the system should support.

In this chapter there will be shown four architectures patterns:Lambda Architecture (section 4.1) and Kappa Architecture (section 3.2) provide a

complete solution for a Big and Fast Data system; the patternsReal-time Stream Processing with In-Memory DB (section 3.3) and Big Data Analyt-ics with Tachyon (section 3.4), instead, are focused on the utilisation of

in memory systems, namely, in the real-time analysis, and in the histor-ical one. Moreover, in the patternConnect Big Data Analytics to Real-Time Stream Processing (section 3.5), some basic strategies that support the

con-nection between the historical analysis results and the real-time analysis processes are investigated. Finally, the chapter ends with the description of different Failure-Recovery Strategies (section 3.6) where three possible failure-recovery’s solutions are analysed.

3.1 Lambda Architecture

The first architecture analysed is the Lambda Architecture, described in [MW15]. Its design provides the possibility to ingest, process, and compute analysis on both real-time and historical data. The idea is to build Big Data systems as a series of layers where each has a specific role into the system. The layers on which the Lambda Architecture is build are the followings:

batch layer – it is usually a data lake system, such as Apache

(22)

Chapter 3. Reference Architecture for Big and Fast Data Serving layer Batch layer Speed layer New data Query Realtime view Realtime view Realtime view Master dataset 010011010 001101011... Batch view Batch view Batch view

Figure 3.1: Lambda Architecture diagram.

constantly growing, master dataset, and to compute arbitrary func-tions on that dataset. The processes in the batch layer are made over all data, and they compute historical predefined analytics; their results are thebatch views, that contain the data analysed through low-latency

processes. As a better understanding, it is possible to think the batch layer as a system that runs in a while(true) loop, and recomputes the batch views from scratch continuously (or at pre-defined intervals).

serving layer – it stores the results of the batch layer computations in

order to make them immediately queryable. Since the batch layer peri-odically recomputes the batch views, the serving layer is responsible for refresh the cached results with the updated ones. It is a distributed database that supports batch updates and random reads but it has not to support also random writes. As a result, the serving layer could be a really simple database as without the random writes requirement its complexity significantly decreases.

speed layer – it is a combination of queuing, streaming and operational

data stores. It supports the computation of real-time analytics on streaming-data. The speed layer compensates for the time while batch processes are made in the batch layer and it producesreal-time views

based on the most recent data received. The purpose of the layer is to compute the analytics quickly and at low latency in order to provide

(23)

3.2. Kappa Architecture

the opportunity to query the new data as soon as needed. In this way, when the system is queried it can show a complete view of the analytics across the most recent data and all historical data.

In Lambda Architecture (Figure 3.1) all new data are sent to both the batch and speed layer. Thus, data are ingested by two actors, usu-ally in parallel, by way of streaming systems of message queues, such as Storm [Fou16b] or Kafka [Fou16a]. Once data are being ingested, the batch layer (re)computes the batch metrics, and the speed layer computes sim-ilar results but just for the most recent data. Because of his configuration, and differently from what seen for the serving layer, the speed layer needs databases that support random writes and random reads. As a result, it is more complex than the serving layer both in terms of implementation and operations.

In conclusion, Lambda Architecture provides a reliable design for a Big Data system. The clear separation of each role in the system, over different components, gives the chance of focusing on a specific objective, one for each part, and it provides a simplification in the tasks allocation. However, having different layers and different components, for each layer, poses a difficult task in connecting all the parts and in sending messages through all the system flow. Making the speed layer as simple as possible could improve the implementation of the system, but, on the other part, the utilisation of compatible components in the speed and batch layer could also be a valiant opportunity in order to share the same computational code at least in some parts of the analysis.

3.2 Kappa Architecture

Kappa Architecture, proposed in [Kre14], is an architecture that focuses on simplifying the Lambda Architecture with a single change: remove the batch layer and make all processing happen in a near real-time, streaming mode. Since the batch layer is responsible for (re)compute the analysis on historical data, in Kappa Architecture the re-computation on the data can still be possible and it is in effect streamed through the Kappa flow again. Let us first focus on the Kappa Architecture structure.

As shown in Figure 3.2, Kappa Architecture is composed of the speed layer, the serving layer, and of a distributed messaging system. When new data are received by the distributed messaging system, it sends them to the speed layer where computations are made. Then, the serving layer waits for the outputs of the analysis, and it caches them in order to provide queryable results. The distributed messaging system lets retain all the data in order to provide also the possibility to reprocess them. In [Kre14] it is highly recommended to use Kafka in this role, but it is not excluded the possibility to use a distributed file system, like HDFS from Hadoop, to support the data

(24)

Chapter 3. Reference Architecture for Big and Fast Data Query New data 010011010 001101011... Speed layer Distributed messaging system Serving layer HDFS

Figure 3.2: Kappa Architecture diagram.

storage. Comparing HDFS in Kappa Architecture, and HDFS in Lambda Architecture, while in the second approach the distributed file system both stores all the data, and computes functions over them, in Kappa Architec-ture, instead, HDFS just stores the dataset, and the computations are made in the speed layer only.

As a result, the main goal of Kappa Architecture does not lie as much in redesign the Lambda Architecture, but rather in propose an alternative to its complexity: Lambda Architecture is composed of three different layers and many different technologies are needed in order to build a correct Big Data system; having two complex distributed systems like the batch and the speed layer, and maintaining code that has to produce the same result over them, is a huge challenge that could be avoided in all circumstances except when strictly necessary. However, removing the batch layer means either finding a different way to recompute analyses on the whole dataset or avoiding this functionality. In Kappa Architecture that is solved with the following steps (Figure 3.3):

1. Use a distributed system, like Kafka (or HDFS), in order to retain (or store) the data providing the way to re-process them;

2. When the reprocessing is needed, start that from the beginning of the retained data through a second instance of the stream processing job where the output data are directed to a new output;

3. When the second job finishes its computations, switch the application to read from the new output;

4. Stop the old job version and delete the old output.

The reprocessing job has to be done when the processing code changes, or when it is needed to recompute the results. This method takes data from the same source and it uses the same code over the same layer, as the standard stream processing job. Thus, this important simplification gives

(25)

3.2. Kappa Architecture New data 010011010 001101011 ... Query Distributed messaging system input Speed layer job version n job version n+1 Serving layer output n output n+1

Figure 3.3: Kappa Architecture, an example of the reprocessing process. The

components denoted by n are the original results before the reprocessing (grey flow), the components denoted by n+1 are the results after the reprocessing (red flow).

the possibility to greatly reduce the components in building the Big Data system. However, if a re-computation on the complete dataset is needed, the speed layer is responsible for both the roles, and it could weigh down the overall performances; in Lambda Architecture, instead, the re-computation is only done by the batch layer. Consequentially, Kappa Architecture could be a valiant alternative to Lambda Architecture when the re-computation frequency is not too high in the system, and the speed layer could then run without problems. If the re-computation happens frequently, it could be more reasonable having two different components (batch and speed layer), each of them with a specific and determinate role in the system.

As already presented, however, if in designing the system, the plan is to provide the possibility to analyse the whole dataset, starting from the first data received, in Kappa Architecture it could be useful to use HDFS in order to store the complete set of data, rather than setting the distributed messaging system retention at a very high threshold. As a result, part of the simplification that Kappa Architecture provides, could shrink, and have a specialized layer responsible for providing batch analyses, could be a more reliable solution in this situation. Furthermore, in section 5.2 will be presented the first final algorithm provided by this study. In this case, the algorithm used for the batch analysis provides its results, regarding the complete dataset, with the use of a threshold. The threshold is made by computing the average, and the standard deviation, of subsets of data, through a comparison of them with all the other data inside the dataset. Then, two passes over data are needed: the first one, in order to compute the average; the second one, in order to find the standard deviation. The algorithm used during the real-time analysis, instead, has a quite similar structure, but it works on chunks of data, rather than on the complete dataset. Consequentially, it still needs to compute the threshold and to pass over the complete dataset. In Kappa Architecture, a similar approach leads the streaming algorithm to start a new job for each chunk, since the complete dataset is needed for each of them. Consequentially, part

(26)

Chapter 3. Reference Architecture for Big and Fast Data

of the improvements that Kappa Architecture provides, could be again outweighed by additional complexities. In this case, the batch and the streaming algorithm are quite similar, but still not completely identical. As a result, the use of a single algorithm, as Kappa Architecture suggests, could complicate the performances of the system, rather than improving them.

3.3 Pattern: real-time Stream Processing with

In-Me-mory DB

As much as Lambda Architecture seems to be one of the most complete structure in order to build Big Data systems, and Kappa Architecture offers a reliable alternative solution, some aspects could still be improved, in order to make the structure more complete, and less complex. In [Pie15] some weaknesses of Lambda Architecture are highlighted, and it is suggested VoltDB [Vol16] as alternative component of the architecture. The aim of the proposal is highly focused in building Big Data systems that could not only make real-time analyses but also make real-time decisions on each event in the data flow. In order to do that, some complexities of Lambda Architecture have to be solved, like theone-way data flow, that makes impossible to do

per-event decisions on the streaming data or to respond immediately to the events coming in, theeventual consistency, less strong than the consistency

offered by relation databases and some NoSQL products, and the general

complexity, derived from the presence of many different components passing

messages from one to the next.

Using VoltDB, a clustered, in-memory, relational database supporting the fast ingest of data, real-time analytics, and the fast export of data to systems such as Hadoop and OLAP products, the system architecture (Figure 3.4) works as explained below:

• Data arrive and are ingested. They are immediately exported to the batch layer;

• The batch layer is responsible for detect historical intelligence and send it to the speed layer for per-event real-time decision making; • The new decision-making layer (inside VoltDB), both ingests fast data

and uses real-time data, together with historical intelligence, in order to compute a response for each data.

As a result, in-memory databases, such VoltDB, let systems both capture value the moment the event arrives (rather then capturing value at some point after the event arrives) and react immediately. Moreover, this approach reduces the complexity of Lambda Architecture: it replaces the streaming analysis parts with a single component, and it provides the traditional database interaction model with SQL capabilities. It should be noted that

(27)

3.4. Pattern: Big Data Analytics with Tachyon

Figure 3.4: Big Data system with VoltDB, source [Jar14].

most of the improvements VoltDB provides to the architecture, depend on its in-memory structure, thus they are subject to the in-memory database weaknesses (e.g. the potential loss of data and the limit on the database size). Indeed, unlike what happens in Lambda Architecture, in which new data are ingested by both the speed and batch layer, in this structure, new data are ingested only by the speed layer and, if it fails, or it becomes unavailable, reliable failure-recovery strategies have to come to the rescue.

As a last consideration, the solution adopted in this project had to reply to specific requirements coming from the industrial domain. Usually, from this prospective, the use of specific technologies is strictly related to the use of technologies which are bundled together and are provided byone source

partner. Regarding this, VoltDB still represents a very new addition to some of the main Big Data Distribution, thus not too shared in the application domain. Since the aim of the project has been to develop a system both dependable and easy to be maintained, VoltDB has been considered as a solution still not too much integrable in the industrial ecosystem.

3.4 Pattern: Big Data Analytics with Tachyon

As just shown, regarding the use of an in-memory database in the real-time processing system, it could be interesting analyse the use of a similar database also in the batch processing system. A reliable example is described in [Sha15], in which is reported an interesting result made by the use of Tachyon, a memory speed virtual distributed storage system (actually also known as Alluxio), into Baidu’s big data infrastructure. The infrastructure (Figure 3.5) is built with Tachyon and Spark SQL, and it assures strong

(28)

Chapter 3. Reference Architecture for Big and Fast Data

Figure 3.5: Big Data system with Tachyon and Spark SQL, source [Sha15]. performances in querying Baidu’s dataset [Sha15, Figure 4-11]. A simple example of the data flow in the architecture is the following [DZL+13]:

1. Queries are submitted through a user interface, then the operation manager analyses each query and asks to the view manager if the necessary data are already in Tachyon;

2. If data are in Tachyon, the operation manager directly takes the data from it and provides the analysis on the data;

3. If data are not in Tachyon, the operation manager asks them to the data warehouse. At the same time the view manager requests the same data to the data warehouse and stores them in Tachyon so the next time the same query is submitted, the necessary data are already in Tachyon.

As a result, the operation manager, a Spark application integrating Spark SQL, is responsible for grabbing the queries from the user interface and asking the data, while the view manager manages cache metadata and takes query requests from the operation manager. Tachyon is used as cache layer and it buffers the frequently used data. Finally, the data warehouse is the data center where data are stored in HDFS-based systems.

In spite of this example is not focused on a real-time analysis, it provides an appropriate support to what concern the big data analysis all in all. Differently from what saw with VoltDB in the previous section, when in memory capabilities were added to the speed layer, this example shows how adding in memory capabilities also to the batch layer could greatly improve its performances. However, adding a new framework as Tachyon at the Big Data system is a choice that has to be made considering the use of the batch layer in the real case. In this project, the final use of the results, and the

(29)

3.5. Pattern: Connect Big Data Analytics to real-time Stream Processing

method through which they will be got, is not analysed. Anyhow, show how in memory capabilities could improve also the batch layer, is a great way to take under consideration their use in different applications.

3.5 Pattern: Connect Big Data Analytics to real-time

Stream Processing

The architectures shown so far, present a view of some of the Big Data architecture solutions that are nowadays used in different domains. A final discussion about them will be reported at the end of this chapter, while in the continuous of this section, it will be analysed guide lines in order to connect the two main components of the final system: a Big Data analysis element, and a real-time streaming one. As it was shown until now, provide a reliable solution for each of the two layers implies a lot of choices and attentions, but many are the possibilities to build both a batch and a speed layer with great performances. An important task that now should be investigate is how the two layers could work together, and they can interoperate to support meaningful applications.

Tying multiple systems together is complex, but it is necessary when the need is to analyse large volume and high velocity data at the same time in an industrial application. Large volume data require systems providing reliable batch computations, while high velocity data require systems providing real-time analysis. Connect these different systems, and databases, has to be done through a careful and conscious designing of both the dataflow and the connections between the components. Regarding that, in [BH15], two important considerations are highlighted:

• the system responsible for the real-time analysis requires a component capable of holding or getting the state generated by the batch system; • the state generated by the batch system needs to be update, or replaced

in full, with regularity.

Updating or replacing the state generated by the batch system could be done in different ways: updating state on a per-record basis is sufficient if the application requires per-record consistency but accepts also eventual consistency across records; updating the set of records as a whole if the application requires strictly consistency in the data. This second updating way is suggested for the applications where the correctness of the results depends on having all the data set consistent. In that case, write a shadow table could be a possible solution and, once the shadow table with the batch analysis is written, it can be switched with the main table. In that way the application will see only data from the previous version, or only from the new version of the results, and it will never see a mix of data from both the versions.

(30)

Chapter 3. Reference Architecture for Big and Fast Data

As a consequence, it is quite obvious that the chosen strategy depends on the algorithm used during the analysis: an algorithm that needs per-record consistency, as it will be shown in chapter 5, allows to update the state generated by the batch system with less constraints, than an algorithm that needs consistency over the entire generated state. Furthermore, in this project, specific attentions have been dedicated to the updating phase of the batch system, and the use of a reliable strategy, together with adaptable components, ended up with a assured per-record consistency.

3.6 Failure-Recovery Strategies

In the last section of this chapter, a final analysis on failure-recovery strate-gies is reported. In building a reliable Big and Fast Data system, indeed, connections between different components are required and, since one or more processing stages could eventually fail, and become unavailable, the design of failure-recovery strategies is a fundamental task to be investigates. Regarding this, the following sections explain the main strategies for three possible failure-recovery strategy types:at-most-once delivery, at-least-once delivery and exactly-once delivery [BH15].

3.6.1 At-Most-Once Delivery

At-most-once delivery is the simplest situation between the three alternat-ives. When an interruption happens, and some data are not processed, they can be simply dropped from the system. It requires the consideration of two important data qualities:

data importance – it both concern the historical analytics, that will

never analyse the dropped data, and the methods through which the system deal with missing data;

data volume – it is related to the amount of data lost during the

recov-ery.

Consequentially, if the system could support losing some data during a component failure, regarding both their importance and their volume, at-most-once delivery could be a simple solution to be implemented. Moreover, regarding the data volume lost, estimate the mean time to recovery for each component of the system, could be useful in order to have a measurement of the amount of data not analysed during a system failure.

3.6.2 At-Least-Once Delivery

At-least-once delivery start the analysis of some data, starting from a know-processed safe-point. This strategy provides that each data will be know-processed

(31)

3.6. Failure-Recovery Strategies

at-least-once, therefore it is also possible that some of them will be processed more than one time. Some consideration have to be discussed:

data ordering – once the recovery is done, it could be possible that the

data order of arrival will be different regarding the original one;

idempotent operations – since some data could be processed more than

one time, it is important to have idempotent operations (i.e. operations having the same effect no matter how many time they are applied), in order to provide same outputs processing more time same data;

deterministic operations – for the same reason as before, having

determ-inistic operations in order to not change, or corrupt, the outcomes;

durable components – in order to assure the possibility to recover the

system from a known checkpoint, durable components, as upstream components, are required.

If the system provides all these requirements, then at-least-once delivery could be a reliable strategy. If the system does not provide some of them, at-least-once delivery should be avoided in order to not compromise the system functionalities.

3.6.3 Exactly-Once Delivery

Exactly-once delivery is the ideal strategy for a system, as each data is processed exactly once. This strategy requires different considerations, and a reliable method to determine which data has been processed, and which has been not, before a system failure. In [BH15] are presented two different patterns, in order to provide exactly-once delivery. Both of them, however, are founded on the basic assumption that the system provides a functionality similar to the ON CONFLICT clause used by SQL systems. In particular, SQL systems can insert a row with a conditional insert, that means “if the row exists, do not insert. Otherwise, insert it”. The problem with this assumption relies in the case in which rows can not be uniquely identified; in this situation, the upsert state will be unclear. Thus, two main strategies to uniquely identify rows are here presented:

use unique identifiers – using a unique identifier, for each data,

im-proves the correctness of the exactly-once delivery strategy. It could be done in different ways: incrementing a counter that can uniquely identify data; using a system such as ZooKeeper, that assigns blocks of one million ids in chunks; combining timestamps with ids; or combin-ing timestamps and counters in a 64-bit number as VoltDB provides;

use Kafka offsets – Kafka automatically applies ids to data combining

(32)

Chapter 3. Reference Architecture for Big and Fast Data

have to be discussed: first, insert data in Kafka has same problems that insert data in other systems, thus manage exactly-once delivery strategies has to be done when new data are inserted in Kafka; sec-ondly, if the Kafka cluster is switched or restarted, topic offsets could no longer be unique (use Kafka cluster id could solve this problem). As will be shown in the continuous of this study, since Kafka has been used as component of the final architecture, the unique identification of the rows analysed, has been founded on the use of the Kafka offsets.

3.7 Discussions

As show in this chapter, different options in building a system for Big and Fast data exist. While Lambda Architecture is one of the most complete architecture, and it provides a clear division between the historical and the real-time analysis, Kappa Architecture assures same functionalities with a lighter system structure. However, it relies on considering the two algorithms, used to process historical and real-time data, as identical ap-proaches. Sometimes, and this project represents a prove of that, the two algorithms are different: the batch algorithms derives useful information from the complete historical dataset, and it use them as support for the real-time analysis; in the meantime, the real-time algorithm could be more focused on derive the requested results only, with low latency processes. For this reason, and in order to have a clearer distinction between the compon-ents inside the system, Lambda Architecture has been preferred to Kappa Architecture.

Moreover the use of in memory strategies have been investigated and they led to taking under consideration the use of in memory solution in the system. VoltDB and Tachyon are two proposals for, namely, a in memory database, and a in memory storage system, but they have been considered not perfectly suitable for the architecture design. While Tachyon fits into the batch component of the system, and the aim of this project does not really need the use of an in memory solution for that part (HDFS already provides an optimal solution for this role), Spark Streaming has been preferred to VoltDB. This choice has been made in order to share some code parts between the historical and real-time algorithms, and in order to use a single analytic framework for both the two analysis (Spark offers both a module for the batch analysis,Spark Core, and a module for the real-time analysis, Spark Streaming).

Finally, what discussed in the last sections of this chapter is useful in order to think about a correct path to follow in designing the system. As will be shown in chapter 4, HBase has been chosen as component to connect the Big Data analytics with the real-time ones, and the use of Kafka, together

(33)

3.7. Discussions

with Spark Streaming, assured the exactly-once delivery as failure recovery strategy.

(34)
(35)

Chapter 4

Specify Architecture and

Technologies

In this project Lambda Architecture has been chosen as blueprint of the Big Data system. The architecture, showed in detail in section 4.1, provides an optimal solution for both the batch and the real-time parts. Together with what defined in the standard definition of the architecture, some additional components had been added to the system, and some different connections have been made between them. In section 4.2 these parts are analysed, and the final architecture is presented. In section 4.3, each part of the architecture is, instead, described regarding the technology choose to be used into it.

4.1 Lambda Architecture

Lambda Architecture (Figure 3.1), as seen in section 3.1, is an approach where batch analysis and real-time analysis are mixed together in order to derive opportunities from both the two parts. The idea of the architecture is to build Big Data systems as a series of layers where each of these has a specific role into the system. The layers of which Lambda Architecture is composed are the followings:

batch layer – the layer where the master dataset is stored and batch

processes are made;

serving layer – the layer responsible for storing the results of the batch

layer, thebatch views, and making them queryable;

speed layer – the layer where low latency processes are made on recent

data only. Its results are calledreal-time views.

The idea behind the Lambda Architecture is the possibility to build a system that satisfies the most important properties Big Data systems should have

(36)

Chapter 4. Specify Architecture and Technologies

(see 2.1), splitting the different phases of the analytic process over separated actors. All new data is sent to both thebatch and speed layer: while the first

runs on the complete dataset (historical and new data), with high latency processes, the second operates on the most recent data only, and the results are made with low latency.

4.1.1 Batch layer

Thebatch layer is responsible for two main purposes in manage the Big Data

system designed with the Lambda Architecture. It both: • store the master copy of the dataset;

• compute thebatch views on that master dataset.

The batch views are the results of the analysis the batch layer runs over the master dataset. These are loaded by the serving layer, that allows rapid access to them, with the use of indexes.

Since in Lambda Architecture data are immutable (i.e. data are never modified) and then eternally true [MW15, p. 34–36], each data is written once and only once in the dataset. This strong consequence affects the choice of the storage solution: the only operation needed is to add new data to the dataset. As a result, the storage solution must be optimized to ensure the storage of a large and constantly growing set of data; consequentially, dis-tributed file-systems, such as the Hadoop Disdis-tributed File Systems (HDFS), are the best choices [MW15, p. 56–61] for this role.

Like the storage solution, it is really important to define also an efficient batch layer storage system, that has to be reliable in reading lots of data at once, in a non-random access way.

4.1.2 Serving layer

Theserving layer is a distributed database that loads batch views from the

batch layer and supports random access to them. When new batch views are available, the serving layer automatically replaces the older views with the new ones so that more up to date results can be queried. An important note regarding the serving layer is that in the Lambda Architecture it does not need to provide random writes, but just random reads and batch updates. As a result, the serving layer could be a very simple distributed database with less requirements than a database that supports random writes. The conditions a serving layer must provide are the followings:

Batch writeable – when a new batch views becomes available, it must

be possible to replace the older version with the update one;

Fault-tolerant – because the serving layer is a distributed database, it

(37)

4.1. Lambda Architecture

Scalable – the serving layer must be capable of managing views of any

size;

Random reads – in order to have low latency on queries, the serving

layer must provide random reads with indexes supporting direct access to small portions of the views.

Serving layer and batch layer, together, solve one of the biggest prob-lem in relational databases:the normalization/de-normalization problem. In

relational databases, in order to provide better performance in retrieving information, quite often is needed to store copies of the same data in dif-ferent tables causing redundancy in the schema. Think about information frequently requested that are available only through a join between two tables (T1, T2). Joins are expensive, and a common solution is to store some

information ofT2also inT1. As a result, next times these information will

be requested, the relational database will queryT1only, avoiding the joining

withT2. It provides efficiency in terms of time and resources spent, but it

causes redundancy in the database, with consequent problems in manage multiple copies of same data.

Lambda Architecture solves this problem dividing the normalized and de-normalized data between the batch and server layer respectively: the batch layer stores the normalized version of the master dataset because its operations read the master dataset in bulk, and so there is no need to design an optimizing schema; the serving layer, instead, is responsible for maximize queries performances, and data should be not only pre-joined, but also aggregated and transformed to improve efficiency.

4.1.3 Speed layer

The speed layer is responsible for fill the analysis gap between the batch

views computed by the batch layer, and new data arrived during the time between the batch layer started and finished its computations. While the batch layer provides high latency updates to the serving layer, the speed layer provides low latency computations in order to producereal-time views

based on the most updated data. Both the speed and the batch layer produce views based on data received, but the speed layer looks just at the recent data, while the batch layer computes the batch views analysing all the data at once. Moreover, the speed layer does not look at all the recent data at once: instead of recomputing the views from scratch (like the batch layer does), it updates the real-time views as it receives new data with anincremental computation and not a re-computation, as done in the batch layer.

The incremental computation causes the speed layer to be more complex than the batch layer. However, that is compensated by two advantages the speed layer provides:

(38)

Chapter 4. Specify Architecture and Technologies

• it processes a smaller volume of data than the one processed by the batch layer, that allows agreat design flexibility;

its results (i.e. real-time views) are transient, because once the serving

layer is updated with new results, the real-time views can be ignored and replaced by updated views. As a result, even if the speed layer is more complex than the batch layer, because of the incremental compu-tation, and thus more prone to error, any errors will be automatically corrected through the batch and the serving layers, as soon as the batch processes will end.

With these advantages the speed layer has to support two main services:

update the real-time views, processing the incoming data stream, and store the views in order to support their interrogation. In addition, the decision about

when expiring the real-time views should also be investigated. However, since the method through which the real-time views will be interrogated depends on which will be the final application of the real-time analysis, this project does not investigate how to store the view or update them. Indeed, regarding the speed layer, it is focused on how to get the results in a reliable way.

4.2 Lambda Architecture details

Figure 4.1 shows how lambda architecture has been thought as solution of this project. Differently from what saw in the standard definition, three main additions have been inserted:

• a persistent publish/subscribe messaging system has been inserted as

system input gateway;

• adistributed batch processing framework to compute batch analysis has

been added to the batch layer;

• a connection between the speed layer and the serving layer has been

created;

To be clear, the distributed batch processing framework was already pro-vided by the internal map-reduce service of Hadoop, used as batch layer. However, it has been substituted by a more performant frameworks in terms of velocity for what concerns the batch analysis, and in terms of compatib-ility to what regards the possibcompatib-ility to use similar computational code in the batch and in the speed layer. One interesting task is now to take a look into each part of the architecture, and understand the reasons why each component has been chosen, and which properties does it satisfy.

(39)

4.2. Lambda Architecture details

Figure 4.1: Big Data Pilot Architecture. Persistent publish/subscribe messaging system

The persistent publish/subscribe messaging system, inserted as input com-ponent into the architecture, is a gateway between the external world and the internal system. It serves as assurance regarding the correctness of the received data: each data should be correctly received and no data loss should take place. In addition, data can be distributed within the system to provide protections against failures as well as significant opportunities for scaling performance.

Distributed batch processing framework

The distributed batch processing component, that has been added to the batch layer, provides the possibility to compute analysis over the whole dataset. It is distributed in terms that it works in parallel within different nodes and it assures a fault-tolerant strategy. The reason why this compon-ent was added will be more clear in section 4.3, where the technologies chosen for the system will be showed, but, at this moment, the main reason that can be taken into consideration is that a in memory analysis has been preferred to the default map-reduce strategy provided by the distributed storage system.

Connection between speed layer and serving layer

Lambda Architecture, as shown in section 3.1, does not provide any descrip-tion about a possible connecdescrip-tion between the speed layer and the serving layer. However, since in this project have been derived different and useful

(40)

Chapter 4. Specify Architecture and Technologies

results from historical data, the serving layer has been used not only as batch views storage solution, but also as storage for intermediate results. In this way, the algorithm running on the speed layer, can benefit from these information in order to analyse the real-time data regarding historical notions about them.

Overall, the three additions have been necessary to complete different parts of the architecture:

• the persistent publish/subscribe messaging system provides a easy interface for the external world and the internal system. As will be shown in 4.3.1, it can be configured with a number of different settings and components, thus, it can adapt to diverse situations and function as ingestion path controller;

• the distributed batch processing framework provides fast analysis in a distributed fashion. As a result, it improves the efficiency of the batch layer speeding up the batch analytic;

• the connection between the speed layer and the serving layer is used to retrieve information about historical data and use them to support real-time analysis. As a consequence, it completes the speed layer giving a reliable approach to fetch useful intermediate results. In this way the architecture structure has been optimized, and adapted, to the specific use case of this project. The technologies used inside each of its components, are then described in the following section.

4.3 Technologies

Before starting with the description of the technologies used into the ar-chitecture, it is necessary to dwell on the reasons that led the choice of a technology rather than another one. The development of an architecture, made for Big and Fast Data, is subject to the different criteria (section 2.1); first of all, it should be fast, scalable and fault tolerant. Moreover two basic assumptions, that in this project have been considered, are the followings:

• the architecture should be easy to program, in order to provide the

possibility to share the project (or part of it) with different contributors, and to have a definite structure, rather than a vast number of different components;

• the architecture should beadaptive, in the sense that it should be able

to adapt accordingly to the changes of different factors that effect its functionalities.

(41)

4.3. Technologies

Pay attention on the fact that an architecture easy to program depends considerably from the spread of the technologies used into it. Related with the industrial prospective, it mainly regards the use of technologies which are bundled together, and that are provided byone source partner.

For example, Big Data Distributions as Cloudera [Clo16], MapR [MT16] and HortonWorks [Inc16], provide the coexistence of different Big Data frameworks within an integrated industry platform. As a result, they play a fundamental role in the dissemination of the technologies, thus, they are central determinants in the construction of an adaptive architecture easy to program.

Furthermore, to what concerns Lambda Architecture, theeasy to program

property should be a shared paradigm within all the layers of the architec-ture. Theadaptive estate, instead, affects the speed layer more than the batch and the serving layers. The first one, indeed, may suffer from sudden surges in data rates, variations in the processing load, or unexpected slowdowns in downstream data stores [Tat16]. Batch and serving layers, instead, are affected by less variable circumstances, like the reading and writing speed on the disk, or the allocation of the resources into the cluster.

With this in mind, the following sections show the chosen technolo-gies for each part of the architecture. In this project, additionally to the standard separation of the roles present in Lambda Architecture, it is essen-tial to underline not only the connections between the layers, but also the components dedicated to the data ingestion path.

4.3.1 Data ingestion: Apache Kafka

Apache Kafka is a publish/subscribe messaging system that can store data durably and in order. It also provides deterministic and faster access to them. Data can then be distributed within the system providing protections against failures [NSP16]. Kafka works withmessages, where a message is

an array of bytes, usually together with a bit of metadata used as message key. Messages are then categorized intotopics (topics are like what folders

are for a file system or tables are for a database). A Kafka topic could be divided into a number ofpartitions. A partition is the messages container,

where messages are in the same order in which they arrive. Kafka provides the reading order for each partition (from the first message arrived to the last added), but it does not provide any order about the reading of messages across the entire topic.

Two different types of clients are presents in Kafka: producers and

con-sumers. Producers create new messages to a specific topic and, usually, also

to a specific partition of that topic. When a message is produced, Kafka assigns it anoffset: a bit of metadata representing an integer value that

continually increases. As a result, each message, within a given partition, has an unique offset. Offsets are used by consumers that, diversely from

(42)

Chapter 4. Specify Architecture and Technologies Partition 0 0 1 2 3 4 5 6 7 8 91011 Consumer 1 Partition 3 0 1 2 3 4 5 6 7 8 910 Partition 1 0 1 2 3 4 5 6 7 Partition 2 0 1 2 3 4 5 6 7 8 9 Message writes Consumer 2 Consumer 3 Consumer Group Topic "topicName"

Figure 4.2: Kafka, representation of a topic with four partitions. The number of

each message corresponds to the order in which the messages will be read from the partition which they belong to.

producers, read messages. A consumer could subscribe to one or more topics, and it reads messages in the same order they arrive. It also keeps track of the offset of the last message consumed, in this way, a consumer can stop and restart without losing its place. Moreover, consumers can work alone or as part of aconsumer group. A consumer group is a set of consumers

that work together to consume a topic where each partition of the topic is only consumed by one member of the consumer group, calledowner of that

partition.

Finally, a single Kafka server is a broker. It receives messages from

producers, assigns offsets to them and commits the messages to storage on disk. A broker also services consumers, responding to fetch requests for partitions and responding with the messages that have been committed to disk. Brokers operate as part of acluster, in which one broker will function as

cluster controller providing administrative operations, like assign partitions to brokers. A partitions is owned by a single broker in the cluster, called the

leader for that partition. An example of what explained so far is shown in

Figure 4.2, in which the division of a topic into four different partitions is illustrated, together with the way how a consumer group reads the messages from each partition. Messages are read in the same order in which they arrived, so the gray ones have been already read by one consumer, while the messages in green are being consumed in this moment, and the ones in red have been just added to the partition.

An important question at this point of the study is the reason why Kafka has been chose as ingestion controller. Kafka has been chosen due to its properties, and its reputation. First, as shown before, it is distributed, scal-able and fault tolerant, and it is designed to work with systems that need to

Riferimenti

Documenti correlati

Examples include the International Standard Organization's Open Systems Interconnection Reference Model (a layered network architecture) [12], the NIST/ECMA Reference Model (a

Furthermore, as confirmed by stoichiometric calculations and microbial community analysis, CO 2 produced from lactate fermentation was recycled into acetate via

Nella misura in cui Büchner concepisce i bisogni primari e gli stimoli rivoluzionari solo come differenti stati ag- gregativi della stessa energia naturale, della

In particular, G-CNV is able to analyze the read sequences to fil- ter those read sequences that do not satisfy a quality constraint, to mask low-quality nucleotides with an aN

Photometric observations were carried out by the following observatories: Tuorla Observatory in Finland, Mount Suhora Observatory of the Pedagogical University and

1. tipo di coil utilizzato 2. forma dell'onda magnetica.. orientamento encefalico delle linee di corrente e di eccitabilità neuronale. 1) Il coil utilizzato può essere

Questi nuovi valori di variazione di corrente sono stati poi espressi in funzione della dose di radiazione per diversi valori di tensione (Figura 3.22) e, svolgendo nuovamente un

From 20s- 30s Sao Paolo began rapid industrialization with switch from coffee production to blue- collar work, Sao Paulo is a city that started its development late compared to