Data Platform/Data Lake/Edits/MediaWiki history dumps/Scala spark examples
Appearance
This page provides examples of using Scala Spark to process the Mediawiki history dumps.
Loading the data (unescape strings and split arrays)
// Update this value to you path
val mediawiki_history_path = "/PATH/TO/MEDIAWIKI/HISTORY/FILES"
import org.apache.spark.sql.types._
// Note: string unescaping and array conversion is done later
val mediawiki_history_schema = StructType(Seq(
StructField("wiki_db", StringType, nullable = false),
StructField("event_entity", StringType, nullable = false),
StructField("event_type", StringType, nullable = false),
StructField("event_timestamp", StringType, nullable = true),
StructField("event_comment_escaped", StringType, nullable = true),
StructField("event_user_id", LongType, nullable = true),
StructField("event_user_text_historical_escaped", StringType, nullable = true),
StructField("event_user_text_escaped", StringType, nullable = true),
StructField("event_user_blocks_historical_string", StringType, nullable = true),
StructField("event_user_blocks_string", StringType, nullable = true),
StructField("event_user_groups_historical_string", StringType, nullable = true),
StructField("event_user_groups_string", StringType, nullable = true),
StructField("event_user_is_bot_by_historical_string", StringType, nullable = true),
StructField("event_user_is_bot_by_string", StringType, nullable = true),
StructField("event_user_is_created_by_self", BooleanType, nullable = true),
StructField("event_user_is_created_by_system", BooleanType, nullable = true),
StructField("event_user_is_created_by_peer", BooleanType, nullable = true),
StructField("event_user_is_anonymous", BooleanType, nullable = true),
StructField("event_user_registration_timestamp", StringType, nullable = true),
StructField("event_user_creation_timestamp", StringType, nullable = true),
StructField("event_user_first_edit_timestamp", StringType, nullable = true),
StructField("event_user_revision_count", LongType, nullable = true),
StructField("event_user_seconds_since_previous_revision", LongType, nullable = true),
StructField("page_id", LongType, nullable = true),
StructField("page_title_historical_escaped", StringType, nullable = true),
StructField("page_title_escaped", StringType, nullable = true),
StructField("page_namespace_historical", IntegerType, nullable = true),
StructField("page_namespace_is_content_historical", BooleanType, nullable = true),
StructField("page_namespace", IntegerType, nullable = true),
StructField("page_namespace_is_content", BooleanType, nullable = true),
StructField("page_is_redirect", BooleanType, nullable = true),
StructField("page_is_deleted", BooleanType, nullable = true),
StructField("page_creation_timestamp", StringType, nullable = true),
StructField("page_first_edit_timestamp", StringType, nullable = true),
StructField("page_revision_count", LongType, nullable = true),
StructField("page_seconds_since_previous_revision", LongType, nullable = true),
StructField("user_id", LongType, nullable = true),
StructField("user_text_historical_escaped", StringType, nullable = true),
StructField("user_text_escaped", StringType, nullable = true),
StructField("user_blocks_historical_string", StringType, nullable = true),
StructField("user_blocks_string", StringType, nullable = true),
StructField("user_groups_historical_string", StringType, nullable = true),
StructField("user_groups_string", StringType, nullable = true),
StructField("user_is_bot_by_historical_string", StringType, nullable = true),
StructField("user_is_bot_by_string", StringType, nullable = true),
StructField("user_is_created_by_self", BooleanType, nullable = true),
StructField("user_is_created_by_system", BooleanType, nullable = true),
StructField("user_is_created_by_peer", BooleanType, nullable = true),
StructField("user_is_anonymous", BooleanType, nullable = true),
StructField("user_registration_timestamp", StringType, nullable = true),
StructField("user_creation_timestamp", StringType, nullable = true),
StructField("user_first_edit_timestamp", StringType, nullable = true),
StructField("revision_id", LongType, nullable = true),
StructField("revision_parent_id", LongType, nullable = true),
StructField("revision_minor_edit", BooleanType, nullable = true),
StructField("revision_deleted_parts_string", StringType, nullable = true),
StructField("revision_deleted_parts_are_suppressed", BooleanType, nullable = true),
StructField("revision_text_bytes", LongType, nullable = true),
StructField("revision_text_bytes_diff", LongType, nullable = true),
StructField("revision_text_sha1", StringType, nullable = true),
StructField("revision_content_model", StringType, nullable = true),
StructField("revision_content_format", StringType, nullable = true),
StructField("revision_is_deleted_by_page_deletion", BooleanType, nullable = true),
StructField("revision_deleted_by_page_deletion_timestamp", StringType, nullable = true),
StructField("revision_is_identity_reverted", BooleanType, nullable = true),
StructField("revision_first_identity_reverting_revision_id", LongType, nullable = true),
StructField("revision_seconds_to_identity_revert", LongType, nullable = true),
StructField("revision_is_identity_revert", BooleanType, nullable = true),
StructField("revision_is_from_before_page_creation", BooleanType, nullable = true),
StructField("revision_tags_string", StringType, nullable = true)
))
// Note: It's important to set .option("quote", "") to prevent spark to automaticallu use double-quotes to quote text
val mediawiki_history_raw = spark.read.option("delimiter", "\t").option("quote", "").schema(mediawiki_history_schema).csv(mediawiki_history_path)
// Unescaping and array-splitting UDFs
val unescape = (str: String) => if (str == null) null else str.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t")
// The comma splitter applies a negative lookahead for \ to prevent splitting escaped commas
val commaSplitter = "(?<!\\\\),".r
val toArray = (str: String) => if (str == null) Array.empty[String] else commaSplitter.split(unescape(str)).map(_.trim.replace("\\,", ","))
spark.udf.register("unescape", unescape)
spark.udf.register("to_array", toArray)
// Update dataframe using unescaping and array-splitting UDFs
val mediawiki_history = mediawiki_history_raw.selectExpr(
"wiki_db",
"event_entity",
"event_type",
"event_timestamp",
"unescape(event_comment_escaped) AS event_comment",
"event_user_id",
"unescape(event_user_text_historical_escaped) AS event_user_text_historical",
"unescape(event_user_text_escaped) AS event_user_text",
"to_array(event_user_blocks_historical_string) AS event_user_blocks_historical",
"to_array(event_user_blocks_string) AS event_user_blocks",
"to_array(event_user_groups_historical_string) AS event_user_groups_historical",
"to_array(event_user_groups_string) AS event_user_groups",
"to_array(event_user_is_bot_by_historical_string) AS event_user_is_bot_by_historical",
"to_array(event_user_is_bot_by_string) AS event_user_is_bot_by",
"event_user_is_created_by_self",
"event_user_is_created_by_system",
"event_user_is_created_by_peer",
"event_user_is_anonymous",
"event_user_registration_timestamp",
"event_user_creation_timestamp",
"event_user_first_edit_timestamp",
"event_user_revision_count",
"event_user_seconds_since_previous_revision",
"page_id",
"unescape(page_title_historical_escaped) AS page_title_historical",
"unescape(page_title_escaped) AS page_title",
"page_namespace_historical",
"page_namespace_is_content_historical",
"page_namespace",
"page_namespace_is_content",
"page_is_redirect",
"page_is_deleted",
"page_creation_timestamp",
"page_first_edit_timestamp",
"page_revision_count",
"page_seconds_since_previous_revision",
"user_id",
"unescape(user_text_historical_escaped) AS user_text_historical",
"unescape(user_text_escaped) AS user_text",
"to_array(user_blocks_historical_string) AS user_blocks_historical",
"to_array(user_blocks_string) AS user_blocks",
"to_array(user_groups_historical_string) AS user_groups_historical",
"to_array(user_groups_string) AS user_groups",
"to_array(user_is_bot_by_historical_string) AS user_is_bot_by_historical",
"to_array(user_is_bot_by_string) AS user_is_bot_by",
"user_is_created_by_self",
"user_is_created_by_system",
"user_is_created_by_peer",
"user_is_anonymous",
"user_registration_timestamp",
"user_creation_timestamp",
"user_first_edit_timestamp",
"revision_id",
"revision_parent_id",
"revision_minor_edit",
"to_array(revision_deleted_parts_string) AS revision_deleted_parts",
"revision_deleted_parts_are_suppressed",
"revision_text_bytes",
"revision_text_bytes_diff",
"revision_text_sha1",
"revision_content_model",
"revision_content_format",
"revision_is_deleted_by_page_deletion",
"revision_deleted_by_page_deletion_timestamp",
"revision_is_identity_reverted",
"revision_first_identity_reverting_revision_id",
"revision_seconds_to_identity_revert",
"revision_is_identity_revert",
"revision_is_from_before_page_creation",
"to_array(revision_tags_string) AS revision_tags"
)
// Have fun with data :)
// Top 10 projects in number of revisions for month 2019-12
mediawiki_history.
where("event_entity = 'revision' and event_type = 'create'").
selectExpr("wiki_db", "SUBSTR(event_timestamp, 0, 7) as month").
where("month = '2019-12'").
groupBy("wiki_db", "month").
agg(count(lit(1L)).as("revision_count")).
sort(desc("revision_count")).
show(10, false)
/*
+--------------+-------+--------------+
|wiki_db |month |revision_count|
+--------------+-------+--------------+
|wikidatawiki |2019-12|21511762 |
|commonswiki |2019-12|6046688 |
|enwiki |2019-12|4756250 |
|arwiki |2019-12|1599840 |
|frwiki |2019-12|903838 |
|dewiki |2019-12|795638 |
|eswiki |2019-12|710516 |
|viwiki |2019-12|679525 |
|ruwiki |2019-12|652051 |
|itwiki |2019-12|563592 |
+--------------+-------+--------------+
only showing top 10 rows
*/