• Non ci sono risultati.

Tools for self-adaptiveness in Spark-based applications

N/A
N/A
Protected

Academic year: 2021

Condividi "Tools for self-adaptiveness in Spark-based applications"

Copied!
49
0
0

Testo completo

(1)

Universit`

a degli studi di Pisa

Scuola superiore Sant’Anna

Master’s Degree in Computer Science and Networking

Tools for self-adaptiveness in

Spark-based applications

Supervisors:

Prof. Nuno Antunes

Prof. Antonio Brogi

Candidate:

Alberto And`

o

(2)
(3)

Contents

1 Introduction 2

2 Background and related work 4

2.1 Autonomic computing . . . 4

2.1.1 The MAPE-K cycle . . . 7

2.2 The TMA framework . . . 10

2.3 Spark . . . 12

2.3.1 HDFS . . . 13

2.3.2 Cluster mode and submissions . . . 15

2.3.3 Configuration . . . 16 2.3.4 Monitoring . . . 17 3 The components 19 3.1 Property codes . . . 19 3.2 The monitor . . . 21 3.3 The executor . . . 23 4 Experimental validation 26 4.1 Analyze and plan . . . 27

4.2 BULMA . . . 29 4.3 Experimental setup . . . 30 4.4 Simulation . . . 32 4.5 Results . . . 32 5 Conclusions 34 A Source code 36 A.1 monitor.py . . . 36 A.2 executor.py . . . 41 A.3 an pl.py . . . 43

(4)

1

Introduction

In the Big Data era, the archetype of heavy, resource-demanding software is gradually overlapping with the concept of data-intensive software. Data-intensive is an adjective used in contrast with compute-Data-intensive, meaning that the processing burden of the software comes predominantly from I/O, due to data size, as opposed to coming from pure computational requirements[1]. There are virtually no bounds on how big data can get. With the massive increase in interconnected hardware and software producing data, the com-putational burden required by large scale distributed applications is contin-uously reaching new heights. In this scenario, any solution providing per-formance optimizations is helpful. One possible source of optimization is resource allocation, and distributed applications often use both static and dynamic lower level policies to assign tasks to nodes. In typical schemes, a central node is responsible for orchestrating other nodes, and allocating resources after receiving information on the state of the nodes. Besides that, there is higher level approach that can be combined with these techniques to further improve performance: following the sensor-actuator dualism typical of automated control systems, this approach is known in the software area as autonomic computing.

This work aims to design a lightweight, microservice-styled pair of highly configurable software components, a monitor and an executor, that provide a skeleton for building self-adaptiveness features in applications based on Spark, a largely used parallel computing framework. The components are supposed to work as parts of an MAPE-K cycle, an influential architectural pattern for autonomic computing systems, which includes stages of Monitor-ing, AnalyzMonitor-ing, Planning and ExecutMonitor-ing, other than a Knowledge source. The monitor and the executor stand at the M and E phases of the MAPE-K cycle, a theoretical model for autonomic computing systems. They are also designed to be compatible with the TMA framework (Trustworthiness Monitoring Assessment), a microservice-based application currently in devel-opment by researchers at the University of Coimbra, Portugal. Particularly, the components presented in this work are fully compatible with the frame-work, as they can communicate with TMA’s monitor and executor RESTful interfaces through TMA-formatted JSON messages. The monitor periodi-cally collects data from Spark applications in execution and sends it through its outbound interface, while the executor responds to messages coming to its inbound interface by applying changes in Spark configuration.

(5)

The experimental validation of the work consists of a full MAPE-K cycle, where M and E are the components, AP is implemented in an ad hoc sample component and K is the Spark history server. The resource managed by the system is a shape matching application called BULMA, running as the driver application on a Spark cluster. The results will show how the monitor and the executor are able to effectively interact with Spark-based applications, and how a full cycle with an efficient analyze and plan module can result in performance improvements.

Finally, an appendix is provided, featuring the full source code of the moni-tor, the execumoni-tor, and the analyze and plan module. The behaviour of these modules is described in detail in later sections.

(6)

2

Background and related work

This chapter introduces the theoretical concepts and the technologies on which the work is based. The first section is dedicated to the autonomic computer standards defined by IBM and the MAPE-K cycle. Then, the second section summarizes the architecture and the purpose of the TMA framework, and the third section is focused on Spark, a computing framework that is used to run distributed applications.

2.1

Autonomic computing

One of the big downsides in the steady increase of computing power due to Moore’s law is the related increase in software complexity. Current-day com-plex systems and networks may deal with a large number of interconnected heterogeneous components, variable workloads, and large amounts of both structured and unstructured information. These systems require not only a huge design and implementation effort, but possibly an ever bigger manage-ment and maintenance effort. This is the typical scenario where autonomic computing - a concept introduced by IBM in 2001[3] - can be helpful. The term autonomic is borrowed from the autonomic nervous system, the part of the human peripheral nervous system that is responsible for the regulation of bodily functions - such as heart rate and temperature, among others - that happens as an unconscious response to bodily feedback. In other words, this means that the body regulates itself based on its current state, which in turn is based on the previous regulations.

In a similar way, autonomic computing is the set of features of a system that provide self-management capabilities, by performing automatic adap-tations to analytically unpredictable changes in a way that is completely transparent to the user, thus reducing the system complexity.

In an autonomic computing environment, there are system components that include self-management functions, that are divided into four categories ac-cording to the IBM Autonomic Computing Blueprint, namely:

• Self-configuring: dynamic adaptation to environmental changes, ac-cording to a predefined policy. The adaptation usually consists of changes in the system configuration, which may include the deploy-ment or removal of certain components.

• Self-healing: automatic detection of system anomalies or failures and reaction to such events, according to a predefined policy. Self-healing

(7)

functions largely improve the resilience of systems, as they allow to handle malfunctions without the need for human intervention.

• Self-optimizing: dynamic optimization of resource usage, based on monitored parameters. This usually includes resource reallocation in response to workload changes, and is thus particularly useful in systems with a heterogeneous and bursty workload.

• Self-protecting: automatic detection and identification of threats, and reaction to such events with the enforcement of predefined security policies.

The IBM Autonomic Computing Blueprint also presents a much more for-mal approach to define an autonomic computing system from an architectural point of view. The system is described as a set of communicating components, that are connected using standard interoperability mechanisms. The commu-nication interface is implemented is often implemented through lightweight web services, such as RESTful[4][5].

The building blocks of an autonomic computing system can be divided into four categories: autonomic managers, manual managers, touchpoints and knowledge sources.

(8)

Fig. 1, The reference architecture for autonomic computing, as presented in the IBM Blueprint.[2]

At the bottom of the picture, there are the managed resources, i.e. the sys-tem components. These are not part of the autonomic computing syssys-tem, as they are fully functional on their own, regardless of the upper layers. Man-aged resources may be hardware or software, or both. The following layer is made of touchpoints, components that implement interfaces to communicate with the managed resources. Through specific interfaces, touchpoints pro-vide sensor/effector functions for the managed resources, and are therefore essential building blocks of an autonomic computing system. Going further up, touchpoint autonomic managers implement one or more of the four self-management functions and execute it through the touchpoints. Upper-layer orchestrating autonomic managers may provide automatic management for groups of touchpoint autonomic managers. Grouping can be done accord-ing to the self-management function categories (within a discipline). Alter-natively, a group can be formed based on other criteria, and can include autonomic managers performing different self-management function (across disciplines). Finally, all autonomic managers are designed and maintained by manual managers, which are the top layer. Knowledge sources, depicted on the right side, cross all layers are they are shared resources, that provide each layer with the right information.

(9)

A touchpoint autonomic managers can work with multiple touchpoints at the same time. Indeed, an interesting property that defines a touchpoint autonomic manager is its scope of control, the type and quantity of managed resources that are under the control of the manager. There are four typical categories for the scope of control:

• Single resource: the basic scope, in which a touchpoint autonomic man-ager implements a single control loop to interact with a single managed resource of any kind. Examples include a server, an application, or a storage device.

• Homogeneous group: a scope that groups managed resources of the same type, e.g. a server pool. In this case, the touchpoint autonomic manager implements a single control loop to execute for each resource of the group.

• Heterogeneous group: a scope that groups managed resources of dif-ferent types, and can implement one or more control loops to interact with them.

