Data Platform/Systems/Iceberg
Apache Iceberg adds a layer between Hadoop SQL engines and filesystems like HDFS and Ceph so that the semantics of data read and write are more familiar to users of relational databases. That is, Iceberg is atomic, as in writes don’t mess up reads, and more scalable since query plan computations are done locally without depending on external catalogs or walking file systems trees. Iceberg also supports 'hidden partitioning' effectively decoupling physical data layout from queries.
Iceberg is a library, not a service. This means that systems that want to read and write Iceberg tables must include Iceberg's library and some configuration to support it.
Query engines that support Iceberg will also depend on the Hive Metastore to find tables and for its locking mechanism so that writes are atomic.
Query Engine Support
Engine | Version | Iceberg Read Support | Iceberg Write Support | Iceberg version | Comments |
---|---|---|---|---|---|
Spark | 3.1.2 | Yes | Yes | 1.2.1 | |
Spark | 3.3.2+ | Yes | Yes | 1.2.1+ | We have limited Spark 3.3+ support for PySpark jobs. These jobs can choose to use Iceberg 1.2.1 and up. |
Presto | 0.273.3 | Yes | No | 0.13.1 | Presto embeds its own Iceberg library. At WMF, Presto is setup for read only. Version 0.13.1 is forward read compatible with 1.2.1. |
Hive | 2.3.6 | No | No | N/A | Although we will be using the Hive Metastore for cataloging purposes (i.e. to discover tables), we do not intent to support querying in Hive. |
Flink (Search) | 1.16 | Yes | Yes | Not installed | The search team currently runs Flink 1.16. This version is currently supported by Iceberg recent releases. We have not installed the Iceberg jars on their cluster yet. |
Flink (Events) | 1.16,
1.17 |
Yes | Yes | Not installed | The events team currently runs Flink 1.16 and 1.17. These versions are currently supported by Iceberg recent releases. We have not installed the Iceberg jars on their cluster yet. |
Accessing metadata
As of May 2024, we run Spark < 3.2, which does not support accessing metadata "sub-tables" using SQL. You can work around this by using Spark's DataFrameReader API.
For example, this will not work:
import wmfdata as wmf
# Raises `AnalysisException: The namespace in session catalog must have
# exactly one name part`
wmf.spark.run("""
SELECT *
FROM wmf_traffic.referrer_daily.files
""")
However, this will work:
import wmfdata as wmf
spark = wmf.spark.create_session()
(
spark
.read
.format("iceberg")
.load("wmf_traffic.referrer_daily.files")
.toPandas()
)
Migration from Hive Tables
The Data Engineering team intends to migrate all the supported datasets currently under the wmf
Hive database to Iceberg. We intend to keep both versions (old Hive table and new Iceberg tables) working in production for a reasonable time.
What to expect in terms of querying
Iceberg supports 'hidden partitioning', and we will leverage this mechanism so that we can evolve tables while being less disruptive to users. This means that the schema of the tables will change, with most of that change coming from the way we partition the data. Here is an example for the wmf.referrer_daily
table. In its Hive version, we have the following schema:
CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily` (
`country` string COMMENT 'Reader country per IP geolocation',
`lang` string COMMENT 'Wikipedia language -- e.g., en for English',
`browser_family` string COMMENT 'Browser family from user-agent',
`os_family` string COMMENT 'OS family from user-agent',
`search_engine` string COMMENT 'One of ~20 standard search engines (e.g., Google)',
`num_referrals` int COMMENT 'Number of pageviews from the referral source'
)
PARTITIONED BY (
`year` int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day` int COMMENT 'Unpadded day of request'
)
STORED AS PARQUET
LOCATION '/wmf/data/wmf/referrer/daily'
;
The respective Iceberg schema is as follows:
CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily`(
`country` string COMMENT 'Reader country per IP geolocation',
`lang` string COMMENT 'Wikipedia language -- e.g., en for English',
`browser_family` string COMMENT 'Browser family from user-agent',
`os_family` string COMMENT 'OS family from user-agent',
`search_engine` string COMMENT 'One of ~20 standard search engines (e.g., Google)',
`num_referrals` int COMMENT 'Number of pageviews from the referral source',
`day` date COMMENT 'The date of the request'
)
USING ICEBERG
PARTITIONED BY (months(day))
LOCATION '/wmf/data/wmf_traffic/referrer/daily'
;
Notice the following:
- In the Hive table, partitioning is explicit, and
year
,month
, andday
becomeINT
columns that you must include in your queries. For example, you may do:SELECT * FROM wmf.referrer_daily WHERE year = 2020 AND month = 5 and day = 1;
. If we ever changed the partitioning strategy, your queries would have to change accordingly. - In the Iceberg table the partitioning references the existing
day
column, which is of typeDATE
. Thus queries are like so:SELECT * FROM wmf_traffic.referrer_daily WHERE day = '2020-05-01';
. Since the partitioning refers to existing columns, if we ever changed the partitioning strategy, your queries will only need to change if you wanted to pickup the performance gains. Otherwise your queries will continue working. This mechanism also allows Data Engineering to better control the amount of files generated by any single table, which when unchecked, can be problematic.
Changes to database names
As part of the Iceberg migration, we are also taking the opportunity to functionally decompose the 56 tables currently on the wmf
database into distinct databases that hopefully will help data discovery. Please note that these proposed databases should only contain Iceberg tables, given that Presto has a limitation where it is unable to query both Iceberg and Hive tables in the same session.
After some discussions, we settled on the following.
proposed db | table | comments |
---|---|---|
wmf_contributors | edit_hourly | |
wmf_contributors | editors_daily | |
wmf_contributors | geoeditors_blacklist_country | |
wmf_contributors | geoeditors_edits_monthly | |
wmf_contributors | geoeditors_monthly | |
wmf_contributors | geoeditors_public_monthly | |
wmf_contributors | unique_editors_by_country_monthly | |
wmf_data_ops | data_quality_stats | meta about other tables |
wmf_data_ops | hdfs_usage | HDFS file system usage. |
wmf_experiments | iceberg_wikitext_content | |
wmf_mediawiki | mediawiki_history | |
wmf_mediawiki | mediawiki_history_archive | |
wmf_mediawiki | mediawiki_history_reduced | |
wmf_mediawiki | mediawiki_metrics | |
wmf_mediawiki | mediawiki_page_history | |
wmf_mediawiki | mediawiki_page_history_archive | |
wmf_mediawiki | mediawiki_user_history | |
wmf_mediawiki | mediawiki_user_history_archive | |
wmf_mediawiki | mediawiki_wikitext_current | |
wmf_mediawiki | mediawiki_wikitext_history | |
wmf_readership | disallowed_cassandra_articles | |
wmf_readership | pageview_actor | |
wmf_readership | pageview_allowlist | |
wmf_readership | pageview_dataloss_202112_202201 | |
wmf_readership | pageview_historical | |
wmf_readership | pageview_hourly | |
wmf_readership | pageview_unexpected_values | |
wmf_readership | projectcounts_all_sites | |
wmf_readership | projectcounts_raw | |
wmf_readership | projectview_hourly | |
wmf_readership | session_length_daily | |
wmf_readership | unique_devices_per_domain_daily | |
wmf_readership | unique_devices_per_domain_monthly | |
wmf_readership | unique_devices_per_project_family_daily | |
wmf_readership | unique_devices_per_project_family_monthly | |
wmf_readership | virtualpageview_hourly | |
wmf_traffic | anomaly_detection | |
wmf_traffic | aqs_hourly | |
wmf_traffic | browser_general | |
wmf_traffic | domain_abbrev_map | |
wmf_traffic | interlanguage_navigation | |
wmf_traffic | mediacounts | |
wmf_traffic | mediarequest | |
wmf_traffic | officewiki_webrequest_daily | |
wmf_traffic | referrer_daily | |
wmf_traffic | traffic_anomaly_checked_countries | unsure |
wmf_traffic | webrequest | |
wmf_traffic | webrequest_actor_label_hourly | |
wmf_traffic | webrequest_actor_metrics_hourly | |
wmf_traffic | webrequest_actor_metrics_rollup_hourly | |
wmf_traffic | webrequest_subset_tags | |
wmf_wikidata | wikidata_entity | |
wmf_wikidata | wikidata_item_page_link | |
mobilewebuiclicktracking_10742159_15423246 | appears to be deprecated, so will not be migrated. | |
pagecounts_all_sites | appears to be deprecated, so will not be migrated. | |
tmp_druid_load_virtualpageview_daily_20230427 | this is a vestigial temp. they should now be created under database `tmp` | |
tmp_druid_load_virtualpageview_daily_20230428 | this is a vestigial temp. they should now be created under database `tmp` | |
wdqs_extract | appears to be deprecated, so will not be migrated. |
It is a de-facto standard that data for "production" tables should be saved in /wmf/data
directory of HDFS. However, there is no guideline on how that directory should be organized (task T367243).
Data for these tables should be saved in with table name optionally broken down into more than one directory (for example, referrer_daily
uses referrer/daily
rather than referrer_daily
).
Table migration check list
- [DOC] List dependencies of the migrated table [Datasets, Dashboards]. Using the new Iceberg table will incur the datasets pipelines sensors and queries to be updated, and the dashboards to be updated as well to match the new iceberg temporal partition mechanism.
- [CODE] Prepare a code-review with the new schema for the table. This new table will be extremely similar to the old, except for 1) a timestamp field with timestamp type 2) no more year/month/day/hour partitioning but a timestamp-based hidden partitioning (see previous section)
- [CODE] Prepare a code-review updating the original table's airflow job with a new step filling in the new Iceberg table.
- [CODE] Prepare a code-review with a one-off job to be run to back-fill the old data into the new table. This step is mostly need to fill-in the new timestamp field not available in original data in most tables.
- [CODE-NOT_YET] Prepare a code-review to add iceberg maintenance jobs to the new table. This task will be doable after T338065. For now we should manually run maintenance job regularly on existing iceberg tables.
- [OPS] Create the new iceberg table after step 2 has been reviewed and validated - Add the
Iceberg
tag to its datahub documentation. - [OPS] Start the airflow job updating the new table - After step 3 has been reviewed and validated
- [OPS] Back-fill the old data from the existing table - After step 4 has been reviewed and validated
- [CODE] Prepare code-reviews to update dependent datasets to use the new table
- [OPS] Make the dependent datasets use the new table - After step 9 has been reviewed and validated
- [COM] Communicate about the future deprecation of the old table, particularly mentioning impacted Dashboards (possibly flagging owners)
- [CODE] Update the Airflow job to not compute the old table
- [OPS] Stop the old table updates and drop the table - After step 12 has been validate.
- [DOC] Update DataHub, tag migrated table with 'iceberg' tag
- [OPS] Validate data in the new table after one two airflow runs - data errors might occur in the regular jobs even if back-fill went well (for instance data duplication).
Which tables are migrated?
Hive Table Name | Iceberg Table Name | Dependencies | Created? | Backfilled? | Pipeline on Airflow? | Documented? | Original Table Deprecated | Comments |
---|---|---|---|---|---|---|---|---|
wmf.referrer_daily | wmf_traffic.referrer_daily | Yes | Yes | Yes | Yes | Yes | Yes | https://phabricator.wikimedia.org/T335305 |
wmf.unique_devices_per_domain_daily | wmf_readership. unique_devices_per_domain_daily | Yes | Yes | Yes | Yes | No | https://phabricator.wikimedia.org/T347689 | |
wmf.unique_devices_per_domain_monthly | wmf_readership.unique_devices_per_domain_monthly | (above) | Yes | Yes | Yes | No | ||
wmf.unique_devices_per_project_family_daily | wmf_readership.unique_devices_per_project_family_daily | (above) | Yes | Yes | Yes | No | ||
wmf.unique_devices_per_project_family_monthly | wmf_readership.unique_devices_per_project_family_monthly | (above) | Yes | Yes | Yes | No | ||