User:Joal/Clickstream historical
Appearance
This page describes my experiments with the Clickstream dataset.
First, launch a spark-2 scala shell from any of the stat100[46] machine (with nice limitations for resource sharing):
spark2-shell --master yarn --conf spark.dynamicAllocation.maxExecutors=64 --driver-memory 4G --executor-memory 8G --executor-cores 2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar
Then do some work:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
import org.apache.spark.sql.Row
val wiki = Seq(
"enwiki"
)
val dates = Seq(
"2017-11",
"2017-12",
"2018-01",
"2018-02",
"2018-03",
"2018-04",
"2018-05",
"2018-06",
"2018-07",
"2018-08",
"2018-09",
"2018-10",
"2018-11",
"2018-12",
"2019-01",
"2019-02",
"2019-03",
"2019-04"
)
val wiki_dates = wiki.flatMap(w => dates.map(d => (w, d))).par
val rawSchema = StructType(
StructField("page_from", StringType, false) ::
StructField("page_to", StringType, false) ::
StructField("link_type", StringType, false) ::
StructField("count", LongType, false) :: Nil
)
val augSchema = rawSchema.
add(StructField("date", LongType, false)).
add(StructField("wiki_db", LongType, false))
def readDf(wiki: String, dt: String): DataFrame = {
spark.
read.
option("delimiter", "\t").
schema(rawSchema).
csv(s"/wmf/data/archive/clickstream/${dt}/clickstream-${wiki}-${dt}.tsv.gz").
withColumn("date", lit(dt)).
withColumn("wiki_db", lit(wiki))
}
val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], augSchema)
val clickstreams = wiki_dates.
map(w_d => readDf(w_d._1, w_d._2)).
fold(emptyDf)((d1, d2) => d1.union(d2)).
repartition(64).
cache()
// Takes a long time as it has to read the full dataset (unzip is not parallel :(
clickstreams.groupBy("wiki_db", "date").count().sort("wiki_db", "date").show(100, false)
clickstreams.
where("page_from IN ('Global_warming', 'Climate_change') AND page_to IN ('Global_warming', 'Climate_change')").
sort("date", "page_from", "page_to").
show(1000, false)
clickstreams.
where("page_from IN ('Smog', 'Air_pollution') AND page_to IN ('Smog', 'Air_pollution')").
sort("date", "page_from", "page_to").
show(1000, false)