Jump to content

Data Platform/Systems/Spark

From Wikitech

Spark is a powerful engine for processing data on the Analytics Cluster. You can drive it using SQL, Python, R, Java, or Scala.

As of November 2023, we are running Spark 3.1.2 (docs: spark.apache.org/docs/3.1.2/) in all of our production pipelines.

However, in order to facilitate users' development environments and ease the upgrade path, we are now able to run multiple concurrent versions of the spark shuffler service for YARN. See task T344910 for more information. These spark shuffler service versions for YARN are:

  • 3.1.2
  • 3.3.2
  • 3.4.1

Command-line interfaces

There are a number of Spark command-line programs available on the analytics clients:

  • spark3-submit
  • spark3-shell
  • spark3R
  • spark3-sql
  • pyspark3
  • spark3-thriftserver

Note that other Spark documentation will use the standard names for these programs, without the 3 (e.g. spark-submit). We have added the 3 to prevent confusion with the programs from Spark 1 and 2.

spark3-sql allows you to interact with Hive tables directly via Spark SQL engine, but in a purely SQL REPL, rather than having to code in a programming language.

In the rest of this doc, spark3 shell commands will be used, as it is the preferred installation of Spark. Note that our spark3 configuration defaults pyspark3 to using python3 (and ipython3 for the driver).

Configuration

Application Master

Spark can be run either in local mode, where all execution happens on the machine the notebook is started from, or in Yarn mode which is fully distributed.

Local mode is especially convenient for initial notebook development. Local mode is typically more responsive than Yarn mode, and allows prototyping code against small samples of the data before scaling up to full month or multi-month datasets. Dependencies are additionally easily handled in local mode, when running local nothing special has to be done. Dependencies can be installed with !pip install and will be available in all executor processes. In local mode the number of java threads can be specified using brackets, such as local[12] . It is recommended to provide more memory than the defaults when running in local mode, otherwise a dozen java executor threads might fight over a couple GB of memory.

Note: Using spark in local mode implies some knowledge of the resources of the local machine it runs on (number of CPU cores and amount of memory). Please don't use the full capacity of local machines! Some other users might be working with it as well:)

(
    builder
    .master('local[12]')
    .config('spark.driver.memory', '8g')
)

Local mode has it's limits. When more compute is necessary set the master to yarn and specify some limits to the amount of compute to use. Spark will execute a task per core, so 64 executors with 4 cores will give 256 parallel tasks with 64 * 8G = 512G of total memory to work on the notebook (this represent ~15% of the whole cluster). Finally some more memory is needed for the spark-driver: 2g instead of 1g as default. The driver being responsible to collect computed data onto the notebook (if needed) as well as managing the whole tasks the application deals with, some memory shall prevent it to crash too quickly.

(
    builder
    .master('yarn')
    .config('spark.dynamicAllocation.maxExecutors', 64)
    .config('spark.executor.memory', '4g')
    .config('spark.executor.cores', 4)
    .config('spark.driver.memory', 2g)
)

Other working examples

An example notebook has been put together demonstrating loading the ELMo package from tensorflow hub and running it across many executors in the yarn cluster.

Hive table Spark writer configuration

For Spark jobs triggered through Airflow, we utilize version=2 of the file committer algorithm. This configuration significantly enhances the robustness of the output writing process by mitigating concurrency issues that may occur when multiple jobs write to the same Hive table simultaneously (e.b. Airflow back-filling or parallel execution).

In the event of a job failure, output files may be partially written to the partition folder. However, this is not a concern in the Airflow context, as jobs are typically configured to automatically retry until successful.

--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2

Other Spark jobs on the cluster are using the version=1 of the algorithm to write the output files. For those manually triggered jobs, the output is written to the Hive partition directory only if the job is a success. This avoids user's confusion.

--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1

How do I ...

Use PySpark to run SQL on Hive tables

The wmfdata-python quickstart notebook has examples of running SQL commands against the Hive tables in the Data Lake using the PySpark SQL interface, and how to create a Spark session to use the PySpark API directly.

Also see the upstream documentation at https://spark.apache.org/docs/latest/quick-start.html.

Start a spark shell in yarn

Note: The settings presented here are for a medium-size job on the cluster (~15% of the whole cluster)

  • Scala
spark3-shell --master yarn --executor-memory 8G --executor-cores 4 --driver-memory 2G --conf spark.dynamicAllocation.maxExecutors=64
  • Python
pyspark3 --master yarn --executor-memory 8G --executor-cores 4 --driver-memory 2G --conf spark.dynamicAllocation.maxExecutors=64
  • R
