> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-mintlify-8a08bda2.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> Documentation for the AvroConfluent format

# AvroConfluent

| Input | Output | Alias |
| ----- | ------ | ----- |
| ✔     | ✔      |       |

<h2 id="description">
  Description
</h2>

[Apache Avro](https://avro.apache.org/) is a row-oriented serialization format that uses binary encoding for efficient data processing. The `AvroConfluent` format supports reading and writing Avro-encoded messages using the [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/index.html) (or API-compatible services).

Each message uses the Confluent wire format: a magic byte (`0x00`) followed by a 4-byte big-endian schema ID, followed by the Avro binary datum. When reading, ClickHouse resolves the schema ID by querying the registry. When writing, ClickHouse registers the schema derived from the output columns and prepends the resulting ID to each row. Schemas are cached for optimal performance.

<a id="data-types-matching" />

<h2 id="data-type-mapping">
  Data type mapping
</h2>

The table below shows all data types supported by the Apache Avro format, and their corresponding ClickHouse [data types](/reference/data-types) in `INSERT` and `SELECT` queries.

| Avro data type `INSERT`                     | ClickHouse data type                                                                            | Avro data type `SELECT`          |
| ------------------------------------------- | ----------------------------------------------------------------------------------------------- | -------------------------------- |
| `boolean`, `int`, `long`, `float`, `double` | [Int(8\16\32)](/reference/data-types/int-uint), [UInt(8\16\32)](/reference/data-types/int-uint) | `int`                            |
| `boolean`, `int`, `long`, `float`, `double` | [Int64](/reference/data-types/int-uint), [UInt64](/reference/data-types/int-uint)               | `long`                           |
| `boolean`, `int`, `long`, `float`, `double` | [Float32](/reference/data-types/float)                                                          | `float`                          |
| `boolean`, `int`, `long`, `float`, `double` | [Float64](/reference/data-types/float)                                                          | `double`                         |
| `bytes`, `string`, `fixed`, `enum`          | [String](/reference/data-types/string)                                                          | `bytes` or `string` \*           |
| `bytes`, `string`, `fixed`                  | [FixedString(N)](/reference/data-types/fixedstring)                                             | `fixed(N)`                       |
| `enum`                                      | [Enum(8\16)](/reference/data-types/enum)                                                        | `enum`                           |
| `array(T)`                                  | [Array(T)](/reference/data-types/array)                                                         | `array(T)`                       |
| `map(V, K)`                                 | [Map(V, K)](/reference/data-types/map)                                                          | `map(string, K)`                 |
| `union(null, T)`, `union(T, null)`          | [Nullable(T)](/reference/data-types/date)                                                       | `union(null, T)`                 |
| `union(T1, T2, …)` \*\*                     | [Variant(T1, T2, …)](/reference/data-types/variant)                                             | `union(T1, T2, …)` \*\*          |
| `null`                                      | [Nullable(Nothing)](/reference/data-types/special-data-types/nothing)                           | `null`                           |
| `int (date)` \*\*\*                         | [Date](/reference/data-types/date), [Date32](/reference/data-types/date32)                      | `int (date)` \*\*\*              |
| `long (timestamp-millis)` \*\*\*            | [DateTime64(3)](/reference/data-types/datetime)                                                 | `long (timestamp-millis)` \*\*\* |
| `long (timestamp-micros)` \*\*\*            | [DateTime64(6)](/reference/data-types/datetime)                                                 | `long (timestamp-micros)` \*\*\* |
| `bytes (decimal)`  \*\*\*                   | [DateTime64(N)](/reference/data-types/datetime)                                                 | `bytes (decimal)`  \*\*\*        |
| `int`                                       | [IPv4](/reference/data-types/ipv4)                                                              | `int`                            |
| `fixed(16)`                                 | [IPv6](/reference/data-types/ipv6)                                                              | `fixed(16)`                      |
| `bytes (decimal)` \*\*\*                    | [Decimal(P, S)](/reference/data-types/decimal)                                                  | `bytes (decimal)` \*\*\*         |
| `string (uuid)` \*\*\*                      | [UUID](/reference/data-types/uuid)                                                              | `string (uuid)` \*\*\*           |
| `fixed(16)`                                 | [Int128/UInt128](/reference/data-types/int-uint)                                                | `fixed(16)`                      |
| `fixed(32)`                                 | [Int256/UInt256](/reference/data-types/int-uint)                                                | `fixed(32)`                      |
| `record`                                    | [Tuple](/reference/data-types/tuple)                                                            | `record`                         |

\* `bytes` is default, controlled by setting [`output_format_avro_string_column_pattern`](/reference/settings/formats#output_format_avro_string_column_pattern)

\*\*  The [Variant type](/reference/data-types/variant) implicitly accepts `null` as a field value, so for example the Avro `union(T1, T2, null)` will be converted to `Variant(T1, T2)`.
As a result, when producing Avro from ClickHouse, we have to always include the `null` type to the Avro `union` type set as we don't know if any value is actually `null` during the schema inference.

\*\*\* [Avro logical types](https://avro.apache.org/docs/current/spec.html#Logical+Types)

Unsupported Avro logical data types:

* `time-millis`
* `time-micros`
* `duration`

<h2 id="format-settings">
  Format settings
</h2>

[//]: # "NOTE These settings can be set at a session-level, but this isn't common and documenting it too prominently can be confusing to users."

| Setting                                          | Description                                                                                                                                                             | Default |
| ------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `input_format_avro_allow_missing_fields`         | Whether to use a default value instead of throwing an error when a field is not found in the schema.                                                                    | `0`     |
| `input_format_avro_null_as_default`              | Whether to use a default value instead of throwing an error when inserting a `null` value into a non-nullable column.                                                   | `0`     |
| `format_avro_schema_registry_url`                | The Confluent Schema Registry URL. For basic authentication, URL-encoded credentials can be included directly in the URL path.                                          |         |
| `format_avro_schema_registry_connection_timeout` | Connection timeout in seconds for the Schema Registry HTTP client (used for both schema fetch and registration). Must be greater than 0 and less than 600 (10 minutes). | `1`     |
| `format_avro_schema_registry_send_timeout`       | Send timeout in seconds for the Schema Registry HTTP client. Must be greater than 0 and less than 600 (10 minutes).                                                     | `1`     |
| `format_avro_schema_registry_receive_timeout`    | Receive timeout in seconds for the Schema Registry HTTP client. Must be greater than 0 and less than 600 (10 minutes).                                                  | `1`     |
| `output_format_avro_confluent_subject`           | For output: the subject name under which the schema is registered in the Schema Registry. Required when writing.                                                        |         |
| `output_format_avro_string_column_pattern`       | For output: regexp of String columns to serialize as Avro `string` (default is `bytes`).                                                                                |         |

<h2 id="examples">
  Examples
</h2>

<h3 id="reading-from-kafka">
  Reading from Kafka
</h3>

To read an Avro-encoded Kafka topic using the [Kafka table engine](/reference/engines/table-engines/integrations/kafka), use the `format_avro_schema_registry_url` setting to provide the URL of the schema registry.

```sql theme={null}
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;
```

<h3 id="writing-to-kafka">
  Writing to Kafka
</h3>

To write AvroConfluent messages to a Kafka topic, set both the schema registry URL and the subject name. The schema is automatically registered with the registry on the first write.

```sql theme={null}
CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');
```

<h4 id="using-basic-authentication">
  Using basic authentication
</h4>

If your schema registry requires basic authentication (e.g., if you're using Confluent Cloud), you can provide URL-encoded credentials in the `format_avro_schema_registry_url` setting.

```sql theme={null}
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
```

<h2 id="troubleshooting">
  Troubleshooting
</h2>

To monitor ingestion progress and debug errors with the Kafka consumer, you can query the [`system.kafka_consumers` system table](/reference/system-tables/kafka_consumers). If your deployment has multiple replicas (e.g., ClickHouse Cloud), you must use the [`clusterAllReplicas`](/reference/functions/table-functions/cluster) table function.

```sql theme={null}
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
```

If you run into schema resolution issues, you can use [kafkacat](https://github.com/edenhill/kafkacat) with [clickhouse-local](/concepts/features/tools-and-utilities/clickhouse-local) to troubleshoot:

```bash theme={null}
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
```