• Business system: a scope that contains an organized collection of het-erogeneous resources belonging to a specific business service. Touch-point autonomic managers with a business system scope must keep and evaluate an overall state of the business processes and interact with the associated resource groups (both homogeneous and heterogeneous) or individual managed resources according to a policy.

2.1.1 The MAPE-K cycle

The responsibility of a touchpoint autonomic manager is that of imple-menting the automation of some management functions, externalizing them through the touchpoint interfaces. The core of an autonomic manager is the control loop1 that it implements. Self-management comes from the automa-tion of a process involving multiple steps: gathering interesting informaautoma-tion from the system; processing such information according to its internal logic to determine if actions are required; defining such actions, and in some cases their execution order; performing such actions. If all these steps are auto-mated, the control loop is said to be intelligent.

The steps described are collectively known as the MAPE-K cycle, the most 1It can be more than one, in case of heterogeneous group scope.

(10)

influential reference control model for autonomic and self-adaptive systems. The acronym stands for Monitor-Analyze-Plan-Execute over a shared Knowl-edge.

Fig. 2, The MAPE-K cycle functional scheme.[2]

The M (monitor) function collects information from the managed resources through a sensor interface provided by the touchpoints. The information collected by the monitor can be heterogeneous in nature, as it can consist of virtually anything that is relevant for self-management purposes. Typi-cal examples of monitored information includes metrics, configuration prop-erty settings and topology information, in case of network structures. The included data can be changing at different paces. In this context, slowly changing data is said to be static, whereas rapidly changing data is dynamic. The monitor function does not process data; however, it can perform some pre-processing in the likes of data aggregation. Finally, data is then sent to the analyze function as a symptom, ready to be inspected during that stage

(11)

of the cycle. A well-designed monitor function should be able to receive, pre-process and organize the data efficiently. To do so, a good interoperability standard is crucial[6].

The A (analyze) function is responsible for analyzing the available data to determine whether there are changes to make, and subsequently, to deter-mine which. Typically, policies specify certain conditions on the data that act as triggers for pre-determined changes. Whenever the analyze function realizes that a triggering condition is met, it generates a change requests, to be passed to the following stage. In many cases, the analyze function is used to evaluate non-trivial aspects and behaviors of the system, so it can em-ploy advanced mathematical models and forecasting techniques to perform extensive data analysis on the symptoms provided by the monitor function. The analysis may or may not be influenced by stored knowledge data, com-ing from knowledge sources, or from data comcom-ing from past cycles, which requires a stateful manager. At the end of the analysis, the change request is forwarded to the plan function. The change request describes in sufficient detail all the modifications that are deemed necessary or desirable after the analysis.

The P (plan) function chooses a procedure to enact the desired changes in the managed resource, according to the received change request. The plan function can range from a single instruction to a complex workflow. More precisely, the plan function generates a change plan, featuring the desired (possibly ordered) set of changes to be transmitted to the managed resource, and passes it to the execute function.

The E (execute) function is responsible for performing the chosen changes to the system. In fact, once the plan function has generated a change plan and sent it to the execute function, a set of actions is needed to modify the state of one or more managed resources through the touchpoint effector interface. The execute function carries out the procedure specified in the plan function through a series of actions that will have an impact on the system resources. It is possible that part of the execution of the change plan involves the up-date of the knowledge, described next.

Finally, the K (knowledge) is the data used by the touchpoint autonomic managers during all four stages, that is stored as shared knowledge. This is the reason why it is graphically represented in the center of the cycle and the K is separated from the rest of the acronym with a hyphen. The shared knowledge usually includes data of different types, including policies, past

(12)

symptoms, metrics, logs, and topology information. There are three ways for a touchpoint autonomic manager to obtain knowledge, namely:

• The knowledge is passed directly to the autonomic manager. This is typical for policies, i.e. sets of behavioral constraints or preferences that affect the decisions taken by the autonomic manager. Policies might be passed from orchestrating managers, which can be operated by manual managers in turn.

• The knowledge is obtained from an external knowledge source. Exam-ples include practical definitions of symptoms or historical knowledge of managed resources. In this case, the knowledge sources could be definition files or log files.

• The knowledge is created by the autonomic manager itself. Specifically, it could be created by the monitor function, based on the information collected through the touchpoint sensor interface. For instance, knowl-edge may be created based on recent activities by logging the data that the monitor receives from a managed resource. Alternatively, the exe-cute function might create new knowledge by logging the actions that were taken as a result of the analysis and planning. The knowledge created in this way would be entirely contained within the autonomic manager, as represented by the K block in Figure 2. If the knowledge is of any relevance to other touchpoint autonomic managers, then it should be moved or copied into a knowledge source.

2.2

The TMA framework

The ATMOSPHERE project is a two-year worldwide research project on cloud computing involving eleven universities and four tech companies, funded by the European Commission, the Brazilian Ministry of Technology and the RNP academic network. ATMOSPHERE is a long acronym standing for Adaptive, Trustworthy, Manageable, Orchestrated, Secure, Privacy-assuring Hybrid, Ecosystem for Resilient Cloud Computing.

The Trustworthiness Monitoring and Assessment (TMA) framework is one of the products of ATMOSPHERE, and it is currently in development by researchers at the Software and Systems Engineering (SSE) group at the University of Coimbra’s Centre for Information Systems (CISUC). TMA re-volves around the concept of trustworthiness score, a numeric value defining

(13)

an abstract property2, that is used for self-assessment. In fact, the whole framework is a microservice-based platform where each microservice imple-ments one stage of the MAPE-K cycle, and a TMA execution is a virtually endless loop of monitoring properties, obtaining the trustworthiness score, planning changes to improve the score (if needed), and applying the changes.

Fig. 3, The TMA architecture, as specified in the documentation.[7] In more detail, the main components of TMA are:

• The monitor service (TMA-M), which receives measurements of mon-itored properties through the ReceiveData RESTful interface, that is later enqueued in a fault tolerant queue, managed by a different mi-croservice. Additional monitoring may be performed at design time. • The knowledge service (TMA-K), which can store data of many kinds,

including measurements and information about the system architecture and resources.

• The analyze service (TMA-A), which gets data from the monitor queue and the TMA-K, computes the trustworthiness score, and if the score satisfies a particular condition, activates the TMA-P microservice. 2The trustworthiness score does not have a precise definition, as it can be used to

measure several different properties, such as availability, safety and fault resistance, among others.

(14)

• The planning service (TMA-P), which produces an adaptation plan to improve the trustworthiness score and makes it available to the TMA-E through the queues.

• The execute service (TMA-E), which executes the adaptation plan re-ceived through the SubmitPlan interface using the deployed actuators connected to the outer interface.

2.3

Spark

Spark is an open source distributed computing framework introduced by Apache in 2014. Despite being a general purpose framework, it is specifically designed for applications working with large amounts of data, as Spark is able to balance the processing burden among a (possibly) large amount of different machines, organized as a cluster.

Before the release of Spark, Hadoop, a framework also developed by Apache, was one of the most employed solutions for big data processing. Hadoop is essentially an implementation of the widely known MapReduce parallel pro-gramming model. The development of Spark started with the aim of over-coming the limitations in the MapReduce pattern[8], which only supports a particular data flow structure: a strict sequence of load, map, reduce and store operations. Spark introduces several innovations, including algorithms with iterative access to data, database-like queries, and stream processing, the latter being arguably the most important.

Cluster management in Spark is transparently taken care of by Spark itself by default3. However, it can also be delegated to external software, including Apache Mesos, Kubernetes or Hadoop itself (YARN).

Spark-based applications can be written in Java, Scala, Python or R. All four languages have a dedicated API for Spark integration. Once the few lines of code needed for the integration are added, the application is ready to run through Spark with a spark-submit command. The application, some-times called driver, will run on the Spark cluster, with the processing burden being distributed among the cluster nodes in a way that is completely trans-parent to the user.

The entire Spark framework revolves around a fundamental data structure 3This strategy will be used in the experiment setup.

(15)

called RDD (Resilient Distributed Dataset). It is a generic, immutable, dis-tributed collection divided into logical partitions, for them to be possibly computed on multiple different nodes of the cluster. RDDs can contain any custom object defined in either Python, Java, or Scala. RDDs can be cre-ated by transforming an existing collection within the driver application, or by referencing a dataset stored externally[9], e.g. on a distributed file system.

