Jump to content

Data Platform/Data Lake/Edits/Mediawiki history dumps/Python Pandas examples

From Wikitech

This page provides examples of using Python and Pandas to process the Mediawiki history dumps. There are slight difference between Pandas v1 and Pandas v2, therefore you can find an example for each version.

Pandas v1

# ---
# jupyter:
#   jupytext:
#     text_representation:
#       extension: .py
#       format_name: light
#       format_version: '1.5'
#       jupytext_version: 1.16.1
#   kernelspec:
#     display_name: Python [conda-mwhd_pandas1]
#     language: python
#     name: conda-env-conda-mwhd_pandas1-py
# ---

# + [markdown] editable=true slideshow={"slide_type": ""}
# # Processing MediaWiki HistoryDump files with Pandas

# +
import bz2
import csv
import pathlib
import tempfile

import pandas as pd

import matplotlib.pyplot as plt
# -

# %matplotlib inline

# let's use catalan Wikipedia as an example and let's assume the data are in
# the directory:
#  data/mwhd/2023-10/cawiki/
LANG = "ca"
data_dir = pathlib.Path("data/mwhd/2023-10/{lang}wiki".format(lang=LANG))

# create a plots directory, if does not exist
plots_dir = pathlib.Path("plots")
plots_dir.mkdir(parents=True, exist_ok=True)

csv_files = sorted([f for f in data_dir.glob("*tsv.bz2")])
csv_files

tmpdir = tempfile.TemporaryDirectory(prefix="mwhd-pandas.")
tmpdir_path = pathlib.Path(tmpdir.name)
print(f"Created temporary directory at {tmpdir_path}")


# ### Helper functions


def decompress_bz2_files(file_paths, tmpdir):
    decompressed_files = []

    # Decompress each bz2 file
    for file_path in file_paths:
        # check if file is bz2 compressed, otherwise skip
        if not file_path.suffix.endswith(".bz2"):
            print(f"File {file_path} is not bz2 compressed, skipping...")
            decompressed_files.append(file_path)
            continue

        # det the base name without .bz2 extension for the decompressed file
        decompressed_file_path = pathlib.Path(tmpdir, file_path.stem)

        # Read the bz2 file and write the decompressed data
        with bz2.BZ2File(file_path, "rb") as file, decompressed_file_path.open(
            "wb"
        ) as new_file:
            # Copy the decompressed data to the new file
            for data in iter(lambda: file.read(100 * 1024), b""):
                new_file.write(data)

            decompressed_files.append(decompressed_file_path)

        print(f"  - decompressed {file_path} to {decompressed_file_path}")

    return decompressed_files


# ### Read input

# let's assume that there is a file with the MWHD fields in:
# data/mwhd/mwhd_fields.csv
fields_file = pathlib.Path("data/mwhd/mwhd_fields.csv")

CSV_FIELDS = []
CSV_FIELDS_META = {}
with fields_file.open("r") as infile:
    reader = csv.reader(infile, delimiter="\t")

    # skip header
    next(reader)

    for line in reader:
        fclass = line[0]
        fname = line[1]
        dtype = line[2]
        comment = line[3]

        CSV_FIELDS.append(fname)

        if dtype == "int":
            dtype = "Int64"
        elif dtype == "bigint":
            dtype = "Int64"
        elif dtype == "array<string>":
            dtype = "object"

        if "timestamp" in fname:
            dtype = "object"

        CSV_FIELDS_META[fname] = {"class": fclass, "dtype": dtype, "comment": comment}

# +
maxl = 60

print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in enumerate(CSV_FIELDS, start=1):
    print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")

# +
timestamp_fields = [
    (id, field) for id, field in enumerate(CSV_FIELDS, start=1) if "timestamp" in field
]

print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in timestamp_fields:
    print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")

# +
# ## Load CSV with pandas
df_list = []

for file in csv_files:
    tmpdf = pd.read_csv(
        file,
        delimiter="\t",
        encoding="utf-8",
        quotechar='"',
        quoting=csv.QUOTE_NONE,
        doublequote=False,
        header=None,
        names=CSV_FIELDS,
        dtype={field: CSV_FIELDS_META[field]["dtype"] for field in CSV_FIELDS},
        # date_format={field: "%Y-%m-%d %H:%M:%S.%f"
        #              for field in CSV_FIELDS
        #              if "timestamp" in field},
    )
    df_list.append(tmpdf)

df = pd.concat(df_list)
df.head()

del df_list
# -

for _, field in timestamp_fields:
    df[field] = pd.to_datetime(
        df[field], errors="coerce", format="%Y-%m-%d %H:%M:%S.%f"
    )

