Skip to main content
Skip to main content
Edit this page

Spark connector

ClickHouse Supported

This connector leverages ClickHouse-specific optimizations, such as advanced partitioning and predicate pushdown, to improve query performance and data handling. The connector is based on ClickHouse's official JDBC connector, and manages its own catalog.

Before Spark 3.0, Spark lacked a built-in catalog concept, so users typically relied on external catalog systems such as Hive Metastore or AWS Glue. With these external solutions, users had to register their data source tables manually before accessing them in Spark. However, since Spark 3.0 introduced the catalog concept, Spark can now automatically discover tables by registering catalog plugins.

Spark's default catalog is spark_catalog, and tables are identified by {catalog name}.{database}.{table}. With the new catalog feature, it is now possible to add and work with multiple catalogs in a single Spark application.

Choosing Between Catalog API and TableProvider API

The ClickHouse Spark connector supports two access patterns: the Catalog API and the TableProvider API (format-based access). Understanding the differences helps you choose the right approach for your use case.

Catalog API vs TableProvider API

FeatureCatalog APITableProvider API
ConfigurationCentralized via Spark configurationPer-operation via options
Table DiscoveryAutomatic via catalogManual table specification
DDL OperationsFull support (CREATE, DROP, ALTER)Limited (automatic table creation only)
Spark SQL IntegrationNative (clickhouse.database.table)Requires format specification
Use CaseLong-term, stable connections with centralized configAd-hoc, dynamic, or temporary access

Requirements

  • Java 8 or 17 (Java 17+ required for Spark 4.0)
  • Scala 2.12 or 2.13 (Spark 4.0 only supports Scala 2.13)
  • Apache Spark 3.3, 3.4, 3.5, or 4.0

Compatibility matrix

VersionCompatible Spark VersionsClickHouse JDBC version
mainSpark 3.3, 3.4, 3.5, 4.00.9.4
0.10.0Spark 3.3, 3.4, 3.5, 4.00.9.5
0.9.0Spark 3.3, 3.4, 3.5, 4.00.9.4
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Not depend on
0.3.0Spark 3.2, 3.3Not depend on
0.2.1Spark 3.2Not depend on
0.1.2Spark 3.2Not depend on

Installation & setup

For integrating ClickHouse with Spark, there are multiple installation options to suit different project setups. You can add the ClickHouse Spark connector as a dependency directly in your project's build file (such as in pom.xml for Maven or build.sbt for SBT). Alternatively, you can put the required JAR files in your $SPARK_HOME/jars/ folder, or pass them directly as a Spark option using the --jars flag in the spark-submit command. Both approaches ensure the ClickHouse connector is available in your Spark environment.

Import as a Dependency

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Add the following repository if you want to use SNAPSHOT version.

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

Download the library

The name pattern of the binary JAR is:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

You can find all available released JAR files in the Maven Central Repository and all daily build SNAPSHOT JAR files in the Sonatype OSS Snapshots Repository.

References

It's essential to include the clickhouse-jdbc JAR with the "all" classifier, as the connector relies on clickhouse-http and clickhouse-client — both of which are bundled in clickhouse-jdbc:all. Alternatively, you can add clickhouse-client JAR and clickhouse-http individually if you prefer not to use the full JDBC package.

In any case, ensure that the package versions are compatible according to the Compatibility Matrix.

Register the catalog (required)

In order to access your ClickHouse tables, you must configure a new Spark catalog with the following configs:

PropertyValueDefault ValueRequired
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(empty string)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

These settings could be set via one of the following:

  • Edit/Create spark-defaults.conf.
  • Pass the configuration to your spark-submit command (or to your spark-shell/spark-sql CLI commands).
  • Add the configuration when initiating your context.
References

When working with a ClickHouse cluster, you need to set a unique catalog name for each instance. For example:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

That way, you would be able to access clickhouse1 table <ck_db>.<ck_table> from Spark SQL by clickhouse1.<ck_db>.<ck_table>, and access clickhouse2 table <ck_db>.<ck_table> by clickhouse2.<ck_db>.<ck_table>.

Using the TableProvider API (Format-based Access)

In addition to the catalog-based approach, the ClickHouse Spark connector supports a format-based access pattern via the TableProvider API.

Format-based Read Example

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read from ClickHouse using format API
df = spark.read \
    .format("clickhouse") \
    .option("host", "your-clickhouse-host") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "your_table") \
    .option("user", "default") \
    .option("password", "your_password") \
    .option("ssl", "true") \
    .load()

df.show()

Format-based Write Example

