Jump to content

Data Platform/Systems/Iceberg

From Wikitech

Apache Iceberg adds a layer between Hadoop SQL engines and filesystems like HDFS and Ceph so that the semantics of data read and write are more familiar to users of relational databases. That is, Iceberg is atomic, as in writes don’t mess up reads, and more scalable since query plan computations are done locally without depending on external catalogs or walking file systems trees. Iceberg also supports 'hidden partitioning' effectively decoupling physical data layout from queries.

Iceberg is a library, not a service. This means that systems that want to read and write Iceberg tables must include Iceberg's library and some configuration to support it.

Query engines that support Iceberg will also depend on the Hive Metastore to find tables and for its locking mechanism so that writes are atomic.

Query Engine Support

Engine Version Iceberg Read Support Iceberg Write Support Iceberg version Comments
Spark 3.1.2 Yes Yes 1.2.1
Spark 3.3.2+ Yes Yes 1.2.1+ We have limited Spark 3.3+ support for PySpark jobs. These jobs can choose to use Iceberg 1.2.1 and up.
Presto 0.273.3 Yes No 0.13.1 Presto embeds its own Iceberg library. At WMF, Presto is setup for read only. Version 0.13.1 is forward read compatible with 1.2.1.
Hive 2.3.6 No No N/A Although we will be using the Hive Metastore for cataloging purposes (i.e. to discover tables), we do not intent to support querying in Hive.
Flink (Search) 1.16 Yes Yes Not installed The search team currently runs Flink 1.16. This version is currently supported by Iceberg recent releases. We have not installed the Iceberg jars on their cluster yet.
Flink (Events) 1.16,

1.17

Yes Yes Not installed The events team currently runs Flink 1.16 and 1.17. These versions are currently supported by Iceberg recent releases. We have not installed the Iceberg jars on their cluster yet.

Accessing metadata

As of May 2024, we run Spark < 3.2, which does not support accessing metadata "sub-tables" using SQL. You can work around this by using Spark's DataFrameReader API.

For example, this will not work:

import wmfdata as wmf

# Raises `AnalysisException: The namespace in session catalog must have 
# exactly one name part`
wmf.spark.run("""
    SELECT *
    FROM wmf_traffic.referrer_daily.files
""")

However, this will work:

import wmfdata as wmf

spark = wmf.spark.create_session()

(
    spark
    .read
    .format("iceberg")
    .load("wmf_traffic.referrer_daily.files")
    .toPandas()
)

Migration from Hive Tables

The Data Engineering team intends to migrate all the supported datasets currently under the wmf Hive database to Iceberg. We intend to keep both versions (old Hive table and new Iceberg tables) working in production for a reasonable time.

What to expect in terms of querying

Iceberg supports 'hidden partitioning', and we will leverage this mechanism so that we can evolve tables while being less disruptive to users. This means that the schema of the tables will change, with most of that change coming from the way we partition the data. Here is an example for the wmf.referrer_daily table. In its Hive version, we have the following schema:

CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily` (
    `country`             string  COMMENT 'Reader country per IP geolocation',
    `lang`                string  COMMENT 'Wikipedia language -- e.g., en for English',
    `browser_family`      string  COMMENT 'Browser family from user-agent',
    `os_family`           string  COMMENT 'OS family from user-agent',
    `search_engine`       string  COMMENT 'One of ~20 standard search engines (e.g., Google)',
    `num_referrals`       int     COMMENT 'Number of pageviews from the referral source'
)
PARTITIONED BY (
    `year`                int     COMMENT 'Unpadded year of request',
    `month`               int     COMMENT 'Unpadded month of request',
    `day`                 int     COMMENT 'Unpadded day of request'
)
STORED AS PARQUET
LOCATION '/wmf/data/wmf/referrer/daily'
;

The respective Iceberg schema is as follows:

CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily`(
    `country`             string  COMMENT 'Reader country per IP geolocation',
    `lang`                string  COMMENT 'Wikipedia language -- e.g., en for English',
    `browser_family`      string  COMMENT 'Browser family from user-agent',
    `os_family`           string  COMMENT 'OS family from user-agent',
    `search_engine`       string  COMMENT 'One of ~20 standard search engines (e.g., Google)',
    `num_referrals`       int     COMMENT 'Number of pageviews from the referral source',
    `day`                 date    COMMENT 'The date of the request'
)
USING ICEBERG
PARTITIONED BY (months(day))
LOCATION '/wmf/data/wmf_traffic/referrer/daily'
;

