Jump to content

User:Joal/Clickstream historical

From Wikitech

This page describes my experiments with the Clickstream dataset.

First, launch a spark-2 scala shell from any of the stat100[46] machine (with nice limitations for resource sharing):

spark2-shell --master yarn --conf spark.dynamicAllocation.maxExecutors=64 --driver-memory 4G --executor-memory 8G --executor-cores 2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar

Then do some work:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
import org.apache.spark.sql.Row

val wiki = Seq(
  "enwiki"
)

val dates = Seq(
  "2017-11",
  "2017-12",
  "2018-01",
  "2018-02",
  "2018-03",
  "2018-04",
  "2018-05",
  "2018-06",
  "2018-07",
  "2018-08",
  "2018-09",
  "2018-10",
  "2018-11",
  "2018-12",
  "2019-01",
  "2019-02",
  "2019-03",
  "2019-04"
)

val wiki_dates = wiki.flatMap(w => dates.map(d => (w, d))).par

val rawSchema = StructType(
  StructField("page_from", StringType, false) ::
  StructField("page_to", StringType, false) ::
  StructField("link_type", StringType, false) ::
  StructField("count", LongType, false) :: Nil
)

val augSchema = rawSchema.
  add(StructField("date", LongType, false)).
  add(StructField("wiki_db", LongType, false))

def readDf(wiki: String, dt: String): DataFrame = {
  spark.
    read.
    option("delimiter", "\t").
    schema(rawSchema).
    csv(s"/wmf/data/archive/clickstream/${dt}/clickstream-${wiki}-${dt}.tsv.gz").
    withColumn("date", lit(dt)).
    withColumn("wiki_db", lit(wiki))
}

val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], augSchema)
val clickstreams = wiki_dates.
  map(w_d => readDf(w_d._1, w_d._2)).
  fold(emptyDf)((d1,  d2) => d1.union(d2)).
  repartition(64).
  cache()

// Takes a long time as it has to read the full dataset (unzip is not parallel :(
clickstreams.groupBy("wiki_db", "date").count().sort("wiki_db", "date").show(100, false)

clickstreams.
  where("page_from IN ('Global_warming', 'Climate_change') AND page_to IN ('Global_warming', 'Climate_change')").
  sort("date", "page_from", "page_to").
  show(1000, false)

clickstreams.
  where("page_from IN ('Smog', 'Air_pollution') AND page_to IN ('Smog', 'Air_pollution')").
  sort("date", "page_from", "page_to").
  show(1000, false)