• Non ci sono risultati.

CAPITOLO 4 PARALLELIZZAZIONE DELLE APPLICAZIONI

4.2 F ATTORIZZAZIONE NUMERI PRIMI

4.2.2 Parallelizzazione remota

L’implementazione remota è perfettamente identica alla precedente, l’unica diffe- renza risiede nel fatto che questa volta i worker della FARM non sono i thread di una threadpool locale bensì nodi lontani contenuti all’interno di un cluster. Questo significa che i tempi di overhead aumentano proporzionalmente alla velocità della rete in uso, di conseguenza, per mantenere la granularità allo stesso livello del caso precedente, è ne- cessario aumentare le dimensioni dei task in relazione al numero di worker attivi (in que- sto modo andiamo a bilanciare tempi di comunicazione dei task sulla rete e di esecuzione effettiva su ogni singolo worker remoto). Particolarmente interessante sarà la fase di te- sting e raccolta dei risultati condotta nel paragrafo 5.2 Fattorizzazione in numeri primi dove verranno analizzate più casistiche differenti.

4.3 Risoluzione di sistemi lineari

4.3.1 Parallelizzazione locale

Passiamo al secondo applicativo per il quale conviene ricordare dal paragrafo 3.3 Risoluzione di sistemi lineari, come la risoluzione di un sistema Ax=b sia stata sottoposta all’individuazione della matrice inversa A-1 per poi andare a risolvere facilmente il nuovo sistema x=A-1b. Sono state sviluppate quindi tre forme di implementazione sequenziali differenti che corrisponderanno ad altrettante trasposizioni parallele.

La determinazione della soluzione parallela, infatti, ha seguito un procedimento lungo e farraginoso volto alla scoperta del miglior approccio che potesse essere utilizzato al fine di ottenere dei parametri di efficienza quantomeno vicini a quelli ideali. Ci trovia- mo di fronte ad un applicativo più complesso da parallelizzare rispetto al precedente, non più con un implementazione ricorsiva bensì iterativa, e questo ci da la possibilità di esplo- rare anche in questo campo le potenzialità offerte da Muskel 2.

Richiamiamo la logica risolutiva del problema vista nel paragrafo 3.3 Risoluzione di sistemi lineari circa il metodo di Gauss-Jordan. Qui avevamo due cicli: uno ESTERNO re-

sponsabile della scansione della totalità delle righe di A, l’altro INTERNO, eseguito in ogni iterazione del precedente, che applicava una funzione di trasformazione agli elementi del- la sottomatrice B (Figura 6). Ebbene da uno studio di una logica siffatta è risultato subito evidente che la computazione eseguita dal ciclo più ESTERNO non potesse essere paralle- lizzata. Infatti in ognuna delle sue iterazioni (i) andiamo a modificare le componenti della sottomatrice B in funzione del valore attuale della componente A[i,i] e della colonna FACTOR la quale viene anch’essa modificata in tal contesto. Sulla matrice generata da queste modifiche andrà a lavorare la prossima iterazione, in altre parole le trasformazioni che apportiamo alla matrice A

A→A*1→A*2→A*3…→I

generano una sequenza di cambi di stato dove ognuno è dipendente dal prece- dente. Il tipo di ottimizzazione parallela che possiamo attuare riguarda quindi il ciclo IN- TERNO nel quale andiamo a modificare tutte le componenti della sottomatrice B che, lun- go le varie iterazioni, diventa sempre più schiacciata, o meglio il numero di colonne rima- ne sempre lo stesso ma il numero di righe diminuisce di un unità per iterazione. Possiamo dire che anche in questo caso la parallelizzazione è “orizzontale” poiché il ciclo ESTERNO genera una sequenza di nodi che si accumulano come serial fraction e in ciascuno di essi applichiamo un processo di parallelizzazione volto a suddividere per NW il proprio tempo di esecuzione.

Il primo approccio risolutivo prevede di utilizzare una MAP per parallelizzare il ci- clo INTERNO di aggiornamento di B sia nella Gauss elimination phase che nella back pro-

