Configuration Settings for Spark Applications
The configuration file for Spark, spark-default.conf includes this section with details specific to theVAST Cluster-specific details:
Vast Authentication
# Please fill your user access and secret keys.
spark.ndb.access_key_id=$VAST_ACCESS_KEY
spark.ndb.secret_access_key=$VAST_SECRET_KEY
where VAST_ACCESS_KEY and VAST_SECRET_KEY are the access and secret keys for a VMS user with permissions to create and access VAST Databases (see Pre-requisites).
Accessing the Spark UI
The Spark Cluster services use these ports.
Spark Service | VIP(s) | Protocol | Port |
|---|---|---|---|
Master | Master VIP | HTTP | 9292 |
HTTPS | 9492 | ||
RPC | 2424 | ||
REST API | 6066 | ||
Worker | Workers VIPs | HTTP | 9293 |
HTTPS | 9493 | ||
History | Master VIP | HTTP | 18080 |
HTTP | 18480 | ||
Connect | Master VIP | HTTP | 4040 |
HTTPS | 4440 | ||
GRP API | 15002 | ||
Thrift | Master VIP | HTTP | 4041 |
HTTPS | 4441 | ||
Thrift API | 10000 | ||
Thrift API HTTP | 10001 |
Setting Up Spark Clients to Access the CNode Spark Cluster
Follow these steps to configure a client host to connect to the Spark Cluster running on the VAST Cluster CNode.
Using the Standalone Spark Driver
Download the Spark driver (3.4.1):
wget -q -4 https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3-scala2.13.tgz tar xvf spark-3.4.1-bin-hadoop3-scala2.13.tgz mv spark-3.4.1-bin-hadoop3-scala2.13 /opt/sparkNote
The version should match the version running on the VAST Cluster (spark 3.4.1 with Scala-13).
Install the Spark driver (and include the connector jars in the Spark location for the application):
curl -fsSL -o spark-vast-plugin.zip "https://github.com/vast-data/vast-db-connectors/releases/download/spark3-vast-3.4.1-f93839bfa38a/spark3-vast-3.4.1-f93839bfa38a.zip" unzip spark-vast-plugin.zip mv spark3-vast* /opt/spark/vastRun these commands for the S3A Hive interface to include jars in the Spark vanilla jars library:
wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar mkdir -p /tmp/hive_jars mv hadoop-aws-3.3.2.jar /tmp/hive_jars mv aws-java-sdk-bundle-1.11.1026.jar /tmp/hive_jars
Using the Docker Spark Driver
You can avoid setting up Spark env on your host, by deploying a pre-built Spark-Vast image, packed up with the VastDB connector and ready to use.
Expose the driver port and host network and deploy the driver image:
Run this docker command:
docker run -d --name spark-client \ --net=host \ -p 4040:4040 \ -v $PWD/spark-scripts:/tmp/spark-scripts \ vastdataorg/spark-vast /bin/sleep infinityNote
This is an example, Using Docker volume mounts (spark-scripts), you can expose to the docker driver your scripts/applications to submit on the spark cluster.
Configuring Spark Executors
Optimizing the allocation of executors on Spark can significantly improve the workload performance.
There are two primary approaches to setting up executors for a Spark job:
Static Allocation
Dynamic Allocation
Static Allocation
Static allocation involves setting specific numbers of executors, cores, and memory ahead of time. To calculate these values effectively, compare the total resources on each worker (CPU cores and memory) to the resources requested per executor. For example, if each worker has 16 cores and 64 GB RAM, and you allocate 5 cores and 20 GB RAM per executor, you can run three executors per worker without over-allocating. This approach ensures each executor has sufficient resources without straining the system. These parameters remain fixed for the lifetime of the application.
Key Configuration Parameters
spark.executor.cores. The number of cores per executor.
spark.executor.memory. The amount of memory allocated to each executor.
spark.executor.instances. The number of executors.
Example:
--conf spark.executor.cores=5 \
--conf spark.executor.memory=16g \
--conf spark.executor.instances=3 \Note
Setting
spark.executor.instancesis optional because you can indirectly control the number of executors withspark.executor.coresandspark.executor.memory. By configuring these, you define the size of each executor. The number that fit into your cluster depends on the total resources available on each worker node.
Advantages & Disadvantages of Static Allocation
Resource allocation is predictable, which can be easier to manage in environments with fixed capacities.
Simple to configure as it doesn't require additional overhead to manage scaling.
May lead to underutilization or overutilization of resources depending on the workload, as the number of executors does not change in response to the job's demands.
Not ideal for varying workloads where the processing needs can change dramatically over time.
Dynamic Allocation
Dynamic allocation enables Spark to add or remove executors dynamically based on the workload. This means Spark can request more executors when there is a high demand for processing and can release executors when the demand decreases.
Key Configuration Parameters
spark.dynamicAllocation.enabled. Enables dynamic allocation.
spark.dynamicAllocation.minExecutors. Minimum number of executors Spark will maintain.
spark.dynamicAllocation.maxExecutors. Maximum number of executors Spark can allocate.
spark.dynamicAllocation.initialExecutors. Initial number of executors Spark should start with.
spark.dynamicAllocation.executorIdleTimeout. Duration after which an idle executor is removed.
Advantages & Disadvantages of Dynamic Allocation
Improved Resource Utilization: Adjusts the number of executors based on the workload, potentially leading to better utilization of cluster resources.
Complexity: Requires a more sophisticated setup, including proper configuration of the Spark cluster manager and possibly fine-tuning the parameters for optimal performance.
Potential for Latency: Scaling decisions aren't instantaneous, which can introduce delays in executor provisioning, impacting job start times or scaling reactions to workload changes.
Choosing Between Static and Dynamic Allocation
Use Static Allocation when you have a predictable workload, or when operating in a static cluster environment.
Use Dynamic Allocation for jobs with varying workloads, where resource utilization efficiency is critical, It's also beneficial when cluster workloads are unpredictable and diverse, making it hard to manually determine the optimal number of executors.
Starting Spark Sessions on a Client
Submitting a Spark Session using PySpark or Spark-Submit
Using Dynamic Allocation
Example of submitting a PySpark script to the spark master with dynamic executors allocation:
/opt/spark/bin/pyspark --master spark://<master-ip>:2424 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=3 \
--conf spark.dynamicAllocation.maxExecutors=6 \
--conf spark.dynamicAllocation.initialExecutors=4 \
--conf spark.dynamicAllocation.executorIdleTimeout=120s \
--conf spark.driver.maxResultSize=4g \
--driver-class-path $(echo /opt/spark/vast/*.jar | tr ' ' ':') \
--conf spark.executor.extraClassPath=$(echo /opt/spark/vast/*.jar | tr ' ' ':') \
--jars $(echo /opt/spark/vast/*.jar | tr ' ' ',') \
--conf spark.executor.userClassPathFirst=true \
--conf spark.driver.userClassPathFirst=true \
--conf spark.driver.host=10.71.16.91 \
--driver-memory 32g < pyspark_app.pySpark-submit can also be used in place of pyspark in the example above.
Using Static Allocation
Example of submitting a PySpark script to the spark master with Static executors allocation:
/opt/spark/bin/pyspark --master spark://<master-ip>:2424 \
--conf spark.executor.memory=10g \
--conf spark.executor.instances=4 \
--conf spark.executor.cores=3 \
--conf spark.driver.maxResultSize=4g \
--driver-class-path $(echo /opt/spark/vast/*.jar | tr ' ' ':') \
--conf spark.executor.extraClassPath=$(echo /opt/spark/vast/*.jar | tr ' ' ':') \
--jars $(echo /opt/spark/vast/*.jar | tr ' ' ',') \
--conf spark.executor.userClassPathFirst=true \
--conf spark.driver.userClassPathFirst=true \
--conf spark.driver.host=10.71.16.91 \
--driver-memory 32g < pyspark_app.pySpark-submit can also be used in place of pyspark in the example above.
Running a Spark-SQL Session to Query the VAST Database
Run the following command to start spark-sql:
/opt/spark/bin/spark-sql --master spark://<master-vip>:2424 \ --driver-class-path $(echo /opt/spark/vast/*.jar | tr ' ' ':') \ --conf spark.executor.extraClassPath=$(echo /opt/spark/vast/*.jar | tr ' ' ':') \ --jars $(echo /opt/spark/vast/*.jar | tr ' ' ',') \ --conf spark.executor.userClassPathFirst=true \ --conf spark.driver.userClassPathFirst=true \ --conf spark.driver.maxResultSize=4g \ --conf spark.driver.memory=16g \ --conf spark.executor.cores=2 \ --conf spark.executor.memory=4g \ --conf spark.ndb.endpoint=http://172.19.197.1 \ --conf spark.ndb.data_endpoints=http://127.0.0.1 \ --conf spark.ndb.access_key_id=Q00Q0.. \ --conf spark.ndb.secret_access_key=IpKLQnvx.. \ --conf spark.ndb.num_of_splits=64 \ --conf spark.ndb.num_of_sub_splits=8 \ --conf spark.ndb.rowgroups_per_subsplit=1 \ --conf spark.ndb.query_data_rows_per_split=4000000 \ --conf spark.ndb.retry_max_count=3 \ --conf spark.ndb.retry_sleep_duration=1 \ --conf spark.ndb.parallel_import=true \ --conf spark.ndb.dynamic_filter_compaction_threshold=100 \ --conf spark.ndb.dynamic_filtering_wait_timeout=2 \ --conf spark.sql.catalog.ndb=spark.sql.catalog.ndb.VastCatalog \ --conf spark.sql.extensions=ndb.NDBSparkSessionExtension \ --conf spark.python.authenticate.socketTimeout=1m \ --conf spark.driver.host=10.71.16.91 \ --conf spark.sql.catalogImplementation=in-memorywhere <master-vip> is the VIP of the Master node.
Run queries from spark-sql:
spark-sql> select * from `ndb`.`vastdb1`.`schema1`.`customer` limit 10;
You can use Hive to access S3 data in the VAST Database.
Run the following command (which includes
--conf spark.sql.catalogImplementation=hive, for Hive):/opt/spark/bin/spark-sql --master spark://172.19.197.21:2424 \ --driver-class-path $(echo /opt/spark/vast/*.jar | tr ' ' ':') \ --conf spark.executor.extraClassPath=$(echo /opt/spark/vast/*.jar | tr ' ' ':') \ --jars $(echo /opt/spark/vast/*.jar | tr ' ' ',') \ --conf spark.executor.userClassPathFirst=true \ --conf spark.driver.userClassPathFirst=true \ --conf spark.driver.maxResultSize=4g \ --conf spark.driver.memory=16g \ --conf spark.executor.cores=2 \ --conf spark.executor.memory=4g \ --conf spark.ndb.endpoint=http://172.19.197.1 \ --conf spark.ndb.data_endpoints=http://127.0.0.1 \ --conf spark.ndb.access_key_id=Q00Q0.. \ --conf spark.ndb.secret_access_key=IpKLQnvx.. \ --conf spark.ndb.num_of_splits=64 \ --conf spark.ndb.num_of_sub_splits=8 \ --conf spark.ndb.rowgroups_per_subsplit=1 \ --conf spark.ndb.query_data_rows_per_split=4000000 \ --conf spark.ndb.retry_max_count=3 \ --conf spark.ndb.retry_sleep_duration=1 \ --conf spark.ndb.parallel_import=true \ --conf spark.ndb.dynamic_filter_compaction_threshold=100 \ --conf spark.ndb.dynamic_filtering_wait_timeout=2 \ --conf spark.sql.catalog.ndb=spark.sql.catalog.ndb.VastCatalog \ --conf spark.sql.extensions=ndb.NDBSparkSessionExtension \ --conf spark.python.authenticate.socketTimeout=1m \ --conf spark.driver.host=10.71.16.91 \ --conf spark.hadoop.fs.s3a.access.key=Q00Q0.. \ --conf spark.hadoop.fs.s3a.secret.key=IpKLQnvx.. \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.endpoint=http://172.19.197.1 \ --conf spark.hadoop.fs.s3a.path.style.access=true \ --conf spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory \ --conf spark.hadoop.fs.s3a.ssl.enabled=false \ --conf spark.hadoop.hive.metastore.uris=thrift://v197lg6:9083 \ --conf spark.sql.catalogImplementation=hive
Accessing the VAST Database using the Spark Driver Web UI
You can monitor progress of Spark sessions submitted using pyspark or spark-submit using the Spark Driver Web UI.
Browse to the client IP on port 4040 (the default port for the Spark Web UI).
Spark Client Features with VAST Database
The Spark clients support all nested data types (struct, array, map).
For example:
spark.sql('CREATE TABLE mytable (s STRUCT<a: INTEGER, b: STRING>)')
spark.sql('select s.b from mytable order by s.a')which includes a nested data type, (s STRUCT<a: INTEGER, b: STRING>).
Removing the Spark Application
On the Data Engine page, select the Applications tab.
Right-click on the application, and click Remove.
Monitor progress on the Activities page of the VAST Web UI: events appear indicating that the application removal has completed (event name:
delete_managed_application). When this process is complete, the application is removed from the list of applications in the Applications page.