2.3.1 HDFS

As frameworks such as Spark and Hadoop frequently work with large files, which need to be accessed as quickly as possible from multiple different ma-chine, a space- and time-efficient technique for distributed and replicated data storage is essential.

Indeed, other than offering a framework to balance the workload among the cluster nodes, Hadoop also natively provides a distributed storage abstrac-tion called HDFS (Hadoop Distributed File System), which is often used in Spark clusters as well4. In spite of the name, it is not strictly a file sys-tem as it does not comply with POSIX standards[10]; however, it provides file-system-like shell commands and it is a suitable solution for the problem.

(16)

Fig. 4, HDFS nodes on a Hadoop cluster.[11]

The main components of HDFS are the NameNode and the DataNodes, that work in a master and slave fashion. The NameNode manages the entire file system and stores the metadata of the files, along with their replication fac-tor, block division, and block addresses. The NameNode can communicate directly with each DataNode, and is responsible for planning and orchestrat-ing the insertion and deletion of files from the system. Unlike the DataNodes, the NameNode is a single point of failure, and should therefore be set up on the cluster node with the highest availability.

DataNodes signal their active state to the NameNode every few seconds with messages called Heartbeats. When a DataNode is thought to be no longer active, its blocks are replicated on different active nodes. This is why the failure of a single DataNode is never a problem in general. The default replication factor for files on the HDFS is 3, which is rather costly in terms of disk space, but at the same time provides a very good fault resistance that is often crucial in big data processing applications.

(17)

Na-meNode, which is used to periodically take snapshots of the information contained NameNode, that will act a as checkpoints in case of failed NameN-ode. However, to further mitigate the disaster recovery, a newer addition to HDFS called HDFS Federation was created in order to support multiple NameNodes and multiple namespaces[12].

2.3.2 Cluster mode and submissions

Spark-based applications run as separated processes on different nodes of a cluster, being coordinated by the Spark master node based on the informa-tion contained in the SparkContext object defined in the driver applicainforma-tion. The SparkContext is responsible for connecting to a cluster manager, which, as already discussed, can be Spark itself or an external one (e.g. YARN) and is used to allocate resources on the cluster nodes.

Applications are launched through the spark-submit command, and the driver application process, running on the master node, transforms user code into a direct acyclic graphs of RDDs, including static code optimizations. All RDDs on the graph are created on the driver, but are left untouched until the corresponding action is called. The graph is then converted into an actual execution plan, i.e. a partially ordered set of tasks.

At this point, the cluster manager starts allocating resources. Once the manager selects a node to be used for processing, Spark starts an execu-tor on that node, which is a process that is used run the driver application instructions and store the corresponding data. Each executor receives the part of application code that it is going to run. Finally, the manager sends tasks to the executors to run, while keeping a complete overview of where the tasks are being ran. The criteria used to assign tasks to executors include data availability, which is a key aspect when the Spark cluster is using a distributed file system.

Other than tasks, there are two more units of computation used by Spark, which provide interesting monitorable information: stages and jobs. Stages are collections of similar tasks that can be executed in parallel, whereas jobs are ordered sequences of stages.

(18)

Fig. 5, Architecture of a Spark cluster.[13]

2.3.3 Configuration

Almost all configuration of Spark-based applications comes from Spark prop-erties, which are application-specific settings that can be set on a SparkConf object. SparkConf objects are defined in every Spark API and work in the same way in all languages supported by Spark. Properties inside the Spark-Conf can be read and written with standard get/set methods.

There is a downside in setting all properties through the SparkConf: all configuration will be hard-coded into the driver application, despite being not only non-functional information, but also information that is not related with the application itself. Therefore, Spark provides two more ways to set configuration properties.

In fact, configuration properties can be specified at runtime, within the spark-submit command, after leaving an empty SparkConf in the driver applica-tion. Additionally, properties can be specified in the Spark configuration file conf/spark-defaults.conf. All three methods are equally valid; however, in case of conflicts, Spark will give highest priority to properties hard-coded in the SparkConf, second highest priority to runtime-specified properties, and lower priority to properties specified in the configuration file5.

There are over a hundred configurable properties, and all of them are listed in the Spark configuration. After a spark-submit, all properties with their 5In the experiment, all mutable properties will be specified on the configuration file, so

(19)

values are visible in the Spark web UI for the submitted application. The properties that are most interesting to this work are those related to memory management, as they impact performance in a generally hard-to-predict way in all kinds of Spark-based applications[14]. Within this group, the most important include:

• Spark memory fraction: given H = total heap space - 300 MB, the Spark memory fraction is the fraction (expressed as a decimal number between 0 and 1) of H that is dedicated to execution and storage. A lower value results in higher chances of memory spills and replacement of cached data.

• Spark memory storage fraction: the fraction of the Spark memory frac-tion that is never evicted from physical memory. A higher fracfrac-tion leaves a smaller amount of memory available to execution and tasks, which end up being transferred to disk much more often.

• Spark memory off heap enabled: a binary value that allows Spark to use off-heap memory for certain operations

• Spark memory off heap size: if the previous property is set as true, this property represents the amount of off-heap memory (in bytes) that Spark can use at most. Regardless of the amount, off-heap memory is only used when heap memory is full.

• Spark memory use legacy mode: a binary value that allows the use the memory management schemes from older Spark versions (pre-Spark 1.5) where the heap memory is rigidly divided into fixed-size blocks.

2.3.4 Monitoring

Spark provides several ways to monitor applications while they are running. A web user interface is always available on port 4040 and displays informa-tion about the running applicainforma-tion, executors, stages, tasks and jobs, going into various details about memory-related issues and RDD sizes. When a driver application is submitted with the spark.eventLog.enabled property set as true, the final values available on the web UI are stored in a given direc-tory, and are readable through the Spark history server, which provides a similar web UI, available on port 18080.

While web user interfaces are very well suited for human readability, Spark provides a much more interesting feature to allow developers to easily build monitoring applications. In fact, a REST API provides access to nearly all

(20)

the monitorable properties, with guaranteed retrocompatibility of endpoints addresses[16].

(21)

3

The components

The core of this work consists in two Python programs, a monitor and an ex-ecutor, that can be used to integrate the TMA framework with applications running on top of Spark. The two programs are designed in a microser-vice fashion: they are decoupled and communicate with each other through lightweight RESTful interfaces using JSON data. It is worth mentioning that, although monitor and executor are dual components, there is one fun-damental design asymmetry between the two. In fact, while the monitor is periodic, i.e. it fetches data and sends data periodically, the executor is triggered, i.e. it only sends data in the event of data being received.

3.1

Property codes

Before introducing the components, property codes will be described. Prop-erty codes are an arbitrary bijection of monitorable Spark monitorable prop-erties and 3-digit natural numbers, created specifically for this work. The main reason why property codes are needed is integration with the TMA framework. In fact, TMA has a standard message format for measurements where the measurement description is an integer called resourceID. There-fore, information about which Spark property is contained in the TMA mes-sage has to be encoded as an integer.

Rather than using a generic alphabetic-numeric bijection, a much more vi-able solution is that of defining property codes. Property codes were assigned with the intention of making them as semantically meaningful as possible. Particularly, all codes are 3-digit numbers such that:

• The first digit identifies a Spark computation unit/entity to which mea-surements are related (1: Job, 2: Stage, 3: Executor)

• The second digit identifies a subset of such measurements (0: Gener-ic/Misc, 1: Stage-related, 2: Task-related, 3: I/O-related, 4: Memory-related)

• Codes differing only for the first digit identify the same or corresponding measurement in different computation units/entities

The list of codes with their associated monitorable properties is contained in the property dictionary, a configuration file used by the monitor called propdict. Below is a complete list of the implemented monitorable proper-ties and their codes.

(22)

• 100 Job ID • 101 Status • 102 Name

• 103 Submission time • 110 Stage IDs

• 111 Num active stages • 112 Num completed stages • 113 Num skipped stages

• 114 Num failed stages • 120 Num tasks

• 121 Num active tasks • 122 Num completed tasks • 123 Num skipped tasks • 124 Num failed tasks • 125 Num killed tasks • 126 Killed tasks summary Stage-related (2xx) • 200 Stage ID • 201 Status • 202 Name • 203 Submission time • 204 Attempt ID • 220 Num tasks

