• Non ci sono risultati.

Dopo aver effettuato le varie prove di integrazione dei componenti in gioco, si è provveduto a creare tre flussi in Infosphere Streams per la realizzazione del POC:

 il primo flusso ha lo scopo di ottenere una storicizzazione dei dati e consente di caricare in PDA le rilevazioni provenienti dal sistema PI senza alcun tipo di trasformazione;

 il secondo si occupa di scrivere su PDA il valore della variabile contenente l’indicazione della fase del ciclo in cui ci si trova attualmente;

 il terzo ha lo scopo di richiamare lo script R e caricare in PDA i dati previsionali.

59

Il primo flusso, visibile in Figura 5.1, utilizza l’operatore custom

JDBCSource per leggere dalla sorgente PI nelle modalità descritte in

5.1.1, il quale poi passa all’operatore ODBCAppend la tupla contenente, per ogni variabile, il valore e l’istante temporale della rilevazione. Quest’ultimo operatore provvederà quindi a scrivere nell’apposita tabella in PDA, specificata nel file .xml di connessione che riceve come parametro, i valori ricevuti in input. La tabella così popolata servirà quindi nel momento in cui si vorranno effettuare analisi sui dati storici tramite Tableau.

Il secondo flusso è molto simile al primo, in questo caso però si usa l’operatore JDBCSource per ritrovare il valore della singola variabile contenente l’informazione sulla fase in corso, che verrà poi scritto su una tabella in PDA come nel caso precedente.

Figura 5.2 Flusso responsabile della memorizzazione su PDA delle previsioni

Il terzo flusso, visibile in Figura 5.2, è quello delegato all’applicazione dello script R per la previsione dell’esito delle variabili. Il primo operatore, SourceReader, si comporta esattamente come il precedente, inviando in output la tupla verso l’input dell’operatore

JavaConcat. Come spiegato in 3.2.2, è necessario passare allo script R

60

dieci rilevazioni consecutive per effettuare la predizione. Per questo è stato necessario implementare un operatore custom, sviluppato in Java, che attende l’arrivo delle dieci tuple e restituisce in output la loro concatenazione in modo che sia interpretabile dallo script R. Per fare questo si è sfruttata la funzionalità window presente in Infosphere Streams. Sono supportate due tipi di window: sliding window e tumbling

window. Nella prima l’arrivo di una nuova tupla causa la rimozione della

più vecchia se la finestra è piena, lasciando le altre ancora disponibili per il processamento. La seconda, che è stata quella effettivamente utilizzata, raggruppa, in base a diversi criteri, le tuple in arrivo ad un operatore prima di passarle tutte insieme come input rimuovendo tutte quelle già presenti. Nel caso in oggetto si è scelto di impostare come criterio quello che prevede di attivare l’operatore in seguito all’arrivo di dieci tuple. Per lo sviluppo dell’operatore, Streams mette a disposizione la classe

AbstractWindowOperator dalla quale è possibile registrare un listener sugli eventi proveniente dalla window. Questi eventi possono essere:

 INSERT: quando una o più tuple sono inserite nella finestra.  EVICT: quando tutte le tuple nella finestra vengono rimosse, in questo caso il listener dovrebbe restituire in output il risultato.

 FINAL: quando viene ricevuta la marca di fine della finestra1.

Per lo sviluppo dell’operatore si è quindi estesa la classe

AbstractWindowOperator ed implementata l’interfaccia

StreamWindowListener, riscrivendo il metodo handleEvent in modo da:

(a) concatenare tutte le tuple in arrivo quando riceve un evento di tipo INSERT, (b) restituire in output la concatenazione all’arrivo dell’evento EVICT o FINAL. A questo punto è bastato registrare il listener nel metodo initialize dell’operatore tramite la funzione registerListener

del metodo getStreamWindow per fare in modo che possa effettivamente ricevere le tuple in input.

61

La stringa formata dalla concatenazione di tutte le tuple della finestra, creata al passo precedente, è passata all’operatore RScript come parametro di input, che si occupa di mapparla nella corrispondente variabile R utilizzata all’interno dello script, il cui comportamento è descritto in 3.2.2. Il risultato della sua esecuzione sono le due variabili:

 prediction, che rappresenta una matrice con trentotto righe

(numero di sensori) e venti colonne (i dieci valori del sensore rilevati più la previsione per le prossime dieci unità di tempo), che viene mappata in Streams nel tipo

list<float64>;

 outcome, che rappresenta un vettore di trentotto elementi con la previsione Bad/Good di ogni variabile, che viene mappata in Streams nel tipo list<rstring>.

Queste due variabili SPL vengono quindi indirizzate, tramite l’operatore Split, in due rami diversi del flusso.

Il ramo superiore, grazie all’operatore Custom, trasforma la variabile outcome in modo da poter essere scritta, con l’operatore

ODBCAppend, come nuovo record nella corrispondente tabella in PDA

contenente le previsioni per ogni sensore. Per far questo, nella logica dell’operatore Custom, viene mappato ogni elemento della lista in un attributo della tupla in uscita. Si ha quindi una tupla con un attributo per ogni sensore, più un attributo aggiuntivo contenente l’istante temporale della previsione, ottenuto da una funzione interna di Streams.

Il ramo inferiore effettua le stesse operazioni sulla variabile

prediction ma, diversamente da prima, crea venti record (uno per ogni

unità di tempo) da inserire nella corrispondente tabella in PDA. Dato che la variabile prediction mappa una matrice in una lista, per poter passare correttamente i venti record all’operatore successivo la logica dell’operatore Custom prevede un ciclo di 20 iterazioni. Ad ogni iterazione viene creata e inviata al successivo operatore una tupla formata

62

dagli elementi della lista presi a passi di 20 in modo da ottenere un valore per ogni sensore.

63

6 R

EPORTISTICA

In questo capitolo verrà illustrato il procedimento seguito per la creazione delle dashboard che l’utente finale utilizzerà per visualizzare: (a) l’output della previsione, (b) l’andamento dei sensori nelle ultime due ore, (c) la visualizzazione dei dati storici.

Documenti correlati