Apache Spark is an open source cluster-computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop's disk-based analytics paradigm, Spark has multi-stage in-memory analytics. Spark can run programs up-to 100x faster than Hadoop’s MapReduce in memory or 10x faster on disk. Spark support applications written in python, java, scala and R.
Availability and Restrictions
Versions
The following versions of Spark are available on OSC systems:
Version | Pitzer | Ascend | Cardinal | Note |
---|---|---|---|---|
2.4.0 | X* | |||
2.4.5 | X | |||
3.5.1 | X | X |
You can use module spider spark
to view available modules for a given machine. Feel free to contact OSC Help if you need other versions for your work.
Access
Spark is available to all OSC users. If you have any questions, please contact OSC Help.
Publisher/Vendor/Repository and License Type
The Apache Software Foundation, Open source
Usage
Run a Spark Application Using a Job Script
Setting up a Spark cluster
Before running any Spark application, you need to initialize a Spark cluster based on the resources allocated. For example, assume you have allocated two CPU nodes either in an interactive session or through a batch job:
Requesting an Interactive session
salloc -N 2 --exclusive
A Batch job script
#!/bin/bash #SBATCH --nodes=2 #SBATCH --exclusive
Once your resources are allocated, you can use the slurm-spark-submit
script to set up the Spark cluster:
module load spark/3.5.1 slurm-spark-submit
You should see output similar to the following:
/apps/spack/0.21/ascend/linux-rhel9-zen2/spark/gcc/11.4.1/3.5.1-lbffccn/sbin/start-master.sh SPARK_MASTER_HOST=a0114.ten.osc.edu SPARK_MASTER_PORT=7077
And you should see the following line repeated twice in the output:
25/05/14 12:04:29 INFO Worker: Successfully registered with master spark://a0114.ten.osc.edu:7077
This setup starts a Spark master on one of the CPU nodes and launches one Spark worker per node, resulting in a total of two workers. In this configuration, each worker is allocated all the available CPUs and memory on its respective node.
If you want multiple workers per node, you can use the -W
option with slurm-spark-submit
. For example:
slurm-spark-submit -W 2
This command starts two workers per node, resulting in a total of four workers. In this case, each worker is configured to use half of the available CPUs and memory on the node.
Accessing the Spark Web UI
You can monitor the status and resource usage of your Spark cluster through the Spark Web UI. Follow these steps:
- Launch a lightweight desktop:
https://ondemand.osc.edu/pun/sys/dashboard/batch_connect/sys/bc_desktop/vdi/session_contexts/new - Once the desktop session starts, open a web browser and enter the value of SPARK_MASTER_HOST obtained from your job output, followed by port 8080. For example: a0114.ten.osc.edu:8080
-
You should now be connected to the Spark Web UI, where you can view the Spark cluster status, running jobs, and resource consumption.
Running a Spark applicaiton
Once the Spark cluster is ready, you can run a Spark application using slurm-spark-submit
, specifying Spark properites for the Spark session:
slurm-spark-submit --no-init \ --driver-memory 2G \ --executor-memory 60G \ --executor-cores 24 \ /users/PZS0645/support/share/tests/spark/spark_parallel_example.py
Explanation of options
--no-init
: Do not start a new Spark cluster. Omit this option if you have not set up a Spark cluster as instructed above — in that case, the script will initialize one for you.--driver-memory 2G
: Allocates 2 GB of memory for the driver process.--executor-memory 60G
: Allocates 60 GB of memory for each executor process.--executor-cores 24
: Assigns 24 CPU cores per executor. If each worker node has 96 CPU cores, this configuration allows four executors to run on each worker.
Creating a Spark session in Python
In your Python application, create a Spark session to communicate with the Spark cluster:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("MySparkApp") \ .getOrCreate()
You can now use this spark session to create DataFrames, run SQL queries, and read/write data. For example:
# Create a DataFrame from a JSON file df = spark.read.json("data/input.json") # Run a SQL query df.createOrReplaceTempView("my_table") result = spark.sql("SELECT * FROM my_table WHERE value > 100") # Write the result to a CSV file result.write.csv("data/output.csv")
Configuring the Spark session in a Python application
While creating a Spark session, you can also specify additional Spark properties. For example:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("MySparkApp") \ .config("spark.driver.memory", "2G") \ .config("spark.executor.memory", "120G") \ .config("spark.executor.cores", "24") \ .getOrCreate()
Note that Spark properites set in the code can override those passed through the slurm-spark-submit
script. For instance, in the example above, each executor will be allocated 120 GB of memory (as specified in the code), potentially overriding a different value (e.g., 60 GB) provided through the slurm-spark-submit
script.
Run a Spark Application in a Jupyter Notebook
Launching a Jupyter + Spark app on OSC OnDemand
On OSC OnDemand, you can use the Jupyter + Spark app to easily set up a Spark cluster and run a Spark application within a notebook. For detailed instructions on how to launch Jupyter + Spark using the OSC OnDemand web interface, please visit:
https://www.osc.edu/content/launching_jupyter_spark_app
Choosing a kernel
In a Jupyter + Spark instance, you can choose the default PySpark kernel or use a custom kernel created from your Conda environment. To create a custom kernel, please refer to this guide for details.
Please note that there are some issues with both types of kernels. See Known Issues for more details.
Custom Spark properties
When launching a Jupyter + Spark app, a Spark cluster is automatically configured based on the number of nodes and workers you specify. To enable running a Spark application within a notebook, the PYSPARK_SUBMIT_ARGS
environment variable is pre-defined. This variable allows communication between your notebook and the Spark cluster.
Spark Property | Default Value |
---|---|
spark.driver.memory | 120 GB if the driver is launched only on the master node; otherwise, 2 GB |
spark.executor.memory | Minimum of 60 GB or 120 GB divided by the number of workers per node |
spark.driver.maxResultSize | 0 (unlimited) |
You can override these default Spark properties when creating a Spark session in your notebook:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("MySparkApp") \ .config("spark.driver.memory", "2G") \ .config("spark.executor.memory", "120G") \ .config("spark.executor.cores", "24") \ .getOrCreate()
This approach allows for greater customization and performance optimization based on your application’s specific requirements. However, before using custom configurations, ensure the cluster has sufficient resources to accommodate them.
Alternatively, you can provide a path to a custom property file when launching the Jupyter + Spark app. This file will override Spark’s default configuration settings. Example of a custom spark-defaults.conf file:
spark.executor.cores 24 spark.executor.memory 120G spark.driver.memory 2G
Verifying Spark configuration
To view the active Spark configuration (including default or overridden values), you can run the following command in your notebook:
spark.sparkContext.getConf().getAll()
This returns a list of all active Spark configuration settings, which can help with debugging or performance tuning.