Installing the VAST Connector for Spark

Prev Next

Installing Spark

Download Spark 3.5.1 from here. Follow the instructions there to install a Standalone cluster.

Note

These components are required to install Spark (on all Spark nodes):

  • Java 8, 11, or 17

  • Scala 2.13

  • Python 3.7+

Downloading the VAST Connector for Spark

Download the VAST Connector for Spark from here.

Configuring the Connector

  1. Copy the VAST Connector (from the previous section) to all Spark nodes (masters and workers) in /usr/local, and unzip it there. This folder is used for VAST products for Spark; it is not the folder from which Spark is run.

  2. Define these paths on all Spark nodes:

    Path

    Value

    JAVA_HOME

    according the the Java installation

    SPARK_HOME

    /opt/spark

    PYSPARK_PYTHON

    /usr/bin/python3

    SPARK_CONF_DIR

    /opt/spark/conf

    Add JAVA_HOME and SPARK_HOME to PATH.

  3. Create a config file spark-defaults.conf in /opt/spark/conf on all Spark nodes with the following:

    [NDB]
    spark.ndb.endpoint=ENDPOINT
    spark.ndb.data_endpoints=VAST_ADDRS
    spark.ndb.access_key_id=ACCESS_KEY
    spark.ndb.secret_access_key=SECRET_KEY
    spark.ndb.num_of_splits=256
    spark.ndb.num_of_sub_splits=10
    spark.ndb.rowgroups_per_subsplit=1
    spark.ndb.query_data_rows_per_split=4000000
    spark.ndb.parallel_import=true
    spark.ndb.dynamic_filter_compaction_threshold=100
    spark.sql.catalog.ndb=spark.sql.catalog.ndb.VastCatalog
    spark.sql.extensions=ndb.NDBSparkSessionExtension
    
    spark.sql.execution.arrow.pyspark.enabled=true
    spark.executor.userClassPathFirst=true
    spark.port.maxRetries=30
    spark.network.timeout=3600s
    spark.executor.heartbeatInterval=3000s
    spark.task.reaper.pollingInterval=3000s
    spark.task.maxFailures=1
    spark.shuffle.push.finalize.timeout=3000s
    spark.shuffle.io.connectionTimeout=3000s
    spark.rpc.io.connectionTimeout=3000s
    spark.rpc.askTimeout=3000s
    

    Where ENDPOINT is an IP address of the VAST Cluster (one of the IPs in the Virtual IP pool),  VAST_ADDRS is the list of all IPs in the Virtual IP pool, comma separated, and ACCESS_KEY and SECRET_KEY are the S3 access key pair of the database owner user created here.

  4. Create a file spark-env.sh in /opt/spark/conf with the following:

    SPARK_MASTER_WEBUI_PORT=1234
    SPARK_DAEMON_CLASSPATH="/usr/local/spark3/*"
    SPARK_DAEMON_JAVA_OPTS="-DSPARK_LOGS_DIR=$SPARK_LOGS_DIR -DSPARK_ROLE=$SPARK_ROLE -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$SPARK_LOGS_DIR"
  5. Create a file log4j2.properties in /opt/spark/conf with the following:

    name = SparkLogConfig
    appenders = rf
    #appenders = console, rf
    
    # Set everything to be logged to the console
    rootLogger.level = debug
    rootLogger.appenderRefs = file
    rootLogger.appenderRef.file.ref = rf
    
    # In the pattern layout configuration below, we specify an explicit `%ex` conversion
    # pattern for logging Throwables. If this was omitted, then (by default) Log4J would
    # implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
    # class packaging information. That extra information can sometimes add a substantial
    # performance overhead, so we disable it in our default logging config.
    # For more information, see SPARK-39361.
    
    appender.rf.type = RollingRandomAccessFile
    appender.rf.name = rf
    appender.rf.fileName = ${sys:SPARK_LOGS_DIR}/spark_${sys:SPARK_ROLE}.log
    appender.rf.filePattern = ${sys:SPARK_LOGS_DIR}/spark_${sys:SPARK_ROLE}.log.%d{yyyy-MM-dd}.%i
    appender.rf.layout.type = PatternLayout
    appender.rf.layout.pattern = %d{yy-MM-dd HH:mm:ss,SSS} %p %c: %m%n%ex
    appender.rf.policies.type = Policies
    appender.rf.policies.size.type = SizeBasedTriggeringPolicy
    appender.rf.policies.size.size = 1GB
    appender.rf.policies.time.type = TimeBasedTriggeringPolicy
    appender.rf.strategy.type = DefaultRolloverStrategy
    appender.rf.strategy.max = 30
    
    # NDB connector loggers to keep silent
    logger.ndbpredicate.name = com.vastdata.spark.predicate
    logger.ndbpredicate.level = info
    logger.ndbvastscan.name = com.vastdata.spark.VastScan
    logger.ndbvastscan.level = info
    logger.ndbsparkstats.name = com.vastdata.spark.statistics
    logger.ndbsparkstats.level = info
    logger.ndbpredserializer.name = com.vastdata.spark.SparkPredicateSerializer
    logger.ndbpredserializer.level = info
    logger.ndbstrategy.name = ndb.NDBStrategy
    logger.ndbstrategy.level = info
    
    # NDB third party loggers to keep silent
    logger.aws.name = com.amazonaws
    logger.aws.level = warn
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = warn
    logger.adaptive.name = org.apache.spark.sql.execution.adaptive
    logger.adaptive.level = warn
    logger.apachehttp.name = org.apache.http
    logger.apachehttp.level = warn
    logger.arrow.name = org.apache.arrow
    logger.arrow.level = warn
    logger.codegen1.name = org.apache.spark.sql.catalyst.expressions.codegen
    logger.codegen1.level = warn
    logger.codegen2.name = org.apache.spark.sql.execution.WholeStageCodegenExec
    logger.codegen2.level = warn
    
    
    # Set the default spark-shell/spark-sql log level to WARN. When running the
    # spark-shell/spark-sql, the log level for these classes is used to overwrite
    # the root logger's log level, so that the user can have different defaults
    # for the shell and regular Spark apps.
    logger.repl.name = org.apache.spark.repl.Main
    logger.repl.level = warn
    
    logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
    logger.thriftserver.level = warn
    
    # Settings to quiet third party logs that are too verbose
    logger.jetty1.name = org.sparkproject.jetty
    logger.jetty1.level = warn
    logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
    logger.jetty2.level = error
    logger.netty.name = io.netty
    logger.netty.level = warn
    logger.networkutil.name = org.apache.spark.network.util
    logger.networkutil.level = warn
    logger.ctxcleaner.name = org.apache.spark.ContextCleaner
    logger.ctxcleaner.level = warn
    logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
    logger.replexprTyper.level = info
    logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
    logger.replSparkILoopInterpreter.level = info
    logger.parquet1.name = org.apache.parquet
    logger.parquet1.level = error
    logger.parquet2.name = parquet
    logger.parquet2.level = error
    logger.parquet3.name = org.apache.spark.sql.execution.datasources.parquet
    logger.parquet3.level = error
    
    # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
    logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
    logger.RetryingHMSHandler.level = fatal
    logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
    logger.FunctionRegistry.level = error
    
    # For deploying Spark ThriftServer
    # SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
    appender.console.filter.1.type = RegexFilter
    appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
    appender.console.filter.1.onMatch = deny
    appender.console.filter.1.onMismatch = neutral
  6. Restart all the Spark nodes (master and worker).