• 221 Num active tasks • 222 Num completed tasks • 223 Num skipped tasks • 224 Num failed tasks • 225 Num killed tasks

• 226 Killed tasks summary • 230 Input records

• 231 Output records • 232 Input bytes • 233 Output bytes • 234 Shuffle read records • 235 Shuffle write records • 236 Shuffle read bytes • 237 Shuffle write bytes • 240 Memory bytes spilled • 241 Disk bytes spilled Executor-related (3xx) • 300 Executor ID • 301 Status • 303 Add time • 305 Total duration • 306 Total cores • 320 Total tasks

(23)

• 321 Active tasks • 322 Completed tasks • 324 Failed tasks • 327 Max tasks

• 332 Total input bytes • 336 Total shuffle read • 337 Total shuffle write • 342 Memory used

• 343 Disk used

• 344 Used on heap storage mem-ory

• 345 Total on heap storage mem-ory

• 346 Used off heap storage mem-ory

• 347 Total off heap storage mem-ory

• 348 Max memory

3.2

The monitor

The monitor is a Python application that periodically fetches data about running applications from the Spark monitoring API and sends it to the TMA monitoring server. The monitor is easily customizable through con-figuration files properties and propdict, with the latter containing the list of monitorable properties, as presented in the past paragraph, and the former containing specifications on the monitoring policy. More precisely, the properties file is a sequence of configuration lines following the pattern property = value, with the exception of comment lines, which should begin with a #. The properties file has three mandatory property lines that must be present for the monitor to work, namely:

• monitoring.server: the address of the TMA monitoring server • spark.master: the address of the cluster node that is working as Spark

master

• probe.id: the probe ID of the messages sent by the monitor, as spec-ified in the TMA framework

Other than those, a set of optional properties is available to configure the monitor according to the chosen policy, consistently with Spark requirements. Unlike the mandatory lines, if these lines are missing, the monitor will still work with default values. This is a list of values that are available for configu-ration to control the behaviour of the monitor: limit.monitored.applications: a natural number N that will act as a limit on the maximum number of run-ning applications that will be monitored at the same time. If there are

(24)

more, only the N applications that started first will be monitored. If this property is unspecified, there will be no limit. refresh.time: the waiting time between two API calls, expressed in seconds, with the default value being 10. This value should be no less than the value expressed in the spark.history.fs.update.interval Spark configuration value - also 10 seconds by default - which represents how frequently Spark updates the val-ues that are sent through the monitoring API. In fact, setting a smaller refresh.time would result in useless duplicate messages with identical mea-sured values. update.coefficient: a natural number N that indicates how frequently the list of running applications is updated compared to the refresh.time. The monitor will check for running applications every N mon-itoring API calls, that is once in N × ref resh.time seconds.

Finally, there is another optional set of properties that defines how fre-quently each class of Spark-monitorable properties (job-related, stage-related and executor-related) has to be monitored. If value is N , then the monitor will send the related measurements once in N monitoring messages. The default value is never. Therefore, to monitor a certain class of properties, it is necessary to add the corresponding line, that is either read.job.stats, read.stages.stats, or read.executors.stats.

After the propdict and properties files are correctly configured, the mon-itor is ready to start. When the monmon-itor is launched, it starts a sequence of operations before entering the execution loop. The sequence includes reading propdict and properties, starting a message counter, and using the values from properties to define the desired behaviour.

The execution sends a message for each running application, up to a maxi-mum of limit.monitored.applications. Each message is then filled with all the relevant data (1 TMA-formatted message for each monitored property) and then sent to the TMA monitor server. Specifically, the TMA description ID is obtained by concatenation of the property code, the application ID given by Spark, and the Spark ID of the computation entity involved - job ID, stage ID or executor ID.

(25)

Fig. 6, A flow chart depicting the execution flow of the monitor.

3.3

The executor

The executor is a Python application built with Flask that waits for well formatted messages and transforms them into changes in Spark configura-tion as they come. Properly formatted messages received on a fixed address and port via HTTP/POST are processed and transformed into changes in configuration.

(26)

The executor is equipped with an external file containing the list of valid Spark configuration properties. The data contained in the file is generally static, but it might need updates as new versions of Spark are released. If a new version of Spark adds or deletes configurable properties, the executor can be adapted to changes by simply appending or removing lines from the file, which, like the equivalent file in the monitor, is called properties to pre-serve symmetry in naming.

When the executor is launched, first the properties file is loaded, then the Flask app is ran. The app receives data from HTTP/POST requests, ex-amines the content, and if the content is a valid command, the command is transformed into a key/value pair, which is then compared with the list of allowed properties. If there is a match, i.e. if the key is a valid property, then the executor executes the action.

The action consists in changing a Spark configuration parameter through configuration files. If the property is already configured in the file, then the value will be changed to the new value; otherwise the property is appended to the file with the value. Configuring Spark through a configuration file is only one of the three possible ways allowed by Spark. It is possibly the most straightforward way to implement changes in the executor, compared to the other two, considering that:

• Unlike runtime configuration, it is permanent so the last generated value can be used in case the executor is shut down and restarted at a later time

• Unlike in-code configuration, it is universal and does not need to inter-act with driver applications. Frequent changes in in-code configuration are also hardly a viable solution in case the driver application is main-tained by a large group of people under version control

(27)
(28)

4

Experimental validation

A use case of the monitor and executor components will now be presented to demonstrate them in action. The monitor and the executor make up the monitor and the executor functions of a MAPE-K cycle, respectively. In order to have a working demonstration, two more components are needed: a component implementing the analyze and plan functions and a managed resource. The former is another Flask app implementing RESTful interfaces with the monitor and the executor, while the latter is a driver application running on Spark. The application is supposed to run repeatedly, with con-figuration changes happening before the beginning of each execution.

Fig. 8, An architectural overview of the components used in the experiment and their interfaces.

The execution flow of the system is an endless loop including the following steps:

• The monitor obtains real-time data from the driver application running on top of Spark through the Spark monitoring API

• The monitor transforms the data into TMA-formatted messages and sends it to the analyze and plan module via HTTP/POST

(29)

• The analyze and plan module receives the data and checks if the spark-submit has terminated through the Spark history API. If that is the case, the

module generates a change plan according to an algorithm and sends it to the executor via HTTP/POST. If that is not the case, nothing is sent

• The executor waits for a TMA-formatted message on its inbound in-terface. When it comes, the message is translated into an instruction and inserted in a Spark configuration file in order to change the con-figuration

In parallel, whenever a spark-submit is over, Spark updates its history server through internal mechanisms.

4.1

Analyze and plan

The an pl.py module is the missing link between the monitor and the execu-tor. The TMA framework in its final form will provide a set of other compo-nents that can be used by system administrators to define their own analysis and plan, i.e. how the values of the monitored properties are transformed into commands for the executor. However, in order to make the experiments completely independent from the development of the TMA framework, which is still ongoing at the time of this work, a whole new module was developed to provide a working sample of how a full cycle could look like and it is only for demonstration purposes. Particularly, the an pl.py module works cor-rectly only if Spark executes one application at a time, which is the case of the experiment.

Indeed, as Spark has a large number of both monitorable values and config-urable properties, understanding the marginal and joint impact of the latter on the former would require an extensive, large-scale statistical analysis that is beyond the scope of this work. The analysis algorithm in use only uses one specific monitored property to plan changes in one specific configurable property. In order to design it, first of all a sample configuration property was chosen, inspired by the results of a paper by Luna Xu et al.[15]. The paper presents a work that, among other things, successfully finds a relation between the spark.memory.fraction property and performance6 in a range of case tests.

(30)

The spark.memory.fraction property indicates the fraction of the avail-able heap space (minus 300MB) used for execution and storage[17]. Spark does not allow the allocation of separate portions of memory for execution only or storage only; however, it does allow to separate the two from the rest of the memory used during Spark submits, which consists of user-defined data structures, Spark internal metadata, and safeguarding against out-of-memory errors in the case of very large sparse records[18].

Once the monitored property is identified, a set of rules is needed to make sure that the executing commands derived from the monitored values will change the memory fraction in a way that will eventually improve perfor-mance. In its simplest form, the problem consists of deciding whether - after each new set of monitored values - the memory fraction should increase, de-crease or remain unchanged.

