neo4j-spark-connector: load del grafo

Pubblicato il Pubblicato in Blog

Come utilizzare il neo4j-spark-connector per elaborare i dati di un grafo in modo distribuito.

Da agosto 2016 è dispobibile la versione 2.0 del connettore sperimentale che permette ad Apache Spark di interagire con il database a grafo Neo4j (neo4j-spark-connector). Partendo dall’esempio proposto dal suo creatore, Michael Hunger, vedremo come caricare ed elaborare i dati presenti nel grafo in un cluster Spark. Il grafo contiene delle persone che hanno dei figli; tramite Spark calcoleremo l’età media dei padri e dei figli.

I componenti

Per il nostro esempio utilizzeremo 4 macchine Ubuntu 15 Server, così configurate

  • graphmachine: contiene Neo4j server e il grafo
  • skmaster: lo Spark master
  • skworker1: uno Spark worker/executor
  • skworker2: uno Spark worker/executor

Spark 2.0 è configurato come cluster standalone, senza appoggiarsi ad Hadoop. Utilizzeremo la spark-shell per l’esempio e i risultati verranno visualizzati e non salvati.

La graphmachine è una semplice installazione Neo4j 3.0 in cui sono stati abilitati gli accessi HTTP e BOLT anche per IP esterni.

Il grafo

Il grafo è molto semplice, composto da un solo tipo di nodo, “Person” e un solo tipo di relazione “:PARENT_OF“. Ogni nodo contiene la property “age” che indica l’età della persona.

Creiamo un centinaio di migliaia di nodi:

FOREACH (x in range(1,100000) | CREATE (:Person {name:"name_"+x, age: x%100}));

A questi nodi aggiungiamo altrettante relazioni casuali

UNWIND range(1,100000) as x
MATCH (n),(m) WHERE id(n) = x AND id(m)=toInt(rand()*100000)
CREATE (n)-[:PARENT_OF]->(m);

Il grafo è pronto per venire analizzato.

Compilare il neo4j-spark-connector

I sorgenti del connettore sono scaricabili da GitHub.

Una volta scaricati, vanno compilati con JDK 7 (minimo) e Maven 3. Basta eseguire

mvn clean package

Essendo scritto prevalenemente in Scala, la fase di download delle dipendenze è molto corposa. Anche la fase di test è molto pesante e si può evitarla se si è scaricata una versione stabile

mvn clean package -Dmaven.test.skip=true

A questo punto, se tutto va liscio, nella cartella target ci sarà il file neo4j-spark-connector-full-2.0.0-M2.jar. Nella directory ne vengono creati altri, ma noi useremo questo perchè è completo del driver neo4j così da non doverlo portare come dipendenza separata. Il connettore infatti sfrutta il protocollo BOLT per comunicare con il database.

Caricare il grafo in Spark

Per eseguire l’elaborazione verrà usata la shell di Spark, che risponde a comandi Scala. La shell deve essere collegata al cluster e portare con sè il connettore. All’avvio della shell si impostano anche le direttive d’accesso a Neo4j.

$SPARK_HOME/bin/spark-shell 
--master spark://skmaster:7077 
--jars neo4j-spark-connector-full-2.0.0-M2.jar 
--conf spark.neo4j.bolt.url=bolt://neo4j@graphmachine 
--conf spark.neo4j.bolt.password=<neo4j-password>

Se il caricamento o la connessione per qualche motivo non funziona lo si saprà solo dopo i primi comandi espliciti che utilizzano il connettore. Prima di proseguire verifichiamo che la shell sia operativa su 3 executors, (supponendo che sia avviata sulla macchina skmaster) collegandoci a http://skmaster:4040/

Spark shell all'avvio
Spark shell all’avvio

Iniziamo importando il package neo4j

import org.neo4j.spark._

Questo fa sì che tutti i prossimi comandi siano più snelli. Importiamo ora il grafo attraverso una query cypher. In questo articolo viene presa in esame una delle strutture dati più semplici tra quelle messe a disposizione (DataFrames, GraphX, ecc..): Neo4jTupleRDD. Si tratta di un RDD formato da tuple composte dal return della query cypher. Ogni elemento ha due componenti, il primo (_1) che riporta il nome scritto nella query e il secondo (_2) che porta il valore. Ecco il caricamento:

var graph = Neo4jTupleRDD(sc,"MATCH (father)-[r:PARENT_OF]->(child) return father.age,child.age",Seq.empty)

Useremo una semplice count per verificare che il caricamento sia corretto e per mettere in moto l’esecuzione di un task

graph.count

Il risultato è “res0: Long = 99999” in quanto le relazioni sono una in meno rispetto ai nodi. L’RDD è carico e pronto per venire usato. Si può notare come l’action count abbia prodotto un task sul worker 1.

Spark shell 1 task
Il task eseguito

Per vedere la struttura del record basta dare il comando

graph.first

che sarà una cosa simile a “res1: Seq[(String, AnyRef)] = ArrayBuffer((father.age,68), (child.age,2))“, ossia una sequenza lunga 2 di coppie String-AnyRef. Da qui partiamo per creare la mappatura.

Logica elaborativa

L’elaborazione si presenta in più fasi. Nella pagina di monitor della shell è possibile notare come le action si traducano in processi distribuiti sugli executors.

L’obiettivo è ottenere l’età media di padri e figli, quindi le chiavi finali saranno apputo “father.age” e “child.age“. La fase di mapping creerà queste chiavi con associato il valore “age” del nodo estratto dalla query. Mentre la funzione reduce di occuperà di sommare le età. Una volta ridotte le chiave basterà dividere per il count iniziale (99.999).

var map = graph.flatMap(p => { Seq( 
    ( p(0)._1, (""+p(0)._2).toInt ) , 
    ( p(1)._1, (""+p(1)._2 ).toInt) ) 
} )

In questo modo viene riutilizzato il nome del return (_1) e convertito in intero il valore (_2). Se eseguiamo il count sulla mappatura

map.count

otteniamo il doppio (199998) degli elementi che ci sono nell’RDD delle tuple. A questo punto applichiamo la logica di riduzione sommando le età per chiave.

var red = map.reduceByKey( (a,b) => a + b )

ora il count di red darà 2 e le si può vedere con l’action take

red.take(2)
//res3: Array[(String, Int)] = Array((child.age,4938414), (father.age,4949999))

Per completare l’opera, creiamo le medie semplicemente dividento le somme per il count iniziale

var res = red.map( m => ( m._1 , m._2 / 99999 ) )
res.take(2)
//res5: Array[(String, Int)] = Array((child.age,49), (father.age,49))

Ovviamente essendo dati random i risultati possono essere diversi.

Conclusioni

Come abbiamo visto è possibile caricare dati relazionati in “tabelle” (o strutture più sofisticate non trattate in questo articolo). La natura distribuita e in-memory di Spark permette di applicare algoritmi di map reducing, filtering e analisi in modo veloce e potente. Il neo4j-spark-connector non si limita a caricare il grafo su Spark ma anche di salvare dei risultati in Neo4j, così da poter eseguire graph analysis su big data presenti su Hadoop.

Lascia una risposta

L'indirizzo email non verrà pubblicato. I campi obbligatori sono contrassegnati *