• Non ci sono risultati.

Questo progetto è stato implementato e testato inizialmente in locale, su una macchina virtuale con la distribuzione Cloudera. Tuttavia, per poter effettuare un confronto più accurato e veritiero, si è potuto utilizzare un cluster formato da 8 istanze AWS:

• 1 del tipo m3.2xlarge, con le seguenti caratteristiche: – 8 virtual CPU

– 2 x 80 GB di SSD per lo storage

• 7 del tipo d2.2xlarge, con le seguenti caratteristiche: – 8 virtual CPU

– 60 GiB di memoria

– 6 x 2000 GB per lo storage

Su queste macchine è stato installato un cluster, con la distribuzione Cloudera alla versione 5.4.10, utilizzando il nodo più performante (m3.2xlarge) come nodo master, in quanto utilizza uno storage su SSD, e 4 dell’altra tipologia come nodi worker. inoltre, sono stati installati tutti i servizi necessari al corretto funzionamento dei flussi e Java8, poichè gli script di spark implementati utilizzano alcune delle nuove feature di java come le lambda expression.

Sugli altri tre nodi rimanenti, gli edge node, sono stati installati i cluster dei due database NoSQL utilizzati. Per quanto riguarda cassandra, è stata utilizzata la distribuzione di Datastax alla versione 21 che include Apache Cassandra 2.1.18. Questo è stato opportunamente configurato affinchè i 3 nodi su cui è installato facciano parte dello stesso cluster, e che questo possa essere interrogato dagli altri nodi del cluster. In parallelo, su queste 3 istanze è stato installato un cluster con MongoDB con la versione 3.2, in modalità Replica Set; ciò significa che c’è un nodo master e 2 nodi slave in cui i dati vengono replicati.

Implementazione del Sistema

In questo capitolo verranno descritti tutti i flussi di ingestion dei dati implementati. Prima di passare alla descrizione puntuale dei singoli flussi, ci sono una serie di sezioni in cui vengono descritte le sorgenti dei dati, il data model utilizzato e la logica di ingestion dei dati.

4.1

Input Data: Producer Kafka and Hdfs

Il primo step per poter iniziare ad implementare la logica dei flussi batch e streaming per l’inserimento dei dati su Cassandra e MongoDB è stato quello di ottenere un sample di dati e andare a popolare le due fonti di input: hdfs e kafka.

La prima fonte ad essere stata popolata è HDFS. Previa autorizzazione,sono stati estratti alcuni dei file contenenti lo storico dei dati dal cluster di produzione e copiati copiati in opportune cartelle hdfs all’interno dell’ambiente di test.

Successivamente è stato possibile popolare le code kafka utilizzando i file appena copiati; per andare ad inserire i dati dentro queste ultime, è stato implementato uno script Java con Spark.

Lo script legge come input un file di configurazione in cui sono esplicitate tutti i parametri di configurazione di spark, Kafka, Zookeper, i nomi dei topic da creare e i path dei file da caricare. Come primo step, lo script crea l’oggetto JavaSparkContext con la configurazione di spark presente nel file di configurazione. Successivamente va a creare, se non già presenti, i topic relativi a misure, cause e stati.

Z k C l i e n t z k C l i e n t = new Z k C l i e n t ( zkConnect , 900000 , 900000 , Z K S t r i n g S e r i a l i z e r $ .MODULE$) ;

i f ( ! AdminUtils . t o p i c E x i s t s ( z k C l i e n t , TOPIC) )

AdminUtils . c r e a t e T o p i c ( z k C l i e n t , TOPIC , 1 , 1 , new P r o p e r t i e s ( ) ) ;

Una volta creati i topic, per ogni tipologia di dato va ad effettuare il caricamento dei mes- saggi contenuti nei rispettivi file. Per caricare i messaggi, recupera dal file di configurazione, il path della cartella in cui sono contenuti i file da caricare. Una volta recuperata questa informazione, recupera la lista dei file all’interno di quella cartella e per ogni file controlla se questo è già stato caricato nella coda o no. Per fare questo controllo, va a controllare se il nome di quest’ultimo è già stato inserito all’interno di un altro file, in cui sono salvati i nomi dei file già caricati, uno per ogni riga. Se il file non è stato inserito si procede altrimenti si passa al file successivo.

Come già detto nel paragrafo relativo all’architettura del sistema, ogni riga del file da caricare corrisponde ad un messaggio da inviare sulla coda, nel formato JSON definito dalle specifiche. Per cui, il file viene letto nel seguente modo:

JavaRDD<S t r i n g > fileRDD= j s c . t e x t F i l e (< f i l e p a t h >)

in modo da creare un RDD di Stringhe che corrispondono alle righe del messaggio. Così facendo, è possibile caricare in parallelo ogni messaggio sulla coda e velocizzare le operazioni di inserimento. Questo è fatto attraverso una operazione di map sul fileRDD in cui:

KafkaProducer kp=new KafkaProducer <>(producerProps ) ; kp . send ( new ProducerRecord <>(TOPIC ," egp ", s t r ) ) ; kp . c l o s e ( ) ;

viene creato un Producer per Kafka, con le proprietà definite dal file di configurazione, e inviata la stringa della riga sul topic associato.

Una volta finiti di inviare tutti i messaggi di un file, ne viene salvato il nome sul file i cui sono tracciati i file già caricati, e si procede col file successivo.

