Jump to content

Data Platform/Data Lake/Edits/MediaWiki history dumps/Python spark examples

From Wikitech

This page provides examples of using Python Spark to process the Mediawiki history dumps.

Loading the data (unescape strings and split arrays)

import re
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, IntegerType, ArrayType
from pyspark.sql.functions import count, lit, desc

# Update this value to you path
mediawiki_history_path = "/PATH/TO/MEDIAWIKI/HISTORY/FILES"

# Note: string unescaping and array conversion is done later
mediawiki_history_schema = StructType([

    StructField("wiki_db", StringType(), nullable = False),
    StructField("event_entity", StringType(), nullable = False),
    StructField("event_type", StringType(), nullable = False),
    StructField("event_timestamp", StringType(), nullable = True),
    StructField("event_comment_escaped", StringType(), nullable = True),

    StructField("event_user_id", LongType(), nullable = True),
    StructField("event_user_text_historical_escaped", StringType(), nullable = True),
    StructField("event_user_text_escaped", StringType(), nullable = True),
    StructField("event_user_blocks_historical_string", StringType(), nullable = True),
    StructField("event_user_blocks_string", StringType(), nullable = True),
    StructField("event_user_groups_historical_string", StringType(), nullable = True),
    StructField("event_user_groups_string", StringType(), nullable = True),
    StructField("event_user_is_bot_by_historical_string", StringType(), nullable = True),
    StructField("event_user_is_bot_by_string", StringType(), nullable = True),
    StructField("event_user_is_created_by_self", BooleanType(), nullable = True),
    StructField("event_user_is_created_by_system", BooleanType(), nullable = True),
    StructField("event_user_is_created_by_peer", BooleanType(), nullable = True),
    StructField("event_user_is_anonymous", BooleanType(), nullable = True),
    StructField("event_user_registration_timestamp", StringType(), nullable = True),
    StructField("event_user_creation_timestamp", StringType(), nullable = True),
    StructField("event_user_first_edit_timestamp", StringType(), nullable = True),
    StructField("event_user_revision_count", LongType(), nullable = True),
    StructField("event_user_seconds_since_previous_revision", LongType(), nullable = True),

    StructField("page_id", LongType(), nullable = True),
    StructField("page_title_historical_escaped", StringType(), nullable = True),
    StructField("page_title_escaped", StringType(), nullable = True),
    StructField("page_namespace_historical", IntegerType(), nullable = True),
    StructField("page_namespace_is_content_historical", BooleanType(), nullable = True),
    StructField("page_namespace", IntegerType(), nullable = True),
    StructField("page_namespace_is_content", BooleanType(), nullable = True),
    StructField("page_is_redirect", BooleanType(), nullable = True),
    StructField("page_is_deleted", BooleanType(), nullable = True),
    StructField("page_creation_timestamp", StringType(), nullable = True),
    StructField("page_first_edit_timestamp", StringType(), nullable = True),
    StructField("page_revision_count", LongType(), nullable = True),
    StructField("page_seconds_since_previous_revision", LongType(), nullable = True),

    StructField("user_id", LongType(), nullable = True),
    StructField("user_text_historical_escaped",  StringType(), nullable = True),
    StructField("user_text_escaped", StringType(), nullable = True),
    StructField("user_blocks_historical_string", StringType(), nullable = True),
    StructField("user_blocks_string", StringType(), nullable = True),
    StructField("user_groups_historical_string", StringType(), nullable = True),
    StructField("user_groups_string", StringType(), nullable = True),
    StructField("user_is_bot_by_historical_string", StringType(), nullable = True),
    StructField("user_is_bot_by_string", StringType(), nullable = True),
    StructField("user_is_created_by_self", BooleanType(), nullable = True),
    StructField("user_is_created_by_system", BooleanType(), nullable = True),
    StructField("user_is_created_by_peer", BooleanType(), nullable = True),
    StructField("user_is_anonymous", BooleanType(), nullable = True),
    StructField("user_registration_timestamp", StringType(), nullable = True),
    StructField("user_creation_timestamp", StringType(), nullable = True),
    StructField("user_first_edit_timestamp", StringType(), nullable = True),

    StructField("revision_id", LongType(), nullable = True),
    StructField("revision_parent_id", LongType(), nullable = True),
    StructField("revision_minor_edit", BooleanType(), nullable = True),
    StructField("revision_deleted_parts_string", StringType(), nullable = True),
    StructField("revision_deleted_parts_are_suppressed", BooleanType(), nullable = True),
    StructField("revision_text_bytes", LongType(), nullable = True),
    StructField("revision_text_bytes_diff", LongType(), nullable = True),
    StructField("revision_text_sha1", StringType(), nullable = True),
    StructField("revision_content_model", StringType(), nullable = True),
    StructField("revision_content_format", StringType(), nullable = True),
    StructField("revision_is_deleted_by_page_deletion", BooleanType(), nullable = True),
    StructField("revision_deleted_by_page_deletion_timestamp", StringType(), nullable = True),
    StructField("revision_is_identity_reverted", BooleanType(), nullable = True),
    StructField("revision_first_identity_reverting_revision_id", LongType(), nullable = True),
    StructField("revision_seconds_to_identity_revert", LongType(), nullable = True),
    StructField("revision_is_identity_revert", BooleanType(), nullable = True),
    StructField("revision_is_from_before_page_creation", BooleanType(), nullable = True),
    StructField("revision_tags_string", StringType(), nullable = True)
])

