Managed Applications on Clusters

Prev Next

Overview

You can run certain third-party applications on CNodes in your VAST Cluster. Specifically, you can run applications such as the Spark query engine on designated CNodes, and share compute resources on these CNodes between VAST storage and the applications.

The applications use the VAST connector to access a VAST DataBase.

Viewing Running Applications

The Applications tab in the Data Engine page of the VAST Web UI shows the applications that are running on application designated CNodes.

Creating Applications

You can create a Spark application on CNodes. The Spark application consists of a Spark cluster which has a Master,  Worker nodes, and a Spark History server, which is configured in the settings below.

You can create only single application  on a cluster.

  1. On the VAST Web UI navigate to the Data Engine page, and select the Application tab.

  2. Click Create Application.

  3. Enter details for the application In the General section. In this version, only Spark can be selected as the application.

    App Name

    Enter a name for the application (example: Spark) as it will appear in the list.

    Application

    Spark (only option in this version).

    Image Tag

    Select the application image from the list:

    • Spark 3.4 - an unmodified Spark engine, with the VAST connector.

    • Spark 3.5.1 - a version of Spark that includes the bundled Spark extensions, and also includes the option to run Spark Thrift and Connect as part of the Spark cluster.

  4. In the Resource Selection section, select the CNodes on which the application will run from the All Possible CNodes box (on the right) , and move them to the Selected CNodes box (on the left). The selected CNodes are the application CNodes. You need at least two CNodes; if Spark 3.5.1 is selected, at least three CNodes are required.

    Note

    It is not recommended to select all CNodes for Applications as this could leave insufficient resources for other activities on the cluster.

  5. In the Resources Limitation section, optionally set limits on the CNode resources the application can use. In the Use up to field, set the maximum percentage CNode resources the app can use (between 20% and 60%, in increments of 5%). The default limit is sufficient for most cases.

  6. In the Network section enter details for the Spark Master and Worker nodes. Each node requires a virtual IP (VIP) address. Select these from the Virtual IP pools allocated for the cluster (in the Virtual IP tab of the Network Access page).

    For the Master node, enter a virtual IP address.

    Virtual IP

    The Virtual IP address of the Spark Master node. This should be in the cluster subnet, but not in any existing virtual IP pool on the cluster.

    Note

    When the application is created, as virtual IP pool is created for it automatically.

    For the Worker nodes, enter a list of virtual IP addresses.

    Virtual IP

    A list of virtual IP addresses for the Worker nodes. These should be in the cluster subnet, but not in any existing virtual IP pool on the cluster.

    Optionally, set advanced network details for the Master and Worker nodes:

    Netmask

    The subnet mask of the virtual IP assigned to the Master and Worker nodes.

    Gateway IP

    The IP address of the gateway of the virtual IP assigned to the  Master and Worker nodes.

    VLAN

    If you want to tag the virtual IP pool with a specific VLAN on the data network, enter the VLAN number (0-4096). See Tagging Virtual IP Pools with VLANs.Tagging Virtual IP Pools with VLANs

  7. Optionally, in the Configuration & Security section add configuration files for the Spark cluster (mainly relevant when using Spark Thrift and Connect) and upload SSL certificates for TLS communication with the Spark Thrift and Connect servers.

    This section is relevant only if Spark 3.5.1 was selected above as the Image Tag.

    Configuration File

    Description

    spark-defaults.conf

    The main Spark configuration file, enables:

    • Spark Thrift and Connect servers to operate with the VAST Database

    • LDAP and LDAPS provider

    • TLS

    core-site.xml

    Configures LDAP and LDAPS providers

    hive-site.xml

    Configuration for Hive

    hdfs-site.xml

    provides default behaviors for an HDFS client.

    To add a certificate for Spark Thrift and Connect servers, upload a Certificate and Key in the Certificate and keys section.

    You can also download these files, and template files with examples of how to enable and configure these features.

  8. Click Create. The application is created on the selected CNodes. The application images are loaded on the selected CNodes, and then started. This can take some time.

    Monitor progress on the Activities page of the VAST Web UI: events appear indicating that the application creation has completed (event name: create_managed_application). When this process is complete, the application appears in the list of applications in the Applications page.

Initializing and Starting the Spark Application Cluster on the CNode

After the Spark application is created and deployed on the CNode, it must be initialized and started.

  1. On the Data Engine page, select the Applications tab.

  2. Select the application in the list. The status for the applications is INIT after it is created.

  3. Right-click on the application, and click Start. The status for the application changes to RUN as it starts to run. Right-click on the application and click View CNode State, to monitor the status of the application.

  4. If an error occurs when starting the application (the status is not RUN), right-click on the application and click Retry. This will attempt to start the application again.

