Spark

Apache Spark is an open source cluster-computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop's disk-based analytics paradigm, Spark has multi-stage in-memory analytics. Spark can run programs up-to 100x faster than Hadoop’s MapReduce in memory or 10x faster on disk. Spark support applications written in python, java, scala and R.

Availability and Restrictions

Versions

The following versions of Spark are available on OSC systems: 

Version Pitzer Ascend Cardinal Note
2.4.0 X*      
2.4.5 X      
3.5.1   X X  
* Current default version

You can use module spider spark to view available modules for a given machine. Feel free to contact OSC Help if you need other versions for your work.

Access

Spark is available to all OSC users. If you have any questions, please contact OSC Help.

Publisher/Vendor/Repository and License Type

The Apache Software Foundation, Open source

Usage

Run a Spark Application Using a Job Script

Setting up a Spark cluster

Before running any Spark application, you need to initialize a Spark cluster based on the resources allocated. For example, assume you have allocated two CPU nodes either in an interactive session or through a batch job:

Requesting an Interactive session
salloc -N 2 --exclusive
A Batch job script
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --exclusive

Once your resources are allocated, you can use the slurm-spark-submit script to set up the Spark cluster:

module load spark/3.5.1
slurm-spark-submit

You should see output similar to the following:

/apps/spack/0.21/ascend/linux-rhel9-zen2/spark/gcc/11.4.1/3.5.1-lbffccn/sbin/start-master.sh
SPARK_MASTER_HOST=a0114.ten.osc.edu
SPARK_MASTER_PORT=7077

And you should see the following line repeated twice in the output:

25/05/14 12:04:29 INFO Worker: Successfully registered with master spark://a0114.ten.osc.edu:7077

This setup starts a Spark master on one of the CPU nodes and launches one Spark worker per node, resulting in a total of two workers. In this configuration, each worker is allocated all the available CPUs and memory on its respective node.

If you want multiple workers per node, you can use the -W option with slurm-spark-submit. For example:

slurm-spark-submit -W 2

This command starts two workers per node, resulting in a total of four workers. In this case, each worker is configured to use half of the available CPUs and memory on the node.

Accessing the Spark Web UI

You can monitor the status and resource usage of your Spark cluster through the Spark Web UI. Follow these steps:

  1. Launch a lightweight desktop:
    https://ondemand.osc.edu/pun/sys/dashboard/batch_connect/sys/bc_desktop/vdi/session_contexts/new
  2. Once the desktop session starts, open a web browser and enter the value of SPARK_MASTER_HOST obtained from your job output, followed by port 8080. For example: a0114.ten.osc.edu:8080
  3. You should now be connected to the Spark Web UI, where you can view the Spark cluster status, running jobs, and resource consumption.

Running a Spark applicaiton

Once the Spark cluster is ready, you can run a Spark application using slurm-spark-submit, specifying Spark properites for the Spark session:

slurm-spark-submit --no-init \
  --driver-memory 2G \
  --executor-memory 60G \
  --executor-cores 24 \
  /users/PZS0645/support/share/tests/spark/spark_parallel_example.py
Explanation of options
  • --no-init: Do not start a new Spark cluster. Omit this option if you have not set up a Spark cluster as instructed above — in that case, the script will initialize one for you.
  • --driver-memory 2G: Allocates 2 GB of memory for the driver process.
  • --executor-memory 60G: Allocates 60 GB of memory for each executor process.
  • --executor-cores 24: Assigns 24 CPU cores per executor. If each worker node has 96 CPU cores, this configuration allows four executors to run on each worker.

Creating a Spark session in Python

In your Python application, create a Spark session to communicate with the Spark cluster:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("MySparkApp") \
        .getOrCreate()

You can now use this spark session to create DataFrames, run SQL queries, and read/write data. For example:

# Create a DataFrame from a JSON file
df = spark.read.json("data/input.json")

# Run a SQL query
df.createOrReplaceTempView("my_table")
result = spark.sql("SELECT * FROM my_table WHERE value > 100")

# Write the result to a CSV file
result.write.csv("data/output.csv")

Configuring the Spark session in a Python application

While creating a Spark session, you can also specify additional Spark properties. For example:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
       .appName("MySparkApp") \
       .config("spark.driver.memory", "2G") \
       .config("spark.executor.memory", "120G") \
       .config("spark.executor.cores", "24") \
       .getOrCreate()

Note that Spark properites set in the code can override those passed through the slurm-spark-submit script. For instance, in the example above, each executor will be allocated 120 GB of memory (as specified in the code), potentially overriding a different value (e.g., 60 GB) provided through the slurm-spark-submit script.

Run a Spark Application in a Jupyter Notebook

Launching a Jupyter + Spark app on OSC OnDemand

On OSC OnDemand, you can use the Jupyter + Spark app to easily set up a Spark cluster and run a Spark application within a notebook. For detailed instructions on how to launch Jupyter + Spark using the OSC OnDemand web interface, please visit:

https://www.osc.edu/content/launching_jupyter_spark_app

Choosing a kernel

In a Jupyter + Spark instance, you can choose the default PySpark kernel or use a custom kernel created from your Conda environment. To create a custom kernel, please refer to this guide for details.

Please note that there are some issues with both types of kernels. See Known Issues for more details.

Custom Spark properties

When launching a Jupyter + Spark app, a Spark cluster is automatically configured based on the number of nodes and workers you specify. To enable running a Spark application within a notebook, the PYSPARK_SUBMIT_ARGS environment variable is pre-defined. This variable allows communication between your notebook and the Spark cluster.

Spark Property Default Value
spark.driver.memory 120 GB if the driver is launched only on the master node; otherwise, 2 GB
spark.executor.memory Minimum of 60 GB or 120 GB divided by the number of workers per node
spark.driver.maxResultSize 0 (unlimited)

You can override these default Spark properties when creating a Spark session in your notebook:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
       .appName("MySparkApp") \
       .config("spark.driver.memory", "2G") \
       .config("spark.executor.memory", "120G") \ 
       .config("spark.executor.cores", "24") \ 
       .getOrCreate()

This approach allows for greater customization and performance optimization based on your application’s specific requirements. However, before using custom configurations, ensure the cluster has sufficient resources to accommodate them.

Alternatively, you can provide a path to a custom property file when launching the Jupyter + Spark app. This file will override Spark’s default configuration settings. Example of a custom spark-defaults.conf file:

spark.executor.cores 24 
spark.executor.memory 120G 
spark.driver.memory 2G

Verifying Spark configuration

To view the active Spark configuration (including default or overridden values), you can run the following command in your notebook:

spark.sparkContext.getConf().getAll()

This returns a list of all active Spark configuration settings, which can help with debugging or performance tuning.

Known Issues

Further Reading

See Also

Supercomputer: 
Service: 
Fields of Science: 

Known Issues for Spark