spark3R --master yarn --executor-memory 8G --executor-cores 4 --driver-memory 2G --conf spark.dynamicAllocation.maxExecutors=64
  • SQL
spark3-sql --master yarn --executor-memory 8G --executor-cores 4 --driver-memory 2G --conf spark.dynamicAllocation.maxExecutors=64

Set the python version pyspark should use

We currently deploy pyspark as part of conda-analytics so if you wish to use a different version of python with pyspark, you can create a new conda environment with a custom python version.

See spark logs on my local machine when using spark submit

  • If you are running Spark on local, spark3-submit should write logs to your console by default.
  • How to get logs written to a file?
    • Spark uses log4j for logging, and the log4j config is usually at /etc/spark2/conf/log4j.properties
    • This uses a ConsoleAppender by default, and if you wanted to write to files, an example log4j properties file would be:
# Set everything to be logged to the file
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=/tmp/spark.log
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

This should write logs to /tmp/spark.log on any stats client.

  • On the analytics cluster, running a spark job through spark submit writes logs to the console too, on both yarn and local modes
  • To write to file, create a log4j.properties file, similar to the one above that uses the FileAppender
  • Use the --files argument on spark-submit and upload your custom log4j.properties file:
spark3-shell --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G --files /path/to/your/log4j.properties
  • While running a spark job through Airflow
    • The log4j file path now needs to be a location accessible by all drivers/executors running in different machines
    • Putting the file on a temp directory on Hadoop and using a hdfs:// url should do the trick
    • Note that the logs will be written on the machine where the driver/executors are running - so you'd need access to go look at them

Monitor Spark shell job Resources

If you run some more complicated spark in the shell and you want to see how Yarn is managing resources, have a look at https://yarn.wikimedia.org/cluster/scheduler.

Don't hesitate to poke people on #wikimedia-analytics for help!

Use Hive UDF with Spark SQL

Here is an example in R. On stat1007, start a spark shell with the path to jar:

spark3R --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G --jars /srv/deployment/analytics/refinery/artifacts/refinery-hive.jar

Then in the R session:

sql("CREATE TEMPORARY FUNCTION is_spider as 'org.wikimedia.analytics.refinery.hive.IsSpiderUDF'")
sql("Your query")

pyspark and external packages

To use external packages like graphframes:

pyspark3 --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 --conf "spark.driver.extraJavaOptions=-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080"

Use this to avoid

resolving dependencies :: org.apache.spark#spark-submit-parent;1.0

confs: [default]

SparkR in production (stat100* machines) examples

SparkR: Basic example

From stat100*, and with the latest {SparkR} installed:

Note: This example starts a medium-size application (~15% of the cluster resources)

library(SparkR)

# - set environmental variables
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

# - start SparkR api session
sparkR.session(master = "yarn", 
   appName = "SparkR", 
   sparkHome = "/usr/lib/spark2/", 
   sparkConfig = list(spark.driver.memory = "2g", 
                      spark.driver.cores = "4", 
                      spark.executor.memory = "8g",
                      spark.dynamicAllocation.maxExecutors = "64",
                      spark.enableHiveSupport = TRUE)
)

# - a somewhat trivial example w. linear regression on iris 

# - iris becomes a SparkDataFrame
df <- createDataFrame(iris)

# - GLM w. family = "gaussian"
model <- spark.glm(data = df, Sepal_Length ~ Sepal_Width + Petal_Length + Petal_Width, family = "gaussian")

# - summary
summary(model)

# - end SparkR session
sparkR.stop()

SparkR: Large(er) file from HDFS

Also from stat100*, and with the latest {SparkR} installed:

Note: This example starts a large application (~30% of the cluster)

### --- flights dataset Multinomial Logistic Regression
### --- SparkDataFrame from HDFS
### --- NOTE: in this example, 'flights.csv' is found in /home/goransm/testData on stat1007

setwd('/home/goransm/testData')
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

### --- Start SparkR session w. Hive Support enabled
sparkR.session(master = "yarn",
               appName = "SparkR",
               sparkHome = "/usr/lib/spark2/",
               sparkConfig = list(spark.driver.memory = "4g",
                                  spark.dynamicAllocation.maxExecutors = "128",
                                  spark.executor.cores  = "4",
                                  spark.executor.memory = "8g",
                                  spark.enableHiveSupport = TRUE
                                  )
              )

