Data Platform/Systems/Hive/Querying using UDFs
You can write user-defined functions (or UDFs) which encapsulate complex query logic for easy reuse within Hive.
We maintain a standard set of UDFs in analytics/refinery/source/refinery-hive/src/main/java/org/wikimedia/analytics/refinery/hive.
The latest compiled versions of these standard UDFs can be found on the stat machines at:
/srv/deployment/analytics/refinery/artifacts/refinery-hive-shaded.jar
andhdfs:///wmf/refinery/current/artifacts/refinery-hive-shaded.jar
Using an existing UDF
Here's an example of how to use our user agent UDF to parse user agents from request logs (note that this won't work on EventLogging tables, where the user agent field has already been parsed from a string into a struct).
First, you need to add the function to your session:
hive> ADD JAR /srv/deployment/analytics/refinery/artifacts/refinery-hive-shaded.jar; hive> CREATE TEMPORARY FUNCTION ua as 'org.wikimedia.analytics.refinery.hive.GetUAPropertiesUDF'; OK Time taken: 0.046 seconds
Note: if you are using Hue, you will need to load from HDFS via ADD JAR hdfs:///wmf/refinery/current/artifacts/refinery-hive-shaded.jar;
instead.
Once the function has been added, execute SELECT
using function:
hive> select ua(user_agent) from webrequest where year=2014 and month=9 and day=1 and hour=12
Output will look like:
{"browser_major":"3","os_family":"Android","os_major":"4","device_family":"Lenovo A3500-H","browser_family":"UC Browser","os_minor":"4"} 9
{"browser_major":"30","os_family":"Android","os_major":"4","device_family":"LG-D631/D63110b","browser_family":"Chrome Mobile","os_minor":"4"} 260
{"browser_major":"30","os_family":"Android","os_major":"4","device_family":"SM-N9008","browser_family":"Chrome Mobile","os_minor":"4"} 18
{"browser_major":"31","os_family":"Android","os_major":"4","device_family":"SM-N9005","browser_family":"Chrome Mobile","os_minor":"4"} 708
Group by device_family:
select a.device_family, count(*) as cnt from (
select ua(user_agent)['device_family'] as device_family
from webrequest
where webrequest_source='mobile' and year=2014 and month=10 and day=30 and hour=0
) a
group by a.device_family order by cnt desc limit 10;
iPhone 7773691 Other 2940052 iPad 2911523 Spider 770622 iPod 299841 Samsung GT-I9300 173951 Samsung GT-I9505 170575 Samsung SCH-I545 161654 Samsung SM-G900V 150833 HTC One 132639
Using Reflect UDF
The reflect
UDF is a generic UDF that allows the user to use a Java class method or function without a specific UDF wrapper.[1] For example, we can use it to access the decode
static method of the PercentDecoder class in refinery core:
ADD JAR /srv/deployment/analytics/refinery/artifacts/refinery-hive-shaded.jar;
SELECT reflect("org.wikimedia.analytics.refinery.core.PercentDecoder", "decode", "hello%20world%21") AS decoded_string;
Writing a UDF
Study existing UDFs and their unit tests. For example:
- Client IP (implementation, tests)
- Search Properties (implementation, tests)
- GetSearchPropertiesUDF uses SearchQuery (implementation, tests) singleton which in turn uses an external
enum
SearchQueryFeatureRegex, both in refinery core - the unit tests use a CSV file which has many strings to evaluate and the values they're supposed to evaluate to
- GetSearchPropertiesUDF uses SearchQuery (implementation, tests) singleton which in turn uses an external
Use Get* naming for UDFs that calculate (e.g. GetUAPropertiesUDF) and Is* naming for UDFs that determine true/false (e.g. IsPageviewUDF).
It's very important you include unit tests with any additions to refinery core and/or refinery hive that you make.
IntelliJ IDEA is the recommended IDE for writing UDFs. Licenses are available to Foundation staff, see Office wiki.
Testing a UDF you just wrote
On Stat*
If the definition of the UDF is not yet merged you would need to build a jar that contains the UDF. You can checkout code on stat1007 and build the jar there
Build the jar using mvn
(maven):
mvn package
Once you have compiled your UDF, you will need to register it with Hive before using it.
hive> ADD JAR /some/path/refinery-hive-0.0.1.jar;
Added /some/path/refinery-hive-0.0.1.jar to class path Added resource: /some/path/refinery-hive-0.0.1.jar
hive> CREATE TEMPORARY FUNCTION blah as 'org.wikimedia.analytics.refinery.hive.BlahUDF'; OK Time taken: 0.046 seconds
Execute select using function
hive> select blah(some_colum) from webrequest where year=2014 and month=9 and day=1 and hour=12 limit
On Hue
If you prefer to use the online GUI "Hue" (see access info), you must copy the jar to HDFS:
$ hdfs dfs -put /path/to/udf.jar /user/<your LDAP username>/
Then you can register it in your Hive query in Hue's query editor via:
ADD JAR hdfs:///user/<your LDAP username>/udf.jar;
Refer to usage instructions in the previous section. Note that you can also use that same JAR when you use Hive CLI, not just Hue.
Testing changes to existing UDF
You will need to build the jar just like you would in the case of creating a new udf but when testing the udf you need to override the path that loads some jars by default. Otherwise hive will be existing code rather than new code.
Leaving hive.aux.jars.path empty will do the trick.
hive --hiveconf hive.aux.jars.path= -f test-udf.hql
Debugging
You can get more debugging information in your CLI by launching Hive with a special configuration option:
hive --hiveconf hive.root.logger=INFO,console
Sampling Data
In these examples we'll get a user agent report for one month of data.
With predefined buckets
At creation, the webrequest table defines buckets, meaning its data is clustered in files based on the bucketting fields, namely hostname
and sequence
. This bucketing allows to efficiently sample data at read time (less data to read), when using the same bucketing parameters:
SELECT
user_agent,
COUNT(1) AS c
FROM webrequest TABLESAMPLE(BUCKET 1 OUT OF 1024 ON hostname, sequence)
WHERE year=2018 AND month=3
AND webrequest_source='text'
GROUP BY user_agent
ORDER BY c DESC
LIMIT 20;
It is to be noticed that the original bucketing is done over 64 partitions, therefore the request bucketing MUST be made on a multiple of 64.
In our example, the number of mappers (tasks reading data) is 1454. For the same request over a month of webrequest without sampling, the number of mappers goes up to 92722.
Without predefined buckets
Hadoop holds about a month of data, which means that there is a LOT of data. You do not need to a access it all in order to get a sufficiently precise user agent report. We get about 10.000 request per second for mobile so sampling 1 in 1000 gives you about 18 million records that should be sufficient to get a monthly report.
A hive query like the following does the sampling and the grouping using the UDF:
ADD JAR /home/nuria/refinery-hive-0.0.1.jar;
CREATE TEMPORARY FUNCTION ua as 'org.wikimedia.analytics.refinery.hive.UAParserUDF';
use wmf_raw;
SELECT a.useragent, COUNT(1)
FROM
(select ua(user_agent) as useragent
from webrequest TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand())
where year=2014 and webrequest_source="mobile") a
GROUP BY a.useragent
To execute (timing the output):
time hive -f select.sql > output.txt
Output will look like:
{"browser_major":"3","os_family":"Android","os_major":"4","device_family":"Lenovo A3500-H","browser_family":"UC Browser","os_minor":"4"} 9
{"browser_major":"30","os_family":"Android","os_major":"4","device_family":"LG-D631/D63110b","browser_family":"Chrome Mobile","os_minor":"4"} 260
{"browser_major":"30","os_family":"Android","os_major":"4","device_family":"SM-N9008","browser_family":"Chrome Mobile","os_minor":"4"} 18
{"browser_major":"31","os_family":"Android","os_major":"4","device_family":"SM-N9005","browser_family":"Chrome Mobile","os_minor":"4"} 708
It is to be noticed that this bucketing still involves reading the whole lot of webrequest.
Using Hive UDFs with Spark
About multi-threaded environment
Spark splits the work into tasks that are distributed to executors for execution. Each executor executes N tasks in N parallel threads, matching its N cores.
Each task will create an instance of the UDF and initialize it before using it on each row.
About Serialization
Spark sends instructions (including UDFs) to executors by using Kryo serialization. Kryo does not need the class to implement Serializable.
Some objects, like Caffeine cache, are crashing at deserialization with Kryo so you may mark some of the instance variables as transient.
See also
- Explanation and code examples in the WMF Discovery team's analytics documentation
- Writing Hive UDFs – a tutorial