for field in df.columns.tolist():
    if pd.api.types.is_bool_dtype(df.dtypes[field]):
        df[field] = df[field].astype("boolean")
        df[field] = df[field].fillna(False)


df["event_date"] = df["event_timestamp"].dt.date

basedf = df[
    [
        "event_date",
        "event_entity",
        "event_type",
        "event_user_is_created_by_self",
        "event_user_is_created_by_system",
        "event_user_is_created_by_peer",
        "event_timestamp",
    ]
].copy()

basedf.dtypes

basedf.head()

event_entities = basedf["event_entity"].unique()
event_entities

event_user_df = basedf[basedf["event_entity"] == "user"]

event_user_df.head()

# +
# %%time

# number of new users created
new_users_all = (
    event_user_df[
        (event_user_df["event_type"] == "create")
        & (
            event_user_df["event_user_is_created_by_self"]
            | event_user_df["event_user_is_created_by_system"]
            | event_user_df["event_user_is_created_by_peer"]
        )
    ]
    .groupby("event_date")
    .size()
)

# +
ax = new_users_all.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_users_all.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# +
# %%time

# number of new users created
new_users_self = (
    event_user_df[
        (event_user_df["event_type"] == "create")
        & (event_user_df["event_user_is_created_by_self"])
    ]
    .groupby("event_date")
    .size()
)

# +
ax = new_users_self.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_users_self.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# +
# %%time

# number of new pages created
new_pages = (
    basedf[(basedf["event_entity"] == "page") & (basedf["event_type"] == "create")]
    .groupby("event_date")
    .size()
)

# +
ax = new_pages.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_pages.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# +
# %%time

# absolute value of total bytes added or removed
total_bytes = df["revision_text_bytes_diff"].abs().groupby(df["event_date"]).sum()

# +
ax = total_bytes.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.total_bytes.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# +
# %%time

# number of new revisions created
new_revisions = (
    basedf[(basedf["event_entity"] == "revision") & (basedf["event_type"] == "create")]
    .groupby("event_date")
    .size()
)

# +
ax = new_revisions.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_revisions.pandas-v1.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

Requirements

Example conda/pip requirements file.

# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: linux-64
matplotlib=3.7.2=py38h06a4308_0
matplotlib-base=3.7.2=py38h1128e8f_0
matplotlib-inline=0.1.6=py38h06a4308_0
pandas=1.5.3=py38h417a72b_0

Pandas v2

# ---
# jupyter:
#   jupytext:
#     text_representation:
#       extension: .py
#       format_name: light
#       format_version: '1.5'
#       jupytext_version: 1.16.1
#   kernelspec:
#     display_name: Python [conda-mwhd_pandas2]
#     language: python
#     name: conda-env-conda-mwhd_pandas2-py
# ---

# + [markdown] editable=true slideshow={"slide_type": ""}
# # Processing MediaWiki HistoryDump files with Pandas

# +
import bz2
import csv
import pathlib
import tempfile

import pandas as pd

import matplotlib.pyplot as plt
# -

# %matplotlib inline

# let's use catalan Wikipedia as an example and let's assume the data are in
# the directory:
#  data/mwhd/2023-10/cawiki/
LANG = "ca"
data_dir = pathlib.Path("data/mwhd/2023-10/{lang}wiki".format(lang=LANG))

# create a plots directory, if does not exist
plots_dir = pathlib.Path("plots")
plots_dir.mkdir(parents=True, exist_ok=True)

csv_files = sorted([f for f in data_dir.glob("*tsv.bz2")])
csv_files

tmpdir = tempfile.TemporaryDirectory(prefix="mwhd-pandas.")
tmpdir_path = pathlib.Path(tmpdir.name)
print(f"Created temporary directory at {tmpdir_path}")


# ### Helper functions


def decompress_bz2_files(file_paths, tmpdir):
    decompressed_files = []

    # Decompress each bz2 file
    for file_path in file_paths:
        # check if file is bz2 compressed, otherwise skip
        if not file_path.suffix.endswith(".bz2"):
            print(f"File {file_path} is not bz2 compressed, skipping...")
            decompressed_files.append(file_path)
            continue

        # det the base name without .bz2 extension for the decompressed file
        decompressed_file_path = pathlib.Path(tmpdir, file_path.stem)

        # Read the bz2 file and write the decompressed data
        with bz2.BZ2File(file_path, "rb") as file, decompressed_file_path.open(
            "wb"
        ) as new_file:
            # Copy the decompressed data to the new file
            for data in iter(lambda: file.read(100 * 1024), b""):
                new_file.write(data)

            decompressed_files.append(decompressed_file_path)

        print(f"  - decompressed {file_path} to {decompressed_file_path}")

    return decompressed_files


