MediaWiki Event Enrichment
mediawiki-event-enrichment is a repository of Apache Flink (pyflink) streaming enrichment jobs intended for streaming enrichment of MediaWiki event state change streams.
These streams are implementation of the “comprehensiveness” problem described in T291120 MediaWiki Event Carried State Transfer - Problem Statement.
Why
We need a way to get real time updates of comprehensive MediaWiki state outside of MediaWiki.
- MediaWiki History is a monthly snapshot
- Wikimedia Dumps are (monthlyish) snapshots
- MediaWiki Event Streams (e.g. mediawiki.revision-create) are notification streams, and do not have full state changes (e.g. no content in streams).
We want to design MediaWiki event streams that can be used to fully externalize MediaWiki state, without involving MediaWiki on the consumer side. That is, we want MediaWiki state to be carried by events to any downstream consumer.
Event Notification vs. Event-Carried State Transfer | by Arvind Balachandran .
Enrichment Jobs
mw-page-content-change-enrich
We had hoped that MediaWiki entity based changelog streams would be enough to externalize all MediaWiki state. The MediaWiki revision table itself is really just an event log of changes to pages. However, this is not technically true, as past revisions can be updated. On page deletes, revision records are 'archived'. They can be merged back into existing pages, updating the revision record's page_id. Modeling this as a page changelog event will be very difficult.
Instead, page state change event data model will support use cases that only care about externalized current state. That is, we will not try to capture modifications to MediaWiki's past in this stream. These stream will be useful for Wikimedia Enterprise, Dumps, Search updates, cache invalidation, etc, but not for keeping a comprehensive history of all state changes to pages.
The page_content_change enrichment job consumes the mediawiki.page_change
stream and emits the mediawiki.page_content_change
stream. Both of these streams conform to the mediawiki/page/change event schema. (Design work in T308017.)
mw-page-content-change-enrich is deployed in wikikube kubernetes using the flink-app helm chart via flink-kubernetes-operator.
mediawiki-event-enrichment is built using Wikimedia Event Utilities java and python libraries.
mediawiki.page_content_change
semantics
Because the streaming enrichment job that creates mediawiki.page_content_change
runs asynchronously, by the time it processes a input mediawiki.page_change
event, MediaWiki state may have changed in such a way that the data needed is no longer available.
For example, a page may be created and then immediately deleted (by a bot). The mw-page-content-change-enrich streaming job will receive the page create event before the page delete. When it receives the page create event, it will ask the MediaWiki API for content. MediaWiki will return an error, because the page no longer exists. The subsequent page delete event should soon be processed, so the eventual state of the page will be consistent, but a question remains as to what the mw-page-content-change-enrich job should do with the page create event. https://phabricator.wikimedia.org/T309699 should resolve this issue.
For all cases in the table below an event was consumed from page_change. Failure scenario describes the outcome of an enrichment call to MW Action API (query action).
Failure scenario | HTTP Error code | Retry | Enriched Event produced | Event forwarded to error topic | When can this happen | Behavior |
Connection Timeout | 408 | Yes | No | Yes | The request took more than <timeout> to complete | Retry up to threshold, than forward to error topic |
Read timeout | Yes | No | Yes | The request was canceled by the client because it exceeded <timeout>. A ReadTimeout Exception was raised. | Retry up to threshold, than forward to error topic | |
No content | 204 | No | No | Yes | An exception will be raised when trying to parse response body | |
Further action needs to be taken in order to complete the request | 3xx | No | No | Yes | An exception will be raise on 3xx statuses. | |
Other Request errors | 4xx | No | No | Yes | An exception will be raise on 4xx statuses. | |
Internal error | 5xx | Yes | No | Yes | Action API is momentarily not unavailable | An exception will be raise on 5xx statuses, with the exception of 500, 502. 503, 504. Those will be retried before failing. |
Missing content: badrevids in response | 200 | Yes | No | Yes | Content body can be parsed and contains badrevids payload. mw enrichment will raise an exception | Retry up to threshold, than forward to error topic |
Missing content: database replica lag | 200 | Yes | No | Yes | Content could not be retrieved because the maximum database replica lag exceeded the `maxlag` parameter https://www.mediawiki.org/wiki/Manual:Maxlag_parameter | Retry up to threshold, than forward to error topic |
Missing content
The Action API cannot return a response for a `rev_id` that should exist.
Failure scenario | New event is generated |
Page deletion: page is deleted before we had a chance to read its content. Can happen within seconds though. Minutes to hours, more typically. | |
Content suppression: page is deleted before we had a chance to read its content. Can happen within seconds though. Minutes to hours, more typically. | No |
Mysql replica lag: we might request a revision to mariadb replica that is lagging behind | |
A maintenance SQL script is executed for content errors, resulting in DB mutations | No |
Monitoring
Logs are shipped to logstash in the ECS index. They are most easily viewed using this logstash dashboard.
Flink UI
The Flink JobManager UI can be accessed via an ssh tunnel to the leader JobManager.
Get the Pod IP of the JobManager:
$ ssh deployment.eqiad.wmnet
$ kube_env <k8s-namespace> <k8s-cluster>
# E.g. kube_env mw-page-content-change-enrich eqiad
$ kubectl get pods -o wide | grep -v taskmanager
flink-app-main-897448f4-vnsfh 2/2 Running 0 8h 10.64.75.96 kubestage1004.eqiad.wmnet <none> <none>
# JobManager Pod IP is: 10.64.75.96
$ exit
# Back on your local workstation, open an ssh tunnel:
$ ssh -t -N -L8081:<FLINK_JOB_MANAGER_POD_IP>:8081 deploy1002.eqiad.wmnet
Now navigate to http://localhost:8081 to access the Flink JobManager UI.
Upgrades and Deployment
Development and release instructions for mediawiki-event-enrichment can be found in the README.
Once a new docker image has been built and published , we can bump the image version in the relevant helmfile values.yaml file. For mw-page-content-change-enrich, this is the appropriate values.yaml file.
Once merged, for the most part, the usual kubernetes helmfile deployment process can be followed. But, because this is a streaming Flink application, special care may need to be taken to handle the application restart.
See
MW enrichment runs active/active single compute, and there are no downstream applications to 'depool'.
If mw enrichment jobs are off in the active MW DC for a very long time, we will have issues. If they are of in the active MW DC for a short amount of time, we will just have late events. If they are off in the inactive MW DC, there shouldn't be any real issues (unless somehow page changes are processed by MW in the inactive DC).
Local development and debugging with Kafka
eventutilities-python and thus mediawiki-event-enrichment run tests using Event Platform streams with data in local file sources and sinks. Local development can then be as easy as running the tests, or running the python main entrypoint with the relevant configuration.
However, as of 2023-07, there are no tests or automation for running the Flink job using Kafka sources and sinks. Usually this is fine, as the same SerDes are used for file sources and sinks. However, we have encountered issues where the Kafka source was improperly configured. The easiest way to debug these types of issues is to run Flink locally with Kafka sources and sinks.
Running Kafka
You can run Kafka locally in a myriad of ways, and any will suffice. Since we run k8s at WMF, our deployment-charts repo includes a kafka-dev Helm chart. For our purposes we'll use minikube locally, and our kafka-dev chart to run Kafka.
Install docker, minikube, and helm, if you don't already have them.
Start minikube:
minikube start
Clone the operations/deployment-charts repo and install the kafka-dev helm chart. Since we'll be connecting to Kafka from a client external to the k8s cluster, follow the instructions in the kafka-dev README. You may need to change the value of kafka_advertised_hosts
depending on how you are running minikube.
git clone ssh://gerrit.wikimedia.org:29418/operations/deployment-charts
cd ./deployment-charts
helm install ./charts/kafka-dev
You can inspect the status of the pods with kubectl get pods
and kubectl logs -f <pod_id>
.
Kafka should now be running and accessible from your host machine at <kafka_advertised_host>:30092
, default of 127.0.0.1:30092
.
You should also install kcat (formerly kafkacat), so you can inspect, produce, and consume to your local Kafka cluster from the CLI.
Running Flink using local Kafka
Since we'll be running Flink locally, we'll need a few more things:
A conda or virtualenv with apache-flink (pyflink) installed. If you have an active (~python 3.9) python environment, install Flink, and also other mediawiki-event-enrichment (test) requirements:
pip install apache-flink==1.17.1 # change this to the appropriate Flink version
# Install other requirements (like eventutilities-python), including
# those neede to run tests locally.
cd ./mediawiki-event-enrichment
pip install -r ./requirements-test.txt
You can now configure stream_manager sources and sinks to use kafka with this value for bootstrap_servers
. Here's an example config file:
stream_manager:
job_name: mw-page-content-change-enrich
stream_config_uri: https://meta.wikimedia.org/w/api.php
schema_uris:
# Load schemas from local file repo: CHANGE THIS PATH!
- file:///Users/otto/Projects/wm/eventplatform/mediawiki-event-enrichment/event-schemas/primary/jsonschema
# Configure sources and sinks.
# NOTE: Make sure to set kafka options in environment specific values files.
source:
stream: mediawiki.page_change.v1:1.1.0
connector: kafka
options:
topics: [eqiad.mediawiki.page_change.v1]
consumer_group: mw_page_content_change_enrich-000
bootstrap_servers: 127.0.0.1:30092
sink:
stream: rc1.mediawiki.page_content_change:1.1.0
connector: kafka
options:
delivery_guarantee: AT_LEAST_ONCE
bootstrap_servers: 127.0.0.1:30092
kafka_topic_prefix: eqiad.
For whatever reason, Flink Kafka sources cannot auto create topics, so we'll need to create any source topics manually. The easiest way is to produce a blank line to the topic with kcat:
# Produce a blank line to the topic
echo ' ' | kcat -P -b 127.0.0.1:30092 -t eqiad.mediawiki.page_change.v1
# Verify that the topic exists
kcat -L -b 127.0.0.1:30092 -t eqiad.mediawiki.page_change.v1
Assuming we saved our above config file in config-local-kafka.yaml, the following command will run page_content_change.py in a local Flink cluster:
# Unfortunetly pip install apache-flink does not put `flink` on your PATH.
# This will put pyflink/bin on your PATH:
PATH=$PATH:$(python -c 'import pyflink, os; print(os.path.join(os.path.dirname(pyflink.__file__), "bin"))')
flink run -t local \
-py ./mediawiki_event_enrichment/page_content_change.py \
--config ./scr/config-local-kafka.yaml
By default, Flink logs will be in the python env's pyflink/log directory. You can tail them like:
tail -f $(python -c 'import pyflink, os; print(os.path.join(os.path.dirname(pyflink.__file__), "log"))')/*
Replaying events through Kafka
Now that your Kafka and your Flink app are running, you'll need to produce actual data to the source topic(s) to get it to do anything. We can use kcat to both produce and consume data in Kafka.
Since our config-local-kafka.yaml is using defaults for the MediaWiki API, we'll be sending our enrichment HTTP requests to the production MW API. We should use real data!
We can get a sample of real data from production kafka clusters. Login to an analytics client (AKA stat box) and run the following command to get the last 10 page_change events.
stat1005 $ kafkacat -C -b kafka-jumbo1007.eqiad.wmnet:9092 -o -10 -c 10 -t eqiad.mediawiki.page_change.v1
Copy and paste these events to a local file called page_change_events.json.
On one terminal, start consuming from your output sink topic, e.g.
kcat -u -C -b 127.0.0.1:30092 -t eqiad.rc1.mediawiki.page_content_change
Then, pipe the page_change_events.json file into the source topic in your local kafka cluster
kcat -u -P -b 127.0.0.1:30092 -t eqiad.mediawiki.page_change.v1 < page_change_events.json
You can run this produce command as many times as you like to have those page_change_events.json enriched by your local pipeline.