# Note: It's important to set .option("quote", "") to prevent spark to automaticallu use double-quotes to quote text
mediawiki_history_raw = spark.read.option("delimiter", "\t").option("quote", "").schema(mediawiki_history_schema).csv(mediawiki_history_path)

# Unescaping and array-splitting UDFs
def unescape(str):
    if (str is None):
        return None
    else:
        return str.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t")
# The comma splitter applies a negative lookahead for \ to prevent splitting escaped commas
def toArray(str):
    if (str is None):
        return []
    else:
        return [s.strip().replace("\\,", ",") for s in re.split("(?<!\\\\),", unescape(str))]

spark.udf.register("unescape", unescape, StringType())
spark.udf.register("to_array", toArray, ArrayType(StringType(), False))

# Update dataframe using unescaping and array-splitting UDFs
mediawiki_history = mediawiki_history_raw.selectExpr(
  
  "wiki_db",
  "event_entity",
  "event_type",
  "event_timestamp",
  "unescape(event_comment_escaped) AS event_comment",
  
  "event_user_id",
  "unescape(event_user_text_historical_escaped) AS event_user_text_historical",
  "unescape(event_user_text_escaped) AS event_user_text",
  "to_array(event_user_blocks_historical_string) AS event_user_blocks_historical",
  "to_array(event_user_blocks_string) AS event_user_blocks",
  "to_array(event_user_groups_historical_string) AS event_user_groups_historical",
  "to_array(event_user_groups_string) AS event_user_groups",
  "to_array(event_user_is_bot_by_historical_string) AS event_user_is_bot_by_historical",
  "to_array(event_user_is_bot_by_string) AS event_user_is_bot_by",
  "event_user_is_created_by_self",
  "event_user_is_created_by_system",
  "event_user_is_created_by_peer",
  "event_user_is_anonymous",
  "event_user_registration_timestamp",
  "event_user_creation_timestamp",
  "event_user_first_edit_timestamp",
  "event_user_revision_count",
  "event_user_seconds_since_previous_revision",
  
  "page_id",
  "unescape(page_title_historical_escaped) AS page_title_historical",
  "unescape(page_title_escaped) AS page_title",
  "page_namespace_historical",
  "page_namespace_is_content_historical",
  "page_namespace",
  "page_namespace_is_content",
  "page_is_redirect",
  "page_is_deleted",
  "page_creation_timestamp",
  "page_first_edit_timestamp",
  "page_revision_count",
  "page_seconds_since_previous_revision",
  
  "user_id",
  "unescape(user_text_historical_escaped) AS user_text_historical",
  "unescape(user_text_escaped) AS user_text",
  "to_array(user_blocks_historical_string) AS user_blocks_historical",
  "to_array(user_blocks_string) AS user_blocks",
  "to_array(user_groups_historical_string) AS user_groups_historical",
  "to_array(user_groups_string) AS user_groups",
  "to_array(user_is_bot_by_historical_string) AS user_is_bot_by_historical",
  "to_array(user_is_bot_by_string) AS user_is_bot_by",
  "user_is_created_by_self",
  "user_is_created_by_system",
  "user_is_created_by_peer",
  "user_is_anonymous",
  "user_registration_timestamp",
  "user_creation_timestamp",
  "user_first_edit_timestamp",
  
  "revision_id",
  "revision_parent_id",
  "revision_minor_edit",
  "to_array(revision_deleted_parts_string) AS revision_deleted_parts",
  "revision_deleted_parts_are_suppressed",
  "revision_text_bytes",
  "revision_text_bytes_diff",
  "revision_text_sha1",
  "revision_content_model",
  "revision_content_format",
  "revision_is_deleted_by_page_deletion",
  "revision_deleted_by_page_deletion_timestamp",
  "revision_is_identity_reverted",
  "revision_first_identity_reverting_revision_id",
  "revision_seconds_to_identity_revert",
  "revision_is_identity_revert",
  "revision_is_from_before_page_creation",
  "to_array(revision_tags_string) AS revision_tags"
)

# Have fun with data :)
# Top 10 projects in number of revisions for month 2019-12
mediawiki_history. \
  where("event_entity = 'revision' and event_type = 'create'"). \
  selectExpr("wiki_db", "SUBSTR(event_timestamp, 0, 7) as month"). \
  where("month = '2019-12'"). \
  groupBy("wiki_db", "month"). \
  agg(count(lit(1)).alias("revision_count")). \
  sort(desc("revision_count")). \
  show(10, False)
  
# +--------------+-------+--------------+                                         
# |wiki_db       |month  |revision_count|
# +--------------+-------+--------------+
# |wikidatawiki  |2019-12|21511762      |
# |commonswiki   |2019-12|6046688       |
# |enwiki        |2019-12|4756250       |
# |arwiki        |2019-12|1599840       |
# |frwiki        |2019-12|903838        |
# |dewiki        |2019-12|795638        |
# |eswiki        |2019-12|710516        |
# |viwiki        |2019-12|679525        |
# |ruwiki        |2019-12|652051        |
# |itwiki        |2019-12|563592        |
# +--------------+-------+--------------+