Verify the Spark configuration using pyspark or spark-sql, both included in the Spark deployment.
Verifying Using Pyspark
Run pyspark with the following command:
/opt/spark/bin/pyspark --driver-class-path $(echo /usr/local/spark3/*.jar | tr ' ' ':') --jars $(echo /usr/local/spark3/*.jar | tr ' ' ',') --master "spark://SPARK_MASTER_IP:7077" --name spark_app_name --driver-memory <driver_memory>g --conf "spark.driver.maxResultSize"=<driver_memory> --conf "spark.driver.userClassPathFirst"="true" --conf "spark.executor.extraJavaOptions"=f"-DSPARK_LOGS_DIR=/opt/spark/logs -DSPARK_ROLE=app_executor -Xloggc:/opt/spark/logs/gc_app_executor.log -XX:+PrintGCDateStamps -XX:+PrintGCDetails -Xms<memory_usage_per_worker>g -XX:MetaspaceSize=100m" --conf "spark.executor.heartbeatInterval"="10s" --conf "spark.executor.memory"=memory_usage_per_worker --conf "spark.executor.userClassPathFirst"="true" --conf "spark.jars"=spark_jar_files --conf "spark.memory.offHeap.enabled"="true" --conf "spark.memory.offHeap.size"=53687091200 --conf "spark.ndb.access_key_id"=access_key --conf "spark.ndb.data_endpoints"=endpoints --conf "spark.ndb.dynamic_filter_compaction_threshold"=100 --conf "spark.ndb.dynamic_filter_max_values_threshold"=1000 --conf "spark.ndb.endpoint"=endpoint --conf "spark.ndb.num_of_splits"=num_of_splits --conf "spark.ndb.num_of_sub_splits"=num_of_sub_splits --conf "spark.ndb.parallel_import"="true" --conf "spark.ndb.query_data_rows_per_split"=4000000 --conf "spark.ndb.rowgroups_per_subsplit"=1 --conf "spark.ndb.secret_access_key"=secret_key --conf "spark.ndb.use_column_histogram"="true" --conf "spark.network.timeout"="3600s" --conf "spark.port.maxRetries"=30 --conf "spark.rpc.askTimeout"="60s" --conf "spark.rpc.io.connectionTimeout"="60s" --conf "spark.rpc.numRetries"=15 --conf "spark.rpc.retry.wait"="10s" --conf "spark.shuffle.push.finalize.timeout"="1000s" --conf "spark.sql.adaptive.enabled"=true" --conf "spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold"="64MB" --conf "spark.sql.autoBroadcastJoinThreshold"=524288000 --conf "spark.sql.catalog.ndb"="spark.sql.catalog.ndb.VastCatalog" --conf "spark.sql.catalogImplementation"="in-memory" --conf "spark.sql.cbo.enabled"=true" --conf "spark.sql.cbo.joinReorder.dp.star.filter"=true" --conf "spark.sql.cbo.joinReorder.enabled"="true" --conf "spark.sql.cbo.planStats.enabled"="true" --conf "spark.sql.cbo.starSchemaDetection"="true" --conf "spark.sql.charAsVarchar"="false" --conf "spark.sql.execution.arrow.pyspark.enabled"=true" --conf "spark.sql.extensions"="ndb.NDBSparkSessionExtension" --conf "spark.sql.join.preferSortMergeJoin"="false" --conf "spark.sql.optimizer.runtime.bloomFilter.enabled"="true" --conf "spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold"="2GB" --conf "spark.sql.optimizer.dynamicPartitionPruning.enabled"="true" --conf "spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio"=0.5 --conf "spark.sql.optimizer.dynamicPartitionPruning.useStats"="true" --conf "spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled"="true" --conf "spark.sql.parquet.enableVectorizedReader"="false" --conf "spark.sql.readSideCharPadding"="false" --conf "spark.sql.shuffle.partitions"=256 --conf "spark.sql.statistics.histogram.enabled"="true" --conf "spark.shuffle.service.removeShuffle"="true" --conf "spark.sql.cbo.joinReorder.dp.threshold"=50 --conf "spark.sql.exchange.reuse"="true" --conf "spark.sql.execution.reuseSubquery"="true"
where SPARK_MASTER_IP is the IP address of the Spark Master node.
In pyspark, follow these steps to verify the connector setup:
Show schemas.
spark.sql("show schemas in ndb").show(20,False)This result should look like this:
+----------------------------------+ |namespace | +----------------------------------+ |`tabular-tsmgjldunu` | |`tabular-volcanic-reclusive-skink`| +----------------------------------+
Create a table.
spark.sql("create table ndb.`tabular-volcanic-reclusive-skink`.s1.t1(a int, b string)") spark.sql("describe ndb.`tabular-volcanic-reclusive-skink`.s1.t2").show()This should be the result:
+---------------+---------+-------+ | col_name|data_type|comment| +---------------+---------+-------+ | a| int| | | b| string| | | | | | | # Partitioning| | | |Not partitioned| | | +---------------+---------+-------+
Insert values into the table.
spark.sql("insert into ndb.`tabular-volcanic-reclusive-skink`.s1.t2 values (1, 'aa')") spark.sql("select * from ndb.`tabular-volcanic-reclusive-skink`.s1.t2").show()This should be the result:
+---+---+ | a| b| +---+---+ | 1| aa| +---+---+
Verifying Using Spark-sql
Run spark-sql with the following command:
/opt/spark/bin/spark-sql --driver-class-path $(echo /usr/local/spark3/*.jar | tr ' ' ':') --jars $(echo /usr/local/spark3/*.jar | tr ' ' ',') --master "spark://SPARK_MASTER_IP:7077" --driver-java-options "-DSPARK_ROLE=driver" --name spark_app_name
where SPARK_MASTER_IP is the IP address of the Spark Master node.
In spark-sql, follow these steps to verify the connector setup:
Show schemas.
spark-sql>show schemas in ndb
This result should look like this:
+----------------------------------+ |namespace | +----------------------------------+ |`tabular-tsmgjldunu` | |`tabular-volcanic-reclusive-skink`| +----------------------------------+
Create a table.
spark-sql>create table ndb.`tabular-volcanic-reclusive-skink`.s1.t1(a int, b string) spark-sql>describe ndb.`tabular-volcanic-reclusive-skink`.s1.t2
This should be the result:
+---------------+---------+-------+ | col_name|data_type|comment| +---------------+---------+-------+ | a| int| | | b| string| | | | | | | # Partitioning| | | |Not partitioned| | | +---------------+---------+-------+
Insert values into the table.
spark-sql>insert into ndb.`tabular-volcanic-reclusive-skink`.s1.t2 values (1, 'aa') spark-sql>select * from ndb.`tabular-volcanic-reclusive-skink`.s1.t2
This should be the result:
+---+---+ | a| b| +---+---+ | 1| aa| +---+---+