Jump to content

Data Platform/Data Lake/Edits/Mediawiki history dumps/Python Dask examples

From Wikitech

This page provides examples of using Python and Pandas to process the Mediawiki history dumps. There are slight difference between Pandas v1 and Pandas v2, therefore you can find an example for each version.

Dask

# ---
# jupyter:
#   jupytext:
#     text_representation:
#       extension: .py
#       format_name: light
#       format_version: '1.5'
#       jupytext_version: 1.16.1
#   kernelspec:
#     display_name: Python [conda-mwhd_dask]
#     language: python
#     name: conda-env-conda-mwhd_dask-py
# ---

# + [markdown] editable=true slideshow={"slide_type": ""}
# # Processing MediaWiki HistoryDump files with Dask dataframes

# +
import bz2
import csv
import pathlib
import tempfile

import dask.dataframe as dd

import matplotlib.pyplot as plt
# -

# let's use catalan Wikipedia as an example and let's assume the data are in the directory:
#  data/mwhd/2023-10/cawiki/
LANG = "ca"
data_dir = pathlib.Path("data/mwhd/2023-10/{lang}wiki".format(lang=LANG))

# create a plots directory, if does not exist
plots_dir = pathlib.Path("plots")
plots_dir.mkdir(parents=True, exist_ok=True)

csv_files = sorted([f for f in data_dir.glob("*tsv.bz2")])
csv_files

tmpdir = tempfile.TemporaryDirectory(prefix="mwhd-dask.")
tmpdir_path = pathlib.Path(tmpdir.name)
print(f"Created temporary directory at {tmpdir_path}")


# ### Helper functions


def decompress_bz2_files(file_paths, tmpdir):
    decompressed_files = []

    # Decompress each bz2 file
    for file_path in file_paths:
        # check if file is bz2 compressed, otherwise skip
        if not file_path.suffix.endswith(".bz2"):
            print(f"File {file_path} is not bz2 compressed, skipping...")
            decompressed_files.append(file_path)
            continue

        # det the base name without .bz2 extension for the decompressed file
        decompressed_file_path = pathlib.Path(tmpdir, file_path.stem)

        # Read the bz2 file and write the decompressed data
        with bz2.BZ2File(file_path, "rb") as file, decompressed_file_path.open(
            "wb"
        ) as new_file:
            # Copy the decompressed data to the new file
            for data in iter(lambda: file.read(100 * 1024), b""):
                new_file.write(data)

            decompressed_files.append(decompressed_file_path)

        print(f"  - decompressed {file_path} to {decompressed_file_path}")

    return decompressed_files


# ### Read input

# %%time
if any(file.suffix.endswith(".bz2") for file in csv_files):
    csv_files = decompress_bz2_files(csv_files, tmpdir_path)

csv_files

# let's assume that there is a file with the MWHD fields in:
# data/mwhd/mwhd_fields.csv
fields_file = pathlib.Path("data/mwhd/mwhd_fields.csv")

CSV_FIELDS = []
CSV_FIELDS_META = {}
with fields_file.open("r") as infile:
    reader = csv.reader(infile, delimiter="\t")

    # skip header
    next(reader)

    for line in reader:
        fclass = line[0]
        fname = line[1]
        dtype = line[2]
        comment = line[3]

        CSV_FIELDS.append(fname)

        if dtype == "int":
            dtype = "Int64"
        elif dtype == "bigint":
            dtype = "Int64"
        elif dtype == "array<string>":
            dtype = "object"

        if "timestamp" in fname:
            dtype = "object"

        CSV_FIELDS_META[fname] = {"class": fclass, "dtype": dtype, "comment": comment}

# +
maxl = 60

print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in enumerate(CSV_FIELDS, start=1):
    print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")

# +
timestamp_fields = [
    (id, field) for id, field in enumerate(CSV_FIELDS, start=1) if "timestamp" in field
]