To do so, a heuristic a posteriori approach was used: BULMA, the Spark-based application used for this experiment, was launched many times with different values of spark.memory.fraction. Then, the monitored values were compared against the detailed overall performance of the execution during which they originated, provided by the Spark monitoring API. After that, the monitored property that happened to best predict the final performance was used in the decision of how to modify to memory fraction. The resulting property was used.memory, identified by code 342.

This completes the A and P phases of the cycle: the analysis consists of comparing the retrieved value of used.memory with a threshold value, and the total execution time with the previous total execution time. The total execution times are taken from the Spark history server, as they cannot be obtained through real-time monitoring by definition. Therefore, the data stored by the Spark history server is a knowledge source, being updated by Spark itself and used by the an pl.py module.

The analysis algorithm starts with an arbitrary direction d (backward or forward) and step s (a real number between 0 and 1). The chosen values to begin with in the experiment are d = f orward and s = 0.1. However, the algorithm is designed so that any pair of value will eventually converge to the same result.

At each step, the algorithm compares the values of used.memory and total execution time with the previous values. If the two comparisons converge, i.e. the total execution time decreases and the used.memory gets closer to its

(31)

benchmark value, then it is assumed that the value of spark.memory.fraction was moved in the right direction. If that is the case, the new value will be moved by step s in the same direction d. On the other hand, if the compar-isons diverge, then it is assumed that the difference between either or both values might be due to non-determinism, and therefore a much lighter action is taken. Specifically, if the execution time decreased, but the monitored value moved away from the benchmark, then the configurable value is moved in the same direction d, but by only 23s. Conversely, if the execution time increased but the monitored value moved closer to the benchmark, then the configurable value is moved by 23s, but in the opposite direction of d, −d. Lastly, if the configuration change has resulted in a higher execution time and a worse used.memory value, then the memory fraction is reverted to its previous value. In all cases, the value to be sent to the executor must be in the [0, 1] interval. Therefore, in the extremely unlikely event that an action result in a new value outside of the interval, the value will be transformed to 1 or 0, if the resulted value was bigger than 1 or smaller than 0, respectively. However, the last case is unlikely to happen, provided that the algorithm starts with a small enough step. When the analysis is over and the action is decided, the plan function will generate a change plan consisting of a single instruction: spark.memory.fraction new value where the new value is obtained through the aforementioned iterative algorithm.

Fig. 9, A flow control depicting the execution flow of the analyze and plan module, with LET = last execution time, P ET = penultimate execution time, LU M = last used.memory, P U M = penultimate used.memory, M =

benchmark for used.memory

4.2

BULMA

The managed resource is a sample Spark-based application called BULMA (BUs Line MAtching), an EMaaS (Entity Matching as a Service) application developed by EUBra-BigSea. BULMA finds best-match solutions to a map

(32)

matching problem, i.e. identifying the path on a street graph starting from a set of noisy GPS position observations. The application is feeded a set of known route shapes and a csv file with GPS observations of a single day, with all the data coming from the Brazilian city of Curitiba.

Fig. 10, A sample use of BULMA in one of the most difficult situations to identify: bus lines following the same route, but starting and ending at

different points.[19]

A sample use of BULMA in one of the most difficult situations to identify: bus lines following the same route, but starting and ending at different points. In real life applications, BULMA is used with its dual application BUSTE (BUs Stop Ticketing Estimation)[19], which performs time interpolation on GPS data coming from passengers’ bus cards, in order to provide an esti-mation of the number of users boarding at each bus stop at any time. In the experimental validation of this work, BULMA is only used as a sample Spark-based application to show how Spark-based applications, in general, can be integrated with the components. In fact, the output of executions of BULMA in the experiment is discarded after checking that it was correctly generated.

4.3

Experimental setup

The whole experiment was performed on a dedicated setup, built on a new machine, that runs Ubuntu 18 and hosts a network of isolated virtual

(33)

ma-chines using different resources. The hardware details of the setup are shown below.

Physical infrastructure 1 physical machine with:

• RAM: 32 GB • Storage: 230 GB (SSD) + 2 TB (HDD) • CPUs: 8 • OS: Ubuntu 18 Logical infrastructure 1 master VM with: • RAM: 4 GB • Storage: 40 GB (SSD) • CPUs: 2 • OS: Ubuntu 18 3 slave VMs with: • RAM: 8 GB • Storage: 40 GB (SSD) + 300 GB (HDD) • CPUs: 2 • OS: Ubuntu 18

Hadoop and Spark were installed in all four virtual machines, with the master VM acting as Hadoop NameNode and Spark master, respectively. The HDFS is setup on the cluster, in order to store input and output data of the Spark application. It is hosted on the HDD partitions of the nodes. Spark is set to work on standalone mode, with the master managing three slaves.

(34)

4.4

Simulation

Before starting the simulation, the input data is placed on the HDFS, with a replication factor of 2. The simulation starts with the launch of the three modules monitor.py, an pl.py and executor.py. Later, a script is launched to repeatedly execute BULMA with the input data on the HDFS, with a 15-second break between consecutive executions so the configuration changes can take place. The monitor is configured to fetch data every 10 seconds. At the end of each execution, the last monitored value of used.memory is used together with the total execution time to determine the configuration change, which is then applied by the executor by writing on the configuration file before the following execution.

Four different simulations were launched, each with five consecutive execu-tions of BULMA and a different starting value of the configuration variable and direction being forward if the variable was 0.6 or less and backward otherwise. All simulations started with a step of 0.1 or 0.2.

4.5

Results

Simulation 1 Start Direction Step Exec. time (s) Used memory (MB)

Execution 1 0.6 forward 0.1 1028 365.1

Execution 2 0.7 forward 0.1 1045 367.2

Execution 3 0.633 backward 0.066 1019 352.6

Execution 4 0.677 forward 0.044 1078 374.0

Execution 5 0.648 backward 0.027 1023 373.8

The first simulation starts from a memory fraction value of 0.6 and a step of 0.1. In the executions, the values of used.memory fluctuate around the benchmark value in a way that looks independent from the total execution time. Therefore, the step rapidly decreases and the obtained value seems to converge.

Simulation 2 Start Direction Step Exec. time (s) Used memory (MB)

Execution 1 0.4 forward 0.1 1330 277.2

Execution 2 0.5 forward 0.1 1151 311.0

Execution 3 0.6 forward 0.1 999 366.9

Execution 4 0.7 forward 0.1 1060 383.2

(35)

The second simulation starts from a memory fraction value of 0.4, and after two executions the system is the same (start, step) state as in the beginning of the first simulation. However, in the first five executions the execution time and used memory parameters, always move in the same direction, hence the step does not decrease. However, in the longer run the simulation is likely to converge to a similar value to that of the first simulation.

Simulation 3 Start Direction Step Exec. time (s) Used memory (MB)

Execution 1 0.8 backward 0.1 1199 388.7

Execution 2 0.7 backward 0.1 1026 344.0

Execution 3 0.6 backward 0.1 1023 366.5

Execution 4 0.533 backward 0.066 1101 348.2

Execution 5 0.6 forward 0.066 1038 370.0

This simulation starts with a higher starting value to be decreased, and gets rapidly closer to 0.6, like the simulations above.

Simulation 4 Start Direction Step Exec. time (s) Used memory (MB)

Execution 1 0.5 forward 0.2 1123 330.1

Execution 2 0.7 forward 0.2 1044 377.2

Execution 3 0.9 forward 0.2 1331 395.6

Execution 4 0.7 backward 0.2 1038 373.1

Execution 5 0.5 backward 0.2 1057 344.6

This simulation starts with a higher step, and loops around values that are presumably far from the optimal. In the longer run, the value is also likely (but not guaranteed) to converge but only after a longer time. This example shows the importance of choosing a step small enough to begin with.

(36)

5

Conclusions

It is clear that applying autonomic computing principles on big data appli-cations in general can lead to significant performance optimizations. The components presented in this work provide a skeleton for self-adaptiveness in Spark-based applications, on top of which a complete autonomic computing system can be built.

Nonetheless, the effectiveness of the optimization depends mainly on the analyse and plan phase. In fact, the stronger the relations between moni-torable properties and configurable properties are, the more changes made by the executor will predictably affect performance. The processing logic depends on the specific application and should be designed specifically for each use case, be it a single application or a group of related applications. If, ad absurdum, there was a way of dynamically changing configuration prop-erties that worked with every application in the same way, then it should be implemented by the Spark cluster manager itself. Therefore, in order to obtain good results, it is necessary to perform a detailed analysis not only on the specific application, but also in the specific setup, as the results can be significantly different.