Stopping Spark Applications

  • To stop a running application, right-click on the application and click Stop.

Restarting Spark Applications

  • To restart an application that was stopped, right-click on it and click Restart.

Updating Application Configurations for Spark Thrift and Connect

You can change the configuration for the Spark Thrift and Connect applications (if you are running this option for the application).

  1. RIght-click on the application, and click Edit Configuration & Security. The configuration and certificate files for the application are shown, if they were added when the application was created.

  2. Click download-symbol.png next to a file to download it.

  3. Make changes to the downloaded file, as necessary.

  4. Click trash-symbol.png to delete from the application the current configuration files that were downloaded.

  5. Click Add to upload the modified files.

  6. Right-click on the application, and click Restart. The application is stopped and restarted, using the modified configuration files.

Accessing the Spark UI

The Spark cluster uses these ports, on the virtual IPs.

Spark Service

Protocol

Port

Master

HTTP

9292

HTTPS

9492

RPC

2424

REST API

6066

Worker

HTTP

9293

HTTPS

9493

History

HTTP

18080

HTTP

18480

Connect

HTTP

4040

HTTPS

4440

GRP API

15002

Thrift

HTTP

4041

HTTPS

4441

Thrift API

10000

Thrift API HTTP

10001

Setting Up Spark Clients to Access the CNode Spark Cluster

Follow these steps to configure a client host to connect to the Spark cluster running on the VAST Cluster CNode.

Using the Standalone Spark Driver

  1. Download the Spark driver (3.4.1):

    wget -q -4 https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3-scala2.13.tgz
    tar xvf spark-3.4.1-bin-hadoop3-scala2.13.tgz
    mv spark-3.4.1-bin-hadoop3-scala2.13 /opt/spark

    Note

    The version should match the version running on the VAST Cluster (spark 3.4.1 with Scala-13).

  2. Install the Spark driver (and include the connector jars in the Spark location for the application):

    curl -fsSL -o spark-vast-plugin.zip "https://github.com/vast-data/vast-db-connectors/releases/download/spark3-vast-3.4.1-f93839bfa38a/spark3-vast-3.4.1-f93839bfa38a.zip"
    unzip spark-vast-plugin.zip
    mv spark3-vast* /opt/spark/vast
  3. Run these commands for the S3A Hive interface to include jars in the Spark vanilla jars library:

    wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar
    wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar
    mkdir -p /tmp/hive_jars
    mv hadoop-aws-3.3.2.jar /tmp/hive_jars
    mv aws-java-sdk-bundle-1.11.1026.jar /tmp/hive_jars

Using the Docker Spark Driver

You can avoid setting up Spark env on your host, by deploying a pre-built Spark-Vast image, packed up with the VastDB connector and ready to use.

Expose the driver port and host network and deploy the driver image:

  • Run this docker command:

    docker run -d --name spark-client \
    --net=host \
    -p 4040:4040 \
    -v $PWD/spark-scripts:/tmp/spark-scripts \
    vastdataorg/spark-vast /bin/sleep infinity

    Note

    This is an example. Using Docker volume mounts (spark-scripts), you can expose to the docker driver your scripts/applications to submit on the Spark cluster.

Configuring Spark Executors

Optimizing the allocation of executors on Spark can significantly improve the workload performance.

There are two primary approaches to setting up executors for a Spark job:

  • Static Allocation

  • Dynamic Allocation

Static Allocation

Static allocation involves setting specific numbers of executors, cores, and memory ahead of time. To calculate these values effectively, compare the total resources on each worker (CPU cores and memory) to the resources requested per executor. For example, if each worker has 16 cores and 64 GB RAM, and you allocate 5 cores and 20 GB RAM per executor, you can run three executors per worker without over-allocating. This approach ensures each executor has sufficient resources without straining the system. These parameters remain fixed for the lifetime of the application.

Key Configuration Parameters

  • spark.executor.cores. The number of cores per executor.

  • spark.executor.memory. The amount of memory allocated to each executor.

  • spark.executor.instances. The number of executors.

Example:

--conf spark.executor.cores=5 \
--conf spark.executor.memory=16g \
--conf spark.executor.instances=3 \

Note

Setting spark.executor.instances is optional because you can indirectly control the number of executors with spark.executor.cores and spark.executor.memory. By configuring these, you define the size of each executor. The number that fit into your cluster depends on the total resources available on each worker node.

Advantages & Disadvantages of Static Allocation

Advantages
  • Resource allocation is predictable, which can be easier to manage in environments with fixed capacities.

  • Simple to configure as it doesn't require additional overhead to manage scaling.

Disadvantages
  • May lead to underutilization or overutilization of resources depending on the workload, as the number of executors does not change in response to the job's demands.

  • Not ideal for varying workloads where the processing needs can change dramatically over time.