print(f"id\t{'field': <{maxl}}\tdtype")
print("----\t" + "-" * maxl + "\t" + "------")
for id, field in timestamp_fields:
    print(f"{id}\t{field: <{maxl}}\t{CSV_FIELDS_META[field]['dtype']}")
# -

ddf = dd.read_csv(
    csv_files,
    include_path_column=True,
    delimiter="\t",
    encoding="utf-8",
    quotechar='"',
    quoting=csv.QUOTE_NONE,
    header=None,
    names=CSV_FIELDS,
    dtype={field: CSV_FIELDS_META[field]["dtype"] for field in CSV_FIELDS},
    date_format={
        field: "%Y-%m-%d %H:%M:%S.%f" for field in CSV_FIELDS if "timestamp" in field
    },
    # date_parser=date_parser
)

for _, field in timestamp_fields:
    ddf[field] = dd.to_datetime(
        ddf[field], errors="coerce", format="%Y-%m-%d %H:%M:%S.%f"
    )

for field in ddf.columns.tolist():
    if ddf.dtypes[field] == "boolean":
        ddf[field] = ddf[field].astype("boolean")
        ddf[field] = ddf[field].fillna(False)


ddf.head(n=10)

ddf["event_date"] = ddf["event_timestamp"].dt.date

baseddf = ddf[
    [
        "event_date",
        "event_entity",
        "event_type",
        "event_user_is_created_by_self",
        "event_user_is_created_by_system",
        "event_user_is_created_by_peer",
        "event_timestamp",
    ]
].copy()

baseddf.dtypes

baseddf.head()

__event_entities = baseddf["event_entity"].unique()
event_entities = __event_entities.compute()
event_entities

event_user_ddf = baseddf[baseddf["event_entity"] == "user"]

event_user_ddf.head()

# number of new users created
__new_users_all = (
    event_user_ddf[
        (event_user_ddf["event_type"] == "create")
        & (
            event_user_ddf["event_user_is_created_by_self"]
            | event_user_ddf["event_user_is_created_by_system"]
            | event_user_ddf["event_user_is_created_by_peer"]
        )
    ]
    .groupby("event_date")
    .size()
)

# number of new users created
__new_users_self = (
    event_user_ddf[
        (event_user_ddf["event_type"] == "create")
        & (event_user_ddf["event_user_is_created_by_self"])
    ]
    .groupby("event_date")
    .size()
)

# %%time
new_users_all = __new_users_all.compute()

# +
ax = new_users_all.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_users_all.dask.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# -

# %%time
new_users_self = __new_users_self.compute()

# +
ax = new_users_self.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_users_self.dask.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# -

# number of new pages created
__new_pages = (
    baseddf[(baseddf["event_entity"] == "page") & (baseddf["event_type"] == "create")]
    .groupby("event_date")
    .size()
)

# %%time
new_pages = __new_pages.compute()

# +
ax = new_pages.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_pages.dask.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# -

# absolute value of total bytes added or removed
__total_bytes = ddf["revision_text_bytes_diff"].abs().groupby(ddf["event_date"]).sum()

# %%time
total_bytes = __total_bytes.compute()

# +
ax = total_bytes.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.total_bytes.dask.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

# -

# number of new revisions created
__new_revisions = (
    baseddf[
        (baseddf["event_entity"] == "revision") & (baseddf["event_type"] == "create")
    ]
    .groupby("event_date")
    .size()
)

# %%time
new_revisions = __new_revisions.compute()

# +
ax = new_revisions.plot()
plt.xticks(rotation=70)

# Save the figure to a file
plot_filename = "{lang}wiki.new_revisions.dask.png".format(lang=LANG)
plt.savefig(plots_dir / plot_filename, bbox_inches="tight", dpi=300)

plt.show()

Requirements

Example conda/pip requirements file.

# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: linux-64
dask=2023.4.1=py38h06a4308_1
dask-core=2023.4.1=py38h06a4308_0
matplotlib=3.7.2=py38h06a4308_0
matplotlib-base=3.7.2=py38h1128e8f_0
matplotlib-inline=0.1.6=py38h06a4308_0