# ### Read input

# let's assume that there is a file with the MWHD fields in:
# data/mwhd/mwhd_fields.csv
fields_file = pathlib.Path("data/mwhd/mwhd_fields.csv")

CSV_FIELDS = []
CSV_FIELDS_META = {}
with fields_file.open("r") as infile:
    reader = csv.reader(infile, delimiter="\t")

    # skip header
    next(reader)

    for line in reader:
        fclass = line[0]
        fname = line[1]
        dtype = line[2]
        comment = line[3]

        CSV_FIELDS.append(fname)

        if dtype == "int":
            dtype = "Int64"
        elif dtype == "bigint":
            dtype = "Int64"
        elif dtype == "array<string>":
            dtype = "object"

        if "timestamp" in fname:
            dtype = "object"

        CSV_FIELDS_META[fname] = {"class": fclass, "dtype": dtype, "comment": comment}

# +
maxl = 60

print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in enumerate(CSV_FIELDS, start=1):
    print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")

# +
timestamp_fields = [
    (id, field) for id, field in enumerate(CSV_FIELDS, start=1) if "timestamp" in field
]

print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in timestamp_fields:
    print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")

# +
# ## Load CSV with pandas
df_list = []

for file in csv_files:
    tmpdf = pd.read_csv(
        file,
        delimiter="\t",
        encoding="utf-8",
        quotechar='"',
        quoting=csv.QUOTE_NONE,
        doublequote=False,
        header=None,
        names=CSV_FIELDS,
        dtype={field: CSV_FIELDS_META[field]["dtype"] for field in CSV_FIELDS},
        date_format={
            field: "%Y-%m-%d %H:%M:%S.%f"
            for field in CSV_FIELDS
            if "timestamp" in field
        },
    )
    df_list.append(tmpdf)

df = pd.concat(df_list)
df.head()

del df_list
# -

for _, field in timestamp_fields:
    df[field] = pd.to_datetime(
        df[field], errors="coerce", format="%Y-%m-%d %H:%M:%S.%f"
    )

for field in df.columns.tolist():
    if df.dtypes[field] == "boolean":
        df[field] = df[field].astype("boolean")
        df[field] = df[field].fillna(False)


df["event_date"] = df["event_timestamp"].dt.date

basedf = df[
    [
        "event_date",
        "event_entity",
        "event_type",
        "event_user_is_created_by_self",
        "event_user_is_created_by_system",
        "event_user_is_created_by_peer",
        "event_timestamp",
    ]
].copy()

basedf.dtypes

basedf.head()

event_entities = basedf["event_entity"].unique()
event_entities

event_user_df = basedf[basedf["event_entity"] == "user"]

event_user_df.head()

# +
# %%time

# number of new users created
new_users_all = (
    event_user_df[
        (event_user_df["event_type"] == "create")
        & (
            event_user_df["event_user_is_created_by_self"]
            | event_user_df["event_user_is_created_by_system"]
            | event_user_df["event_user_is_created_by_peer"]
        )
    ]
    .groupby("event_date")
    .size()
)

# +
ax = new_users_all.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_users_all.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# +
# %%time

# number of new users created
new_users_self = (
    event_user_df[
        (event_user_df["event_type"] == "create")
        & (event_user_df["event_user_is_created_by_self"])
    ]
    .groupby("event_date")
    .size()
)

# +
ax = new_users_self.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_users_self.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# +
# %%time

# number of new pages created
new_pages = (
    basedf[(basedf["event_entity"] == "page") & (basedf["event_type"] == "create")]
    .groupby("event_date")
    .size()
)

# +
ax = new_pages.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_pages.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# +
# %%time

# absolute value of total bytes added or removed
total_bytes = df["revision_text_bytes_diff"].abs().groupby(df["event_date"]).sum()

# +
ax = total_bytes.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.total_bytes.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# +
# %%time

# number of new revisions created
new_revisions = (
    basedf[(basedf["event_entity"] == "revision") & (basedf["event_type"] == "create")]
    .groupby("event_date")
    .size()
)

# +
ax = new_revisions.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_revisions.pandas-v2.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

Requirements

Example conda/pip requirements file.

# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: linux-64
matplotlib=3.7.2=py38h06a4308_0
matplotlib-base=3.7.2=py38h1128e8f_0
matplotlib-inline=0.1.6=py38h06a4308_0
pandas=2.0.3=py38h1128e8f_0