Notice the following:

  • In the Hive table, partitioning is explicit, and year, month, and day become INT columns that you must include in your queries. For example, you may do: SELECT * FROM wmf.referrer_daily WHERE year = 2020 AND month = 5 and day = 1;. If we ever changed the partitioning strategy, your queries would have to change accordingly.
  • In the Iceberg table the partitioning references the existing day column, which is of type DATE. Thus queries are like so: SELECT * FROM wmf_traffic.referrer_daily WHERE day = '2020-05-01';. Since the partitioning refers to existing columns, if we ever changed the partitioning strategy, your queries will only need to change if you wanted to pickup the performance gains. Otherwise your queries will continue working. This mechanism also allows Data Engineering to better control the amount of files generated by any single table, which when unchecked, can be problematic.

Changes to database names

As part of the Iceberg migration, we are also taking the opportunity to functionally decompose the 56 tables currently on the wmf database into distinct databases that hopefully will help data discovery. Please note that these proposed databases should only contain Iceberg tables, given that Presto has a limitation where it is unable to query both Iceberg and Hive tables in the same session.

After some discussions, we settled on the following.

proposed db table comments
wmf_contributors edit_hourly
wmf_contributors editors_daily
wmf_contributors geoeditors_blacklist_country
wmf_contributors geoeditors_edits_monthly
wmf_contributors geoeditors_monthly
wmf_contributors geoeditors_public_monthly
wmf_contributors unique_editors_by_country_monthly
wmf_data_ops data_quality_stats meta about other tables
wmf_data_ops hdfs_usage HDFS file system usage.
wmf_experiments iceberg_wikitext_content
wmf_mediawiki mediawiki_history
wmf_mediawiki mediawiki_history_archive
wmf_mediawiki mediawiki_history_reduced
wmf_mediawiki mediawiki_metrics
wmf_mediawiki mediawiki_page_history
wmf_mediawiki mediawiki_page_history_archive
wmf_mediawiki mediawiki_user_history
wmf_mediawiki mediawiki_user_history_archive
wmf_mediawiki mediawiki_wikitext_current
wmf_mediawiki mediawiki_wikitext_history
wmf_readership disallowed_cassandra_articles
wmf_readership pageview_actor
wmf_readership pageview_allowlist
wmf_readership pageview_dataloss_202112_202201
wmf_readership pageview_historical
wmf_readership pageview_hourly
wmf_readership pageview_unexpected_values
wmf_readership projectcounts_all_sites
wmf_readership projectcounts_raw
wmf_readership projectview_hourly
wmf_readership session_length_daily
wmf_readership unique_devices_per_domain_daily
wmf_readership unique_devices_per_domain_monthly
wmf_readership unique_devices_per_project_family_daily
wmf_readership unique_devices_per_project_family_monthly
wmf_readership virtualpageview_hourly
wmf_traffic anomaly_detection
wmf_traffic aqs_hourly
wmf_traffic browser_general
wmf_traffic domain_abbrev_map
wmf_traffic interlanguage_navigation
wmf_traffic mediacounts
wmf_traffic mediarequest
wmf_traffic officewiki_webrequest_daily
wmf_traffic referrer_daily
wmf_traffic traffic_anomaly_checked_countries unsure
wmf_traffic webrequest
wmf_traffic webrequest_actor_label_hourly
wmf_traffic webrequest_actor_metrics_hourly
wmf_traffic webrequest_actor_metrics_rollup_hourly
wmf_traffic webrequest_subset_tags
wmf_wikidata wikidata_entity
wmf_wikidata wikidata_item_page_link
mobilewebuiclicktracking_10742159_15423246 appears to be deprecated, so will not be migrated.
pagecounts_all_sites appears to be deprecated, so will not be migrated.
tmp_druid_load_virtualpageview_daily_20230427 this is a vestigial temp. they should now be created under database `tmp`
tmp_druid_load_virtualpageview_daily_20230428 this is a vestigial temp. they should now be created under database `tmp`
wdqs_extract appears to be deprecated, so will not be migrated.