# - copy flight.csv to HDFS
system('hdfs dfs -put /home/goransm/testData/flights.csv hdfs://analytics-hadoop/user/goransm/flights.csv', 
       wait = T)

# - load flights
df <- read.df("flights.csv",
              "csv",
               header = "true",
               inferSchema = "true",
               na.strings = "NA")

# - structure
str(df)

# - dimensionality
dim(df)

# - clean up df from NA values
df <- filter(df, isNotNull(df$AIRLINE) & isNotNull(df$ARRIVAL_DELAY) & isNotNull(df$AIR_TIME) & isNotNull(df$TAXI_IN) & 
                 isNotNull(df$TAXI_OUT) & isNotNull(df$DISTANCE) & isNotNull(df$ELAPSED_TIME))

# - dimensionality
dim(df)

# - Generalized Linear Model w. family = "multinomial"
model <- spark.logit(data = df, 
                     formula = AIRLINE ~ ARRIVAL_DELAY + AIR_TIME + TAXI_IN + TAXI_OUT + DISTANCE + ELAPSED_TIME,
                     family = "multinomial")

# - Regression Coefficients
res <- summary(model)
res$coefficients

# - delete flight.csv from HDFS
system('hdfs dfs -rm hdfs://analytics-hadoop/user/goransm/flights.csv', wait = T)

# - close SparkR session
sparkR.stop()

Spark Resource Settings

Spark jobs are highly configurable and no setting is optimal for all jobs. However, this section provides some good guidelines and starting points.

Regular jobs

A good starting point for regular jobs is the following combination of settings. These settings allow the job to use roughly as much as 15% of cluster resources.

"spark.driver.memory": "2g",
"spark.dynamicAllocation.maxExecutors": 64,
"spark.executor.memory": "8g",
"spark.executor.cores": 4,
"spark.sql.shuffle.partitions": 256

Large jobs

A good starting point for large jobs is the following combination of settings. These settings allow the job to use roughly as much as 30% of cluster resources.

"spark.driver.memory": "4g",
"spark.dynamicAllocation.maxExecutors": 128
"spark.executor.memory": "8g",
"spark.executor.cores": 4,
"spark.sql.shuffle.partitions": 512

Extra large jobs

Many Spark default settings are not optimal for large scale jobs (roughly, those that handle a terabyte or more of data across stages or that have tens of thousands of stages). This article from the Facebook technical team gives hints at how to better tune Spark in those cases. In this section we try to explain how the tuning helps.

Scaling the driver

  • First, make sure your job uses dynamic allocation. It's enabled by default on the analytics-cluster, but can be turned off. This will ensure a better use of resources across the cluster. If your job fails because of errors at shuffle (due to the external shuffle service), the tuning below should help.
  • Allow for more consecutive attempts per stage (default is 4, 10 is suggested): spark.stage.maxConsecutiveAttempts = 10. This tweak allows to better deal with fetch-failures. They happen usually when an executor is not available anymore (dead because of OOM or cluster resource preemption for instance). In such a case, other executors fail fetching data, and lead to failed stages. Bumping the number possible consecutive attempts allows for more error-recovery space.
  • Increase the RPC server threads to prevent out of memory errors: spark.rpc.io.serverThreads = 64 (no information available as to why this help - It can be assumed that since spark.rpc.connect.threads = 64then it's better to have the same amount of server threads answering, but I have not found proper information).

Executors

  • Manually set spark.executor.memoryOverhead when using big executors or when using a lot of string values (interned string are store in the memory buffer). By default spark allocates 0.1 * total-executor-memory for the buffer, which can be too small.
  • Increase shuffle file buffer size: to reduce number of disk seeks and system calls made: spark.shuffle.file.buffer = 1 MB and spark.unsafe.sorter.spill.reader.buffer.size = 1 MB
  • Optimize spill files merging by facilitating merging newly computed streams to existing files (useful when the job spills a lot): spark.file.transferTo = false, spark.shuffle.file.buffer = 1 MB and spark.shuffle.unsafe.file.output.buffer = 5 MB
  • Reduce spilled data size by augmenting compression block size: spark.io.compression.lz4.blockSize = 512KB
  • If needed: Enable off-heap memory if GC pause become problematic (not needed for analytics jobs so far): spark.memory.offHeap.enable = true and spark.memory.offHeap.size = 3g (don't forget that the off-heap memory is part of the yarn container, therefore your container is of size: executor-memory + memory.offHeap,size)

External shuffle service

  • Speed up file retrieval by bumping the cache size available for the file index: spark.shuffle.service.index.cache.size = 2048