# Write to ClickHouse using format API
df.write \
    .format("clickhouse") \
    .option("host", "your-clickhouse-host") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "your_table") \
    .option("user", "default") \
    .option("password", "your_password") \
    .option("ssl", "true") \
    .mode("append") \
    .save()

TableProvider Features

The TableProvider API provides several powerful features:

Automatic Table Creation

When writing to a non-existent table, the connector automatically creates the table with an appropriate schema. The connector provides intelligent defaults:

  • Engine: Defaults to MergeTree() if not specified. You can specify a different engine using the engine option (e.g., ReplacingMergeTree(), SummingMergeTree(), etc.)
  • ORDER BY: Required - You must explicitly specify the order_by option when creating a new table. The connector validates that all specified columns exist in the schema.
  • Nullable Key Support: Automatically adds settings.allow_nullable_key=1 if ORDER BY contains nullable columns
# Table will be created automatically with explicit ORDER BY (required)
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "new_table") \
    .option("order_by", "id") \
    .mode("append") \
    .save()

# Specify table creation options with custom engine
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "new_table") \
    .option("order_by", "id, timestamp") \
    .option("engine", "ReplacingMergeTree()") \
    .option("settings.allow_nullable_key", "1") \
    .mode("append") \
    .save()
References

ORDER BY Required: The order_by option is required when creating a new table via the TableProvider API. You must explicitly specify which columns to use for the ORDER BY clause. The connector validates that all specified columns exist in the schema and will throw an error if any columns are missing.

Engine Selection: The default engine is MergeTree(), but you can specify any ClickHouse table engine using the engine option (e.g., ReplacingMergeTree(), SummingMergeTree(), AggregatingMergeTree(), etc.).

TableProvider Connection Options

When using the format-based API, the following connection options are available:

Connection Options

OptionDescriptionDefault ValueRequired
hostClickHouse server hostnamelocalhostYes
protocolConnection protocol (http or https)httpNo
http_portHTTP/HTTPS port8123No
databaseDatabase namedefaultYes
tableTable nameN/AYes
userUsername for authenticationdefaultNo
passwordPassword for authentication(empty string)No
sslEnable SSL connectionfalseNo
ssl_modeSSL mode (NONE, STRICT, etc.)STRICTNo
timezoneTimezone for date/time operationsserverNo

Table Creation Options

These options are used when the table doesn't exist and needs to be created:

OptionDescriptionDefault ValueRequired
order_byColumns to use for ORDER BY clause. Comma-separated for multiple columnsN/AYes
engineClickHouse table engine (e.g., MergeTree(), ReplacingMergeTree(), SummingMergeTree(), etc.)MergeTree()No
settings.allow_nullable_keyEnable nullable keys in ORDER BY (for ClickHouse Cloud)Auto-detected**No
settings.<key>Any ClickHouse table settingN/ANo
clusterCluster name for Distributed tablesN/ANo
clickhouse.column.<name>.variant_typesComma-separated list of ClickHouse types for Variant columns (e.g., String, Int64, Bool, JSON). Type names are case-sensitive. Spaces after commas are optional.N/ANo

* The order_by option is required when creating a new table. All specified columns must exist in the schema.
** Automatically set to 1 if ORDER BY contains nullable columns and not explicitly provided.

Tip

Best Practice: For ClickHouse Cloud, explicitly set settings.allow_nullable_key=1 if your ORDER BY columns might be nullable, as ClickHouse Cloud requires this setting.

Writing Modes

The Spark connector (both TableProvider API and Catalog API) supports the following Spark write modes:

  • append: Add data to existing table
  • overwrite: Replace all data in the table (truncates table)
References

Partition Overwrite Not Supported: The connector doesn't currently support partition-level overwrite operations (e.g., overwrite mode with partitionBy). This feature is in progress. See GitHub issue #34 for tracking this feature.

# Overwrite mode (truncates table first)
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .mode("overwrite") \
    .save()

Configuring ClickHouse Options

Both the Catalog API and TableProvider API support configuring ClickHouse-specific options (not connector options). These are passed through to ClickHouse when creating tables or executing queries.

ClickHouse options allow you to configure ClickHouse-specific settings like allow_nullable_key, index_granularity, and other table-level or query-level settings. These are different from connector options (like host, database, table) which control how the connector connects to ClickHouse.

Using TableProvider API

With the TableProvider API, use the settings.<key> option format:

df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .option("order_by", "id") \
    .option("settings.allow_nullable_key", "1") \
    .option("settings.index_granularity", "8192") \
    .mode("append") \
    .save()

Using Catalog API