It is a de-facto standard that data for "production" tables should be saved in /wmf/data directory of HDFS. However, there is no guideline on how that directory should be organized (task T367243).

Data for these tables should be saved in with table name optionally broken down into more than one directory (for example, referrer_daily uses referrer/daily rather than referrer_daily).

Table migration check list

  1. [DOC] List dependencies of the migrated table [Datasets, Dashboards]. Using the new Iceberg table will incur the datasets pipelines sensors and queries to be updated, and the dashboards to be updated as well to match the new iceberg temporal partition mechanism.
  2. [CODE] Prepare a code-review with the new schema for the table. This new table will be extremely similar to the old, except for 1) a timestamp field with timestamp type 2) no more year/month/day/hour partitioning but a timestamp-based hidden partitioning (see previous section)
  3. [CODE] Prepare a code-review updating the original table's airflow job with a new step filling in the new Iceberg table.
  4. [CODE] Prepare a code-review with a one-off job to be run to back-fill the old data into the new table. This step is mostly need to fill-in the new timestamp field not available in original data in most tables.
  5. [CODE-NOT_YET] Prepare a code-review to add iceberg maintenance jobs to the new table. This task will be doable after T338065. For now we should manually run maintenance job regularly on existing iceberg tables.
  6. [OPS] Create the new iceberg table after step 2 has been reviewed and validated - Add the Iceberg tag to its datahub documentation.
  7. [OPS] Start the airflow job updating the new table - After step 3 has been reviewed and validated
  8. [OPS] Back-fill the old data from the existing table - After step 4 has been reviewed and validated
  9. [CODE] Prepare code-reviews to update dependent datasets to use the new table
  10. [OPS] Make the dependent datasets use the new table - After step 9 has been reviewed and validated
  11. [COM] Communicate about the future deprecation of the old table, particularly mentioning impacted Dashboards (possibly flagging owners)
  12. [CODE] Update the Airflow job to not compute the old table
  13. [OPS] Stop the old table updates and drop the table - After step 12 has been validate.
  14. [DOC] Update DataHub, tag migrated table with 'iceberg' tag
  15. [OPS] Validate data in the new table after one two airflow runs - data errors might occur in the regular jobs even if back-fill went well (for instance data duplication).

Which tables are migrated?

Hive Table Name Iceberg Table Name Dependencies Created? Backfilled? Pipeline on Airflow? Documented? Original Table Deprecated Comments
wmf.referrer_daily wmf_traffic.referrer_daily Yes Yes Yes Yes Yes Yes https://phabricator.wikimedia.org/T335305
wmf.unique_devices_per_domain_daily wmf_readership. unique_devices_per_domain_daily Yes Yes Yes Yes No https://phabricator.wikimedia.org/T347689
wmf.unique_devices_per_domain_monthly wmf_readership.unique_devices_per_domain_monthly (above) Yes Yes Yes No
wmf.unique_devices_per_project_family_daily wmf_readership.unique_devices_per_project_family_daily (above) Yes Yes Yes No
wmf.unique_devices_per_project_family_monthly wmf_readership.unique_devices_per_project_family_monthly (above) Yes Yes Yes No