Jump to content

Data Platform/Data Lake/Content/Mediawiki content history v1

From Wikitech

wmf_content.mediawiki_content_history_v1 is a dataset available in the Data Lake that provides the full content of all revisions, past and present, from all Wikimedia wikis.

The schema of this table is similar to that of Mediawiki wikitext history. However, this table's source data comes primarily from event streams from the Event Platform, allowing us to update it on a daily basis. The content under this table's revision_content_slots['main'] is typically unparsed Wikitext, and Multi-Content Revisions content is also included, which as of this writing is only being used by commonswiki.

Consuming this table will be different from snapshot-based tables like Mediawiki wikitext history. See FAQ below for details.

Schema

Note: This is an Iceberg dataset.

col_name data_type comment
page_id bigint The (database) page ID of the page.
page_namespace_id int The id of the namespace this page belongs to.
page_title string The normalized title of the page. If page_namespace_id = 0, then this is the non-namespaced title. If page_namespace_id != 0, then the title is prepended with the localized namespace. Examples for "enwiki": "Main_Page" and "Talk:Main_Page".
page_redirect_target string title of the redirected-to page, if any. Same rules as page_title.
user_id bigint id of the user that made the revision; null if anonymous, zero if old system user, and -1 when deleted or malformed XML was imported
user_text string text of the user that made the revision (either username or IP)
user_is_visible boolean Whether the user that made the revision is visible. If this is false, then the user should be redacted when shown publicly. See RevisionRecord->DELETED_USER.
revision_id bigint The (database) revision ID.
revision_parent_id bigint The (database) revision ID. of the parent revision
revision_dt timestamp The (database) time this revision was created. This is rev_timestamp in the MediaWiki database.
revision_is_minor_edit boolean True if the editor marked this revision as a minor edit.
revision_comment string The comment left by the user when this revision was made.
revision_comment_is_visible boolean Whether the comment of the revision is visible. If this is false, then the comment should be redacted when shown publicly. See RevisionRecord->DELETED_COMMENT.
revision_sha1 string Nested SHA1 hash of hashes of all content slots. See https://www.mediawiki.org/wiki/Manual:Revision_table#rev_sha1
revision_size bigint the sum of the content_size of all content slots'
revision_content_slots
MAP<
  STRING,
  STRUCT<
    content_body:   STRING,
    content_format: STRING,
    content_model:  STRING,
    content_sha1:   STRING,
    content_size:   BIGINT
  >
>
a MAP containing all the content slots associated to this revision. Typically just the "main" slot, but also "mediainfo" for commonswiki.
revision_content_is_visible boolean Whether revision_content_slots is visible. If this is false, then any content should be redacted when shown publicly. See RevisionRecord->DELETED_TEXT.
wiki_id string The wiki ID, which is usually the same as the MediaWiki database name. E.g. enwiki, metawiki, etc.
row_content_update_dt timestamp Control column. Marks the timestamp of the last content event or backfill that updated this row
row_visibility_update_dt timestamp Control column. Marks the timestamp of the last visibility event or backfill that updated this row
row_move_update_dt timestamp Control column. Marks the timestamp of the last move event or backfill that updated this row

Changes and known problems

Date Phab

Task

Details
2025-01-29 T358375 Declaring wmf_content.mediawiki_content_history_v1 production quality, with daily content and reconcile updates.
2024-11-21 T358877 Release candidate of table available under the name wmf_dumps.wikitext_raw_rc2. Reconcile mechanism is still under development. The table will suffer schema changes before production.

FAQ

How do I query this table if I am only interested in the main slot?

This tables includes the content from all the slots of a revision as in the work done for Multi-Content Revisions. But if you are not interested in that, and would like to use this table to just access the 'main' slot, which typically contains the wikitext of the revision, you can query like in the following example:

SELECT 
  revision_id,
  revision_content_slots['main'].content_body AS content
FROM wmf_content.mediawiki_content_history_v1
WHERE wiki_id = 'simplewiki'
LIMIT 10