Indeed, the cluster setup also has a major impact on the effectiveness of changes on configuration properties. The general reason is that a setup with overabundant resources compared to the driver application needs will re-sult in unused resources regardless of the configuration. Ideally, in a largely overpowered setup, it is likely that any configuration will lead to optimal performance, as even mediocre resource management will not result in time being wasted. On the contrary, dynamically changing configuration proper-ties may be crucial when the setup has limited resources, with a utilization factor of nearly 1.

For the reasons stated above, unlike the monitor and the executor, the an pl.py file cannot be effectively used in different applications and clus-ters as is. However, since the algorithmic idea in use is quite general, the component could possibly be reused by simply recomputing the benchmark value used in the algorithm. The results also show how important it is to start the algorithm with a suitable step. In fact, in simulation 4, the results suggest that the configuration variable might enter an infinite loop, without ever getting closer to the optimal value. In cases where it is generally harder to predict good values for starting point and step, the algorithm could be tweaked by implementing a mechanism to prevent loops, e.g. decreasing the

(37)

step after a maximum number of steps where it has stayed the same.

An inherent limit of this work is that the cycle cannot formally happen in real time. This is due to limitations in Spark configuration, which only allows the redefinition of properties at the beginning of a spark-submit. The execution model for which this work is intended is that of a driver application that is executed repeatedly on different data. For example, data could come from an external source that periodically writes data on a new file. This is the same solution used in practice in the demonstration, and it is a mock of true real time.

As Spark already features a streaming mode, which also allows to scan a directory for new files within the same execution[20], it is perhaps not im-possible that upcoming versions of Spark will feature a real time configuration API, symmetrical to the already existing monitoring API, which would allow a completely symmetrical architecture for the proposed framework, featuring another rest API in place of the writing on file.

(38)

A

Source code

A.1

monitor.py

1 i m p o r t ast 2 i m p o r t sys 3 i m p o r t j s o n 4 i m p o r t t i m e 5 i m p o r t re 6 f r o m d a t e t i m e i m p o r t d a t e t i m e 7 i m p o r t r e q u e s t s 8 f r o m t m a l i b r a r y . p r o b e s i m p o r t * 9 10 # S p a r k c o n s t a n t s ( d e f a u l t r e f r e s h t i m e = 10) 11 R E F R E S H _ T I M E = 100 12 U P D A T E _ C O E F F I C I E N T = 100 13 14 # P r o p e r t i e s f i l e c o n s t a n t s 15 P R O P E R T I E S _ P A T H = ’ p r o p e r t i e s ’ 16 S E P A R A T O R = ’ = ’ 17 C O M M E N T _ L I N E = ’ # ’ 18 19 # P r o p e r t y d i c t i o n a r y p a t h 20 P R O P _ D I C T _ P A T H = ’ p r o p d i c t ’ 21 22 # M a k e a d i c t i o n a r y out of the p r o p e r t i e s f i l e 23 def r e a d _ p r o p e r t i e s ( p p a t h ) : 24 p r o p e r t i e s = {} 25 w i t h o p e n( p p a t h ) as f : 26 for l i n e in f : 27 if (not ( C O M M E N T _ L I N E in l i n e ) ) and ( S E P A R A T O R in l i n e ) : 28 k , v = l i n e . s p l i t ( S E P A R A T O R ,1) 29 p r o p e r t i e s [ k . s t r i p () ] = v . s t r i p () 30 r e t u r n p r o p e r t i e s 31 32 # M a k e t h r e e l i s t s of p r o p e r t y codes , d i v i d e d by area , out of the p r o p e r t y c o d e s f i l e 33 def r e a d _ s p a r k _ p r o p e r t i e s ( p p a t h ) : 34 p r o p e r t i e s = {} 35 j o b _ s t a t s = [] 36 s t a g e s _ s t a t s = [] 37 e x e c u t o r s _ s t a t s = []

(39)

38 w i t h o p e n( p p a t h ) as f : 39 for l i n e in f : 40 if (not ( C O M M E N T _ L I N E in l i n e ) ) and ( S E P A R A T O R in l i n e ) : 41 k , v = l i n e . s p l i t ( S E P A R A T O R ,1) 42 c l e a n _ k = k . s t r i p () 43 p r o p e r t i e s [ c l e a n _ k ] = v . s t r i p () 44 if c l e a n _ k [0] == ’ 1 ’: 45 j o b _ s t a t s . a p p e n d ( c l e a n _ k ) 46 e l i f c l e a n _ k [0] == ’ 2 ’: 47 s t a g e s _ s t a t s . a p p e n d ( c l e a n _ k ) 48 e l i f c l e a n _ k [0] == ’ 3 ’: 49 e x e c u t o r s _ s t a t s . a p p e n d ( c l e a n _ k ) 50 r e t u r n p r o p e r t i e s , j o b _ s t a t s , s t a g e s _ s t a t s , e x e c u t o r s _ s t a t s 51 52 # D e f i n e the l i s t of S p a r k p r o p e r t i e s as a g l o b a l v a r i a b l e 53 s p a r k _ p r o p e r t i e s , j o b _ s t a t s , s t a g e s _ s t a t s , e x e c u t o r s _ s t a t s = r e a d _ s p a r k _ p r o p e r t i e s ( P R O P _ D I C T _ P A T H ) 54

55 # G i v e n a d a t a s o u r c e and a l i s t of codes , put the d a t a r e l a t e d to s u c h in a m e s s a g e

56 def f i l l _ m e s s a g e ( msg , source , codes , e x e c u t o r =0) : 57 i d _ c o d e = s o u r c e [ s p a r k _ p r o p e r t i e s [str( c o d e s [ 0 ] ) ]] 58 # If the d a t a c o m e s f r o m e x e c u t o r s , c h e c k for a s p e c i a l c a s e in IDs 59 if e x e c u t o r == 1: 60 i d _ c o d e = g e t _ e x e c u t o r _ i d ( i d _ c o d e ) 61 for c o d e in c o d e s : 62 dt = D a t a (t y p e=" m e a s u r e m e n t ", d e s c r i p t i o n I d = g e t _ d e s c r i p t i o n _ i d ( code , m e s s a g e . r e s o u r c e I D , i d _ c o d e ) , o b s e r v a t i o n s = N o n e ) 63 val = s o u r c e [ s p a r k _ p r o p e r t i e s [str( c o d e ) ]] 64 # C h e c k if the p r o p e r t y is a s t a t u s ( c o d e s m a t c h i n g xx1 r e p r e s e n t a s t a t u s ) 65 if ( c o d e % 1 0 0 ) == 1: 66 val = s t a t u s _ c o d e ( v a l u e ) 67 obs = O b s e r v a t i o n ( t i m e = t i m e s t a m p , v a l u e = val ) 68 dt . a d d _ o b s e r v a t i o n ( o b s e r v a t i o n = obs ) 69 msg . a d d _ d a t a ( d a t a = dt )

(40)