pagation phase, in entrambi i casi la logica è la medesima. Ogni MAP prende in ingresso

un insieme di righe, partizionate fra gli NW worker disponibili attraverso l’operatore GROUPBYSORTEDTOLIST in maniera sfalsata per garantirne il miglior bilanciamento, e le modifica in accordo alla procedura algoritmica specificata. Per ogni ciclo di iterazione del for ESTERNO i task che così vengono generati sono moderatamente bilanciati poiché di fatto ogni worker prende quasi lo stesso numero di righe da computare e le operazioni da eseguire su ognuna sono banali: una somma e una moltiplicazione. Il problema di questo metodo nasce dal fatto che più il main loop procede più il numero di righe da computare diminuisce ottenendo task la cui esecuzione, al pari del numero di worker, diventa sem-

pre più rapida, fino a quando i costi di overhead non superano quelli necessari a compu- tare la parte seriale. Questo problema tende ovviamente ad accentuarsi all’aumentare del valore NW ed è tanto più grave quanto più piccolo è N. Per chiarare sia T l’iterazione con- siderata, abbiamo il seguente scenario:

T=0 -> righe da ripartire in NW worker [1,2,3,4…N] T=1 -> righe da ripartire in NW worker [2,3,4,5…N]

T=N-1 -> righe da ripartire in NW worker [N]

Figura 16 - Risoluzione sistemi lineari, schema logico pattern utilizzati (caso scansione per righe)

1. public static MyVector parallelSolvingSystem(Matrix a, MyVector b, double epslon){//by row 2. if(a.rows()==0||b.size()!=a.rows()){

3. throw new IllegalArgumentException("Matrix A dimension 0 or

4. b vetor does not match with A");

5. }

6. if(a.columns()!=a.rows()){

7. throw new IllegalArgumentException("Matrix A is not square matrix"); 8. }

10. int[] pos=new int[b.size()];

11. for(int i=0;i<b.size();i++) pos[i]=i;

12. Matrix res=Matrix.getIdentityMatrix(a.rows()); 13.

14. for(int mainRow=0;mainRow<a.rows();mainRow++){ 15. //se a[i][i]==0 scambio la i-

esima riga con la prima riga successiva tale che a[j][i]!=0 16. if(a.get(pos[mainRow], mainRow)==0){

17. for(int i=mainRow+1;i<a.rows();i++){ 18. if(a.get(i, mainRow)!=0){ 19. int tmp=pos[i]; 20. pos[i]=pos[mainRow]; 21. pos[mainRow]=tmp;

22. System.out.println("row swaped"); 23. break;

24. } 25. } 26. }

27. double scale=a.get(pos[mainRow], mainRow); 28. for(int col=0; col<a.columns();col++){

29. a.set(pos[mainRow], col, a.get(pos[mainRow], col)/scale); 30. res.set(pos[mainRow], col, res.get(pos[mainRow], col)/scale); 31. }

32. //parallel

33. final int _mainRow=mainRow; 34. if(mainRow<a.rows()-1){ 35. MuskelProcessor.range(_mainRow+1, a.rows()-mainRow-1) 36. .withContext(context) 37. .groupBySortedToList(in->in%no_thread) 38. .flatMap(in->MuskelProcessor.fromIterable(in.toBlocking().first()) 39. .map(i->{

40. double factor=a.get(i, _mainRow); 41. for(int col=0;col<a.columns();col++){ 42. a.set(pos[i], col, a.get(pos[i], col)-

43. (factor*a.get(pos[_mainRow], col))); 44. res.set(pos[i], col, res.get(pos[i], col)-

45. (factor*res.get(pos[_mainRow], col))); 46. } 47. return true; 48. }) 49. ,executor) 50. .toBlocking() 51. .first(); 52. } 53. }

54. for(int mainCol=a.rows()-1;mainCol>=1;mainCol--){ 55. final int _mainCol=mainCol;

56. MuskelProcessor.fromStream(IntStream.range(0, _mainCol).map(i -> _mainCol - i -

1))

57. .withContext(context)

58. .groupBySortedToList(in->in%no_thread)

59. .flatMap(in->MuskelProcessor.fromIterable(in.toBlocking().first()) 60. .map(i->{

61. double factor=a.get(i, _mainCol); 62. for(int col=0;col<a.columns();col++){ 63. a.set(pos[i], col, a.get(pos[i], col)-

64. (factor*a.get(pos[_mainCol], col))); 65. res.set(pos[i], col, res.get(pos[i], col)-

66. (factor*res.get(pos[_mainCol], col))); 67. } 68. return true; 69. }) 70. ,executor) 71. .toBlocking() 72. .first(); 73. }

74. if(a.rows()<101) System.out.println(res); 75. return parallelMultiplier1(res, b); 76. }

