User:Joal/Wikidata Graph
Appearance
This page describes my experiments and finding in playing with Wikidata Json Dumps in spark-graphframes.
Prepare Data (2017-04-03 JSON dump example)
- Build refinery so that it contains the needed patch (https://gerrit.wikimedia.org/r/#/c/346726/) (assuming you have a
~/code/refinery-source
folder).
cd ~/code/refinery-source && git checkout -f master && git pull
git fetch https://gerrit.wikimedia.org/r/analytics/refinery/source refs/changes/26/346726/4 && git checkout FETCH_HEAD
mvn -pl refinery-job -am clean package
- Copy dumps to hdfs (from
stat1002
, it has the proper/mnt/data
folder) - Be carefull to use bz2 and not gz, as gz is not parallelizable at decompression stage.
hdfs dfs -put /mnt/data/xmldatadumps/public/wikidatawiki/entities/20170403/wikidata-20170403-all.json.bz2 wikidata/json
- Convert from JSON to Parquet
spark-submit --master yarn --conf spark.dynamicAllocation.maxExecutors=32 --executor-memory 16G --driver-memory 4G --executor-cores 2 --class org.wikimedia.analytics.refinery.job.wikidata.graph.WikidataParquet ~/code/refinery-source/refinery-job/target/refinery-job-0.0.45-SNAPSHOT.jar -i wikidata/json -o wikidata/parquet
Interesting info:
- Original JSON dump size: 7.1Gb (bz2)
- Conversion job time: ~25min
- Converted Parquet size: 8.2Gb (gz)
Playing with GraphFrames - v1
Launch spark-shell with needed jar and correct executor settings:
spark-shell --master yarn --conf spark.dynamicAllocation.maxExecutors=32 --executor-memory 16G --driver-memory 4G --executor-cores 2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar
Then create a graph out of claims only (items and properties are vertices, and claims are edges).
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, LongType, StructField, StructType}
import org.apache.spark.sql.Row
import org.graphframes.GraphFrame
def makeGraph(wdRdd: RDD[Row]): GraphFrame = {
val verticesRdd = wdRdd.
map(_.getString(0)). // entity Id - Now named key
sortBy(k => k). // In case of re-zipping, ensure same order
zipWithIndex() // RDD[Key, Id]
val verticesDF = sqlContext.createDataFrame(
verticesRdd.map { case (key, id) => Row.fromTuple((id, key))},
StructType(Seq(StructField("id", LongType, nullable = false), StructField("key", StringType, nullable = false))))
val edgesRdd = wdRdd.
flatMap(e => e.getSeq[Row](5).map(c => (e.getString(0), c.getStruct(1).getString(1), Row(c.getStruct(1).getString(2), c.getStruct(1).getStruct(3))))). // (fromKey, toKey, payload)
distinct.
keyBy(_._1). // RDD[(fromKey, (fromKey, toKey, payload))]
join(verticesRdd.keyBy(_._1)). // RDD[(fromKey, ((fromKey, toKey, payload), (key-fromKey, id-fromKey)))]
map(e => (e._2._1._2, (e._2._2._2, e._2._1._3))). // RDD[(toKey, (id-fromKey, payload))]
join(verticesRdd.keyBy(_._1)). //RDD[(toKey, ((id-fromKey, payload), (key-toKey, id-toKey)))]
map(e => (e._2._1._1, e._2._2._2, e._2._1._2)) // RDD[(id-fromKey, id-toKey, payload)]
val edgesDF = sqlContext.createDataFrame(edgesRdd.map(Row.fromTuple(_)),
StructType(Seq(
StructField("src", LongType, nullable = false),
StructField("dst", LongType, nullable = false),
StructField("data", StructType(Seq(
StructField("dataType", StringType, nullable = false),
StructField("dataValue", StructType(Seq(
StructField("typ", StringType, nullable = false),
StructField("value", StringType, nullable = false)
)), nullable = false)
)), nullable = false)
))
)
GraphFrame(verticesDF, edgesDF)
}
val wd = sqlContext.read.parquet("/user/joal/wmf/data/wmf/mediawiki/wikidata_parquet/20180108")
val g = makeGraph(wd.rdd)
g.cache()
And finally get some results:
// Items + properties
g.vertices.count()
//res2: Long = 25479149
// Items only
g.vertices.filter("key like 'Q%'").count()
// res3: Long = 25475798
// Properties only
g.vertices.filter("key like 'P%'").count()
// res4: Long = 3351
// Claims
g.edges.count()
// res1: Long = 128184373
Playing with GraphFrames - v2
At Wikimedia Hackathon 2017 in Vienna I met with and we tried to replicate a "big" SparQL query in Spark.
We didn't succeed but were not far. This section is the final code allowing for the thing to work.
First, launching a spark shell:
spark-shell --master yarn --executor-memory 8G --driver-memory 4G --executor-cores 1 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar
Then, use then :paste
trick (end with CTRL+D)
with:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.{StringType, LongType, StructField, StructType}
import org.graphframes.GraphFrame
import sqlContext.implicits._
/**
* Graph creation functions
* The graph uses mainSnak properties only, and converts the ones
* pointing to other wikidata items into straight links.
*/
val idPattern = ".*\"id\":\"([QP]\\d+)\".*".r
def extractClaimData(claim: Row, fromId: Long): Option[(String, (Long, String, String, String, String))] = {
val mainSnak = claim.getStruct(1)
val claimRank = claim.getString(3)
val mainSnakType = mainSnak.getString(0)
val mainSnakProp = mainSnak.getString(1)
val mainSnakDataType = mainSnak.getString(2)
val mainSnakDataV = mainSnak.getStruct(3) // getting the value, not the typ
val mainSnakDataValue = if (mainSnakDataV != null) mainSnakDataV.getString(1) else null
if (mainSnakDataType != null) {
if (mainSnakDataType.startsWith("wikibase-") && (mainSnakDataValue != null)) {
mainSnakDataValue match {
case idPattern(toId) => Some(toId, (fromId, mainSnakProp, mainSnakType, mainSnakDataType, mainSnakDataValue))
case _ => None
}
} else {
Some(mainSnakProp, (fromId, mainSnakProp, mainSnakType, mainSnakDataType, mainSnakDataValue))
}
} else
None
}
def makeGraph(wikidataDF: DataFrame): GraphFrame = {
val verticesRdd: RDD[((String, String, String, Seq[Row]), Long)] = wikidataDF
.rdd
.map(row =>
(
row.getString(0), // wdid
row.getString(1), // typ
row.getMap[String, String](2).getOrElse("en", "__EMPTY__"), // label
row.getSeq[Row](5) // claims
))
.zipWithUniqueId // RDD[((wdid, typ, label, claims), id)]
val verticesDF = sqlContext.createDataFrame(
verticesRdd.map { case ((wdid, typ, label, _), id) => Row.fromTuple((id, wdid, typ, label))},
StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("wdid", StringType, nullable = false),
StructField("typ", StringType, nullable = false),
StructField("label", StringType, nullable = false)
))
)
val edgesRdd: RDD[(Long, Long, String, String, String, String)] = verticesRdd
.flatMap(e => {
val ((_, _, _, claims), fromId) = e
claims.flatMap(claim => extractClaimData(claim, fromId)) // (toWdId, (fromId, Prop, type, dataType, dataValue))
})
.join(verticesRdd.map(e => (e._1._1, e._2))) // (toWdId, ((fromId, Prop, type, dataType, dataValue), (toId)))
.map(e => (e._2._1._1, e._2._2, e._2._1._2, e._2._1._3, e._2._1._4, e._2._1._5)) // RDD[(fromId, toId, prop, type, dataType, dataValue))]
val edgesDF = sqlContext.createDataFrame(edgesRdd.map(Row.fromTuple(_)),
StructType(Seq(
StructField("src", LongType, nullable = false),
StructField("dst", LongType, nullable = false),
StructField("prop", StringType, nullable = false),
StructField("typ", StringType, nullable = false),
StructField("dataType", StringType, nullable = false),
StructField("dataValue", StringType, nullable = true)
))
)
GraphFrame(verticesDF, edgesDF)
}
/**
* Read already converted data and build the graph
* This takes a long time because of graph creation.
* Once graph is loaded, computation is fast(-ish)
*/
val ts1 = System.currentTimeMillis
val df = sqlContext.read.parquet("/user/joal/wmf/data/wmf/mediawiki/wikidata_parquet/20180108")
val g = makeGraph(df)
g.cache()
val nbEdges = g.edges.count()
val nbVertices = g.vertices.count()
val ts2 = System.currentTimeMillis
println(s"Edges: ${nbEdges} / Vertices: ${nbVertices}")
println(s"Duration (ms): ${ts2-ts1}")
// Edges: 142588525 / Vertices: 25479149
//
// Duration (ms): 251281 -- About 4 minutes ... a bit long
/**
* Compute the SparQL equivalent for:
* Find all humans having at least one occupation and a manner-of-death.
* Compute out of those the total count by occupation, and the count
* by occupation for only maner-of-death = homicide.
* Then join both and find the rate of homicide over the total number
* of different manners-of-death per occupation.
*/
val ts3 = System.currentTimeMillis
val motifs = g.find("(a)-[e]->(b); (a)-[e2]->(c); (a)-[e3]->(d)")
.filter("e3.prop = 'P1196'") // manner of death
.filter("b.wdid = 'Q5'") // human
.filter("e.prop = 'P31'") // instanceOf
.filter("e2.prop = 'P106'") // occupation
.distinct()
motifs.cache()
val occups = motifs.groupBy("c.label").count().withColumnRenamed("count", "total")
val homicideOccups = motifs.filter("d.wdid = 'Q149086'").groupBy("c.label").count().withColumnRenamed("count", "homicides")
val occupsWithRate = occups.join(homicideOccups, Seq("label"), "leftouter")
.filter("total > 5") // remove occupations with too-small number of manners-of-death
.selectExpr("label", "homicides", "total", "(COALESCE(homicides, 0) / CAST(total AS DOUBLE)) AS rate") // compute rate
.orderBy($"rate".desc, $"total".desc) // Order
occupsWithRate.take(20).foreach(println)
val ts4 = System.currentTimeMillis
println(s"Duration (ms): ${ts4-ts3}")
// ["drug lord",4,6,0.6666666666666666]
// ["police officer",43,83,0.5180722891566265]
// ["civil rights advocate",4,8,0.5]
// ["humanitarian",3,6,0.5]
// ["gangster",12,25,0.48]
// ["drug trafficker",22,47,0.46808510638297873]
// ["outlaw",5,11,0.45454545454545453]
// ["missionary",51,114,0.4473684210526316]
// ["sovereign",15,35,0.42857142857142855]
// ["rapper",24,59,0.4067796610169492]
// ["orator",3,8,0.375]
// ["prosaist",4,11,0.36363636363636365]
// ["prostitute",12,34,0.35294117647058826]
// ["human rights activist",24,69,0.34782608695652173]
// ["monarch",10,29,0.3448275862068966]
// ["prince",3,9,0.3333333333333333]
// ["faculty",3,9,0.3333333333333333]
// ["environmentalist",3,9,0.3333333333333333]
// ["public figure",2,6,0.3333333333333333]
// ["student",11,35,0.3142857142857143]
//
// Duration (ms): 78268 -- 1min20 - better
/**
* Reuse previous motif with a different manner-of-death: Suicide
*/
val ts5 = System.currentTimeMillis
val suicideOccups = motifs.filter("d.wdid = 'Q10737'").groupBy("c.label").count().withColumnRenamed("count", "suicides")
val occupsWithSuicideRate = occups.join(suicideOccups, Seq("label"), "leftouter")
.filter("total > 5") // remove occupations with too-small number of manners-of-death
.selectExpr("label", "suicides", "total", "(COALESCE(suicides, 0) / CAST(total AS DOUBLE)) AS rate") // compute rate
.orderBy($"rate".desc, $"total".desc) // Order
occupsWithSuicideRate.take(20).foreach(println)
val ts6 = System.currentTimeMillis
println(s"Duration (ms): ${ts6-ts5}")
// ["member of parliament",11,13,0.8461538461538461]
// ["spree killer",23,29,0.7931034482758621]
// ["AV idol",6,8,0.75]
// ["internist",5,8,0.625]
// ["samurai",59,101,0.5841584158415841]
// ["bushi",22,42,0.5238095238095238]
// ["pundit",3,6,0.5]
// ["orientalist",4,10,0.4]
// ["psychanalyst",5,13,0.38461538461538464]
// ["warrior",3,8,0.375]
// ["orator",3,8,0.375]
// ["glamor model",4,11,0.36363636363636365]
// ["executioner",4,11,0.36363636363636365]
// ["neurologist",14,40,0.35]
// ["topologist",4,12,0.3333333333333333]
// ["sport shooter",3,9,0.3333333333333333]
// ["vedette",2,6,0.3333333333333333]
// ["cryptographer",2,6,0.3333333333333333]
// ["curler",2,6,0.3333333333333333]
// ["freestyle skier",2,6,0.3333333333333333]
//
// Duration (ms): 4929 -- 5 Seconds - Caching is awesome :)