Dynamic Allocation

Dynamic allocation enables Spark to add or remove executors dynamically based on the workload. This means Spark can request more executors when there is a high demand for processing and can release executors when the demand decreases.

Key Configuration Parameters

  • spark.dynamicAllocation.enabled. Enables dynamic allocation.

  • spark.dynamicAllocation.minExecutors. Minimum number of executors Spark will maintain.

  • spark.dynamicAllocation.maxExecutors. Maximum number of executors Spark can allocate.

  • spark.dynamicAllocation.initialExecutors. Initial number of executors Spark should start with.

  • spark.dynamicAllocation.executorIdleTimeout. Duration after which an idle executor is removed.

Advantages & Disadvantages of Dynamic Allocation

Advantages
  • Improved Resource Utilization: Adjusts the number of executors based on the workload, potentially leading to better utilization of cluster resources.

Disadvantages
  • Complexity: Requires a more sophisticated setup, including proper configuration of the Spark cluster manager and possibly fine-tuning the parameters for optimal performance.

  • Potential for Latency: Scaling decisions aren't instantaneous, which can introduce delays in executor provisioning, impacting job start times or scaling reactions to workload changes.

Choosing Between Static and Dynamic Allocation

Use Static Allocation when you have a predictable workload, or when operating in a static cluster environment.

Use Dynamic Allocation for jobs with varying workloads, where resource utilization efficiency is critical, It's also beneficial when cluster workloads are unpredictable and diverse, making it hard to manually determine the optimal number of executors.

Starting Spark Sessions on a Client

Submitting a Spark Session using PySpark or Spark-Submit

Using Dynamic Allocation

Example of submitting a PySpark script to the spark master with dynamic executors allocation:

/opt/spark/bin/pyspark --master spark://<master-ip>:2424 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=3 \
--conf spark.dynamicAllocation.maxExecutors=6 \
--conf spark.dynamicAllocation.initialExecutors=4 \
--conf spark.dynamicAllocation.executorIdleTimeout=120s \
--conf spark.driver.maxResultSize=4g \
--driver-class-path $(echo /opt/spark/vast/*.jar | tr ' ' ':') \
--conf spark.executor.extraClassPath=$(echo /opt/spark/vast/*.jar | tr ' ' ':') \
--jars $(echo /opt/spark/vast/*.jar | tr ' ' ',') \
--conf spark.executor.userClassPathFirst=true \
--conf spark.driver.userClassPathFirst=true \
--conf spark.driver.host=10.71.16.91 \
--driver-memory 32g < pyspark_app.py

Spark-submit can also be used in place of pyspark in the example above.

Using Static Allocation

Example of submitting a PySpark script to the spark master with Static executors allocation:

/opt/spark/bin/pyspark --master spark://<master-ip>:2424 \
--conf spark.executor.memory=10g \
--conf spark.executor.instances=4 \
--conf spark.executor.cores=3 \
--conf spark.driver.maxResultSize=4g \
--driver-class-path $(echo /opt/spark/vast/*.jar | tr ' ' ':') \
--conf spark.executor.extraClassPath=$(echo /opt/spark/vast/*.jar | tr ' ' ':') \
--jars $(echo /opt/spark/vast/*.jar | tr ' ' ',') \
--conf spark.executor.userClassPathFirst=true \
--conf spark.driver.userClassPathFirst=true \
--conf spark.driver.host=10.71.16.91 \
--driver-memory 32g < pyspark_app.py

Spark-submit can also be used in place of pyspark in the example above.

Running a Spark-SQL Session to Query the VAST Database

  1. Run the following command to start spark-sql:

    /opt/spark/bin/spark-sql --master spark://<master-vip>:2424 \
      --driver-class-path $(echo /opt/spark/vast/*.jar | tr ' ' ':') \
      --conf spark.executor.extraClassPath=$(echo /opt/spark/vast/*.jar | tr ' ' ':') \
      --jars $(echo /opt/spark/vast/*.jar | tr ' ' ',') \
      --conf spark.executor.userClassPathFirst=true \
      --conf spark.driver.userClassPathFirst=true \
      --conf spark.driver.maxResultSize=4g \
      --conf spark.driver.memory=16g \
      --conf spark.executor.cores=2 \
      --conf spark.executor.memory=4g \
      --conf spark.ndb.endpoint=http://172.19.197.1 \
      --conf spark.ndb.data_endpoints=http://127.0.0.1 \
      --conf spark.ndb.access_key_id=Q00Q0.. \
      --conf spark.ndb.secret_access_key=IpKLQnvx.. \
      --conf spark.ndb.num_of_splits=64 \
      --conf spark.ndb.num_of_sub_splits=8 \
      --conf spark.ndb.rowgroups_per_subsplit=1 \
      --conf spark.ndb.query_data_rows_per_split=4000000 \
      --conf spark.ndb.retry_max_count=3 \
      --conf spark.ndb.retry_sleep_duration=1 \
      --conf spark.ndb.parallel_import=true \
      --conf spark.ndb.dynamic_filter_compaction_threshold=100 \
      --conf spark.ndb.dynamic_filtering_wait_timeout=2 \
      --conf spark.sql.catalog.ndb=spark.sql.catalog.ndb.VastCatalog \
      --conf spark.sql.extensions=ndb.NDBSparkSessionExtension \
      --conf spark.python.authenticate.socketTimeout=1m \
      --conf spark.driver.host=10.71.16.91 \
      --conf spark.sql.catalogImplementation=in-memory

    where <master-vip> is the virtual IP of the Master node.

  2. Run queries from spark-sql:

    spark-sql> select * from `ndb`.`vastdb1`.`schema1`.`customer` limit 10;
Use spark-sql with Hive to access a Vast Database with S3 data

You can use Hive to access S3 data in the VAST Database.

  • Run the following command (which includes --conf spark.sql.catalogImplementation=hive, for Hive):

    /opt/spark/bin/spark-sql --master spark://172.19.197.21:2424 \
      --driver-class-path $(echo /opt/spark/vast/*.jar | tr ' ' ':') \
      --conf spark.executor.extraClassPath=$(echo /opt/spark/vast/*.jar | tr ' ' ':') \
      --jars $(echo /opt/spark/vast/*.jar | tr ' ' ',') \
      --conf spark.executor.userClassPathFirst=true \
      --conf spark.driver.userClassPathFirst=true \
      --conf spark.driver.maxResultSize=4g \
      --conf spark.driver.memory=16g \
      --conf spark.executor.cores=2 \
      --conf spark.executor.memory=4g \
      --conf spark.ndb.endpoint=http://172.19.197.1 \
      --conf spark.ndb.data_endpoints=http://127.0.0.1 \
      --conf spark.ndb.access_key_id=Q00Q0.. \
      --conf spark.ndb.secret_access_key=IpKLQnvx.. \
      --conf spark.ndb.num_of_splits=64 \
      --conf spark.ndb.num_of_sub_splits=8 \
      --conf spark.ndb.rowgroups_per_subsplit=1 \
      --conf spark.ndb.query_data_rows_per_split=4000000 \
      --conf spark.ndb.retry_max_count=3 \
      --conf spark.ndb.retry_sleep_duration=1 \
      --conf spark.ndb.parallel_import=true \
      --conf spark.ndb.dynamic_filter_compaction_threshold=100 \
      --conf spark.ndb.dynamic_filtering_wait_timeout=2 \
      --conf spark.sql.catalog.ndb=spark.sql.catalog.ndb.VastCatalog \
      --conf spark.sql.extensions=ndb.NDBSparkSessionExtension \
      --conf spark.python.authenticate.socketTimeout=1m \
      --conf spark.driver.host=10.71.16.91 \
      --conf spark.hadoop.fs.s3a.access.key=Q00Q0.. \
      --conf spark.hadoop.fs.s3a.secret.key=IpKLQnvx.. \
      --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
      --conf spark.hadoop.fs.s3a.endpoint=http://172.19.197.1 \
      --conf spark.hadoop.fs.s3a.path.style.access=true \
      --conf spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory \
      --conf spark.hadoop.fs.s3a.ssl.enabled=false \
      --conf spark.hadoop.hive.metastore.uris=thrift://v197lg6:9083 \
      --conf spark.sql.catalogImplementation=hive

Accessing the VAST Database using the Spark Driver Web UI

You can monitor progress of Spark sessions submitted using pyspark or spark-submit using the Spark Driver Web UI.

Browse to the client IP on port 4040 (the default port for the Spark Web UI).

Spark Client Features with VAST Database

The Spark clients support all nested data types (struct, array, map).

For example:

spark.sql('CREATE TABLE mytable (s STRUCT<a: INTEGER, b: STRING>)')
spark.sql('select s.b from mytable order by s.a')

which includes a nested data type, (s STRUCT<a: INTEGER, b: STRING>).

Removing the Spark Application

  1. On the Data Engine page, select the Application tab.

  2. Right-click on the application, and click Remove.

    Monitor progress on the Activities page of the VAST Web UI: events appear indicating that the application removal has completed (event name: delete_managed_application). When this process is complete, the application is removed from the list of applications in the Applications page.

HA Operability Issues

At least two CNodes must be selected to host applications, to allow for continued operation in the event one CNode fails.

If the CNode running the application fails, the application will be started on another CNode designated for applications.