With the Catalog API, use the spark.sql.catalog.<catalog_name>.option.<key> format in your Spark configuration:

spark.sql.catalog.clickhouse.option.allow_nullable_key 1
spark.sql.catalog.clickhouse.option.index_granularity 8192

Or set them when creating tables via Spark SQL:

CREATE TABLE clickhouse.default.my_table (
  id INT,
  name STRING
) USING ClickHouse
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  'settings.allow_nullable_key' = '1',
  'settings.index_granularity' = '8192'
)

ClickHouse Cloud settings

When connecting to ClickHouse Cloud, make sure to enable SSL and set the appropriate SSL mode. For example:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

Read data

public static void main(String[] args) {
        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

Write data

References

Partition Overwrite Not Supported: The Catalog API doesn't currently support partition-level overwrite operations (e.g., overwrite mode with partitionBy). This feature is in progress. See GitHub issue #34 for tracking this feature.

 public static void main(String[] args) throws AnalysisException {

        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // Define the schema for the DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // Create a DataFrame
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

DDL operations

You can perform DDL operations on your ClickHouse instance using Spark SQL, with all changes immediately persisted in ClickHouse. Spark SQL allows you to write queries exactly as you would in ClickHouse, so you can directly execute commands such as CREATE TABLE, TRUNCATE, and more - without modification, for instance:

Note

When using Spark SQL, only one statement can be executed at a time.

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

The above examples demonstrate Spark SQL queries, which you can run within your application using any API—Java, Scala, PySpark, or shell.

Working with VariantType

Note

VariantType support is available in Spark 4.0+ and requires ClickHouse 25.3+ with experimental JSON/Variant types enabled.

The connector supports Spark's VariantType for working with semi-structured data. VariantType maps to ClickHouse's JSON and Variant types, allowing you to store and query flexible schema data efficiently.

Note

This section focuses specifically on VariantType mapping and usage. For a complete overview of all supported data types, see the Supported data types section.

ClickHouse Type Mapping

ClickHouse TypeSpark TypeDescription
JSONVariantTypeStores JSON objects only (must start with {)
Variant(T1, T2, ...)VariantTypeStores multiple types including primitives, arrays, and JSON

Reading VariantType Data

When reading from ClickHouse, JSON and Variant columns are automatically mapped to Spark's VariantType:

// Read JSON column as VariantType
val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")

// Access variant data
df.show()

// Convert variant to JSON string for inspection
import org.apache.spark.sql.functions._
df.select(
  col("id"),
  to_json(col("data")).as("data_json")
).show()

Writing VariantType Data

You can write VariantType data to ClickHouse using either JSON or Variant column types:

import org.apache.spark.sql.functions._

// Create DataFrame with JSON data
val jsonData = Seq(
  (1, """{"name": "Alice", "age": 30}"""),
  (2, """{"name": "Bob", "age": 25}"""),
  (3, """{"name": "Charlie", "city": "NYC"}""")
).toDF("id", "json_string")

// Parse JSON strings to VariantType
val variantDF = jsonData.select(
  col("id"),
  parse_json(col("json_string")).as("data")
)

// Write to ClickHouse with JSON type (JSON objects only)
variantDF.writeTo("clickhouse.default.user_data").create()

// Or specify Variant with multiple types
spark.sql("""
  CREATE TABLE clickhouse.default.mixed_data (
    id INT,
    data VARIANT
  ) USING clickhouse
  TBLPROPERTIES (
    'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON',
    'engine' = 'MergeTree()',
    'order_by' = 'id'
  )
""")

Creating VariantType Tables with Spark SQL

You can create VariantType tables using Spark SQL DDL:

-- Create table with JSON type (default)
CREATE TABLE clickhouse.default.json_table (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)
-- Create table with Variant type supporting multiple types
CREATE TABLE clickhouse.default.flexible_data (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

Configuring Variant Types

When creating tables with VariantType columns, you can specify which ClickHouse types to use:

JSON Type (Default)

If no variant_types property is specified, the column defaults to ClickHouse's JSON type, which only accepts JSON objects:

CREATE TABLE clickhouse.default.json_table (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

This creates the following ClickHouse query:

CREATE TABLE json_table (id Int32, data JSON) ENGINE = MergeTree() ORDER BY id

Variant Type with Multiple Types

To support primitives, arrays, and JSON objects, specify the types in the variant_types property:

CREATE TABLE clickhouse.default.flexible_data (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

This creates the following ClickHouse query:

CREATE TABLE flexible_data (
  id Int32, 
  data Variant(String, Int64, Float64, Bool, Array(String), JSON)
) ENGINE = MergeTree() ORDER BY id

Supported Variant Types

The following ClickHouse types can be used in Variant():

  • Primitives: String, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64, Bool
  • Arrays: Array(T) where T is any supported type, including nested arrays
  • JSON: JSON for storing JSON objects

Read Format Configuration

By default, JSON and Variant columns are read as VariantType. You can override this behavior to read them as strings:

// Read JSON/Variant as strings instead of VariantType
spark.conf.set("spark.clickhouse.read.jsonAs", "string")

val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
// data column will be StringType containing JSON strings

Write Format Support

VariantType write support varies by format:

FormatSupportNotes
JSON✅ FullSupports both JSON and Variant types. Recommended for VariantType data
Arrow⚠️ PartialSupports writing to ClickHouse JSON type. Doesn't support ClickHouse Variant type. Full support is pending resolution of https://github.com/ClickHouse/ClickHouse/issues/92752

Configure the write format:

spark.conf.set("spark.clickhouse.write.format", "json")  // Recommended for Variant types
Tip

If you need to write to a ClickHouse Variant type, use JSON format. Arrow format only supports writing to JSON type.

Best Practices

  1. Use JSON type for JSON-only data: If you only store JSON objects, use the default JSON type (no variant_types property)
  2. Specify types explicitly: When using Variant(), explicitly list all types you plan to store
  3. Enable experimental features: Ensure ClickHouse has allow_experimental_json_type = 1 enabled
  4. Use JSON format for writes: JSON format is recommended for VariantType data for better compatibility
  5. Consider query patterns: JSON/Variant types support ClickHouse's JSON path queries for efficient filtering
  6. Column hints for performance: When using JSON fields in ClickHouse, adding column hints improves query performance. Currently, adding column hints via Spark isn't supported. See GitHub issue #497 for tracking this feature.

Example: Complete Workflow

import org.apache.spark.sql.functions._

// Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1")

// Create table with Variant column
spark.sql("""
  CREATE TABLE clickhouse.default.events (
    event_id BIGINT,
    event_time TIMESTAMP,
    event_data VARIANT
  ) USING clickhouse
  TBLPROPERTIES (
    'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON',
    'engine' = 'MergeTree()',
    'order_by' = 'event_time'
  )
""")

// Prepare data with mixed types
val events = Seq(
  (1L, "2024-01-01 10:00:00", """{"action": "login", "user_id": 123}"""),
  (2L, "2024-01-01 10:05:00", """{"action": "purchase", "amount": 99.99}"""),
  (3L, "2024-01-01 10:10:00", """{"action": "logout", "duration": 600}""")
).toDF("event_id", "event_time", "json_data")

// Convert to VariantType and write
val variantEvents = events.select(
  col("event_id"),
  to_timestamp(col("event_time")).as("event_time"),
  parse_json(col("json_data")).as("event_data")
)

variantEvents.writeTo("clickhouse.default.events").append()

// Read and query
val result = spark.sql("""
  SELECT event_id, event_time, event_data
  FROM clickhouse.default.events
  WHERE event_time >= '2024-01-01'
  ORDER BY event_time
""")

result.show(false)

Configurations

The following are the adjustable configurations available in the connector.

Note

Using Configurations: These are Spark-level configuration options that apply to both Catalog API and TableProvider API. They can be set in two ways:

  1. Global Spark configuration (applies to all operations):

    spark.conf.set("spark.clickhouse.write.batchSize", "20000")
    spark.conf.set("spark.clickhouse.write.compression.codec", "lz4")
    
  2. Per-operation override (TableProvider API only - can override global settings):

    df.write \
        .format("clickhouse") \
        .option("host", "your-host") \
        .option("database", "default") \
        .option("table", "my_table") \
        .option("spark.clickhouse.write.batchSize", "20000") \
        .option("spark.clickhouse.write.compression.codec", "lz4") \
        .mode("append") \
        .save()
    

Alternatively, set them in spark-defaults.conf or when creating the Spark session.


KeyDefaultDescriptionSince
spark.clickhouse.ignoreUnsupportedTransformtrueClickHouse supports using complex expressions as sharding keys or partition values, e.g. cityHash64(col_1, col_2), and those can not be supported by Spark now. If true, ignore the unsupported expressions and log a warning, otherwise fail fast w/ an exception. Warning: When spark.clickhouse.write.distributed.convertLocal=true, ignoring unsupported sharding keys may corrupt the data. The connector validates this and throws an error by default. To allow it, explicitly set spark.clickhouse.write.distributed.convertLocal.allowUnsupportedSharding=true.0.4.0
spark.clickhouse.read.compression.codeclz4The codec used to decompress data for reading. Supported codecs: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueWhen reading Distributed table, read local table instead of itself. If true, ignore spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsbinaryRead ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string0.8.0
spark.clickhouse.read.formatjsonSerialize format for reading. Supported formats: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseEnable runtime filter for reading.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueIf true, construct input partition filter by virtual column _partition_id, instead of partition value. There are known issues with assembling SQL predicates by partition value. This feature requires ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalseIf true, mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ... on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true.0.8.0
spark.clickhouse.write.batchSize10000The number of records per batch on writing to ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4The codec used to compress data for writing. Supported codecs: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseWhen writing Distributed table, write local table instead of itself. If true, ignore spark.clickhouse.write.distributed.useClusterNodes. This bypasses ClickHouse's native routing, requiring Spark to evaluate the sharding key. When using unsupported sharding expressions, set spark.clickhouse.ignoreUnsupportedTransform to false to prevent silent data distribution errors.0.1.0
spark.clickhouse.write.distributed.convertLocal.allowUnsupportedShardingfalseAllow writing to Distributed tables with convertLocal=true and ignoreUnsupportedTransform=true when the sharding key is unsupported. This is dangerous and may cause data corruption due to incorrect sharding. When set to true, you must ensure that your data is properly sorted/sharded before writing, as Spark can't evaluate the unsupported sharding expression. Only set to true if you understand the risks and have verified your data distribution. By default, this combination will throw an error to prevent silent data corruption.0.10.0
spark.clickhouse.write.distributed.useClusterNodestrueWrite to all nodes of cluster when writing Distributed table.0.1.0
spark.clickhouse.write.formatarrowSerialize format for writing. Supported formats: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrueIf true, do local sort by sort keys before writing.0.3.0
spark.clickhouse.write.localSortByPartitionvalue of spark.clickhouse.write.repartitionByPartitionIf true, do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3The maximum number of write we will retry for a single batch write failed with retryable codes.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueWhether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing.0.3.0
spark.clickhouse.write.repartitionNum0Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseIf true, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true.0.3.0
spark.clickhouse.write.retryInterval10sThe interval in seconds between write retry.0.1.0
spark.clickhouse.write.retryableErrorCodes241The retryable error codes returned by ClickHouse server when write failing.0.1.0

Supported data types

This section outlines the mapping of data types between Spark and ClickHouse. The tables below provide quick references for converting data types when reading from ClickHouse into Spark and when inserting data from Spark into ClickHouse.

Reading data from ClickHouse into Spark

ClickHouse Data TypeSpark Data TypeSupportedIs PrimitiveNotes
NothingNullTypeYes
BoolBooleanTypeYes
UInt8, Int16ShortTypeYes
Int8ByteTypeYes
UInt16,Int32IntegerTypeYes
UInt32,Int64, UInt64LongTypeYes
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Yes
Float32FloatTypeYes
Float64DoubleTypeYes
String, UUID, Enum8, Enum16, IPv4, IPv6StringTypeYes
FixedStringBinaryType, StringTypeYesControlled by configuration READ_FIXED_STRING_AS
DecimalDecimalTypeYesPrecision and scale up to Decimal128
Decimal32DecimalType(9, scale)Yes
Decimal64DecimalType(18, scale)Yes
Decimal128DecimalType(38, scale)Yes
Date, Date32DateTypeYes
DateTime, DateTime32, DateTime64TimestampTypeYes
ArrayArrayTypeNoArray element type is also converted
MapMapTypeNoKeys are limited to StringType
IntervalYearYearMonthIntervalType(Year)Yes
IntervalMonthYearMonthIntervalType(Month)Yes
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeNoSpecific interval type is used
JSON, VariantVariantTypeNoRequires Spark 4.0+ and ClickHouse 25.3+. Can be read as StringType with spark.clickhouse.read.jsonAs=string
Object
Nested
TupleStructTypeNoSupports both named and unnamed tuples. Named tuples map to struct fields by name, unnamed tuples use _1, _2, etc. Supports nested structs and nullable fields
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Inserting data from Spark into ClickHouse

Spark Data TypeClickHouse Data TypeSupportedIs PrimitiveNotes
BooleanTypeBoolYesMapped to Bool type (not UInt8) since version 0.9.0
ByteTypeInt8Yes
ShortTypeInt16Yes
IntegerTypeInt32Yes
LongTypeInt64Yes
FloatTypeFloat32Yes
DoubleTypeFloat64Yes
StringTypeStringYes
VarcharTypeStringYes
CharTypeStringYes
DecimalTypeDecimal(p, s)YesPrecision and scale up to Decimal128
DateTypeDateYes
TimestampTypeDateTimeYes
ArrayType (list, tuple, or array)ArrayNoArray element type is also converted
MapTypeMapNoKeys are limited to StringType
StructTypeTupleNoConverted to named Tuple with field names.
VariantTypeJSON or VariantNoRequires Spark 4.0+ and ClickHouse 25.3+. Defaults to JSON type. Use clickhouse.column.<name>.variant_types property to specify Variant with multiple types.
Object
Nested

Supported ClickHouse server versions

The connector communicates with ClickHouse exclusively over HTTP. There is no minimum version enforced at runtime, but the following version requirements apply for specific features:

FeatureMinimum ClickHouse Version
General HTTP connectivity20.7+
spark.clickhouse.read.splitByPartitionId (partition-id-based filtering)21.6+
VariantType / JSON type support25.3+

For production deployments, we recommend running ClickHouse 23.x or later. The connector is tested against the latest stable ClickHouse releases.

Push-down operations

The connector implements the Spark DataSource V2 push-down interfaces, meaning the following operations are translated into SQL and executed on the ClickHouse server rather than in Spark memory. This significantly reduces data transfer and improves query performance.

Push-down TypeSpark InterfaceWhat gets pushed
Column pruningSupportsPushDownRequiredColumnsOnly the columns selected by the query are fetched from ClickHouse. SELECT col1, col2 avoids transferring all columns.
Filter predicatesSupportsPushDownFiltersWHERE conditions using standard comparison operators, IN, IS NULL, LIKE, etc. Unsupported expressions fall back to Spark-side evaluation.
LimitSupportsPushDownLimitLIMIT N is sent to ClickHouse, preventing full table scans for small result sets.
AggregationsSupportsPushDownAggregatesGROUP BY + aggregate functions (SUM, COUNT, MIN, MAX, AVG) are executed in ClickHouse. A probe query (WHERE 1=0) is first sent to determine the output schema.
Runtime filtersSupportsRuntimeFilteringDynamic filters produced during query execution (e.g. from broadcast joins) are applied at scan time. Must be enabled explicitly.

To enable runtime filtering:

spark.conf.set("spark.clickhouse.read.runtimeFilter.enabled", "true")
Note

Complex filter expressions that Spark cannot compile to SQL (e.g., user-defined functions, unsupported predicates) are not pushed down. They are returned to Spark and evaluated in memory after the data is fetched. Use EXPLAIN in Spark to see which filters were actually pushed down.

Connector parallelism

Understanding how the connector maps Spark tasks to ClickHouse shards and partitions is important for tuning performance.

Read parallelism

The connector creates one Spark input partition per ClickHouse physical partition (data part group). The number of Spark read tasks equals the number of distinct partition values in the ClickHouse table.

ScenarioSpark tasks created
MergeTree table with N distinct partitionsN tasks, one per partition, all targeting the same ClickHouse node
Distributed table with spark.clickhouse.read.distributed.convertLocal=true (default)(number of shards) × (partitions per shard) tasks, each targeting a specific shard node directly
Distributed table with spark.clickhouse.read.distributed.convertLocal=false1 task, reading through the Distributed coordinator node

For a table with no PARTITION BY clause, the connector creates one Spark partition per ClickHouse data part group, which may result in many small tasks. Use PARTITION BY in your ClickHouse table to control parallelism granularity.

Write parallelism

Before writing, Spark reshuffles the DataFrame to match ClickHouse's distribution requirements:

  1. Repartition by partition key (spark.clickhouse.write.repartitionByPartition=true, default): Spark groups rows by ClickHouse partition key values so that all rows for the same partition land in the same Spark task.
  2. Sort within partition: Rows are locally sorted by [sharding_key, partition_key, order_by_key] to produce optimally-ordered inserts.
  3. Write tasks: Each Spark task writes its rows to ClickHouse in batches of spark.clickhouse.write.batchSize (default 10,000).

For Distributed tables, when spark.clickhouse.write.distributed.useClusterNodes=true (default), each Spark task can write to multiple shard nodes in parallel. When spark.clickhouse.write.distributed.convertLocal=true, Spark computes the sharding key and routes each row directly to the correct shard's local table.

To control the number of write tasks explicitly:

spark.conf.set("spark.clickhouse.write.repartitionNum", "16")  # 16 write tasks

Working with Distributed tables

When using ClickHouse Distributed tables with the connector, there is an important networking and architecture consideration:

The connector bypasses the Distributed engine and connects directly to each shard node.

This means:

  1. All shard hostnames must be reachable from Spark executors. The connector reads cluster topology from system.clusters on the coordinator node, then opens direct connections to each shard. If the shard hostnames returned by system.clusters are internal cluster names not resolvable from outside, reads and writes will fail.
  2. Inserts go to local tables: When writing, data is inserted directly into the local MergeTree table on each shard (not through the Distributed table). This is more efficient but requires that the local table exists on every shard and that Spark can connect to each shard directly.
  3. Schema must be consistent across all shards: Since Spark reads schema from one node, the table structure must be identical on all shards.

To use coordinator-only routing (simpler networking, less parallelism):

spark.clickhouse.read.distributed.convertLocal   false
spark.clickhouse.write.distributed.useClusterNodes  false
spark.clickhouse.write.distributed.convertLocal  false

With these settings, all reads and writes go through the single coordinator node using the Distributed engine, and only the coordinator needs to be accessible from Spark.

Passing query settings and Java client options

The connector uses ClickHouse's HTTP interface via the ClickHouse Java client. You can pass arbitrary ClickHouse session settings and HTTP client options using the option.<key> prefix.

Via Catalog API

Add option.<key> entries to spark-defaults.conf or your Spark session configuration:

spark.sql.catalog.clickhouse.option.ssl                  false
spark.sql.catalog.clickhouse.option.async                false
spark.sql.catalog.clickhouse.option.client_name          spark
spark.sql.catalog.clickhouse.option.custom_http_params   async_insert=1,wait_for_async_insert=1

The custom_http_params key lets you pass multiple ClickHouse server settings as a comma-separated list of key=value pairs. These are appended to every HTTP request as query parameters.

Via TableProvider API

df.read \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .option("option.custom_http_params", "max_execution_time=300,max_memory_usage=10000000000") \
    .load()

Common settings

SettingExample valuePurpose
async_insert=1,wait_for_async_insert=1option.custom_http_paramsEnable ClickHouse async inserts; reduces memory pressure during high-frequency writes
insert_deduplicate=0option.custom_http_paramsDisable deduplication for idempotent write pipelines
max_insert_block_size=1048576option.custom_http_paramsControl max block size for inserts
max_execution_time=300option.custom_http_paramsExtend query timeout (seconds) for large reads
session_timeout=60option.custom_http_paramsExtend HTTP session timeout
ssloption.sslEnable TLS (true/false)
client_nameoption.client_nameTag requests with a client name visible in system.query_log

Performance tuning

Read performance

TuningConfigurationNotes
Use binary read formatspark.clickhouse.read.format=binaryBinary (Arrow) format is faster than JSON for large reads. Default is json for wider compatibility.
Column pruningUse explicit SELECT col1, col2Avoid SELECT *; the connector pushes column selection to ClickHouse.
Filter push-downKeep WHERE clauses on native ClickHouse typesString, numeric, and DateTime filters are pushed down. Complex expressions are evaluated in Spark.
Parallel reads across shardsspark.clickhouse.read.distributed.convertLocal=true (default)Enables one Spark task per shard partition. Ensure all shard hosts are accessible.
Runtime filteringspark.clickhouse.read.runtimeFilter.enabled=trueAllows dynamic filters from broadcast joins to reduce data fetched.
Compressionspark.clickhouse.read.compression.codec=lz4 (default)LZ4 reduces network transfer; disable only if CPU is the bottleneck.

Write performance

TuningConfigurationNotes
Increase batch sizespark.clickhouse.write.batchSize=50000Default is 10,000. Larger batches reduce round-trips. Reduce if you see "Broken Pipe" errors.
Arrow write formatspark.clickhouse.write.format=arrow (default)Arrow is faster than JSON for most data types. Use json only for Variant/JSON column types.
Compressionspark.clickhouse.write.compression.codec=lz4 (default)Reduces network transfer during writes.
Async insertsoption.custom_http_params=async_insert=1,wait_for_async_insert=1Shifts buffering to ClickHouse, reducing memory pressure on the server for high-frequency writes.
Pre-sort dataspark.clickhouse.write.localSortByKey=true (default)Local sort before writing reduces MergeTree merge pressure.
Repartition by partitionspark.clickhouse.write.repartitionByPartition=true (default)Groups rows by partition before writing, reducing the number of parts created.
Explicit partition countspark.clickhouse.write.repartitionNum=NSet if you want to control write parallelism explicitly.
Strict distributionspark.clickhouse.write.repartitionStrictly=trueForces exact distribution matching. Required only if data ordering is critical (Spark 3.4+).

Recommended starting configuration for bulk loads

spark.conf.set("spark.clickhouse.write.batchSize", "50000")
spark.conf.set("spark.clickhouse.write.format", "arrow")
spark.conf.set("spark.clickhouse.write.compression.codec", "lz4")
spark.conf.set("spark.clickhouse.write.repartitionByPartition", "true")
spark.conf.set("spark.clickhouse.write.localSortByKey", "true")

Troubleshooting

Broken pipe or connection reset during write

Symptom: CHServerException: [HTTP] Broken pipe (Write failed) or Connection reset by peer during INSERT.

Cause: ClickHouse closed the connection mid-stream, typically due to memory pressure on the ClickHouse server when the batch is too large.

Fix:

  1. Reduce spark.clickhouse.write.batchSize. Start with 1,000–5,000 and increase gradually.
  2. Enable async inserts to shift buffering to ClickHouse:
    spark.conf.set("spark.sql.catalog.clickhouse.option.custom_http_params",
                   "async_insert=1,wait_for_async_insert=1")
    
  3. Increase max_memory_usage on the ClickHouse server if possible.

Schema inference WHERE 1=0 queries

Symptom: A SELECT ... WHERE 1=0 query is sent to ClickHouse every time a DataFrame is used.

Cause: When aggregation push-down is triggered, the connector sends a probe query to ClickHouse to determine the output schema of the pushed-down aggregation. This is by design and only occurs when aggregation push-down is active.

Fix: If this causes unacceptable load, avoid triggering aggregation push-down by handling the aggregation in Spark instead:

# Instead of letting ClickHouse handle the aggregation:
df = spark.sql("SELECT sum(value) FROM clickhouse.default.my_table GROUP BY id")

# Handle the aggregation in Spark memory:
df = spark.sql("SELECT * FROM clickhouse.default.my_table")
df.groupBy("id").agg({"value": "sum"})

Cannot connect to shard hostname

Symptom: Reads or writes fail with connection errors referencing shard hostnames that are not the coordinator node.

Cause: The connector reads cluster topology from system.clusters and tries to connect directly to each shard. If shard hostnames are internal cluster DNS names not resolvable from Spark executors, the connections will fail.

Fix:

  • Configure DNS so all shard hostnames are resolvable from Spark executors.
  • Alternatively, use coordinator-only routing (less parallelism):
    spark.clickhouse.read.distributed.convertLocal   false
    spark.clickhouse.write.distributed.useClusterNodes  false
    

Too many Spark tasks when reading a partitioned Distributed table

Symptom: Spark creates far more tasks than expected (e.g., thousands) when reading a Distributed table.

Cause: With spark.clickhouse.read.distributed.convertLocal=true (default), one Spark partition is created per (shard × ClickHouse partition). A 4-shard cluster with a table partitioned by day over 1 year = 4 × 365 = 1,460 Spark tasks.

Fix:

  • Use spark.clickhouse.read.distributed.convertLocal=false to read through the coordinator (1 task total).
  • Apply a partition filter in your query so only relevant partitions are read, reducing the partition list the connector discovers.

Write fails with unsupported partition expression

Symptom: AnalysisException referencing an unsupported transform (e.g. months(key)) when writing to a table with PARTITION BY toYYYYMM(key).

Cause: Spark cannot evaluate complex ClickHouse partition expressions as partitioning transforms.

Fix: Set spark.clickhouse.ignoreUnsupportedTransform=true (the default). The connector will log a warning and skip the partition-level clustering requirement. ClickHouse will still handle routing correctly when writing to the Distributed table.

Note

If you also have spark.clickhouse.write.distributed.convertLocal=true, ignoring unsupported sharding keys can cause incorrect data distribution. In that case, either use a supported sharding key or set spark.clickhouse.write.distributed.convertLocal.allowUnsupportedSharding=true only if you understand the implications.


Table not found or stale schema after DDL

Symptom: After creating or altering a ClickHouse table, Spark still sees the old schema or raises a "table not found" error.

Cause: Spark caches catalog metadata. For ClickHouse Cloud, replication to all nodes may also take a moment.

Fix:

spark.catalog.refreshTable("clickhouse.database.table_name")

Contributing and support

If you'd like to contribute to the project or report any issues, we welcome your input! Visit our GitHub repository to open an issue, suggest improvements, or submit a pull request. Contributions are welcome! Please check the contribution guidelines in the repository before starting. Thank you for helping improve our ClickHouse Spark connector!