Codice 33 - Risoluzione sistemi lineari, implementazione parallela (caso scansione per righe)

Possiamo notare come ogni worker di fatto esegue un operazione che non ha al- cun valore di ritorno ossia si limita a modificare le matrici A e I in maniera concorrente rispetto ad altri worker, mediante operazioni di lettura/scrittura. Possiamo fare tutto ciò in modalità thread-safe per due motivi fondamentali: innanzitutto ogni riga viene modifi- cata da uno ed un solo worker alla volta, ciò purtroppo non toglie che si acceda concor- rentemente alla stessa entità Matrix (double [][]), ebbene da qui il secondo motivo legato ad una proprietà di Java che ci rassicura a riguardo:

“One implementation consideration for Java virtual machines is that every field

and array element is considered distinct; updates to one field or element must not interact with reads or updates of any other field or element. In particular, two threads that update adjacent elements of a byte array separately must not interfere or interact and do not need synchronization to ensure sequential consistency.”

Un’altra cosa che possiamo notare è la non funzionalità del metodo, dato che la MAP considerata va a usare delle variabili esterne “a” e “res” considerate globali. Questo è possibile a patto da rendere tali variabili final o comunque non modificandole nell’ambiente esterno.

Passiamo al secondo approccio risolutivo nel quale si cerca di mitigare lo sbilan- ciamento dei task generati fra i vari stadi iterativi del loop ESTERNO. Per ovviare a questo problema si è deciso di lasciare inalterata la modalità di parallelizzazione operata, ossia la MAP, (una scelta differente non avrebbe senso ad esempio una FARM generebbe troppi task riducendo ulteriormente la granularità). Tuttavia per ogni ciclo di iterazione di ambo le fasi, piuttosto che ripartire la matrice per righe lo facciamo per colonne, operazione che non altera la computazione a patto da prestare particolare attenzione alla FACTOR (essa deve essere salvata prima della computazione interna oppure, poiché procediamo per colonne, possiamo lasciare l’aggiornamento di quest’ultima alla fine in modo tale che l’aggiornamento di ogni altra colonna posso vertere sulla medesima istanza di FACTOR). Ebbene in questo caso il numero di colonne che ripartiamo fra gli NW worker rimane co-

stante in ogni iterazione del main loop generando un insieme di task che formalmente sono perfettamente bilanciati, anche se di fatto comunque il tempo di esecuzione dei task diminuisce all’aumentare delle iterazioni dello stesso poiché avremo sempre meno righe da computare.

T=0 -> colonne da ripartire in NW worker [1,2,3,4…N] T=1 -> colonne da ripartire in NW worker [1,2,3,4,5…N]

T=N-1 -> colonne da ripartire in NW worker [1,2,3,4,5…N]

Un ulteriore miglioria si ottiene sostituendo il GROUPBYSORTEDTOLIST, il quale si occupava di trasformare lo stream di valori [1,n] relative alle colonne da computare, in NW set di valori interi, uno per ogni worker, con un PUBLISHER. Esso possiede la medesi- ma funzione ma è più intelligente. Ciò ha presentato diversi vantaggi come una riduzione dei costi di overhead dovuti alla gestione dell’operatore e alla possibilità di fornire un tet- to minimo al numero di colonne che ogni worker deve computare per assicurare sempre il raggiungimento dello steady state, anche a fronte di un gran numero di worker e un N piccolo. Non mostriamo lo schema logico essendo identico a quello in Figura 16 eccetto che per la presenza del Publisher.