70 r e t u r n msg 71 72 # C r e a t e a f u n c t i o n d i c t i o n a r y for d a t a m o n i t o r i n g 73 def m a k e _ d i c t ( p r o p e r t i e s ) : 74 d i c t = {} 75 if ’ r e a d . job . s t a t s ’ in p r o p e r t i e s . k e y s () : 76 d i c t[’ r e a d . job . s t a t s ’] = r e a d _ j o b _ s t a t s 77 if ’ r e a d . s t a g e s . s t a t s ’ in p r o p e r t i e s . k e y s () : 78 d i c t[’ r e a d . s t a g e s . s t a t s ’] = r e a d _ s t a g e s _ s t a t s 79 if ’ r e a d . e x e c u t o r s . s t a t s ’ in p r o p e r t i e s . k e y s () : 80 d i c t[’ r e a d . e x e c u t o r s . s t a t s ’] = r e a d _ e x e c u t o r s _ s t a t s 81 if ’ r e a d . s t r e a m i n g . s t a t s ’ in p r o p e r t i e s . k e y s () : 82 d i c t[’ r e a d . s t r e a m i n g . s t a t s ’] = r e a d _ s t r e a m i n g _ s t a t s 83 r e t u r n d i c t 84 85 # C h e c k w h i c h a p p l i c a t i o n s are r u n n i n g 86 def r e a d _ a p p l i c a t i o n s ( m a s t e r ) : 87 r u n n i n g _ a p p l i c a t i o n s = r e q u e s t s . get ( m a s t e r + ’ : 4 0 4 0 / api / v1 / a p p l i c a t i o n s ? s t a t u s = r u n n i n g ’) . j s o n () 88 a p p l i c a t i o n s = [] 89 for i in r a n g e(len( r u n n i n g _ a p p l i c a t i o n s ) ) : 90 a p p l i c a t i o n s . a p p e n d (int( re . sub (r ’ \ D ’,’ ’, r u n n i n g _ a p p l i c a t i o n s [ i ][’ id ’]) ) ) 91 r e t u r n a p p l i c a t i o n s 92 93 # C o m p u t e the TMA - c o m p l i a n t d e s c r i p t i o n ID by a p p e n d i n g the p r o p e r t y code , the a p p l i c a t i o n ID , and the job / s t a g e / e x e c u t o r ID

94 def g e t _ d e s c r i p t i o n _ i d ( code , app , s p e c _ i d ) : 95 r e t u r n int(str( c o d e ) +str( app ) +str( s p e c _ i d ) ) 96

97 # Get job s t a t s f r o m the S p a r k m o n i t o r i n g API

98 def r e a d _ j o b _ s t a t s ( message , f r e q ) : 99 j o b s = r e q u e s t s . get ( a p p _ i d _ u r l + ’ j o b s ? s t a t u s = r u n n i n g ’) . j s o n () 100 for job in j o b s : 101 m e s s a g e = f i l l _ m e s s a g e ( message , job , j o b _ s t a t s ) 102 r e t u r n m e s s a g e 103

104 # Get s t a g e s s t a t s f r o m the S p a r k m o n i t o r i n g API

(41)

106 s t a g e s = r e q u e s t s . get ( a p p _ i d _ u r l + ’ s t a g e s ? s t a t u s = a c t i v e ’) . j s o n () 107 for s t a g e in s t a g e s : 108 m e s s a g e = f i l l _ m e s s a g e ( message , stage , s t a g e s _ s t a t s ) 109 r e t u r n m e s s a g e 110

111 # Get e x e c u t o r s t a t s f r o m the S p a r k m o n i t o r i n g API

112 def r e a d _ e x e c u t o r s _ s t a t s ( message , a p p _ i d _ u r l ) : 113 exs = r e q u e s t s . get ( a p p _ i d _ u r l + ’ e x e c u t o r s ’) . j s o n () 114 for ex in exs : 115 m e s s a g e = f i l l _ m e s s a g e ( message , ex , e x e c u t o r s _ s t a t s , 1) 116 r e t u r n m e s s a g e 117

118 # In the S p a r k m o n i t o r i n g API , g e t t i n g the e x e c u t o r ID of the d r i v e r w i l l r e t u r n the s t r i n g ’ d r i v e r ’ i n s t e a d of a n u m b e r . T h i s f u n c t i o n t r a n s f o r m s i n t o a number , as n e e d e d by the TMA f o r m a t 119 def g e t _ e x e c u t o r _ i d ( e x i d ) : 120 if e x i d == ’ d r i v e r ’: 121 r e t u r n 0 122 e l s e: 123 r e t u r n e x i d 124

125 # The s t a t u s p r o p e r t y in the m o n i t o r i n g API is g i v e n by a string , and d i f f e r e n t s t r i n g r e p r e s e n t a p o s i t i v e v a l u e for jobs , s t a g e s and e x e c u t o r s . T h i s f u n c t i o n t r a n s f o r m s the s t a t u s i n t o 1 or 0 126 def s t a t u s _ c o d e ( s t a t u s ) : 127 if s t a t u s == ’ R U N N I N G ’: 128 r e t u r n 1 129 e l i f s t a t u s == ’ A C T I V E ’: 130 r e t u r n 1 131 e l i f s t a t u s == ’ T r u e ’: 132 r e t u r n 1 133 e l s e: 134 r e t u r n 0 135 # S e n d the m e s s a g e t h r o u g h H T T P / P O S T 136 def s e n d _ m e s s a g e ( url , m e s s a g e _ f o r m a t e d ) : 137 h e a d e r s = {’ content - t y p e ’: ’ a p p l i c a t i o n / j s o n ’} 138 # r e t u r n the r e s p o n s e f r o m P o s t r e q u e s t 139 r e t u r n r e q u e s t s . p o s t ( url , d a t a = m e s s a g e _ f o r m a t e d ,

(42)

h e a d e r s = h e a d e r s ) 140

141 if _ _ n a m e _ _ == ’ _ _ m a i n _ _ ’: 142 # S t a r t the m e s s a g e c o u n t e r

143 m e s s a g e _ c o u n t e r = 0

144 # R e a d all the r e l e v a n t c o n f i g u r a t i o n f r o m the c o n f i g u r a t i o n f i l e s 145 p r o p e r t i e s = r e a d _ p r o p e r t i e s ( P R O P E R T I E S _ P A T H ) 146 m o n i t o r _ d i c t = m a k e _ d i c t ( p r o p e r t i e s ) 147 url = p r o p e r t i e s [’ m o n i t o r i n g . s e r v e r ’] 148 s p a r k _ m a s t e r = p r o p e r t i e s [’ s p a r k . m a s t e r ’] 149 150 c o m m u n i c a t i o n = C o m m u n i c a t i o n ( url ) 151 152 if ’ r e f r e s h . t i m e ’ in p r o p e r t i e s . k e y s () : 153 R E F R E S H _ T I M E = p r o p e r t i e s [’ r e f r e s h . t i m e ’] 154 if ’ u p d a t e . c o e f f i c i e n t ’ in p r o p e r t i e s . k e y s () : 155 U P D A T E _ C O E F F I C I E N T = p r o p e r t i e s [’ u p d a t e . c o e f f i c i e n t ’] 156 157 r u n n i n g _ a p p l i c a t i o n s = r e a d _ a p p l i c a t i o n s ( s p a r k _ m a s t e r ) 158 159 l i m i t = len( r u n n i n g _ a p p l i c a t i o n s ) 160 if ’ l i m i t . m o n i t o r e d . a p p l i c a t i o n s ’ in p r o p e r t i e s . k e y s () : 161 l i m i t = p r o p e r t i e s [’ l i m i t . m o n i t o r e d . a p p l i c a t i o n s ’] 162 163 r u n n i n g _ a p p l i c a t i o n s = r u n n i n g _ a p p l i c a t i o n s [: l i m i t ] 164 165 # S t a r t the e x e c u t i o n l o o p 166 w h i l e 1: 167 for a p p _ i d in r u n n i n g _ a p p l i c a t i o n s : 168 m e s s a g e = M e s s a g e ( p r o b e I d =int( p r o p e r t i e s [’ p r o b e . id ’]) , r e s o u r c e I d = app_id , m e s s a g e I d = m e s s a g e _ c o u n t e r , s e n t T i m e =int( t i m e . t i m e () ) , d a t a = N o n e ) 169 t i m e s t a m p = int( t i m e . t i m e () ) 170 a p p _ i d _ u r l = p r o p e r t i e s [’ s p a r k . m a s t e r ’] + ’ : 4 0 4 0 / api / v1 / a p p l i c a t i o n s / app - ’ + str( a p p _ i d ) [: -4] + ’ - ’ + str( a p p _ i d ) [ -4:] + ’ / ’

(43)