Questa operazione complessivamente non richiede troppo tempo, poichè dopo una serie di prove sul cluster, per caricare un centinaio di file con una media di 500-600 Mb l’uno, ci impiega solo circa 40-50 minuti, per cui in media 0,4-0,5 minuti a file.

Una volta che tutte le sorgenti dati sono state riempite, si è proceduto con l’implementazione e il testing della prima parte del flusso: quella relativa all’ingestion dei dati in Cassandra e MongoDB, prima in modalità batch poi quella streaming.

4.2

Data Model

Prima di implementare i flussi, è necessario definire i datamodel da utilizzare. Dato che il progetto serve per migliorare un progetto già esistente, si è utilizzato il datamodel esistente, applicando qualche modifica alle tabelle secondarie, quelle che sicuramente non erano utilizzate da altri progetti realizzati da terzi.

Per quanto riguarda Cassandra, il data model è costituito da un Keyspace con fattore di replica 2 distribuito su 3 nodi. Le tabelle principali sono quelle associate ai flussi di dati da gestire:

• misure: tabella che maniene i le informaizoni relative alle misure ed è partizionata per device_id e ora;

• stati: tabella che contiene le informazioni sugli stati dei device ed è partizionata per device_id;

• cause: tabella che contiene le informazioni sulle cause di inefficienze dei device ed è partizionata per device_id;

In aggiunta, ci sono delle tabelle secondarie, (partition_flag_measures, partition_flag_status e partition_flag_causes) che tengono traccia delle partizioni aggiornate in fasi di inserimento dei dati nel database. Questo per evitare di effettuare il dump della tabella intera su Hive ogni volta; così facendo si può effettuare il dump della singola partizione, solo per quelle che sono state aggiornate dall’ultima esecuzione. Per “Aggiornamento delle partizioni” si intende il fatto di aver inserito nuovi dati dentro le tabelle principali.

Queste tabelle hanno solamente 3 campi: la chiave di partizione della tabella principale, un campo read_time e uno update_time. Ogni volta che viene aggiornata la partizione viene aggiornato il campo update_time con il timestamp corrente, mentre il campo read_time viene aggiornato una volta che la partizione viene scritta su hive; per identificare quale delle partizioni sono da scrivere /aggiornare su hive, si scelgono quelle per cui update_time > read_time.

Infine, a queste tabelle se ne aggiungono altre 3: hour_deviceid: tiene traccia della coppia ora e device_id device_id: tiene traccia della lista dei device associandoli all’impianto di appartenenza

• file_flag: tiene traccia dei file, salvati su hdfs, i cui dati sono stati già inseriti nel database

• offset_kafka: tiene traccia dell’ultimo messaggio letto, processato e inserito per ogni topic della coda e consumatore.

Questo data model è stato utilizzato sia per Cassandra, sia per MongoDB. In quest’ultimo, sono stati creati un database replicato su 3 nodi, e le collezioni corrispondenti alle tabelle elencate sopra; ad record delle tabelle su Cassandra corrisponde ad un documento nelle collezioni di MongoDB. Questa struttura non è l’ideale per un database documentale, poichè ci saranno tanto Document piccoli, che teoricamente sarebbe meglio raggruppare in Docu- ment più grandi. Tuttavia non è stato possibile farlo sia per mantenere i due data model molto simili, sia perchè la grandezza e la frequenza temporale dei vari record è troppo variabile; ad esempio, i sensori possono inviare n misurazioni al secondo, come n misurazioni in un’ora. Infine, il data model su MongoDB è stato arricchito di con una serie di indici per ogni collezione, che servono, come vedremo meglio nei prossimi capitoli, per migliorare in maniera significativa le performance delle operazioni di lettura, update e delete. Gli indici inseriti per le varie collezioni sono:

• misure

– un indice contenente tutti i campi della primary key della tabella delle misure su cassandra; questo serve per le operazioni di update in upsert in fase di inserimento – un indice su device_id e hour; questo serve per velocizzare la ricerca dei docu-

menti associati ad una singola partizione • stati

– un indice contenente tutti i campi della primary key della tabella delle misure su cassandra; questo serve per le operazioni di update in upsert in fase di inserimento – un indice su device_id sap_code e status_id; questo serve per velocizzare la

ricerca dei documenti da cancellare in fase di caricamento dei dati

– un indice su device_id per velocizzare le operazioni di ricerca dei dati relativi ad una singola partizione

• cause

– un indice contenente tutti i campi della primary key della tabella delle misure su cassandra; questo serve per le operazioni di update in upsert in fase di inserimento – un indice su device_id sap_code e cause_id; questo serve per velocizzare la

ricerca dei documenti da cancellare in fase di caricamento dei dati

– un indice su device_id per velocizzare le operazioni di ricerca dei dati relativi ad una singola partizione

Analogamente sono stati messi anche alcuni indici sulle collezioni che tengon traccia delle liste delle partizioni da scrivere e/o aggiornare su hive.

Per quanto riguarda il database e le tabelle scritte su Hive, sono state provate varie soluzioni che verranno descritte meglio nelle prossime sezioni. Le uniche tabelle di cui è necessario salvare su Hive, sono le 3 principali, ovvero misure, cause e stati. La struttura di tali tabelle è molto simile a quella utilizzata nelle su Cassandra; ciò che cambia in maniera significativa, solo per la tabella delle misure, è il partizionamento che su hive è solamente per ora.

Documenti correlati