#apache-spark #datafusion #datafusion-comet

Quick Introduction

As per documentation, Apache DataFusion Comet is a high-performance accelerator for Apache Spark, built on top of the powerful Apache DataFusion query engine.

Developed initially within Apple a few months ago (in 2024), it was submitted to become an Apache Software Foundation (ASF) project under the Apache Arrow umbrella.

Comet is implemented as a Rust plugin for the Apache DataFusion query engine, which is also being developed in Rust. The primary motivation for Comet's development is to delegate certain components, such as operators, of Spark’s physical plan to their DataFusion counterparts. Although still in development, Comet supports various expressions and operators, including Filter, Project, Aggregation, Join, and Exchange. Additionally, special focus is given to efficiently reading and writing Parquet files using a native implementation.

Since Comet aims for feature parity, meaning that the behavior of Spark jobs should remain consistent whether Comet is enabled or not, it automatically and transparently falls back to the Spark implementation if an unsupported or unimplemented feature is requested, either directly or indirectly. This fallback occurs without interrupting the execution of the job. This is depicted in the image below.

Image from https://datafusion.apache.org/comet/user-guide/overview.html#comet-overview.

Image from https://datafusion.apache.org/comet/user-guide/overview.html#comet-overview.

Regarding speed comparisons, there are already a few benchmarks reporting 30% to 60% faster execution.

Getting Started

We will closely the official instructions for Scala; but for PySpark. Firstly, we just need to fetch the proper jar corresponding to our Spark/Scala/JVM version. Afterwards, we need to inform Spark about the jar, inject it into the appropriate paths, and just enable the plugin as follows. More Comet-specific settings can be found at https://datafusion.apache.org/comet/user-guide/tuning.html.

# Prepare Spark session
COMET_JAR = "jars/comet-spark-spark3.3_2.12-0.3.0.jar"

spark = (
    SparkSession
    .builder
    .config("jars", COMET_JAR)
    .config("spark.driver.extraClassPath", COMET_JAR)
    .config("spark.executor.extraClassPath", COMET_JAR)
    .config("spark.plugins", "org.apache.spark.CometPlugin")
    .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
    .config("spark.comet.explainFallback.enabled", "true")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "16g")
    .getOrCreate()
)

Next, let’s load some data, stored in Parquet format.

df = spark.read.parquet(DATALAKE_RAW)

And create a temporary view just for convenience.

df.createOrReplaceTempView("taxi_trips")

Because of the setting spark.comet.explainFallback.enabled=true in the spark session, the following warning message will be logged, notifying us that Comet does not support creating views. But no worries, it will fall back to the Spark implementation and continue normally.