This table doesn't seem to have Hive partitions, such as the usual 'snapshot' column. How do I consume it?

This table indeed does not use a snapshot partition as with other tables such as Mediawiki wikitext history. We are using a table format called Iceberg. Instead of rewriting all data like we have done before, this technology allows us to update the content of the table in place, with the main benefit being updates to the table's content on a daily cadence.

If you are building a data pipeline, and you need to define an Airflow sensor to wait on this table's updates, instead of waiting on Hive partitions, you should utilize our datasets.yaml configuration and appropriate helper functions to construct a sensor.

In your instance's datasets.yaml file:

iceberg_wmf_content_mediawiki_content_history_v1:
  datastore: iceberg
  table_name: wmf_content.mediawiki_content_history_v1
  produced_by:
    airflow:
      instance: analytics
      dag_id: mw_content_merge_events_to_mw_content_history_daily

In your Airflow Python DAG code:

from platform_eng.config.dag_config import dataset

dataset_id = "iceberg_wmf_content_mediawiki_content_history_v1"

wait_for_sensor = dataset(dataset_id).get_sensor_for(dag)

In case your sensor requires covering more than one day's worth of target DAG executions, we offer a helper function you can use:

from platform_eng.config.dag_config import dataset
from wmf_airflow_common.sensors.helper import get_daily_execution_dates_fn

dataset_id = "iceberg_wmf_content_mediawiki_content_history_v1"

wait_for_sensor = dataset(dataset_id)
    .get_sensor_for(
        dag,
        execution_date_fn=get_daily_execution_dates_fn(num_days=7),
    )

NOTE: Waiting on this sensor guarantees the following: we have ingested all the revision events from the last 24 hours, and we have ingested all the detected reconciliation events from the last 24 hours. Since the reconciliation process lags by 24 hours, this means that the last 24 hours of data may be missing updates. This is the fastest way to consume this table if your use case can tolerate eventually consistent data.

If your use case requires more precision, you can use the sensor as above, but then always consume the data with a filter on the revision_dt column like so:

SELECT 
  *
FROM wmf_content.mediawiki_content_history_v1
WHERE wiki_id = 'simplewiki'
  AND revision_dt <= TIMESTAMP '{{ data_interval_start | to_dt() }}'
LIMIT 10

Where {{ data_interval_start | to_dt() }} is a Jinja template generated from your Airflow DAG.

What minimal Spark configuration do I need to fully read this table?

To be able to take advantage of Iceberg performance improvements, this table's chosen file format is PARQUET. This format has historically been problematic at WMF when wikitext content was included, as some pages do include very big content payloads. Thus we made some basic performance benchmarks to make sure that a Spark job with reasonable resources can fully read the table. We have found that a yarn-large configuration, as defined on the wmfdata helper library, is sufficient:

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
            "spark.driver.memory": "4g",
            "spark.dynamicAllocation.maxExecutors": 128,
            "spark.executor.memory": "8g",
            "spark.executor.cores": 4,
            "spark.sql.shuffle.partitions": 512
    }
)

Pipeline

  1. EventBus generates page_change and page_visibility events that are sent to our Jumbo Kafka instance.
  2. Every hour, these events are materialized in HDFS via Gobblin.
  3. Every day, events from step (2) are ingested into mediawiki_content_history_v1 via a Spark job whose main job is a MERGE INTO statement.
  4. A consistency check is done between the last 24 hours of ingested data and the Analytics Replicas. Any inconsistencies are kept at wmf_content.inconsistent_rows_of_mediawiki_content_history_v1 for 90 days.
  5. A separate Spark job then queries the Analytics Replicas for any revision that was found inconsistent in step (4). Reconcile page_change events are generated to a reconcile Kafka topic.
  6. Reconcile events from the last 24 hours shifted back 24 hours are ingested into the table via a a Spark job whose main job is a MERGE INTO statement.
  7. Every month, a full reconcile over all revisions is triggered to catch any historic inconsistencies.
  8. All of this is coordinated via a set of Airflow jobs.