Wikidata Query Service/Streaming Updater
The WDQS Streaming Updater is an Apache Flink application whose purpose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses the available mediawiki change streams to calculate the diffs and push it to a Kafka topic.
Design
The application reads some of the topics populated by mw:Extension:EventBus and builds a diff of the RDF content as produced by mw:Wikibase/EntityData by comparing the last seen revision for this entity with the new revision seen from the mediawiki.revision-create topic. It is meant to integrate as a Stream processor part of the Modern Event Platform.
It relies on flink to provide:
- event time semantic to re-order the events out of multiple kafka topics
- state management consistent with the output of the stream
- scalability
The flink application (code name streaming-updater-producer) is responsible for producing its data to a kafka topic, a client (named streaming-updater-consumer) running on the same machines as the triple store (known as wdqs hosts) is responsible for reading this topic and performing updates.
Dependencies
The dependencies of the flink application are:
- The mediawiki application servers for mw:Wikibase/EntityData
- Kafka (main) for consuming Mediawiki changes and for producing its output
- Swift (thanos) for the object storage but the aim is to use the future MOSS
- K8S services cluster to run flink as a session cluster
- schema.wikimedia.org for verifying the validity of the event it emits against their Event_Platform/Schemas
- meta.wikimedia.org for fetching the stream configurations
- Zookeeper for cluster coordination (flink-zk hosts, see this task for more context)
Deployment strategy
The flink application is active/active and runs in both eqiad and codfw through the Kubernetes cluster hosting services. The WDQS machines in eqiad will consume the output of flink application running in eqiad and similarly for codfw. In other words if the flink application stops in eqiad all wdqs machines in eqiad will stop being updated.
The benefit of this approach are:
- simple to put in place in our setup: no need to have a fail-over strategy
- Symmetry of the k8s deployed services
Drawbacks:
- No guarantee that the output of both flink pipelines will be the same
- Double compute
See this presentation for a quick overview of the two strategies evaluated.
Operations
Kubernetes setup (flink-app chart)
Kubernetes runs the flink-k8s-operator responsible for monitoring FlinkDeployment resources. The flink-k8s-operator does manage a flink job using flink-app chart with the rdf-streaming-updater values.
Deploying the wikidata job to staging (on deployment.eqiad.wmnet):
$ cd /srv/deployment-charts/helmfile.d/services/rdf-streaming-updater/
$ helmfile -e staging -i apply --selector name=wikidata
Looking at the jobmanager and then the taskmanager logs in staging
$ kube_env rdf-streaming-updater staging
$ kubectl logs -l component=jobmanager,release=wikidata -c flink-main-container -f
$ kubectl logs -l component=taskmanager,release=wikidata -c flink-main-container -f
The flink jobmanager UI and REST endpoint is not exposed but can be accessed using port forwarding:
$ kubectl port-forward $(kubectl get pod -l component=jobmanager,release=wikidata -o name) 8081:8081
Logs
Flink logs are collected in logstash and can be viewed with the logstash dashboard (select the rdf-streaming-updater namespace and the release commons or wikidata).
If for some reasons the logs are not available in logstash they can still be inspected from the deployment server, e.g. to inspect the jobmanager and the taskmanager logs in staging for the wikidata job:
$ kube_env rdf-streaming-updater staging
$ kubectl logs -l component=jobmanager,release=wikidata -c flink-main-container -f
$ kubectl logs -l component=taskmanager,release=wikidata -c flink-main-container -f
Add the --previous
option to look at the logs of the previous run of the container if you want to debug why it crashed.
Managing the streaming-updater-producer
job name | WDQS Streaming Updater | WCQS Streaming Updater |
---|---|---|
consumer group | wdqs_streaming_updater | wcqs_streaming_updater |
kafka topic | <dc>.rdf-streaming-updater.mutation | <dc>.mediainfo-streaming-updater.mutation |
staging consumer group | wdqs_streaming_updater_test |
staging (eqiad) | eqiad | codfw | |
---|---|---|---|
swift container | rdf-streaming-updater-staging | rdf-streaming-updater-eqiad | rdf-streaming-updater-codfw |
kafka cluster | kafka-main@eqiad | kafka-main@eqiad | kafka-main@codfw |
Destroy all pods and prevent from being recreated
Make sure you're in the correct directory (see below!)
user@deploy1002:/srv/deployment-charts/helmfile.d/services/rdf-streaming-updater$kube_env rdf-streaming-updater codfw user@deploy1002:/srv/deployment-charts/helmfile.d/services/rdf-streaming-updater
$ kubectl get pods
user@deploy1002:/srv/deployment-charts/helmfile.d/services/rdf-streaming-updater$ helmfile -e codfw destroy
Destroy all configmaps
Be sure to use the selector, otherwise you will destroy an istio job
(you need root on deploy for this)
$ sudo -i
# kube_env admin ${ENV}
#kubectl delete cm -l type=flink-native-kubernetes -n rdf-streaming-updater
Clean up object storage
Flink and Zookeeper use thanos-swift for storage . When that happens, you can use this script to clean up. After cleanup, you can check your work via this dashboard panel. You should see the amount of used space drop within 5-10m of running the script.
How to tell if the object storage data can be safely deleted
- From k8s: Each flink release (wikidata and commons) has a job ID (UUID) which can be found with the following one-liner:
for n in commons wikidata; do JID=$(kubectl get flinkdeployments.flink.apache.org -l release=${n} -o json | jq '.items[0].status.jobStatus.jobId' -r); printf "%s %s %s\n" ${n} jobid: ${JID}; done
expected output:
commons jobid: 58f59f0548b63d516324c324e1a17a38
wikidata jobid: 5bb76cd4d0165a21cc098c2dc6080557
- While the job is running, Flink creates checkpoints in object storage, using its current job UUID in the object storage. When the job restarts, flink starts a new job ID. As a rule of thumb, objects that can be safely deleted:
- Do NOT contain an object called '_metadata' (this is how Flink keeps track of the active job ID) in their directory structure
- Does NOT contain Zookeeper HA metadata
- Are older than a few weeks
- From thanos-swift: We can scan the bucket and group all objects by job ID. If the newest object in the group is older than say, a few weeks, we can safely delete all objects in that group.
Rotating S3/Swift Password*
If the swift/s3 password needs to be changed, you'll need to file a ticket with Data Persistence. They will help you follow the Swift key rotation procedure.
/srv/private/hieradata/role/common/deployment_server/kubernetes.yaml
The values to change ONLY WITHIN THE RDF-STREAMING-UPDATER CONTEXT, don't touch other apps:
s3.secret-key and swift_api_key
*S3 and Swift mean the same thing in this context.
Kubernetes setup (Flink Application Mode/Flink Operator)
We are working on a new deployment model that uses Flink Application Mode, managed by the Flink Kubernetes Operator. Zookeeper provides the HA services. We deploy the flink-app chart with the rdf-streaming-updater values. WCQS and WDQS streaming updaters each have their own release, which are managed under the same service (rdf-streaming-updater). To manage the releases separately, use the "wikidata" selector for WDQS, and the "commons" selector for WCQS.
Deploying the WDQS chart to production (using active deploy host):
user@deploy2002 $ cd /srv/deployment-charts/helmfile.d/services/rdf-streaming-updater/
$ helmfile -e eqiad --selector name=wikidata -i apply
Looking at job manager logs for the WDQS release:
$ kube_env rdf-streaming-updater staging
$ kubectl logs (tab)
$ kubectl logs flink-app-wikidata-55c55949f9-984n6 (tab)
$ kubectl logs flink-app-wikidata-55c55949f9-984n6 flink-main-container
Checking status of Custom Resource "Flinkdeployment"
$ kube_env rdf-streaming-updater eqiad
$ kubectl get flinkdeployment
A healthy deployment should return:
NAME JOB STATUS LIFECYCLE STATE
flink-app-commons RUNNING STABLE
flink-app-wikidata RUNNING STABLE
Restore from a Checkpoint or Savepoint
Point the initialSavepointPath value under app: job in the helmfile values chart to the latest available checkpoint or savepoint (note that checkpoints expire very quickly; checkpoints last as long as Kafka retention, which is 1 week.) You can find the latest checkpoint or savepoint by checking the logs on the deploy server. Example:
{"@timestamp":"2023-09-22T14:58:36.393Z","log.level": "INFO","message":"Completed checkpoint 19444 for job ad699cfb0eb2c53365df1d982b806b70 (1454458549 bytes, checkpointDuration=8303 ms, finalizationTime=1270 ms).", "ecs.version": "1.2.0","process.thread.name":"jobmanager-io-thread-1","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}
If you have a savepoint (as opposed to a checkpoint), you can also find it by checking the Flinkdeployment resource:
kubectl get flinkdeployment -o json | jq -r '.items[].status.jobStatus.savepointInfo.lastSavepoint.location'
s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata/savepoints/savepoint-434830-abc459b3799c
Savepoints/checkpoints are saved in S3. Find the correct S3 path using the values.yaml file (file is shortened for clarity):
app:
job:
config_files:
rdf-streaming-updater-config.properties:
checkpoint_dir: s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata_test/checkpoints
Once you have that value, update the chart with the correct path:
app:
job:
initialSavepointPath: s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata_test/checkpoints/ad699cfb0eb2c53365df1d982b806b70/chk-19444
config_files:
rdf-streaming-updater-config.properties:
checkpoint_dir: s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata_test/checkpoints
Commit the chart and deploy it. To verify successful operation, check the logs for Flink applying the checkpoint/savepoint, creating a new checkpoint, and confirming successful completion of the new checkpoint:
{"@timestamp":"2023-09-22T15:31:32.325Z","log.level": "INFO","message":"Restoring job 434830c8020a6fe160224465a127517b from Savepoint 19444 @ 0 for 434830c8020a6fe160224465a127517b located at s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata_test/checkpoints/ad699cfb0eb2c53365df1d982b806b70/chk-19444.", "ecs.version": "1.2.0","process.thread.name":"jobmanager-io-thread-1","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}
{"@timestamp":"2023-09-22T15:31:49.890Z","log.level": "INFO","message":"Triggering checkpoint 19445 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1695396709877 for job 434830c8020a6fe160224465a127517b.", "ecs.version": "1.2.0","process.thread.name":"Checkpoint Timer","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}
{"@timestamp":"2023-09-22T15:32:18.443Z","log.level": "INFO","message":"Completed checkpoint 19445 for job 434830c8020a6fe160224465a127517b (1428993309 bytes, checkpointDuration=27394 ms, finalizationTime=1172 ms).", "ecs.version": "1.2.0","process.thread.name":"jobmanager-io-thread-1","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}
Trigger a Savepoint
Follow the same process as Restore from a Checkpoint or Savepoint as documented above, but you also need to increment the savepointTriggerNonce value in the job: section of the deployment chart. This will trigger a savepoint at the next deploy. Example PR
Savepoint logs
After you deploy the chart with a different savepointNonce value (as described here) , you should see log lines similar to the below in the output of kubectl logs -f:
{"@timestamp":"2024-02-16T16:17:40.330Z","log.level": "INFO","message":"Triggering savepoint for job 58f59f0548b63d516324c324e1a17a38.", "ecs.version": "1.2.0","pro
cess.thread.name":"flink-akka.actor.default-dispatcher-13","log.logger":"org.apache.flink.runtime.jobmaster.JobMaster"}
{"@timestamp":"2024-02-16T16:17:40.341Z","log.level": "INFO","message":"Triggering checkpoint 273026 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=NATIVE}) @ 1708100260334 for job 58f59f0548b63d516324c324e1a17a38.",
"ecs.version": "1.2.0","process.thread.name":"Checkpoint Timer","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}
{"@timestamp":"2024-02-16T16:17:50.669Z","log.level": "INFO","message":"Disposing
savepoint s3://rdf-streaming-updater-staging/commons/savepoints/savepoint-58f59f-780a19f269f1.", "ecs.version": "1.2.0","process.thread.name":"jobmanager-io-thread-
1","log.logger":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher"}
{"@timestamp":"2024-02-16T16:17:50.670Z","log.level": "INFO","message":"Attempting to load configured state backend for savepoint disposal", "ecs.version": "1.2.0","process.thread.name":"jobmanager-io-thread-1","log.logger":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher"}
{"@timestamp":"2024-02-16T16:17:50.672Z","log.level": "INFO","message":"Attempting to load configured checkpoint storage for savepoint disposal", "ecs.version": "1.2.0","process.thread.name":"jobmanager-io-thread-1","log.logger":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher"}
Note that the savepoint path will include the currently-running job ID.
Redeploying Flink-Operator
Note: you need root permissions on deployment servers to do this.
- Login to the active deployment server.
- Become root (sudo -i)
- Select the correct kubernetes environment for your operation (for example, kube_env admin dse-k8s-eqiad for the dse-k8s-environment.
- Navigate to the admin_ng folder of the deployment-charts repo:
/srv/deployment-charts/helmfile.d/admin_ng.
- Destroy the current helmfile deployment: helmfile -e dse-k8s-eqiad -l name=flink-operator -i destroy. This is usually necessary because a failure can leave the operator in a partially-deployed state.
- Apply the helmfile deployment: helmfile -e dse-k8s-eqiad -l name=flink-operator -i apply.
Monitoring
The flink job activity can be monitored using the flink-app and the wdqs-streaming-updater graphana dashboards.
Important metrics:
- flink job uptime in the flink app dashboard (flink_jobmanager_job_uptime), indicates for how long the job has been running
- a constant low uptime (below 10minutes) might indicate that the job is constantly restarting. Lag may start to rise.
- Triples Divergences on the wdqs-streaming-updater dashboard, gives an indication of the divergences detected when applying the diffs, sudden surge might indicate the following problems:
- on a single machine, the blazegraph journal was corrupted or copied from another source or a serious bug in the streaming-updater-consumer.
- on all the machines in one or two DC, might indicate a problem in the streaming-updater-producer.
- Consumer Poll vs Store time on the wdqs-streaming-updater gives an indication of the saturation of the writes of the streaming-updater-consumer. Poll time is how much time is spent polling/waiting on kafka, store time is how much is spent on writing to blazegraph.
Runbooks
The job is not starting
RdfStreamingUpdaterFlinkJobUnstable
The job uptime remains under 5 minutes probably means that the job is constantly restarting.
The cause of the failed restarts must be identified by inspecting the #Logs, it might be that some of the #Dependencies are having issues.
Containers constantly being killed may lead to this problem as well (use kubectl get pod -o yaml
to inspect containerStatuses
).
The job is not running
WdqsStreamingUpdaterFlinkJobNotRunning or WcqsStreamingUpdaterFlinkJobNotRunning
The job is not running, there are several reasons for this:
- someone is doing a maintenance operation and the alert was not down-timed
- flink is not running or crashing
- the job had crashed without being restarted
For the last two points try to identify the cause of the crash looking at the #Logs, it could be that the k8s cluster does have enough resource to instantiate the required pods. Once the cause is known the flink session cluster must be brought up if it was not running. The job should recover itself after the flink session cluster starts, if it is not the case then you might to recover from a checkpoint.
The job processing latency is high
RdfStreamingUpdaterFlinkProcessingLatencyIsHigh
The job processing time is higher than usual, it might be due to increased latencies of one or several of the job dependencies:
- the thanos swift cluster (checkpoint times)
- the kafka cluster
- mediawiki application servers
Cause should be identified and the impact on the pipeline monitored:
- backlog for the consumer group should not grow
- checkpoint times should not increase
The consumers are backlogged
RdfStreamingUpdaterHighConsumerUpdateLag
The consumers pulling data out of the mutation kafka topic are backlogged and this might be due to:
Potential root cause: single host alerting
The machine was just restored from crash that lasted a long time: there's nothing to do other than waiting for the host to work through the backlog.
Potential root cause: multiple hosts alerting
- Not enough compute power. This can happen when too many hosts are out of load balancer rotation for too long. Pool all healthy hosts to relieve the pressure on existing. Wait for the [MaxLag] to kick-in and slow down bot edits. If this doesn't help, there could be an abusive query/DDoS. Traffic/Infrastructure Foundations SRE teams can help you isolate and block the bad query.
- The Streaming Updater stopped. If the lag is high in only one datacenter, this is the likely cause.
First run (bootstrap) OR recovering from savepoint
Backfilling after a bootstrap causes high RPS to `mw-api-int`. Coordinate this process with ServiceOps to avoid strain on the infra. Turning down the `wikibase_repo_thread_pool_size` values in the rdf-streaming-updater helm chart will also reduce strain.
The flink application must be given an initial-state, this initial state can be constructed from the RDF dumps using a flink job. We do this when bootstrapping, or when the flink application has been down too long (~1 wk) and recovering from an existing savepoint is no longer possible.
From stat1004.eqiad.wmnet
- Get a Kerberos ticket (kinit)
- Find the latest dump by checking HDFS:
hdfs dfs -ls 'hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map'
Find the result with the newest timestamp.
- Install flink (same version as the one running in k8s) under your home directory.
Configure kerberos for analytics-search (in conf/flink-conf.yaml):
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /etc/security/keytabs/analytics-search/analytics-search.keytab
security.kerberos.login.principal: analytics-search/stat1004.eqiad.wmnet@WIKIMEDIA
s3.access-key: wdqs:savepoints
s3.secret-key: secret
s3.endpoint: thanos-swift.discovery.wmnet
s3.path.style.access: true
Start flink:
sudo -u analytics-search kerberos-run-command analytics-search sh -c 'HADOOP_CLASSPATH="`hadoop classpath`" ./bin/yarn-session.sh -tm 8g -jm 2600m -s 4 -nm "WDQS Streaming Updater"'
Start the bootstrap job
FLINK_JOB=/home/dcausse/streaming-updater-producer.jar
DUMP_DATE=(dump date you found by looking in HDFS)
REV_FILE=hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map/$DUMP_DATE/rev_map.csv
# Use rdf-streaming-updater-eqiad or rdf-streaming-updater-codfw to create the savepoint for the eqiad or codfw flink job
SAVEPOINT_DIR=swift://rdf-streaming-updater-staging.thanos-swift/wikidata/savepoints/init_${DUMP_DATE}
sudo -u analytics-search kerberos-run-command analytics-search sh -c "export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run -c org.wikidata.query.rdf.updater.UpdaterBootstrapJob $FLINK_JOB --job_name bootstrap --revisions_file $REV_FILE --savepoint_dir $SAVEPOINT_DIR --parallelism 12"
Position the kafka offsets for the flink consumers.
First obtain the timestamp of the oldest start date of the dump script using hive
.
Note that the ${DUMP_DATE} in the SQL statement below is a placeholder; you'll need to supply the actual value from the
select object
from discovery.wikibase_rdf
where `date` = (dump date you found by looking in HDFS) and wiki='wikidata' and
subject = '<http://wikiba.se/ontology#Dump>' and
predicate = '<http://schema.org/dateModified>'
order by object asc
limit 1;
Position the offsets according to that date (can be done from stat1004 as well):
# This is the most dangerous command of this procedure as it may break
# an existing flink job by messing up their kafka consumer offsets.
# Be sure to create and activate a conda env with kafka-python
# Start obtained from the hql query above
START_DATE=(dump date you found by looking in HDFS)
# Use kafka-main1001.eqiad.wmnet:9092 for eqiad and staging
KAFKA_BROKER=<changeme>kafka-main2001.codfw.wmnet:9092
# Must match the options consumer_group of the flink_job
# note: set_initial_offsets.py is available at https://gerrit.wikimedia.org/r/plugins/gitiles/wikidata/query/rdf/+/refs/heads/master/streaming-updater-producer/scripts/set_initial_offsets.py
CONSUMER_GROUP=<changeme>wdqs_streaming_updater
for c in eqiad codfw; do
for t in mediawiki.revision-create mediawiki.page-delete mediawiki.page-undelete mediawiki.page-suppress; do
python set_initial_offsets.py -t $c.$t -c $CONSUMER_GROUP -b $KAFKA_BROKER -s $START_DATE;
done;
done
Then start the flink job on k8s using the savepoint.
Running from YARN
In the past it happened that job was unable to perform properly on wikikube (phab:T314835) and we had to run it via YARN to unblock it. Restoring the job from YARN is a risky process that involves many manual steps and should be done carefully, and because the job requirements might have changed while this note have been written you must be sure to know what you are doing before starting such process. The following steps are lacking some details on purpose so that you research and double check them:
- From a stat machine, prepare a flink environment similar to the one used in wikikube (same flink versions, same plugins and libs)
- Download the job jar and place it somewhere on the same stat machine, beware that using the `flink-session-cluster` deployment model there are no easy way to determine which jar version to use (the reason is that wdqs/deploy repo gets updated for many different reasons and thus the version of the jar present in that repo is not necessarily the one actually running). If using the flink-k8s-operator it should be the jar used to build the image.
- Adapt the flink configuration to have a proper kerberos keytab for analytics-search
- Construct the job properties file, out of the parameters that are constructed the
flink-job.py
if still using theflink-session-cluster
deployment model or the parameters present in the services values file of the corresponding environment build a properties file holding all these parameters. This is most error prone step, triple check everything, the job must run from the same kafka cluster and produce to the same kafka cluster and topic. - Identify the checkpoint to resume from (see #Recover from a checkpoint)
- Run the job using the properties file you created and from the checkpoint you identified.
- Note that the swift object storage is replicated between DC so you can use swift@eqiad even if you are trying to restore the job that was running in codfw.
Important note: beware to not try to restore a checkpoint that is too old and for which the stored offsets might no longer exists on the topic it depends on, we've seen in some cases that flink was unable to restore checkpoints older than 7days without the option ignore_failures_after_transaction_timeout: true
(gerrit:887300).