171 for m o n i t o r e d _ p r o p e r t y in m o n i t o r _ d i c t : 172 if m e s s a g e _ c o u n t e r % int( p r o p e r t i e s [ m o n i t o r e d _ p r o p e r t y ]) == 0: 173 m o n i t o r _ d i c t [ m o n i t o r e d _ p r o p e r t y ]( message , a p p _ i d _ u r l ) 174 f o r m a t t e d _ m e s s a g e = j s o n . d u m p s ( m e s s a g e . r e p r J S O N () , cls = C o m p l e x E n c o d e r ) 175 r e s p o n s e = c o m m u n i c a t i o n . s e n d _ m e s s a g e ( f o r m a t t e d _ m e s s a g e ) 176 m e s s a g e _ c o u n t e r += 1 177 t i m e . s l e e p ( R E F R E S H _ T I M E ) 178 # O n c e in U P D A T E _ C O E F F I C I E N T loops , c h e c k a g a i n for r u n n i n g a p p l i c a t i o n s 179 if m e s s a g e _ c o u n t e r % U P D A T E _ C O E F F I C I E N T == 0: 180 r u n n i n g _ a p p l i c a t i o n s = r e a d _ a p p l i c a t i o n s ( s p a r k _ m a s t e r ) 181 r u n n i n g _ a p p l i c a t i o n s = r u n n i n g _ a p p l i c a t i o n s [: l i m i t ]

A.2

executor.py

1 f r o m f l a s k i m p o r t F l a s k 2 f r o m f l a s k i m p o r t r e q u e s t 3 i m p o r t j s o n 4 i m p o r t os 5 f r o m A c t u a t o r P a y l o a d i m p o r t A c t u a t o r P a y l o a d 6 7 # D e f i n e c o n s t a n t s 8 S P A R K _ C O N F I G _ P A T H = ’ / usr / l o c a l / s p a r k / c o n f / spark -d e f a u l t s . c o n f ’ 9 S P A R K _ P R O P E R T I E S _ L I S T = ’ s p a r k _ p r o p e r t i e s ’ 10 P R O P E R T I E S _ P A T H = ’ p r o p e r t i e s ’ 11 S E P A R A T O R = ’ = ’ 12 C O M M E N T _ L I N E = ’ # ’ 13 14 # D e f i n e the F l a s k app 15 e x e c u t o r = F l a s k ( _ _ n a m e _ _ ) 16 17 # M a k e a l i s t of a c c e p t a b l e S p a r k c o n f i g u r a t i o n p r o p e r t i e s out of the S p a r k p r o p e r t i e s f i l e 18 def l o a d _ l i s t () : 19 p r o p _ l i s t = []

(44)

20 w i t h o p e n( S P A R K _ P R O P E R T I E S _ L I S T ) as f : 21 for l i n e in f : 22 p r o p _ l i s t . a p p e n d ( l i n e . r e p l a c e (" \ n "," ") ) 23 r e t u r n p r o p _ l i s t 24 25 p r o p e r t i e s _ l i s t = l o a d _ l i s t () 26 27 # M a k e a d i c t i o n a r y out of the p r o p e r t i e s f i l e 28 def r e a d _ p r o p e r t i e s () : 29 p r o p e r t i e s = {} 30 w i t h o p e n( P R O P E R T I E S _ P A T H ) as f : 31 for l i n e in f : 32 if (not ( C O M M E N T _ L I N E in l i n e ) ) and ( S E P A R A T O R in l i n e ) : 33 k , v = l i n e . s p l i t ( S E P A R A T O R ,1) 34 p r o p e r t i e s [ k . s t r i p () ] = v . s t r i p () 35 r e t u r n p r o p e r t i e s 36 37 # R e a c t to m e s s a g e s r e c e i v e d by H T T P / P O S T by e x e c u t i n g the r e c e i v e d a c t i o n 38 @ e x e c u t o r . r o u t e (’ / act ’, m e t h o d s =[’ P O S T ’]) 39 def p r o c e s s _ m e s s a g e () : 40 i n p u t = r e q u e s t . g e t _ d a t a () 41 p a y l o a d = j s o n . l o a d s (i n p u t) 42 o p e r a t i o n = e x e c u t e a c t i o n ( p a y l o a d [’ a c t i o n ’]) 43 r e t u r n str( o p e r a t i o n ) 44

45 # C h e c k if the a c t i o n is v a l i d and if it is , c a l l the f u n c t i o n to w r i t e the p r o p e r t y d e s c r i b e d by the a c t i o n in the f i l e 46 def e x e c u t e a c t i o n ( a c t i o n ) : 47 key = a c t i o n . s p l i t (" ") [0] 48 v a l u e = a c t i o n . s p l i t (" ") [1] 49 if key in p r o p e r t i e s _ l i s t : 50 r e s u l t = s e t _ s p a r k _ p r o p e r t y ( key , v a l u e ) 51 r e t u r n r e s u l t 52 e l s e: 53 r e t u r n " u n s u p p o r t e d p r o p e r t y " 54 55 # W r i t e the p r o p e r t y in the S p a r k c o n f i g u r a t i n f i l e 56 def s e t _ s p a r k _ p r o p e r t y (p r o p e r t y, v a l u e ) : 57 f = o p e n( S P A R K _ C O N F I G _ P A T H , " r + ") 58 l i n e s = f . r e a d l i n e s ()

(45)

59 m a t c h i n g _ l i n e = [ l for l in l i n e s if p r o p e r t y in l ] 60 n e w _ l i n e = p r o p e r t y + " " + v a l u e + " \ n " 61 if not m a t c h i n g _ l i n e : 62 f = o p e n( S P A R K _ C O N F I G _ P A T H , " a + ") 63 f . w r i t e ( n e w _ l i n e ) 64 e l s e: 65 f = o p e n( S P A R K _ C O N F I G _ P A T H , " w + ") 66 l i n e s [ l i n e s . i n d e x ( m a t c h i n g _ l i n e [ 0 ] ) ] = n e w _ l i n e 67 for l i n e in l i n e s : 68 f . w r i t e ( l i n e ) 69 r e t u r n " p r o p e r t y " + p r o p e r t y + " set to " + v a l u e 70 71 if _ _ n a m e _ _ == ’ _ _ m a i n _ _ ’:

72 # R e a d the r e l e v a n t c o n f i g u r a t i o n and run

73 p r o p e r t i e s = r e a d _ p r o p e r t i e s () 74 if ’ s p a r k . c o n f i g . p a t h ’ in p r o p e r t i e s . k e y s () : 75 S P A R K _ C O N F I G _ P A T H = p r o p e r t i e s [’ s p a r k . c o n f i g . p a t h ’] 76 e x e c u t o r . run ( d e b u g =’ T r u e ’, h o s t =’ 0 . 0 . 0 . 0 ’, p o r t = 5 0 0 2 )

A.3

an pl.py

1 i m p o r t r e q u e s t s 2 i m p o r t t i m e 3 f r o m f l a s k i m p o r t F l a s k 4 i m p o r t j s o n 5 i m p o r t os 6 7 # D e f i n e the F l a s k app 8 a n p l = F l a s k ( _ _ n a m e _ _ ) 9 10 # D e f i n e g e n e r a l c o n s t a n t s 11 a p p _ i d _ u r l = ’ h t t p : / / 1 9 2 . 1 6 8 . 1 2 2 . 1 7 9 : 1 8 0 8 0 / api / v1 / a p p l i c a t i o n s ? s t a t u s = c o m p l e t e d ’ 12 m e s s a g e _ c o u n t e r = 0 13 14 # D e f i n e a l g o r i t h m b e n c h m a r k and s t a r t i n g v a l u e s ( s t e p = abs ( var - p r e v _ v a r ) ) 15 m u _ b e n c h m a r k = 3 7 4 0 0 0 0 0 0 16 l a s t _ e x e c _ t i m e = 1 0 0 0 0 0 0 17 p e n _ e x e c _ t i m e = 1 0 0 0 0 0 0

Riferimenti

Documenti correlati

Through optimized Machine Learning models and continually measuring system features, the framework predicts the remaining time to the occurrence of some unexpected event

 The DataFrameReader class (the same we used for reading a json file and store it in a DataFrame) provides other methods to read many standard (textual) formats and read data

 The takeOrdered(num, key) action returns a local python list of objects containing the num smallest elements of the considered RDD sorted by. considering a user specified

▪ The broadcast variable is stored in the main memory of the driver in a local variable.  And it is sent to all worker nodes that use it in one or more

 The input pairs are grouped in partitions based on the integer value returned by a function applied on the key of each input pair.  This operation can be useful to improve

There- fore an important development of the present work would be represented by the implementation of the developed algorithms on GPU-base hardware, which would allow the

The stimulation of the human skin can take place in different ways: principally by pressure, vibration or heat.. The information can be in the form of images, Braille script and

It can be seen that patients in phase ON tended to under- estimate the time interval that had to be measured, with respect to controls; the same patients, however,