POLITECNICO DI MILANO
Dipartimento di Elettronica, Informazione e
Bioingegneria
AN APPROACH TO SCHEMA
MANAGEMENT FOR DATA
MIGRATIONS ACROSS
HETEROGENEOUS NOSQL
DATABASES
Advisor: Prof. Elisabetta Di Nitto
Co-advisor: Dott. Marco Scavuzzo
Master thesis by:
Fabio Dellea Matr. 817045
Ringraziamenti
Vorrei ringraziare sentitamente la professoressa Di Nitto e il dottor Scavuzzo per la pazienza, la professionalità e l’attenzione con cui mi hanno accompagnato lungo questo percorso.
Vorrei inoltre ringraziare Cinzia, i miei genitori e tutti gli amici per aver, in diverse forme, condiviso con me quest’esperienza.
Abstract
NoSQL is a very heterogeneous set of databases: the main features they share are the abandonment of the relational model and the ability to deal with high-throughput read/write operations on very large datasets. Moreover, NoSQL databases manage schemas in lot of different ways. A schema is the representation of the database structure, i.e. a description of how data are organized into the database. Some NoSQL databases do not store the schema, leaving schema management to the applications. This characteristic may create some issues, for instance, during a data migration - the process of transferring data from a source storage to a destination storage - between heterogeneous NoSQL databases.
This work aims to find a solution to overcome this issue in order to allow Hegira4Cloud, a tool for performing data migrations across heterogeneous NoSQL databases, to perform a complete, correct and efficient migration using databases that does not store schema.
The solution proposed consists in using a centralized repository for schemas called Schema Registry.
The work can be summarized in three steps.
In the first place an API interface is developed in order to store and retrieve data from the Schema Registry.
Thereafter the developed library is integrated with Hegira4Cloud, in order to allow Hegira4Cloud to perform a correct data migration using a database that does not store schema, e.g. Apache HBase, as source data-store.
The last step consists in the analysis and validation of the results obtained from the tests made on Hegira4Cloud after the integration with the Schema Registry.
In conclusion, the analysis of the results shows that the initial goal is achieved: after the integration, through the developed API, with the Schema Registry, Hegira4Cloud is able to perform a correct, complete and efficient data migration using HBase as source database.
VI
Sintesi
Il termine NoSQL aggrega un insieme estremamente variegato di database le cui principali caratteristiche sono l’abbandono del modello relazionale e la capacità di garantire un elevato throughput di operazioni su basi di dati molto grandi. Inoltre, i database NoSQL hanno molti tipi diversi di gestione dello schema, ovvero della rappresentazione della struttura del database stesso.
Alcuni database NoSQL non memorizzano lo schema, lasciando la gestione dello stesso alle applicazioni. Questa caratteristica può però creare alcuni problemi durante la migrazione dei dati – il processo di trasferimento dei dati da uno storage sorgente a uno destinazione – tra database NoSQL.
Questo lavoro si propone di trovare una soluzione per superare questo problema, al fine di permettere a Hegira4Cloud, un tool che effettua migrazioni di dati tra database NoSQL eterogenei, di portare a termine una migrazione corretta, completa ed efficiente usando database che non memorizzano lo schema.
La soluzione proposta consiste nell’utilizzare un repository centralizzato, chiamato Schema Registry, per la memorizzazione degli schema.
Il lavoro può essere diviso in tre fasi.
La prima fase consiste nello sviluppo di una libreria che permetta di interfacciarsi con lo Schema Registry per salvare e leggere i dati.
La seconda fase consiste nell’integrazione in Hegira4Cloud della libreria sviluppata, in modo di permettere ad Hegira4Cloud di effettuare una corretta migrazione di dati usando come sorgente un database che non memorizza lo schema, Apache HBase. La terza fase consiste nell’analisi e nella validazione dei risultati ottenuti nei test effettuati su Hegira4Cloud dopo l’integrazione con lo Schema Registry.
L’analisi dei risultati mostra che l’obiettivo iniziale si può considerare raggiunto: dopo l’integrazione con lo Schema Registry, effettuata per mezzo della libreria sviluppata, Hegira4Cloud è in grado, usando HBase come database sorgente, di effettuare una migrazione di dati corretta, completa ed efficiente.
Contents
AN APPROACH TO SCHEMA MANAGEMENT FOR DATA MIGRATIONS ACROSS HETEROGENEOUS NOSQL DATABASES ... I RINGRAZIAMENTI ... I ABSTRACT ... V SINTESI ... VI CONTENTS ... VII LIST OF FIGURES ... IX LIST OF TABLES ... X CHAPTER 1 INTRODUCTION ... 1 1.1PROBLEM DESCRIPTION ... 1 1.2OBJECTIVES ... 2 1.3THESIS STRUCTURE ... 2
CHAPTER 2 STATE OF THE ART ... 5
2.1NOSQL DATABASES ... 5
2.2APACHE HBASE ... 7
2.3HEGIRA4CLOUD ... 9
2.4SCHEMA REGISTRY ... 12
CHAPTER 3 DESIGN OF THE SOFTWARE SOLUTION ... 14
3.1SCHEMA REGISTRY API REQUIREMENTS ... 14
VIII
3.2.1 Context viewpoint ... 15
3.2.2 Composition viewpoint ... 17
3.2.3 Information viewpoint ... 17
3.2.4 Logical viewpoint ... 20
3.2.5 Patterns use viewpoint ... 22
3.2.6 Interface viewpoint... 22
3.2.7 Interaction viewpoint ... 25
3.3INTEGRATION WITH HEGIRA4CLOUD ... 26
3.3.1 Basic integration ... 27
3.3.2 First step optimization ... 28
3.3.3 Second step optimization ... 28
CHAPTER 4 RESULT ANALYSIS ... 30
4.1INITIAL LIBRARY EVALUATION ... 30
4.2FIRST OPTIMIZATION STEP ... 35
4.3SECOND OPTIMIZATION STEP ... 36
4.4FINAL RESULTS ... 37
4.5OVERALL ANALYSIS ... 40
CHAPTER 5 CONCLUSION ... 43
BIBLIOGRAPHY AND LINKS ... 45
APPENDIX A DATA LOADING TOOL ... 47
A.1CODE ... 47
APPENDIX B INSTALLATION AND INTEGRATION ... 50
B.1LIBRARY INSTALLATION ... 50
B.2LIBRARY INTEGRATION ... 51
B.2.1 Integration with an application ... 51
List of Figures
Figure 2.1 HBase data model (source: www.netwoven.com) 8
Figure 2.2 Hegira4Cloud architecture (source: Experiences and Challenges in Building a Data Intensive System for Data Migration – Scavuzzo,
Di Nitto, Ardagna) 11
Figure 2.3 Schema Registry architecture (source: docs.confluent.io) 12
Figure 3.1 Schema Registry library - UML use case diagram 17
Figure 3.2 Schema Registry library - UML class diagram 21
X
List of Tables
Table 4.1 YCSB tests with 0,549 MB dataset 33
Table 4.2 Library integration evaluation results without optimization 33 Table 4.3 Library integration evaluation results after first optimization step 35 Table 4.4 Library integration evaluation results after second optimization step
36 Table 4.5 Comparison between results obtained with and without cache on
16.2 MB database 37
Table 4.6 . YCSB tests with 64MB workload 38
Table 4.7 Performance evaluation with 64MB dataset, 10 TWTs and variable
cache size 38
Table 4.8 Performance evaluation with 64MB dataset, 10000 rows cache and
variable TWTs 39
Table 4.9 YCSB tests with 64MB workload on reverse databases 39 Table 4.10 Performance of a migration from Cassandra to HBase on a 64MB
CHAPTER 1
INTRODUCTION
1.1 Problem description
NoSQL databases are the leading technology in big data world. The huge amount of data continuously produced by all the devices connected to the internet needs to be processed and managed with a very high throughput and NoSQL databases are the solution to that problem.
In order to fulfill the requirements established by the applications, NoSQL databases have relaxed some constraints posed by the classical RDBMS.
One of the restrictions that no longer holds is the need for data to be strictly compliant with a schema previously defined.
Moreover, NoSQL databases have a more flexible policy about schema management than RDBMS: for example, in many cases schema management is in charge of the applications that use the database while in other cases it is up to the database itself. This peculiarity allows the applications to better respond to changes in the schema and be more flexible, but of course it has some drawbacks.
For instance, if a schema should be managed from different applications, it could be possible to have consistency issues deriving from potential different updates made to the schema itself.
Moreover, new applications that want to use the database need to know the schema in order to clearly understand data.
1
Introduction
2
It comes out that is critical to save schemas somewhere and keep them updated, in order to be able to retrieve them when needed.
For instance, defining a data migration as the process of transferring data from a source storage to a destination storage, a centralized repository for schemas will ensure reliable and efficient data migrations: that way, independently from which data-store needs to be migrated, the migration tool will be able to correctly handle data and transfer them.
1.2 Objectives
The goal of this work is to allow Hegira4Cloud, a tool for performing data migrations across heterogeneous NoSQL databases, to perform a complete, correct and efficient migration using databases that does not store schema, e.g. Apache HBase, as source data-store.
To achieve this result, it is necessary to find solutions to store schemas and obtain schema information when needed, in order to enhance the migration process of NoSQL databases in terms of extensibility and flexibility.
These solutions will then be integrated with Hegira4Cloud, allowing it to correctly migrate data from databases that leave schema management to applications.
After that process, an evaluation of the results will be done, in order to ensure the correctness and the proficiency of the solution.
1.3 Thesis structure
Chapter 1 provides a short explanation of the problem that is going to be faced in the work and presents the objectives of the thesis.
Chapter 2 contains an overview of the underlying technologies that will be used during the work: NoSQL databases, in particular Apache HBase, Hegira4Cloud and Confluent Schema Registry.
Chapter 3 provides a description of the design of the software solution found to resolve the problem, focusing on the requirements, the architecture design and the integration with Hegira4Cloud.
1.3 Thesis structure
Chapter 4 contains the description and analysis of the results obtained during the validation phase, focusing both on the intermediate integration results obtained during the iterative integration optimization process and on the final results.
Chapter 5 contains the conclusions of the thesis: an overall evaluation of the work and a comparison between the premises and the results.
CHAPTER 2
STATE OF THE ART
This chapter is an overview of the context in which the work takes place and of the various technologies that are going to be play a foreground role during the development of the solution.
It starts with a general introduction to NoSQL databases in Section 1.1, followed by a focus on Apache HBase in Section 1.2. The overview ends with the description of Hegira4Cloud, the data migration tool, in Section 1.3 and of the Schema Registry, the schema management system, in Section 1.4.
2.1 NoSQL databases
During these last few years, the data world is drastically changing. Data volume is increasing exponentially, data are becoming more sparse and heterogeneous and the use of data is changing too.
These facts led some years ago to a different approach towards classical RDBMSs, because both the relational model, on which they are based, and their centralized architecture have come across their unsuitableness in dealing with new necessities. At the same time the big revolution of the cloud services emerged, changing system architectures, ways of processing data, ways of integrating different applications and, definitely, attitudes towards Big Data.
2
State of the art
6
Following these consideration, since 2009 a NoSQL movement has come out, proposing many alternatives to the old RDBMSs.
NoSQL is a very heterogeneous set of data-stores: as suggested by the name, the main feature shared between each other is the lack of a common querying language, like SQL was for RDBMSs and, therefore, of the traditional relational model.
It is easy understandable that this loose definition encloses a broad family of databases, but the abandonment of the ER model is not the only common characteristic shared between them.
In fact, they all fulfill the following requirements:
high throughput in working with huge amount of data
ability to deal with heterogeneous data structures avoiding the computationally expensive Object-Relational mapping
easy horizontal scalability high availability of the data
According to Brewer’s CAP Theorem [1], a distributed system cannot provide at the same time consistency, availability and partition tolerance.
For that reason, the NoSQL movement, following their application needs, decided to sacrifice the consistency to have all the above stated requirements and countered the ACID properties with the BASE [2] approach:
Basically available Soft-state
Eventual consistency
As said, NoSQL is a various world, without a single standard and with many products with different features. Anyway, it possible to recognize, based on the data model used, four big families of databases that share many characteristics: key-values databases, column-family based databases, document databases and graph databases.
Key-values databases are based on a very simple data model: they can be considered a big hash-table containing a set of tuples key – value. The key is a unique identifier and the value can be structured or unstructured.
This approach allows for efficient lookups but, on the other side, it could be difficult to build complex data structures on the top of a key-value system and it may be inefficient if one is interested in retrieving only a part of the value.
2.2 Apache HBase
Column-family based databases follow a model that may, at a first sight, feel like the relational one: data, in fact, are represented as tuples and tuples values are divided into columns. However, it is only a shallow similarity: these databases, for instance, do not allow join operations, which are the base of the ER model, and their tuples may have different schemas.
Despite this last assertion, these databases are not completely schema-less: in fact, columns are grouped in column-families, which needs to be defined into the schema of each table. Column-families group column containing cohesive information: for instance, the column-family “personal-data” may be composed by columns “name”, “surname” and “age”.
Moreover, column-family based databases can physically store data both by rows, using the tuple abstraction, and by column, using the column-family abstraction. Therefore, column-family based databases can handle both OLAP workloads, more efficient on column based storage, and OLTP, more efficient on row based storage. Document databases are collections that contain semi-structured documents, which allows to create hierarchies. Unlike key-values databases, they permit to query not only the keys but also structured values. This is possible because their data are represented in suitable format like JSON or xml.
They are designed to be highly efficient in querying complex data structures on large databases.
At last, graph databases are database system in which the representation of the data is graph based: this means that they model data and their relations as nodes and edges. They are designed to manage highly connected data and they permit to efficiently query complex paths and relations between data.
2.2 Apache HBase
HBase is a NoSQL database, belonging to the family of column-family based databases.
It is developed as part of the Apache Hadoop project. It is designed to support high data-update rates, to manage big data-set and to scale out horizontally in distributed clusters. This last feature enables it to support very large tables, for instance with billions of rows.
2
State of the art
8
According to the project official website [3], the main features of this data-store are: Linear and modular scalability
Possibility to provide strong data consistency on reads and writes on a single row
Automatic and configurable sharding of tables Block cache for real-time queries
Moreover, since it uses master nodes to manage cluster members, it provides an automatic failover support to guarantee high availability.
In HBase, data are stored in tables, which have rows and columns. Moreover, columns can be grouped in multiple column families. A tuple {row, column} is called cell and is the atomic container in HBase. This structure is graphically shown in figure 2.1.
This description may suggest an analogy with classical RDBMSs, but it is not effectively correct.
Beside the overlapping terminology, there are lot of differences between them. The most interesting characteristic of HBase, for our purposes, is that data are stored into cells as un-interpreted bytes. This, joint with the fact that schema metadata are not saved into the database, oblige the applications working with HBase to keep the schema themselves to correctly interpret the data retrieved.
2.3 Hegira4Cloud
2.3 Hegira4Cloud
Hegira4Cloud (Hegira in short) is a tool which performs data migrations between heterogeneous NoSQL databases. Currently, it supports only column-family based NoSQL databases.
Hegira can perform both offline and online data migration. A data migration is said offline if, while the data migration is in process, no manipulation operations are allowed on the database; online otherwise.
This work focuses only on offline data migrations.
Nowadays Hegira supports four different databases, allowing the user to create and transfer a snapshot of one of these into another: Apache Cassandra, Apache HBase, Google Datastore and Azure Table. A snapshot is a static view of the database at a defined time point, i.e. when the data migration is issued.
The system is designed to work in a fast and reliable way with huge amount of data, in the order of hundreds of Giga bytes.
The above design constraints are not trivial: in fact, we also have to take into account that NoSQL databases does not have a shared standard for data storing and management.
To face this complexity and ensure data consistency across all databases, Hegira uses a meta-model to preserve databases characteristics properties. [4]
This meta-model takes into account all the features of the supported databases and is able to store the data without losing any information like data type, secondary indexes and data partitioning information. It can be considered a container of all the possible information needed for a correct and complete migration. It is fundamental to create a homogeneous layer between all the supported databases and, moreover, it allows interested developers to easily add support for new databases into Hegira, in order to do so, it would suffice to create a transformer which performs translation of data from / to the meta-model.
Despite all these considerations, sometimes the meta-model is not able to ensure a correct data migration: in fact, databases that do not store the schema cannot correctly fill the meta-model and then the meta-model would not have all the information needed to allow Hegira to perform a correct data migration.
2
State of the art
10
To overcome this issue an external schema manager would be needed. Such Schema Manager should store updated information about the database schema and should feed the meta-model when necessary.
In the following it is briefly explained how Hegira performs offline data migrations. A Source Reading Component (SRC), which may consist of one or multiple Source Reading Threads (SRT), reads, using the specific connector, from the source database and performs, using a component called transformer, the translation of the data into the intermediate meta-model.
Then, the SRC component puts the serialized data into a RabbitMQ queue.
The meta-model contains the actual data serialized into an array of bytes, plus some additional data like the value type and information about secondary indexes and data partitioning. Additional details can be found in “Marco Scavuzzo, Damian A. Tamburri, and Elisabetta Di Nitto. Providing big data applications with fault-tolerant data migration across heterogeneous NoSQL databases”.
As stated before, the meta-model, in order to allow Hegira to work properly, needs to be correctly filled with all that information, but this is not always possible. In fact, some databases may not save the schema, so Hegira would not be able to retrieve information about data values or indexes.
Currently Hegira can only extract from these databases data serialiazed as raw bytes, setting explicitly the data value in the meta-model to an array of bytes for every data. In the meanwhile, a Target Writing Component (TWC), which may consist of one or multiple Target Writing Threads (TWTs), listens on the RabbitMQ queue, waiting for messages from the SRC.
When it finds a message in the queue it pops it and performs, through the proper transformer component, the inverse translation, de-serializing the array of bytes contained into the meta-model on the base of the value type and converting the result value into the source database data model. Then it writes data on the destination database(s).
The system uses REST API services, which allow the user to easily perform two possible tasks: offline migration and online partitioned migration.
This architecture, shown in figure 2.2, has been designed after various attempts: in fact, the first version of the system was based on a single component, which was in charge of the whole reading-serialization-deserialization-writing process.
2.3 Hegira4Cloud
Figure 2.2 Hegira4Cloud architecture (source: Experiences and Challenges in Building a Data Intensive System for Data Migration – Scavuzzo, Di Nitto, Ardagna)
This kind of architecture was not suitable to fast handle big amount of data and, after various tests, it was discovered that the bottleneck was in the writing process: such consideration led the enhancement of the architecture decoupling the reading and the writing process into two different components.
This new architecture also allowed the TWC to exploit the power of parallelization: in fact, it was designed in order to be able to work with multiple threads, retrieving data from the queue.
As stated before, the NoSQL world is composed by a heterogeneous crowd of databases with different policies about schema management. These different features could be challenging and they can deeply influence the data migration system design. In fact, the system must take them into account and deal with them in a proper way.
For instance, as already said, some NoSQL databases, like Apache HBase, do not store any information about the schema. Is the application that retrieves them that has to be aware of the context and properly de-serialize them.
During a data migration, this behavior leads to some limitations because the target database would receive data in a serialized form, leaving again the applications that will use it in charge of correctly interpreting the data.
2
State of the art
12
As already said a solution to this problem can consist in supporting the migration tool with a schema management system. This schema management system should always contain correct and updated information about the schema of the database to be migrated and it should be able to supply information about data types and secondary indexes.
The schema management system chosen for the integration with Hegira is the Schema Registry [5], a tool that is part of the Confluent Platform.
2.4 Schema Registry
The Schema Registry is a distributed storage layer for schemas written according to the Apache AVRO specifications. It uses Apache Kafka as storage mechanism and has a useful API interface to store and retrieve data.
The Schema Registry is designed to be distributed, with a single-master architecture, and uses Apache Zookeeper to enforce this policy.
Figure 2.3 shows the architecture of this system.
2.4 Schema Registry
The operating principle of the Schema Registry is simple but effective: it permits to save schema under different subjects and then update and retrieve them.
It assigns a globally unique id to each stored schema: this id may then be used to retrieve the related schema.
The Schema Registry only performs a semantic check of the schema against the AVRO specification and does not apply any other logic on the data.
It is easy to understand how powerful the usage of this tool could be if, for example, coupled with Hegira4Cloud. In fact, assumed that the Schema Registry would always be kept update, Hegira would always be able to retrieve all the schema-related information it needs from the Schema Registry.
That way Hegira would be able to fill its meta-model with the real value type for each data and to properly de-serialize the data from the meta-model, saving it into the target database in the correct form and with proper metadata about the value type.
3 Design of the software solution 14
CHAPTER 3
DESIGN OF THE
SOFTWARE SOLUTION
This chapter will describe in depth the design and the functionalities of the software solution developed to achieve the goals of this work: the integration of the Schema Registry with Hegira4Cloud.
The work solution mainly in two parts: the development of a library to interact with the Schema Registry and the integration of this library with Hegira.
The library source code is available at https://bitbucket.org/hegira/schemaregistry-library.
The first part of the chapter focuses on the library developed using Java API for the Schema Registry, while the second part focuses on the integration of this library with Hegira and the optimization of its HBase adapter.
3.1 Schema Registry API requirements
The definition of the requirements was the first step of the library design.
The functional requirements have been identified starting from the needs Hegira had.
In fact, this library has a clear purpose: it has to expose the information saved into the Schema Registry to Hegira, in order to allow Hegira itself to perform complete and correct data migrations using databases that do not store schemas.
3.2 Schema Registry API design
The data these databases fail to provide are the value type and information about the schema. This information is needed from the TWC to properly de-serialize data contained into the meta-model.
Moreover, we decided to store into the Schema Registry the information about the serializer used to serialize and de-serialize the data correctly.
Another requirement for the library is to have an easy way to store the schema into the Schema Registry.
Starting from these considerations, the following functional requirements were defined:
The library has to provide the value type of a given data
The library has to provide information about secondary indexes for a given data
The library has to provide the serializer used to serialize and needed to correctly de-serialize a given data
The library has to provide an interface to store schema into the Schema Registry
3.2 Schema Registry API design
This paragraph describes the design choices made during the development of the Java API library for the Schema Registry.
According to the IEEE Standard 1016 – 2009 [6] the description of the design and architectural style of the developed solution can be divided into several viewpoints, each one focusing on a specific concern. The viewpoints taken into account are: contextual viewpoint, composition viewpoint, information viewpoint, logical viewpoint, patterns use viewpoint, interface viewpoint and interaction viewpoint.
3.2.1 Context viewpoint
The purpose of this viewpoint is to define the services offered from the design subject, to identify its actors and to establish the system boundaries.
3
Design of the software solution
16
The context taken into account for this design view is the migration of a snapshot of a database. This view is bound only to the library and not to its integration in the Hegira system, since that one is not relevant for this section. Integration with Hegira is discussed in Section 3.3.
The services provided by the developed library are defined starting from the functional requirements:
1) Value type retrieving for a given data;
2) Determine whether some data is indexed or not;
3) Retrieving of the serializer typically used to encode data stored in the column;
4) Storage of a schema either from a JSON or from a Java object.
The first is the basic service needed for the purpose of this work. It will give to Hegira the possibility to discover the type of a value, according to the schema previously saved into the Schema Registry.
The second service is essential too, in order to guarantee a correct migration of the database: in fact, not all NoSQL databases support and use secondary indexes. Information about indexed columns should be stored in the Schema Registry and, thus, should be exposed to Hegira, which will use it to create a correct copy of the source database.
The third service allows Hegira to retrieve information about the serializer used to encode the data stored in the data-store.
This service could seem out of scope, since the information about the serializer is not needed from the Hegira meta-model to perform a correct data migration. Actually, also that one is a core functionality of the library, because for example data inserted in HBase are raw bytes. Therefore, before inserting them into the meta-model, it’s necessary to know which serializer was used to insert them into the database in order to de-serialize them properly.
While other services are developed to be used by Hegira, the last service is addressed to the external applications in charge of the schema management.
In fact, this service allows to be saved into the Schema Registry a new schema or if it is possible to update an already stored one, using either a JSON file, containing a schema in AVRO syntax, or a Java object as input.
3.2 Schema Registry API design
Figure 3.1 Schema Registry library - UML use case diagram
3.2.2 Composition viewpoint
This viewpoint describes how the design subject is structured and where the various functionalities are allocated.
The developed library contains three packages: it.polimi.hegira.exceptions, it.polimi.hegira.registry and it.polimi.hegira.utils.
As suggested by the name, the first one is designated to contain the various exception classes defined in the library.
The second one is the core package of the library. It contains a class with the various functions described into the use case diagram above and a class that represents the data model stored into the Schema Registry.
The last package contains the utility classes, like the ones used to dialog with the file system.
3.2.3 Information viewpoint
The information viewpoint describes data structure, data content, data management strategies and data access schemes of the developed library.
The first step of the library design was the choice of a suitable data format for the schema. This choice was based on the fact that the Schema Registry adopts the AVRO syntax for saving schemas and checking their correctness.
3
Design of the software solution
18
AVRO is a data serialization protocol, and it is an Apache Foundation supported project.
In the following are reported, as stated in the official AVRO documentation [7], the specifications that a generic schema must follow to be compliant to the protocol: “[…] A schema is represented in JSON by one of:
A JSON string, naming a defined type.
A JSON object, of the form {"type": "typeName" ...attributes...} where typeName is either a primitive or derived type name, as defined below. Attributes not defined in this document are permitted as metadata, but must not affect the format of serialized data.
A JSON array, representing a union of types.
Primitive Types
null: no value
boolean: a binary value int: 32-bit signed integer long: 64-bit signed integer
float: single precision (32-bit) IEEE 754 floating-point number double: double precision (64-bit) IEEE 754 floating-point number bytes: sequence of 8-bit unsigned bytes
string: unicode character sequence ”
To fulfill the requirements of the work domain - the representation of a database - it is necessary to further limit the expressive power of this formalism, otherwise it could be possible to store in the Schema Registry schemas that could not be a real database.
Omitting the different terminologies used by the various databases, a data-store can be thought as a set of tables. Each table contains many tuples and each tuple is composed by various fields. For instance, a table called “personal_data” may contain two tuples, t1 {“name”: “Mario”, “surname”: ”Bianchi”} and t2 {“name”: “Luigi”, “age”: “22”}. In that case, the fields are “name”, “surname” for tuple t1 and “name”, “age” for tuple t2. It is possible to notice that the field “name” is present in both t1 and t2: the content of the two fields is different, but the kind of information and its value type are the same. Starting from this consideration it is possible to introduce the concept of column: a column groups fields containing the same kind of
3.2 Schema Registry API design
information and having the same value across different tuples. In the previous example there are three columns: “name”, “surname” and “age”.
Therefore, we decided that each schema would have been associated to a JSON object containing an array of tables. Moreover, each table contains an array of columns. Each column contains information about the data value type, the serializer used to encode the data and, if they exist, the secondary indexes. According to the AVRO specifications and to the Schema Registry documentation, other optional metadata can be stored inside the JSON object.
For instance, a hypothetical database containing two tables table_1 and table_2, which are identical to the one the described in the previous example and whose data were serialized with the defaultSerializer, according to the stated format, would have the following schema:
{
"type": "record",
"doc": "Example of schema", "name": "schema_example", "fields": [{ "name": "table_1", "type": { "type": "array", "items": { "name": "columns", "type": "record", "fields": [{ "name": "name", "type": "string", "serializer": "defaultSerializer" }, { "name": "surname", "type": "string", "serializer": "defaultSerializer" }, { "name": "age", "type": "int", "serializer": "defaultSerializer" }] } } }, { "name": "table_2", "type": { "type": "array", "items": {
3 Design of the software solution 20 "name": "columns", "type": "record", "fields": [{ "name": "name", "type": "string", "serializer": "defaultSerializer" }, { "name": "surname", "type": "string", "serializer": "defaultSerializer" }, { "name": "age", "type": "int", "serializer": "defaultSerializer" }] } } }] }
According to the Schema Registry architecture described in paragraph 2.4, the data is stored inside the Schema Registry, which uses Apache Kafka as storage engine. Schema data can be accessed by means of the web services exposed by the Schema Registry through Zookeeper.
The Schema Registry exposes REST APIs that are totally suitable for the needs of the work.
This kind of approach for accessing data is really powerful because it allows the Schema Registry to be detached from the migration system being, for example, on a different machine without any kind of problem.
As previously described, the data management is currently restricted to the insertion of new schemas and the update of old ones. Both new schemas and updated ones will be check against the AVRO sintax: if the Schema Registry receives a schema not compliant with the requirements it will reject it.
3.2.4 Logical viewpoint
The purpose of this viewpoint is to elaborate all the data types required to develop the library and their implementation in classes and interfaces with their static relationships.
3.2 Schema Registry API design
3
Design of the software solution
22
Starting from the data model described in Section 3.2.3, the library for the interaction with the Schema Registry was devised.
The first step was the translation of such data model into Java objects.
Then it was decided to enclose all the main functions inside a single class. Finally, the core class was divided from exceptions and utils.
An exhaustive description of this model can be find in the UML class diagram in figure 3.2.
3.2.5 Patterns use viewpoint
This viewpoint describes the patterns and the architectural style used in the project. The main architectural decision is the use of a singleton pattern to develop the connector with the Schema Registry.
This decision was taken because Hegira will ask, for each field of each row of each table of the database, the library for the needed information.
It was immediately clear that it was not possible to make a call to the web services for each data item present in the database: such approach would have raised dramatically the execution time of the tool.
In fact, the drawback of using web services to manage data stored into the Schema Registry is the response time of the API: of course and HTTP message is slower than a local one.
We therefore decided, under the assumption that the schema of the database will not change during a data migration, to make the connector with the Schema Registry a singleton, which calls the web services when instantiated and saves in a local variable the schema retrieved. That way the component will not make an HTTP call every time it is called, but only computations over a local variable, which are faster.
3.2.6 Interface viewpoint
The interface viewpoint describes how to properly use the services provided by the library, i.e., listing the services interfaces and briefly explaining their mode of operation.
3.2 Schema Registry API design
public String getCellsType( String tableKey, String columnKey )
o String tableKey – name of the table stored in the Schema Registry o String columnKey – name of the column stored in the Schema
Registry
o returned String – type of values contained into the required column This method returns the value type of data contained in the column columnKey of the table tableKey.
public String getCellsSerializer( String tableKey, String columnKey ) o String tableKey – name of the table stored in the Schema Registry o String columnKey – name of the column stored in the Schema
Registry
o returned String – name of the serializer used to convert data for the required column
Considering that, as previously explained, some databases store data serialized as array of bytes, this method provides the name of the serializer used to serialize data contained in the column columnKey of the table tableKey.
public boolean isIndexable( String tableKey, String columnKey )
o String tableKey – name of the table stored in the Schema Registry o String columnKey – name of the column stored in the Schema
Registry
o returned boolean – true if the required column is indexed, false otherwise
This method returns true if data contained in the column columnKey of the table tableKey are part of a secondary index, false otherwise
public List<TableIndex> getIndexes( String tableKey )
o String tableKey – name of the table stored in the Schema Registry o returned List<TableIndex> – list containing all the indexes for the
required table
This method returns a list of TableIndex objects. The list contains a Java representation of all the secondary indexes for the table tableKey.
The interfaces of the utils exposed to the Hegira system is the following: public static String mapToJavaType( String schemaRegistryType )
3
Design of the software solution
24
o String schemaRegistryType – name of the value type into the Schema Registry
o returned String – name of the correspondent value type in Java This method returns the name of the Java type corresponding to schemaRegistryType
public static Object convertToJavaType( byte[] bytesValue, String schemaRegistryType, String serializer )
o byte[] bytesValue – data to convert serialized in a raw byte array o String schemaRegistryType – name of the value type into the Schema
Registry
o String serializer – name of the serializer used to convert byteValue o returned Object – bytesValue in its de-serialized form
This method de-serializes bytesValue using the serializer given as input and returns the resulting object
The interfaces of the services exposed to the external applications are the following: public String setSchemaFromObject( Schema object ) throws
InvalidInputException
o Schema object – object containing the schema to be stored
o returned String – response from the Schema Registry web service: an object containing the schema if the operation succeeds, an object containing an error code and a message otherwise
This method allows the application to save the Schema object, Java representation of a schema, into the Schema Registry. It returns the response obtained by the Schema Registry after the operation. It throws InvalidInputException if the Schema object is not compliant with the specifications stated in paragraph 3.2.3.
public String setSchemaFromJson( String JSONSchema ) throws InvalidInputException
o String JSONSchema – JSON object containing the schema to be stored
o returned String – response from the Schema Registry web service: an object containing the schema if the operation succeeds, an object containing an error code and a message otherwise
3.2 Schema Registry API design
This method allows the application to save a JSONSchema, JSON representation of a schema, into the Schema Registry. It returns the response obtained by the Schema Registry after the operation. It throws InvalidInputException if the JSONSchema string is not compliant with the specifications stated in paragraph 3.2.3.
3.2.7 Interaction viewpoint
The interaction viewpoint describes the dynamic interactions between the components of the system.
The scenario considered for this analysis is the migration of the snapshot of a database towards another database, using the features provided by the Schema Registry.
The UML sequence diagram in figure 3.3 shows how the three components involved in this operation – Hegira4Cloud, the library and the Schema Registry – interact and the messages they exchange.
When an external agent – namely a user – starts the migration, Hegira performs some operations and then, in the SRC, it instantiates the connector to the Schema Registry.
The connector sends an HTTP request to the Schema Registry and receives a JSON object containing the requested schema.
Then, for each cell of the database that has to be migrated, Hegira asks the connector for its value type, the serializer previously used to encode the data, and if the column of the requested cell is indexed or not.
The interaction between Hegira and the library ends here.
Finally, each cell uses all this information to correctly fill the Hegira meta-model and a message, containing the meta-model, is sent to the RabbitMQ queue, as described in paragraph 2.3. The TWC keeps following its usual flow without being affected by the library.
3
Design of the software solution
26
Figure 3.3 Schema Registry library – UML sequence diagram
3.3 Integration with Hegira4Cloud
This paragraph describes the integration of the API library used by Hegira for interacting with the Schema Registry. The source database chosen for the integration of the library is HBase.
This paragraph describes both the basic integration and the following optimization made to the Hegira HBase connector in order to achieve better results.
3.3 Integration with Hegira4Cloud
3.3.1 Basic integration
As a first step, I tried to integrate the library modifying the existent component as little as possible.
A basic HBase connector was already developed in Hegira, but it was not able to correctly recognize and de-serialize data; in fact, it could only transfer data as byte arrays, leaving them serialized into the destination database.
This was due to the fact that HBase itself stores all the data as byte arrays, leaving the applications the task to interpret them.
Therefore, at the beginning, the integration consisted in interpreting the data, which in turn consists in:
1. de-serializing the data exploiting the information about the serializer stored in the Schema Registry
2. inserting their data types and secondary indexes information into the Hegira HBase-model
3. serializing them again with the Java DefaultSerializer, in order to have a data that can be insert into the meta-model, which only accepts data encoded with this specific serializer
4. inserting them, with all the additional information, into the Hegira meta-model
In this phase, the most significant changes were made into the HBase class of it.polimi.hegira.adapters.hbase package and into the HBaseTransformer of it.polimi.hegira.transoformers package.
In the HBase a class variable, containing an instance of the Schema Registry library, was added. This instance of the library was then used inside the getCells method, which gets data cell by cell from HBase, to retrieve information about the serializer, secondary indexes and the value type.
This information was stored inside a HBaseCell, together with the serialized value and other data needed to identify the cell in the database.
Then, the method fromMyModel of the HBaseTransformer class has been modified in order to de-serialize (using the convertToJavaType utility of the developed library) that data, to the serialize it again with the DefaultSerializer and store it into the Hegira meta-model.
3
Design of the software solution
28
Hegira uses the Java default serializer to de-serialize data contained in the meta-model: this process is necessary in order to allow the receiver component to correctly interpret the data,
If the TWC receives a data serialized with a serializer different from the defaulSerializer, it will not be able to understand it and it will not insert it correctly into the destination database.
3.3.2 First step optimization
The basic integration depicted in the previous subsection was highly inefficient as it will be shown in Section 4.1.
In fact, he existing Hegira connector to HBase was designed to retrieve data cell by cell from the database: this approach was very expensive in term of execution time, because connections to the database are not lightweight operations.
The idea behind this optimization is the reduction of the database connections made to retrieve the data. To do so, the approach already used, for example, by the Hegira Cassandra connector was copied; it consists in retrieving data by rows instead that by cells.
The HBase Java API natively provides a suitable method for this work (getRow), therefore it was not too difficult to achieve the optimization described above.
For this enhancement only the getCells method of HBase class wasmodified.
The class signature was changed, because the columnKey parameter was no longer needed. Then the indication of the column to retrieve was removed, leaving the HBase API free to pick-up all the row.
Finally, the columnKey could be retrieved from the returned cells themselves, in order to allow the method to return the same information returned before the modifications.
3.3.3 Second step optimization
The optimization described above was rather satisfying in terms of throughput, but the number of connections to the database was still high.
3.3 Integration with Hegira4Cloud
Therefore, it was decided to further optimize the Hegira HBase connector, inserting a cache layer in order to reduce the connections to database.
The basic idea was to ask the database for a bulk of rows, store them in memory, and, then, send them one by one to RabbitMQ.
That approach would have reduced the number of connection to the database and, therefore, the execution time.
It was decided to parametrize the number of rows cached in order to allow the users to choose cache size that would best fit their needs.
More in detail, within the HBase class a new subclass called TableCache was created; this subclass is responsible for caching the retrieved data as a list of rows. That way a bulk of rows would be available in RAM memory for fast operations, avoiding to call the database to retrieve each single data item.
Also the method loadNextChunkOfModelIfNeeded was modified in order to allow Hegira to get data from the cache.
That method is also responsible for storing data into the cache. In fact, it calls the method getTableCache when a new table is going to be migrated or when the data previously cached have already been consumed.
4
Result analysis
30
CHAPTER 4
RESULT ANALYSIS
This chapter contains the results of the validation tests made on the software solution discussed in Chapter 3 as well as an analysis of these results.
Such results are organized in four sections: Section 4.1 shows the basic results obtained from the library validation, Section 4.2 shows the results obtained after the first optimization step, Section 4.3 shows the result obtained after the second optimization step, Section 4.4 shows the final test results and Section 4.5 contains an overall analysis of the results.
4.1 Initial library evaluation
The first step of the evaluation process consisted in testing the Schema Registry library together with Hegira.
The obvious minimum requirement to meet at the beginning was the correctness of the library: i.e., Hegira, integrated with the Schema Registry library, was required to perform a complete and correct migration using HBase as source database.
The generic process followed during the evaluation tests was: 1) Dataset preparation;
2) Schema generation and insertion into the Schema Registry; 3) Evaluation of the optimal database performance;
4) Evaluation of the Hegira performance;
4.1 Initial library evaluation
The destination database used during the whole evaluation process was Cassandra [8].
Cassandra was chosen in order to make all the test run locally, thus reducing the possible noise introduced by the communication delay through internet.
The local machine used for the tests had 4 GB of RAM and a 8-core CPU at 3.2 GHz. In order to reduce the possibility of dirty results caused by a possible overloading of the machine (either of the memory or of the processor), during the tests sessions, the only processes running have always been the ones required by the operative system and the ones required by the migration system.
As starting point, a test database was first stored in HBase. Then, a schema, compliant with the test database data, was saved into the Schema Registry. The schema, written according to the specifications given in paragraph 3.2.3, was the following:
{
"type": "record",
"doc": "Test database used for performance evaluation", "name": "SCHEMA_EXAMPLE", "fields": [{ "name": "test_table_1", "indexes": [], "type": { "type": "array", "items": { "name": "entries_table_1", "type": "record", "fields": [{ "name": "name", "type": "string", "serializer": "HBaseShellSerializer" }, { "name": "surname", "type": "string", "serializer": "HBaseShellSerializer" }] } } }, { "name": "test_table_2", "indexes": [], "type": { "type": "array", "items": { "name": "entries_table_2",
4 Result analysis 32 "type": "record", "fields": [{ "name": "string", "type": "string", "serializer": "DefaultSerializer" }, { "name": "double", "type": "double", "serializer": "DefaultSerializer" }, { "name": "integer", "type": "int", "serializer": "DefaultSerializer" }, { "name": "long", "type": "long", "serializer": "DefaultSerializer" }, { "name": "boolean", "type": "boolean", "serializer": "DefaultSerializer" }, { "name": "float", "type": "float", "serializer": "DefaultSerializer" }] } } }] }
The first table, test_table_1, was populated with only 3 rows with the purpose of testing the ability of the library to recognize different tables under a schema. These data were encoded with the default HBase shell serializer.
The second table, test_table_2, was populated with 999 rows. Each row was composed by 6 columns, grouped in a single column-family called “data”, containing different data types. Data was encoded in this case with the Java DefaultSerializer. With such a schema, the following features offered by the library could be tested: 1) the support for the six basic data values offered by AVRO specification, 2) the ability to retrieve the schema of different tables and 3) the multi-serializer support.
According to the HBase statistics, the size of the whole test dataset was of 0,549 MB and the size of each row was 576 bytes.
Before starting the data migration, the reading performance of the HBase database, with 1 thread running, and the writing performance of the Cassandra database, with
4.1 Initial library evaluation
1 thread and 10 threads running, were tested. For HBase the test consisted in reading all the rows of the dataset, in order to simulate the scan of the whole source database made by Hegira, while, for Cassandra, the tests consisted in writing all the dataset performing insert operation, in order to simulate Hegire insert operation in the target database.
The dataset used for these tests had exactly the same characteristics of the one created in HBase for the migration test.
To test HBase and Cassandra performance YCSB was used. YCSB is a framework developed to evaluate the performance of different “key-value” databases (both on-premises or offered as a service).
After measuring the performance with YCSB, essential to discover the overhead introduced by Hegira into the migration process, the effective migration of the dataset started.
Two different tests were made, both using the “switchover” function of Hegira, which performs an “offline” migration of the snapshot of a source database into a destination one, with 1 and 10 TWTs respectively. The results are shown in table 4.1. The resolution of the measures is 1 second.
Table 4.1 YCSB tests with 0,549 MB dataset YCSB HBase - READ YCSB Cassandra 1 thread - INSERT YCSB Cassandra 10 threads - INSERT Total time [s] 0.8 0.9 0.3 Throughput [MB/s] 0.686 0.61 1.83 Entity throughput [entity/s] 7500 6667 20000
Table 4.2 Library integration evaluation results without optimization HEGIRA 1 TWT HEGIRA 10 TWTs SRC time [s] Total time [s] 183 183 182 182 Throughput [MB/s] 0.003 0.003 Entity throughput [entity/s] 33 33
The results of the whole evaluation are showed in table 4.2. Because of the loosely significance of this test, each test was run only once.
4
Result analysis
34
The migration system worked well, as it migrated correctly the data contained in HBase and it was able to correctly recognize both the data types and the serializers used.
In order to compare Hegira and YCSB results, we consider an ideal migration system working like Hegira: there is a source component which reads data from the source database and puts them in a queue. There are no costs for inserting and retrieving data from the queue are. In the meanwhile, a target component, working in parallel, gets data from the queue and inserts them into the target database.
In this situation, considering that, from an overall perspective, source and target components are working in parallel but, at the granularity of the single data, the read-push-pop-insert operations need to be made serially, the optimum time of the system is max(sourceComponentTime, targetComponentTime).
The results of YCSB HBase reading tests and Cassandra wrinting tests will be used as sourceComponentTime and targetComponentTime respectively.
It is necessary to take into account that the result obtained with this procedure will be biased toward better performance. In fact, in addition to read from the source database, real SRC also performs n serializations (n is the number of the columns of each tuple) to convert data into the meta-model and m serializations to store the data into the queue. At the same time, real TWC performs m de-serializations to interpret data from the queue and n de-serializations to convert data from the meta-model.
By analyzing the tests performed we see that, while for Hegira migration test with 1 TWT the reference measure is the throughput obtained with YCSB Cassandra test with 1 thread, for Hegira migration test with 10 TWTs the reference measure is the throughput obtained with YCSB HBase test: in fact, the slower reading performance of HBase is a bottleneck for the writing one with 10 threads of Cassandra.
It was immediately clear that the system was highly inefficient: a raw comparison between the results obtained in the tests performed with YCBS and the ones obtained running Hegira showed a huge throughput difference.
In fact, the Hegira test with 1 TWT had a throughput around 200 times slower than ideal one and the Hegira test with 10 TWTs had a throughput around 227 times slower than ideal one.
A first analysis of the code showed a probable bottleneck in the system: the HBase component within Hegira was querying, for each row of each table, the database to
4.2 First optimization step
know the columns composing the row and then to retrieve the data cell by cell. Therefore, it was decided to try to remove that bottleneck.
4.2 First optimization step
In order to eliminate most of the calls to HBase, the logic of the HBase adapter was changed: instead of retrieving data cell by cell, the code was modified to retrieve them row by row, as described in paragraph 3.3.2.
After this enhancement the system was tested again with the same configurations described in the previous paragraph, obtaining the results showed in table 4.3. The only difference with the previous tests was the number of runs made for each test of the Hegira data migration. In fact, the results in table 4.3 reports the average of 3 different runs. The standard deviation was not reported because in our case these values were not significant: in fact, the resolution of the measure was high with respect to the time needed for executing each run.
Table 4.3 Library integration evaluation results after first optimization step
HEGIRA 1 TWT HEGIRA 10 TWTs SRC time [s] Total time [s] 8.66 8.66 9 9 Throughput [MB/s] 0.061 0.061 Entity throughput [entity/s] 693 667
The speedup, compared to the first results obtained, was outstanding: around 20x. Looking at the results, it is possible to see that the system was not able to exploit multi-threaded computation: in fact, the ideal results obtained by YCSB, show that a run with 10 TWTs should be significantly faster than one with a single TWT. Moreover, according to the results obtained using other databases shown in “Scavuzzo, Di Nitto, Ardagna - Experiences and Challenges in Building a Data Intensive System for Data Migration”, increasing the number of TWTs improves performance.
4
Result analysis
36
In fact, although loosely significant since they were on a small database, these results showed a reduction of the throughput with 10 TWTs.
Moreover, the SRC and the overall performance correspond, suggesting that SRC can be further optimized.
Therefore, the counter-intuitive results showed in table 4.3 might suggest that, again, Hegira has a bottleneck inside the source adapter.
4.3 Second optimization step
In this section we discuss a new optimization of the HBase adapter, which consisted in inserting a cache layer inside the HBase connector as depicted in section 3.3.3. We further identified a possible bottleneck into the connections to the source database (HBase): in fact, even after the last changes described in section 4.2, the connector was still retrieving data row by row; we thought to reduce the number of call by querying the database for batches of rows, thus storing them in a cache system so that we could, later, process them one by one.
Hopefully, this choice would have raised the spatial complexity of the algorithm, but also slowed down the execution time.
If we set the cache size to a value equal to that of the whole database, by using the same configuration of section 4.2, we obtain the results shown in table 4.4.
Table 4.4 Library integration evaluation results after second optimization step
HEGIRA 1 TWT HEGIRA 10 TWTs SRC time [s] Total time [s] 6.33 8 7.66 6 Throughput [MB/s] 0.068 0.071 Entity throughput [entity/s] 750 783
These results show a little speedup in comparison with ones showed in table 4.3, but, considering the resolution of the measures, it is not possible to consider them significant: in order to have meaningful results, it is necessary to increase the size of the test database.
4.4 Final results
Therefore, we created a database of 176994 entities, 16.2 MB, to test the throughput of a data migration with and without the new cache layer.
In table 4.5 it is possible to see the comparison between the results obtained without the cache and the ones obtained with the cache whose size is set to a value equal to that of the whole database, while the number of TWTs is fixed to 10.
Table 4.5 Comparison between results obtained with and without cache on 16.2 MB database
No cache With cache SRC time [s] Total time [s] 120.33 120.33 100 102 Throughput [MB/s] 0.135 0.159 Entity throughput [entity/s] 1471 1735
These results confirm the trend suggested by the tests made on the smaller database: introducing a cache layer in the HBase connector provides a speedup.
4.4 Final results
The results obtained with the second optimization, showed in table 4.5, were rather satisfying and therefore the system was deeply tested with this configuration.
First of all, the number of rows cached in memory by the source HBase adapter was parametrized.
Then, the size of the test database to be migrated was increased, in order to reduce the Hegira “setup cost” in the throughput analysis, thus making results more precise.
A database of 64,8 MB, 707952 entities, was created with the same schema described in section 4.1. The schema was saved into the Schema Registry.
Then, on an identical dataset, the performances of HBase and Cassandra databases were tested following the methodology described in section 4.1: the only difference with the previous tests was the number of runs made for each test of the YCSB workload. In fact, the results in table 4.6 reports an average of 3 runs.
Different tests were performed on Hegira system, varying the two available parameters; i.e., the number of concurrent TWTs and the number of row cached by the SRC.