1. pub-

lic static MyVector parallelSolvingSystem1(Matrix a, MyVector b, double epslon){//by column

2. if(a.rows()==0||b.size()!=a.rows()){

3. throw new IllegalArgumentException("Matrix A dimension 0 or

4. b vetor does not match with A"); 5. }

6. if(a.columns()!=a.rows()){

7. throw new IllegalArgumentException("Matrix A is not square matrix"); 8. }

9. Matrix.epslon=epslon; 10. int[] pos=new int[b.size()];

11. for(int i=0;i<b.size();i++) pos[i]=i;

12. Matrix res=Matrix.getIdentityMatrix(a.rows());

13. SerializablePublisher<Integer[]> publisher= new SerializablePublisher<Integer[]>() {-- };

14. for(int mainRow=0;mainRow<a.rows();mainRow++){ 15. //se a[i][i]==0 scambio la i-

esima riga con la prima riga successiva tale che a[j][i]!=0 16. if(a.get(pos[mainRow], mainRow)==0){

17. for(int i=mainRow+1;i<a.rows();i++){ 18. if(a.get(i, mainRow)!=0){ 19. int tmp=pos[i]; 20. pos[i]=pos[mainRow]; 21. pos[mainRow]=tmp;

22. System.out.println("row swaped"); 23. break;

25. } 26. }

27. double scale=a.get(pos[mainRow], mainRow); 28. for(int col=0; col<a.columns();col++){

29. a.set(pos[mainRow], col, a.get(pos[mainRow], col)/scale); 30. res.set(pos[mainRow], col, res.get(pos[mainRow], col)/scale); 31. }

32. //parallel

33. final int _mainRow=mainRow; 34. if(mainRow<a.rows()-1){

35. double[] factor=new double[a.rows()-mainRow-1];

36. for(int i=0;i<factor.length;i++) factor[i]=a.get(pos[i+mainRow+1], mainRow); 37. MuskelProcessor.fromPublisher(publisher)

38. .withContext(context) 39. .map(list->{

40. for(Integer col:list){ 41. if(col!=null){

42. for(int i=_mainRow+1;i<a.rows();i++){ 43. a.set(pos[i], col, a.get(pos[i], col)-

44. (factor[i-_mainRow-1]*a.get(pos[_mainRow], col))); 45. res.set(pos[i], col, res.get(pos[i], col)-

46. (factor[i-_mainRow-1]*res.get(pos[_mainRow], col))); 47. } 48. } 49. } 50. return true; 51. },executor) 52. .toBlocking() 53. .first();

54. //computo la medesima operazione per la colonna che ho saltato prima 55. }

56. }

57. for(int mainCol=a.rows()-1;mainCol>=1;mainCol--){ 58. final int _mainCol=mainCol;

59. double[] factor=new double[mainCol];

60. for(int i=0;i<factor.length;i++) factor[i]=a.get(pos[mainCol-1-i], mainCol); 61. MuskelProcessor.fromPublisher(publisher)

62. .withContext(context) 63. .map(list->{

64. for(Integer col:list){ 65. if(col!=null){

66. for(int i=_mainCol-1;i>=0;i--){

67. a.set(pos[i], col, a.get(pos[i], col)-

68. (factor[_mainCol-1-i]*a.get(pos[_mainCol], col))); 69. res.set(pos[i], col, res.get(pos[i], col)-

70. (factor[_mainCol-1-i]*res.get(pos[_mainCol], col))); 71. } 72. } 73. } 74. return true; 75. },executor) 76. .toBlocking() 77. .first(); 78. } 79. if(a.rows()<101) System.out.println(res); 80. return parallelMultiplier(res, b, publisher); 81. }

Codice 34 - Risoluzione sistemi lineari, implementazione parallela (caso scansione per colonne)

Sebbene questo metodo sia migliore del precedente possiede un tempo di esecu- zione paradossalmente maggiore. La spiegazione di questo comportamento è molto sem-

plice e riconducibile al modo con cui Java gestisce le matrici: esse infatti sono memorizza- te nel seguente modo:

Figura 17 - Gestione array in java

È ovvio quindi che scorrere la matrice per righe (ogni riga è un array quindi basta accedere al corrispondente riferimento in memoria per avere la riga intera) impiega mol- to meno tempo che scorrerla per colonne poiché per ognuna di esse devo accedere ad n array 1-dimensionali in memoria ed andare a prelevare la colonna selezionata.

Ancora una volta la soluzione finale diventa un prodotto ibrido di quelle preceden- ti: viene effettuata una “teorica” traslazione della matrice A semplicemente invertendo ogni occorrenza che legge/scrive un valore dalle matrici A|I nel seguente modo: da A[i,j] passo a A[j,i] in modo da mantenere l’equivalenza nella soluzione. Con questo semplice trucco vengono mantenuti i vantaggi del primo e del secondo approccio, infatti è come se parallelizzassimo per righe, riducendo i tempi di esecuzione, ma conservando i task costanti come nel secondo metodo.

1. public static MyVector parallelSolvingSystem2(Matrix a, MyVector b, double epslon){ 2. //by column translated

3. if(a.rows()==0||b.size()!=a.rows()){

4. throw new IllegalArgumentException("Matrix A dimension 0 or

5. b vetor does not match with A"); 6. }

7. if(a.columns()!=a.rows()){

8. throw new IllegalArgumentException("Matrix A is not square matrix"); 9. }

10. Matrix.epslon=epslon; 11. int[] pos=new int[b.size()];

12. for(int i=0;i<b.size();i++) pos[i]=i;

13. Matrix res=Matrix.getIdentityMatrix(a.rows());

14. SerializablePublisher<Integer[]> publisher= new SerializablePublisher<Integer[]>() {-- };

15.

16. for(int mainRow=0;mainRow<a.rows();mainRow++){ 17. //se a[i][i]==0 scambio la i-

esima riga con la prima riga successiva tale che a[j][i]!=0 18. if(a.get(mainRow,pos[mainRow] )==0){

20. if(a.get(mainRow, i )!=0){ 21. int tmp=pos[i]; 22. pos[i]=pos[mainRow]; 23. pos[mainRow]=tmp;

24. System.out.println("row swaped"); 25. break;

26. } 27. } 28. }

29. final double scale=a.get(mainRow,pos[mainRow]); 30. final int _mainRow=mainRow;

31. if(mainRow<a.rows()-1){

32. MuskelProcessor.fromPublisher(publisher) 33. .withContext(context)

34. .map(list->{

35. for(Integer col:list){ 36. if(col!=null){ 37.

//qualche col può essere null quando superiamo N (numero di colonne di A)

38. a.set(col,pos[_mainRow] , a.get(col,pos[_mainRow] )/scale);

39. res.set(col,pos[_mainRow] ,res.get(col,pos[_mainRow] )/scal e);

40. if(col!=_mainRow){

41. for(int i=_mainRow+1;i<a.rows();i++){ 42. a.set(col,pos[i], a.get(col,pos[i])- 43.

(a.get(_mainRow,pos[i] )*a.get(col,pos[_mainRow] )));

44. res.set( col,pos[i], res.get( col,pos[i])- 45. (a.get(_mainRow,pos[i] )*res.get(col,pos[_mainRow] ))); 46. } 47. } 48. } 49. } 50. return true; 51. },executor) 52. .toBlocking() 53. .first();

54. for(int i=mainRow+1;i<a.rows();i++){//questo for modifica la colonna- >riga contiene i vari factor, operazione che va fatta alla fine poichè tutte le colonne- >righe devono diendere dalla medesima instanza di factor

55. double tmp=a.get(mainRow,pos[i]);

56. a.set(mainRow,pos[i], a.get(mainRow,pos[i])- 57. (tmp *a.get(mainRow,pos[mainRow] ))); 58. res.set( mainRow,pos[i], res.get( mainRow,pos[i])- 59. (tmp*res.get(mainRow,pos[mainRow] ))); 60. }

61. }

62. else{//eseguito solo una volta ossia quando mainRow=N-1 63. for(int col=0; col<a.columns();col++){

64. a.set(col,pos[mainRow] , a.get(col,pos[mainRow] )/scale); 65. res.set(col,pos[mainRow] , res.get(col,pos[mainRow] )/scale); 66. }

67. } 68. }

69. for(int mainCol=a.rows()-1;mainCol>=1;mainCol--){ 70. final int _mainCol=mainCol;

71. MuskelProcessor.fromPublisher(publisher) 72. .withContext(context)

73. .map(list->{

74. for(Integer col:list){ 75. if(col!=null){

76. if(col!=_mainCol){

77. for(int i=_mainCol-1;i>=0;i--){

79. (a.get(_mainCol,pos[i] )*a.get(col,pos[_mainCol] ))); 80. res.set(col,pos[i], res.get(col,pos[i] )- 81. (a.get(_mainCol,pos[i] )*res.get(col,pos[_mainCol] ))); 82. } 83. } 84. } 85. } 86. return true; 87. },executor) 88. .toBlocking() 89. .first();

90. for(int i=_mainCol-1;i>=0;i--){ 91. double tmp=a.get(mainCol,pos[i]); 92. a.set(mainCol,pos[i], a.get(mainCol,pos[i] )- 93. (tmp*a.get(mainCol,pos[mainCol] ))); 94. res.set(mainCol,pos[i], res.get(mainCol,pos[i] )- 95. (tmp*res.get(mainCol,pos[mainCol] ))); 96. } 97. } 98. if(a.rows()<101) System.out.println(Matrix.getTrasposition(res)); 99. return parallelMultiplier(Matrix.getTrasposition(res), b, publisher); 100. }

Codice 35 - Risoluzione sistemi lineari, implementazione parallela (caso scansione per colonne trasposte)

Nella soluzione finale si è cercato altresì di ridurre il più possibile la serial fraction inserendo varie migliorie tra cui evitare la memorizzazione della colonna FACTOR: i suoi valori sono prelevati dinamicamente ed esclusivamente dai vari worker in base alla co- lonna (riga) considerata attualmente e il suo aggiornamento viene operato solo alla fine dell’iterazione attuale del main loop.

Per quanto riguarda la risoluzione del sistema lineare x=A-1b finale la metodologia di parallelizzazione è banale e riprende quella usata nell’ultimo metodo: si dividono infatti le righe della matrice A-1 in NW partizioni ognuna conferita ad un worker differente che provvederà a realizzare il prodotto scalare con b. Ognuno di tali prodotti scalari produrrà in maniera totalmente indipendente un valore del vettore soluzione x. Lo skeleton utiliz- zato è sempre una MAP come possiamo notare dallo schema sottostante.

Nel paragrafo 5.3 Risoluzione di sistemi lineari analizzeremo i risultati sperimen- tali delle tre versioni algoritmiche cercando di dimostrare nei dati quando appena detto.

4.3.2 Parallelizzazione remota

La parallelizzazione remota non viene fornita per motivi che verranno spiegati nel paragrafo 5.3.2 Sperimentazione remota.

Figura 18 - Risoluzione di sistemi lineari, schema logico pattern utilizzati (soluzione sistema x=A-1b) 1. public static MyVector parallelMultiplier1(Matrix a, final MyVector b){

2. if(a.rows()!=a.columns()) throw new IllegalArgumentException("The matrix 3. must be square matrix");

4. if(b.size()!=a.rows()) throw new IllegalArgumentException("B vector 5. dimension wrong");

6. MyVector res=new MyVector(a.rows());

7. MuskelProcessor.range(0, a.columns())

8. .withContext(context)

9. .groupBySortedToList(in->in%no_thread)

10. .map(list->{

11. for(Integer row:list.toBlocking().first()){

12. res.set(row, Matrix.mulVector(a.getRow(row).element, b.element)); 13. } 14. return true; 15. },executor) 16. .toBlocking() 17. .first(); 18. return res; 19. }

4.4 K-nearest neighbor

4.4.1 Parallelizzazione locale

L’algoritmo Knn si presta naturalmente alla parallelizzazione per ovviare alla sua scarsa scalabilità a fronte di dataset sempre più grandi poiché quasi la totalità delle ope- razioni incluse nella procedura algoritmica vista nel paragrafo 3.4 possono essere svolte in maniera indipendente. Andiamo per ordine: anzitutto disponiamo di un training set più o meno vasto contenente un set di valori distribuiti in uno spazio multidimensionale ed eti- chettati con una classe fra quelle disponibili, esso viene salvato su un file che verrà acce- duto a runtime dalla procedura algoritmica. In tal contesto due sono gli approcci di paral- lelizzazione che possiamo seguire:

1. Possiamo effettuare il parsing del documento in anticipo, durante la fase di training, creando una collezione Java, ad esempio un ArrayList in RAM che conterrà in maniera fruibile, tutti i valori dello spazio vettoriale. A que- sto punto basta partizionare il dataset, così caricato, negli NW worker di- sponibili oppure valutarli come uno stream attraverso gli strumenti di Mu- skel 2 per realizzare la parallelizzazione. Sebbene questo approccio fornisce un più rapido accesso ai dati durante la classificazione di un nuovo punto, da parte di ciascun worker, risulta maggiormente prono a errori di overflow quando le dimensioni del training set sono eccessive.

2. Per questo motivo viene scelto il secondo approccio nel quale il compito di prelevare un set di punti dal training set (salvato su file), effettuare il par- sing e valutarli in relazione al punto da classificare viene fatto in maniera autonoma da ciascun worker. La validità di una simile metodologia è resa possibile dal fatto che il file in oggetto viene acceduto concorrentemente in sola lettura quindi, per le proprietà di Java, questa operazione può essere eseguita in parallelo conservando un ambiente thread-safe. Questo meto- do permette per giunta di ripartire anche la serial fraction relativa al cari- camento di ogni singola riga del documento in RAM (cosa che prima voleva

essere fatta in pre-computazione) fra NW worker differenti il ché introduce un notevole vantaggio nonostante il caricamento non avviene più in se- quenziale ma sulla base dell’indice di riga considerato (quindi più lento). Il discorso diventa un po’ diverso nella computazione remota ma l’affronteremo nel paragrafo successivo.

In linea a quanto detto possiamo generare due versioni parallele dell’applicativo considerato egualmente valide:

Figura 19- Knn, schema logico pattern utilizzati (caso MAP)

1. Nella prima usiamo ancora una volta un data parallel pattern ossia una MAP. Quello che viene fatto è suddividere egualmente gli indici delle righe del training set fra il numero di worker disponibili utilizzando il valido ope-

ratore fornito da Muskel 2: GROUPBYTOSORTEDTOLIST, quindi a ciascuno di essi verrà affidato il compito di caricare le righe della propria partizione, effettuare il parsing nella forma Pair<double[],String>, calcolare la di- stanza euclidea rispetto all’input e incapsulare queste informazioni in una nuova coppia (Pair<String,Double>) che consenta di manipolarle suc- cessivamente. Ogni worker genererà una lista di coppie siffatte che verran- no raccolte e valutate dal nodo master, quindi in sequenziale, alla ricerca dei k valori più vicini in base alla misura di distanza spaziale selezionata. A questo punto è molto facile trovare la classe più frequente ergo il risultato della classificazione.

Analizziamo lo schema logico nella Figura 19 di una tale implementazione nonché il rispettivo codice:

1. public String parallelLocalClassify(double[] x,

2. BiFunc- tion<double[],double[],Double> distance){

3. if(x.length!=dimInput){

4. throw new IllegalArgumentException("Dimension x mismatch!");

5. }

6. List<Knn.Pair<String, Double>> res=MuskelProcessor.range(0,recordSize)

7. .withContext(context)

8. .groupBySortedToList(input->Integer.toString(input%no_Thread))

9. .flatMap(input->

10. MuskelProcessor.fromIterable(input.toBlocking().first())

11. .map(innerInput -> {

12. String line = getLine(innerInput);

13. Pair<double[], String> record=getRecord(line) ;

14. return new Knn.Pair<String, Double>

15. (record.getValue(), 16. dis- tance.apply(record.getKey(),x)); 17. }) 18. 19. , executor)

20. .toSortedList()//l'ordinamento avviene rispetto alla distanza avendo modificato il compareTo di Pair

21. .flatMap(list->MuskelProcessor.fromIterable(list) 22. .take(k) 23. .groupBySortedToList(input->input.getKey()) 24. .map(t -> t.toBlocking().first()) 25. .reduce(Lists.<Knn.Pair<String, Double>>newArrayList(), 26. (u,v)-> v.size()>u.size()? v: u) 27. ) 28. .toBlocking() 29. .single();

30. return "The class is: "+res.get(0).getKey();

31. }

2. Altro approccio consiste nell’utilizzare una FARM piuttosto che una MAP all’apice del processo di parallelizzazione: avere quindi uno stream di interi che vengono dati in pasto ai vari worker dai quali otteniamo uno stream di coppie <A,B> nel formato descritto. Questo approccio è totalmente specu- lare al precedente e ottiene allo stesso modo ottimi risultati vicini a quelli ideali nel caso locale con la piccola differenza di non poter applicare alcuni miglioramenti: come ripartire, fra i vari worker, l’ordinamento delle coppie risultanti dalla FARM/MAP e il prelevamento dei primi K riducendo così ul- teriormente la serial fraction. Inoltre questo approccio non può assoluta- mente essere applicato al caso remoto poiché aumenterebbe in maniera esponenziale i pacchetti che devono essere scambiati fra master/slaver ot- tenendo un significativo calo delle prestazioni.

Figura 20- Knn, schema logico pattern utilizzati (caso FARM) 1. public String parallelLocalClassify1(double[] x,

2. BiFunc- tion<double[],double[],Double> distance){

3. if(x.length!=dimInput){

4. throw new IllegalArgumentException("Dimension x mismatch!");

5. }

6. List<Knn.Pair<String, Double>> res=MuskelProcessor.range(0,recordSize)

7. .withContext(context)

8. .map(innerInput -> {

9. String line = getLine(innerInput);

10. Pair<double[], String> record=getRecord(line);

11. return new Knn.Pair<String, Double>

12. (record.getValue(),

13. distance.apply(record.getKey(),x));

14. }, executor)

15. .toSortedList()//l'ordinamento avviene rispetto alla distanza avendo modificato il compareTo di Pair

16. .flatMap(list->MuskelProcessor.fromIterable(list) 17. .take(k) 18. .groupBySortedToList(input->input.getKey()) 19. .map(t -> t.toBlocking().first()) 20. .reduce(Lists.<Knn.Pair<String, Double>>newArrayList(), 21. (u,v)-> v.size()>u.size()? v: u) 22. ) 23. .toBlocking() 24. .single();

25. return "The class is: "+res.get(0).getKey();

26. }

Codice 38 - Knn, implementazione parallela (caso FARM)

4.4.2 Parallelizzazione remota

I metodi proposti scalano alla perfezione nel contesto locale come avremo modo di constatare nel paragrafo 5.4.1 Sperimentazione locale, mentre per la computazione remota un ulteriore miglioramento è reso necessario alla prima versione parallela (ab- biamo già detto che l’utilizzo di una FARM sarebbe inefficiente).

In questo caso abbiamo un nodo master il quale implementa la main MAP ed ese- gue tutte le operazioni contenute nel proprio EMITTER e COLLECTOR in sequenziale. Ai vari worker viene spedita la serializzazione dei vari task contenenti un set del range di in- teri [0, training set size]. Come possiamo notare, dal caso locale, le operazioni che il col- lector della MAP deve eseguire possono essere anche molto onerose quando il training set è grande poiché di fatto andiamo ad ordinare un gran numero di coppie, derivanti dai vari worker, grazie all’operatore toSortedList(). Siccome questa operazione viene fatta

Documenti correlati