L’ultima serie di flussi implementati sono quelli che portano i dati dai database NoSQL utilizzati su Hive. Questo perchè, come accennato nel capitolo precedente, c’è la necessità di salvare i dati anche su HDFS e su tabelle Hive dato che gli strumenti di visualizzazione utilizzati dai Data Scientist richiedono Hive o HDFS come fonte di input.
L’algoritmo implementato è fondamentalmente simile per tutte i flussi e l’idea è quella di effettuare il dump delle tabelle/collezioni principali dei database su Hive in modalità batch; questa attività verrà poi schedulata in maniera opportuna, cioè in base alla frequenza di , per mantenere aggiornate le tabelle Hive. Anche questi script sono stati implementati con Spark e Java8.
Un primo approccio, prevedeva di effettuare semplicemente il dump completo delle tabelle delle misure, cause e stati. Tuttavia, era poco sostenibile poichè la tabella delle misure, ad esempio, contiene quasi 2 TB di dati, ed effettuare il dump ogni volta di 2 TB di dati richiederebbe troppo tempo e un consumo eccessivo di risorse. Per cui è stato messo da parte questo approccio, ed utilizzata un’altra soluzione. Un’altra soluzione, abbastanza semplice ed ovvia era: effettuare un primo dump delle tabelle su hive, marcare tutti i dati arrivati dopo questo dump come "nuovi", schedulare dei job che facessero delle semplici operazioni di insert di questi dati e marcarli come "inseriti". Tuttavia questa soluzione non è accettabile per due motivi:
• Era necessario modificare il data model delle tabelle principali, aggiungendo un campo per segnare se il record era "nuovo" o "inserito"
• I dati contengono degli aggiornamenti e/o cancellazioni di dati vecchi. L’operazione di UPDATE e DELETE sui dati già esportati sarebbe stata troppo onerosa da gestire per ogni singolo record.
Quindi, è stata adottata una soluzione che sfrutta il partizionamento delle tabelle. Questa è stata pensata ed implementata per i flussi che esportano i dati da Cassandra ad Hive, poi adattata anche per quelli che li esportano da MongoDB. L’idea di base, non è più quella di effettuare il dump della totalità della tabella, ma solamente quello delle partizioni in cui sono
stati inseriti nuovi dati. Per questo motivo, nel data model ci sono delle tabelle/collezioni secondarie che tengono traccia:
• della lista delle partizioni della relativa tabella principale • dell’orario di update della partizione
• dell’orario dell’ultimo dump di quella partizione.
L’orario di update viene aggiornato ogni qual volta arriva almeno un dato da inserire in quella specifica partizione, mentre il l’orario dell’ultimo dump quando quella partizione viene esportata sulla tabella hive.
Le tabelle su hive sono, a loro volta, partizionate secondo lo stesso campo utilizzato nelle tabelle secondarie; quelle degli stati e cause sono partizionate per device_id, mentre quella delle misure per ora. Il differente partizionamento è determinato dalla quantità di partizione e dal numero di record per ognuna di esse: poche partizioni implicano un numero elevato di record per ognuna di esse, tante partizioni implicano potenzialmente pochi record per ognuna di esse. Inoltre avere poche partizioni grandi, implicherebbe il fatto di dover effettuare il dump di un numero eccessivo di record, al contrario avere troppe partizioni piccole implicherebbe la creazione di un numero eccessivo di partizioni da gestire; per avere un partizionamento abbastanza equilibrato e semplice da gestire, si è scelto il partizionamento orario per la tabella delle misure e quello per device id per la quelle relative a stati e cause.
L’algoritmo implementato, per ogni flusso, interroga inizialmente il database nosql sulla tabella secondaria delle partizioni. Da questa recupera la lista delle partizioni il cui valore nel campo "read_time" è nullo oppure inferiore di un certo valore x rispetto al campo "update_time"; adesso bisogna distinguere l’algoritmo per quanto riguarda i flussi delle misure con gli altri due.
Per quanto riguarda cause e stati, poichè la chiave di partizionamento sulla tabella secondaria è la stessa della tabella principale (device_id), per ogni elemento della lista delle partizioni viene effettuata un’ulteriore interrogazione del database sulla tabella principale. Questa recupera l’insieme dei record associati a quella partizione, ovvero tutti i record associati ad uno specifico device_id. A questo punto viene aperta una connessione con Hive, creata l’opportuna tabella se non esiste ed inseriti i record. Terminato l’inserimento, viene chiusa la connessione e sia aggiorna il valore "read_time" sulla tabella/collezione secondaria con il l’istante di tempo corrente e si procede con la partizione successiva.
Invece, per quanto riguarda il flusso delle misure, poichè la tabella su Cassandra è par- tizionata per device_id e hour e sulla tabella secondaria viene salvato solo il campo hour è necessario effettuare un’interrogazione intermedia per recuperare le coppie hour e device_id; questa è fatta interrogano un’altra tabella che tiene salvati questa tipologia di dati.
Quindi, una volta ottenuta la lista delle partizioni da aggiornare (hour), per ognuna di esse la lista dei device associati a quell’ora. A questo punto, poichè il partizionamento su hive è orario, iterativamente viene interrogata la tabella principale selezionando i dati di una specifica coppia (device_id, hour); tutti questi vengono uniti in un unico RDD che verrà utilizzato per l’inserimento dei dati, poichè contiene da tutti i record di tutti i device di una specifica ora. Una volta costruito il quest’ultimo RDD, viene aperta la connessione ad hive, creata l’opportuna tabella se non esiste, inseriti i dati ed aggiornato il campo "read_time" sulla tabella secondaria delle partizioni. Dopodichè si chiude la connessione ad hive e si passa alla partizione successiva.
Data Ingestion to Hive
Per quanto riguarda l’inserimento dei dati su Hive sono state adottate una serie di differenti strategie e differenti tipologie e formati di tabelle. L’obiettivo è sempre stato quello di trovare la soluzione più performante per poter garantire il minor delay possibile nell’aggiornamento delle tabelle su Hive.
Innanzitutto, per la connessione ad hive è stato utilizzato il driver JDBC specifico; questo permette di gestire facilmente la connessione al db, utilizzando semplicemente una stringa di connessione come ad esempio:
S t r i n g j d b c U r l=" j d b c : h i v e 2 : // "+hive_host+" : "+hive_port+"/"+hive_db ;
dove hive_host rappresenta ’indirizzo IP della macchina del cluster in cui è insntallato il servizio Hive, hive_port la porta su cui è in ascolto tale servizio e hive_db nome del database a cui ci si vuole connettere. Il codice seguente mostra come è stata creata la connessione, considerando le API di del driver utilizzato.
C l a s s . forName (" org . apache . h i v e . j d b c . H i v e D r i v e r ") ;
Connection h iv e_co n nec tion = DriverManager . ge tConn ection ( j d b c U r l , hive_username , hive_password ) ;
. . .
La prima operazione effettuata è la creazione delle tabelle misure, stati e cause. Per fare ciò è stata aperta una connessione ad Hive, su un database opportuno, ed invitato attraverso il jdbc il comando per creare le tabelle.
C l a s s . forName (" org . apache . h i v e . j d b c . H i v e D r i v e r ") ;
Connection h iv e_co n nec tion = DriverManager . getC onnection ( j d b c U r l , hive_username , hive_password ) ;
. . .
hi v e_c o nne c tio n . prepareStatement ("CREATE TABLE IF NOT EXISTS
hive_db . h i v e _ t a b l e ( . . . ) PARTITIONED BY ( partition_name ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\ t ' LINES TERMINATED BY
'\ n ' ") . e x e c u t e ( ) ;
hi v e_c o nne c tio n . prepareStatement ("CREATE TABLE IF NOT EXISTS hive_db . h i v e _ t a b l e ( . . . ) PARTITIONED BY ( partition_name ) s t o r e d as parquet ") . e x e c u t e ( ) ;
h i v e \ _connection . prepareStatement ("CREATE EXTERNAL TABLE IF NOT EXISTS "+hive_db+" . "+h i v e _ t a b l e+" ( . . . ) PARTITIONED BY (
partition_name ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\ t ' LINES TERMINATED BY '\ n ' LOCATION ' hdfs_path '
") . e x e c u t e ( ) ;
hiv e_ co nn ec tion . prepareStatement ("CREATE EXTERNAL TABLE IF NOT EXISTS hive_db . hive_table_prq ( . . . ) PARTITIONED BY (
partition_name ) s t o r e d as parquet LOCATION ' hdfs_path ' ") . e x e c u t e ( ) ;
. . .
h i v e \ _connection . c l o s e ( ) ;
Come si può vedere dal codice qua sopra, sono state create tabelle diverse sia come tipologia (interne o esterne) sia come formato di dati (testuale o parquet). Infatti, sono state adottate strategie diverse per poter confrontare le prestazioni ottenute con la scrittura sulle
diverse tipologie di tabelle. nei prossimi due paragrafi verranno descritte e confrontate le strategie adottate a seconda della tipologia e formato di tabella prescelto.
Internal vs External
I primi approcci riguardano la scelta nell’utilizzare le tabelle interne oppure quelle esterne. Una prima strategia, che si è rivelata subito poco efficiente, è stata quella di effettuare direttamente l’operazione di INSERT sulle tabelle interne. Poichè, come è già stato detto, i dati vanno inseriti come se fossero delle operazioni di Update con l’opzione di Upsert ed la versione di Hive non supporta in maniera efficace le operazioni di Update, era necessario cancellare tutti i record della partizione da aggiornare; questa è stata fatta utilizzando il seguente statement:
ALTER TABLE hive_db . h i v e _ t a b l e DROP IF EXISTS PARTITION ( partition_name=' p ar t it i on Na m e ')
Dopo aver pulito i dati della partizione si è proceduto con l’inserimento. Il primo tentativo effettuato, andava ad inserire un record alla volta all’interno della partizione.
INSERT INTO TABLE hive_db . hive_tablePARTITION ( partition_name=' p ar t it io n Na m e ') VALUES ( . . . ) ;
Tuttavia, questa soluzione si è rivelata inefficiente a cause del numero eccessivo di op- erazioni da eseguire. Per cui, si è provato ad utilizzare il seguente statement, per eseguire una sola operazione di INSERT per ogni partizione; a differenza del precedente vengono concatenati i record in un unico statement.
INSERT INTO TABLE hive_db . hive_tablePARTITION
( partition_name=' p ar t it io n Na m e ') VALUES ( record_1 ) , ( record_2 ) , . . . , ( record_n ) ;
Anche questa soluzione presentava alcuni problemi, non tanto legati alle performance di scrittura ma alla dimensione dello statement; infatti, il rischio era quello andare in errore di OutOfMemeory o inviare un Statement troppo grande per essere processato da Hive, tramite il JDBC. Per questi motivi, sono state accantonate le operazioni di inserimento attraverso lo statement di INSERT.
In alternativa al comando di INSERT c’è il comando di LOAD che permette di caricare il contenuto di un file o di una cartella all’interno di una tabella o di una partizione specifica.
LOAD DATA INPATH ' f i l e _ p a t h ' INTO TABLE hive_db . h i v e _ t a b l e PARTITION ( partition_name=' p a r t i t i o n N a m e H i v e ') ;
dove file_path è il percorso della cartella o del file testuale salvato su HDFS. Per cui, per sfruttare questo comando è stato necessario implementare un metodo che, a partire da un RDD<String> scriva uno o più csv su hdfs. In breve, il metodo ha come parametri di input un RDD contenente le righe da scrivere nel csv e i parametri necessari per poter scrivere su HDFS e il path in cui scrivere i file; successivamente viene creata, se non esiste già, una cartella con il nome della partizione che si sta caricando e scritto un file csv all’interno di quest’ultima per ogni partizione del RDD in input. Una volta scritti tutti i file, viene fatto eseguire il comando di LOAD indicando come file_path il percorso della cartella che contiene i file appena scritti; una volta che sono stati caricati i dati, questi file vengono automaticamente cancellati.
Quindi, questa è la soluzione utilizzata per l’ingestion dei dati in Hive considerando le tabelle interne.
Oltre a questa, è stata implementata un’ulteriore soluzione che considera le tabelle esterne. Questo perchè, in pratica, le tabelle esterne sono tabelle che puntano direttamente ad una cartella memorizzata in HDFS e l’idea era quella di scrivere i file direttamente sulla cartella selezionando, evitando quindi l’operazione di LOAD. Una particolarità delle tabelle esterne, sta nella gestione delle cancellazioni; infatti, se si esegue un comando di DROP sulla tabella vengono cancellati i dati da questa, ma non nei file in cui sono contenuti. Per questo motivo, oltre all’operazione,
ALTER TABLE hive_db . h i v e _ t a b l e DROP IF EXISTS PARTITION ( partition_name=' p ar t it io n Na m e ')
è stato necessario cancellare tutti i file presenti nella cartella della specifica partizione. Una volta cancellati i file della partizione, sono stati scritti i nuovi csv utilizzando lo stesso metodo usato nella soluzione con le internal table, modificando opportunamente il path. Completata la scrittura dei file, è stato necessario aggiornare i metadati della tabella, in modo da inserire i dati appena caricati nella tabella; questo è stato fatto aggiungendo nuovamente la partizione appena cancellata, specificandone il percorso.
ALTER TABLE hive_db . h i v e _ t a b l e ADD PARTITION
Finita l’esecuzione di tale statement, si può procedere con la partizione successiva. Textual vs Parquet
Come detto in precedenza, le tabelle su hive sono state create, sia per quelle esterne che per quelle interne, in due formati differenti: testuale e parquet.
La quantità di dati da salvare su HDFS è dell’ordine di grandezza dei TB ed è in costante aumento; per cui i due formati differenti servono per valutare la differenza di performance nell’utilizzare un formato di dati non compresso (testuale) e uno compresso (parquet).
Le prime tabelle ad essere gestite sono quelle testuali, e il modo in cui sono stati scritti i file csv su hdfs è già stato spiegato nel paragrafo precedente. In pratica, ogni elemento dell’rdd di partenza viene convertito in una stringa che corrisponde ad una riga del csv; successivamente, per ogni partizione del rdd, viene creato un csv in una apposita cartella su hdfs; ciò vale sia per le tabelle interne sia per quelle esterne.
Tuttavia, la scrittura dei file in formato parquet è stata un po’ più complessa, soprattutto per quanto riguarda le tabelle interne. Per queste, è stato necessario modificare l’algoritmo di inserimento poiché si sono riscontrati numerosi problemi nell’effettuare lo statement di LOAD da file di tipo parquet. Gli step dell’algoritmo modificato sono:
• scrittura dei file contenenti i record della partizione in formato testuale
• effettuata l’operazione di LOAD del file della partizione su una tabella testuale tempo- ranea
• viene cancellata la partizione dalla tabella parquet
• viene creata la partizione e caricati i dati utilizzando il seguente statement
INSERT INTO hive_db . hive_table_prq PARTITION
( partition_name=' p ar t it i o n Na m e ') s e l e c t <v a l u e s > from hive_db . hive_table_prq_tmp where
partition_name=' p ar t i t i on Na m e '
• vengono cancellati i dati sulla tabella temporanea
Lo statement utilizzato, permette di caricare i dati da una tabella in formato testuale, in una in formato parquet gestendo in automatico la conversione del formato dati.
Invece, per quanto riguarda le tabelle esterne in formato parquet, per semplicità è stata implementata una soluzione con l’utilizzo dei DataFrame. Questa struttura dati, viene costruita a partire da un RDD, e gestisce gli RDD come se fossero delle tabelle distribuite; per cui, gli rdd degli oggetti Tag,Cause e Stati sono convertiti in rispettivi dataframe. Questi possono essere scritti in formato parquet utilizzando il seguente metodo che ha come unico parametro il path di riferimento:
DataFrame df=s q l . createDataFrame ( cassandraRow ,T. c l a s s) df . s a v e A s P a r q u e t F i l e ( path ) ;
Questa soluzione coi dataframe, ha permesso di mantenere lo stesso algoritmo utilizzato nelle tabelle esterne testuali, modificando solamente la scrittura dei file in formato parquet come appena descritto.
Perfomance: Internal vs External and Textual vs Parquet
Le performance per valutare quale delle due tipologie di tabelle fosse più performante è stato effettuato sul flusso degli stati e delle misure. Il grafico 4.21 mostra gli andamenti per le varie combinazioni:
Fig. 4.21 Differenza di prestazioni in termini di velocità di processamento dei dati Come si può vedere, le performance della scrittura su tabelle esterne è migliore rispetto a quella ottenuta dalle tabelle interne, sia per il formato testuale, sia per per quello parquet. Un ulteriore test, per confermare il fatto che l’utilizzo delle tabelle esterne è più performante, è stato fatto sul flusso delle misure; i risultati sono mostrati nel grafico 4.23.
Fig. 4.22 Differenza di prestazioni in termini di velocità di processamento dei dati Anche in questo caso le performance migliori si ottengono con l’utilizzo delle tabelle esterne.
Di queste ultime, è stato valutato anche la differenza delle prestazioni nell’utilizzare i due formati; il grafico sottostante mostra i risultati ottenuti.
Fig. 4.23 Differenza di prestazioni coi due formati in termini di velocità di processamento dei dati
A differenza di quanto osservato per il flusso degli stati, qua la differenza di prestazioni, in termini di operazioni al secondo è minima; la scrittura dei file in formato parquet è leggermente più veloce. Per cui, data questa parità di prestazioni e considerando il fatto che il formato compresso riduce in maniera significativa lo spazio occupato su HDFS, è preferibile quest’ultimo rispetto a quello testuale non compresso.
• Quindi, per effettuare l’esportazione delle tabelle da Apache Cassandra e MongoDB ad Hive è preferibile utilizzare le tabelle esterne in formato parquet.
Apache Cassandra vs MongoDB
Nel capitolo precedente sono state descritte nel dettaglio le prestazioni dei flussi implemen- tati. In questo sezione , verranno confrontate le soluzioni che utilizzano i due db considerando esclusivamente le soluzioni più performanti di ognuno di essi.
Partendo dall’analisi che i due database rappresentano due soluzioni molto differenti sia per quanto riguarda l’architettura sia per il data model, l’obiettivo è cercare di capire quale dei due sia meglio per questo progetto.
5.1
Writing Performance
Analizzando prima di tutto le operazioni di inserimento di dati, sia batch che streaming, i due grafici 5.1 evidenziano la differenza di prestazione ottenuta.
Da questi si evince come Apache Cassandra sia molto più performante di MongoDB sia per l’ingestion dei dati nella modalità batch sia in quella straming. Nella prima la velocità raggiunta da Apache Cassandra si aggira attorno alle 90000 - 10000 operazioni al secondo, mentre MongoDB solamente 50000 - 60000 operazioni al secondo. Nella seconda con Apache Cassandra si riescono a superare le 60000 scritture al secondo, mentre con MongoDB non si raggiungono nemmeno le 30000.
Tuttavia, si può notare come le performance ottentute con MongoDB siamo più stabili di quelle raggiunte con Apache Cassandra queste ultime dipendono, come descritto nel capitolo precedente, da una serie di parametri e sopratutto dalla grandezza del batch scritto. Più è grande il batch più le prestazioni sono efficienti, ma se questo è piccolo le prestazioni sono più lente; invece la scrittura su mongoDB non dipende dalla quantità di dati inviati in blocco per cui le prestazioni risultano più stabili. Invece per quanto riguarda lo streaming, si vede che le due prestazioni hanno lo stesso andamento, ma Apache Cassandra riesce a reggere meglio il carico delle operazioni di inserimento all’aumentare dei dati da processare sulla coda Kafka.
Quindi, Apache Cassandra è la soluzione migliore per gestire la scrittura di grandi quantità di dati rispetto a MongoDB sia per la modalità batch sia quella real-time.