> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-mintlify-8a08bda2.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> This engine provides integration with the Amazon S3 ecosystem and allows streaming imports. Similar to the Kafka and RabbitMQ engines, but provides S3-specific features.

# S3Queue table engine

export const ScalePlanFeatureBadge = ({feature = 'This feature', linking_verb_are = false}) => {
  return <div className="scalePlanFeatureContainer">
            <div className="scalePlanFeatureBadge">
                Scale plan feature
            </div>
            <div>
                <p>{feature} {linking_verb_are ? 'are' : 'is'} available in the Scale and Enterprise plans. To upgrade, visit the plans page in the cloud console.</p>
            </div>
        </div>;
};

This engine provides integration with [Amazon S3](https://aws.amazon.com/s3/) ecosystem and allows streaming import. This engine is similar to the [Kafka](/reference/engines/table-engines/integrations/kafka), [RabbitMQ](/reference/engines/table-engines/integrations/rabbitmq) engines, but provides S3-specific features.

It is important to understand this note from the [original PR for S3Queue implementation](https://github.com/ClickHouse/ClickHouse/pull/49086/files#diff-e1106769c9c8fbe48dd84f18310ef1a250f2c248800fde97586b3104e9cd6af8R183): when the `MATERIALIZED VIEW` joins the engine, the S3Queue Table Engine starts collecting data in the background.

<h2 id="creating-a-table">
  Create table
</h2>

```sql theme={null}
CREATE TABLE s3_queue_engine_table (name String, value UInt32)
    ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression], [headers], [extra_credentials])
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    [loading_retries = 0,]
    [processing_threads_num = 16,]
    [parallel_inserts = false,]
    [enable_logging_to_queue_log = true,]
    [last_processed_path = "",]
    [tracked_files_limit = 1000,]
    [tracked_file_ttl_sec = 0,]
    [polling_min_timeout_ms = 1000,]
    [polling_max_timeout_ms = 10000,]
    [polling_backoff_ms = 0,]
    [cleanup_interval_min_ms = 10000,]
    [cleanup_interval_max_ms = 30000,]
    [buckets = 0,]
    [list_objects_batch_size = 1000,]
    [enable_hash_ring_filtering = 0,]
    [max_processed_files_before_commit = 100,]
    [max_processed_rows_before_commit = 0,]
    [max_processed_bytes_before_commit = 0,]
    [max_processing_time_sec_before_commit = 0,]
```

<Warning>
  Before `24.7`, it is required to use `s3queue_` prefix for all settings apart from `mode`, `after_processing` and `keeper_path`.
</Warning>

**Engine parameters**

`S3Queue` parameters are the same as `S3` table engine supports. See parameters section [here](/reference/engines/table-engines/integrations/s3#parameters).

**Example**

```sql theme={null}
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered';
```

Using named collections:

```xml theme={null}
<clickhouse>
    <named_collections>
        <s3queue_conf>
            <url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
            <access_key_id>test<access_key_id>
            <secret_access_key>test</secret_access_key>
        </s3queue_conf>
    </named_collections>
</clickhouse>
```

```sql theme={null}
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
    mode = 'ordered';
```

<h2 id="settings">
  Settings
</h2>

To get a list of settings, configured for the table, use `system.s3_queue_settings` table. Available from `24.10`.

<Info>
  **Setting Names (24.7+)**

  Starting from version 24.7, S3Queue settings can be specified with or without the `s3queue_` prefix:

  * **Modern syntax** (24.7+): `processing_threads_num`, `tracked_file_ttl_sec`, etc.
  * **Legacy syntax** (all versions): `s3queue_processing_threads_num`, `s3queue_tracked_file_ttl_sec`, etc.

  Both forms are supported in 24.7+. The examples on this page use the modern syntax with no prefix.
</Info>

<h3 id="mode">
  Mode
</h3>

Possible values:

* unordered — With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKeeper.
* ordered — With ordered mode, the files are processed in lexicographic order. It means that if file named 'BBB' was processed at some point and later on a file named 'AA' is added to the bucket, it will be ignored. Only the max name (in lexicographic sense) of the successfully consumed file, and the names of files that will be retried after unsuccessful loading attempt are being stored in ZooKeeper.

Default value: `ordered` in versions before 24.6. Starting with 24.6 there is no default value, the setting becomes required to be specified manually. For tables created on earlier versions the default value will remain `Ordered` for compatibility.

<h3 id="after_processing">
  `after_processing`
</h3>

How to handle file after successful processing.

Possible values:

* keep.
* delete.
* move.
* tag.

Default value: `keep`.

Move requires additional settings. In case of a move within the same bucket, a new path prefix must be provided as `after_processing_move_prefix`.

Move to another S3 bucket requires the target bucket URI as `after_processing_move_uri`, S3 credentials as `after_processing_move_access_key_id` and `after_processing_move_secret_access_key`.

Example:

```sql theme={null}
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_retries = 20,
    after_processing_move_prefix = 'dst_prefix',
    after_processing_move_uri = 'https://clickhouse-public-datasets.s3.amazonaws.com/dst-bucket',
    after_processing_move_access_key_id = 'test',
    after_processing_move_secret_access_key = 'test';
```

Move from an Azure container to another Azure container requires the Blob Storage connection string as `after_processing_move_connection_string` and the container name as `after_processing_move_container`. See [the AzureQueue settings](/reference/engines/table-engines/integrations/azure-queue#settings).

Tagging requires tag key and value provided as `after_processing_tag_key` and `after_processing_tag_value`.

<h3 id="after_processing_retries">
  `after_processing_retries`
</h3>

Number of retries for the requested after-processing action, before giving up.

Possible values:

* Non-negative integer.

Default value: `10`.

<h3 id="after_processing_move_access_key_id">
  `after_processing_move_access_key_id`
</h3>

Access Key ID for S3 bucket to move successfully processed files to, if the destination is another S3 bucket.

Possible values:

* String.

Default value: empty string.

<h3 id="after_processing_move_prefix">
  `after_processing_move_prefix`
</h3>

Path prefix to move successfully processed files to. It is applicable in both cases, move within the same and to another bucket.

Possible values:

* String.

Default value: empty string.

<h3 id="after_processing_move_secret_access_key">
  `after_processing_move_secret_access_key`
</h3>

Secret Access Key for S3 bucket to move successfully processed files to, if the destination is another S3 bucket.

Possible values:

* String.

Default value: empty string.

<h3 id="after_processing_move_uri">
  `after_processing_move_uri`
</h3>

URI of S3 bucket to move successfully processed files to, if the destination is another S3 bucket.

Possible values:

* String.

Default value: empty string.

<h3 id="after_processing_tag_key">
  `after_processing_tag_key`
</h3>

Tag key to put tagging on successfully processed files, if `after_processing='tag'`.

Possible values:

* String.

Default value: empty string.

<h3 id="after_processing_tag_value">
  `after_processing_tag_value`
</h3>

Tag value to put tagging on successfully processed files, if `after_processing='tag'`.

Possible values:

* String.

Default value: empty string.

<h3 id="keeper_path">
  `keeper_path`
</h3>

Path to the queue metadata in ZooKeeper. If not specified explicitly, ClickHouse builds the path from `s3queue_default_zookeeper_path`, the database UUID, and the table UUID. Absolute values (starting with `/`) are used as provided, while relative values are appended to the configured prefix. Macros such as `{database}` or `{uuid}` are expanded before the engine connects to ZooKeeper.

To target an auxiliary ZooKeeper cluster, prefix the value with the configured name, for example `analytics_keeper:/clickhouse/queue/orders`. The name must exist in `<auxiliary_zookeepers>`; otherwise the engine reports `Unknown auxiliary ZooKeeper name ...`. The full string (including the prefix) is preserved in `SHOW CREATE TABLE` so the statement can be replicated verbatim.

Possible values:

* String.

Default value: `/`.

<h3 id="loading_retries">
  `loading_retries`
</h3>

Retry file loading up to specified number of times. By default, there are no retries.
Possible values:

* Positive integer.

Default value: `0`.

<h3 id="processing_threads_num">
  `processing_threads_num`
</h3>

Number of threads to perform processing. Applies only for `Unordered` mode.

Default value: Number of CPUs or 16.

<h3 id="parallel_inserts">
  `parallel_inserts`
</h3>

By default `processing_threads_num` will produce one `INSERT`, so it will only download files and parse in multiple threads.
But this limits the parallelism, so for better throughput use `parallel_inserts=true`, this will allow to insert data in parallel (but keep in mind that it will result in higher number of generated data parts for MergeTree family).

<Note>
  `INSERT`s will be spawned with respect to `max_process*_before_commit` settings.
</Note>

Default value: `false`.

<h3 id="enable_logging_to_s3queue_log">
  `enable_logging_to_s3queue_log`
</h3>

Enable logging to `system.s3queue_log`.

Default value: `0`.

<h3 id="polling_min_timeout_ms">
  `polling_min_timeout_ms`
</h3>

Specifies the minimum time, in milliseconds, that ClickHouse waits before making the next polling attempt.

Possible values:

* Positive integer.

Default value: `1000`.

<h3 id="polling_max_timeout_ms">
  `polling_max_timeout_ms`
</h3>

Defines the maximum time, in milliseconds, that ClickHouse waits before initiating the next polling attempt.

Possible values:

* Positive integer.

Default value: `10000`.

<h3 id="polling_backoff_ms">
  `polling_backoff_ms`
</h3>

Determines the additional wait time added to the previous polling interval when no new files are found. The next poll occurs after the sum of the previous interval and this backoff value, or the maximum interval, whichever is lower.

Possible values:

* Positive integer.

Default value: `0`.

<h3 id="tracked_files_limit">
  `tracked_files_limit`
</h3>

Allows to limit the number of Zookeeper nodes if the 'unordered' mode is used, does nothing for 'ordered' mode.
If limit reached the oldest processed files will be deleted from ZooKeeper node and processed again.

Possible values:

* Positive integer.

Default value: `1000`.

<h3 id="tracked_file_ttl_sec">
  `tracked_file_ttl_sec`
</h3>

Maximum number of seconds to store processed files in ZooKeeper node (store forever by default) for 'unordered' mode, does nothing for 'ordered' mode.
After the specified number of seconds, the file will be re-imported.

Possible values:

* Positive integer.

Default value: `0`.

<h3 id="cleanup_interval_min_ms">
  `cleanup_interval_min_ms`
</h3>

For 'Ordered' mode. Defines a minimum boundary for reschedule interval for a background task, which is responsible for maintaining tracked file TTL and maximum tracked files set.

Default value: `10000`.

<h3 id="cleanup_interval_max_ms">
  `cleanup_interval_max_ms`
</h3>

For 'Ordered' mode. Defines a maximum boundary for reschedule interval for a background task, which is responsible for maintaining tracked file TTL and maximum tracked files set.

Default value: `30000`.

<h3 id="buckets">
  `buckets`
</h3>

For 'Ordered' mode. Available since `24.6`. If there are several replicas of S3Queue table, each working with the same metadata directory in keeper, the value of `buckets` needs to be equal to at least the number of replicas. If `processing_threads` setting is used as well, it makes sense to increase the value of `buckets` setting even further, as it defines the actual parallelism of `S3Queue` processing.

<h3 id="use_persistent_processing_nodes">
  `use_persistent_processing_nodes`
</h3>

By default S3Queue table has always used ephemeral processing nodes, which could lead to duplicates in data in case zookeeper session expires before S3Queue commits processed files in zookeeper, but after it has started processing. This setting forces the server to eliminate possibility of duplicates in case of expired keeper session.

<h3 id="persistent_processing_nodes_ttl_seconds">
  `persistent_processing_nodes_ttl_seconds`
</h3>

In case of non-graceful server termination, it is possible that if `use_persistent_processing_nodes` is enabled, we can have not removed processing nodes. This setting defines a period of time when these processing nodes can safely be cleaned up.

Default value: `3600` (1 hour).

<h2 id="s3-settings">
  S3-related settings
</h2>

Engine supports all s3 related settings. For more information about S3 settings see [here](/reference/engines/table-engines/integrations/s3).

<h2 id="s3-role-based-access">
  S3 role-based access
</h2>

The s3Queue table engine supports role-based access.
Refer to the documentation [here](/products/cloud/guides/data-sources/accessing-s3-data-securely) for steps to configure a role to access your bucket.

Once the role is configured, a `roleARN` can be passed via an `extra_credentials` parameter as shown below:

```sql theme={null}
CREATE TABLE s3_table
(
    ts DateTime,
    value UInt64
)
ENGINE = S3Queue(
                'https://<your_bucket>/*.csv',
                extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/<your_role>')
                ,'CSV')
SETTINGS
    ...
```

<h2 id="ordered-mode">
  S3Queue ordered mode
</h2>

`S3Queue` processing mode allows to store less metadata in ZooKeeper, but has a limitation that files, which added later by time, are required to have alphanumerically bigger names.

`S3Queue` `ordered` mode, as well as `unordered`, supports `(s3queue_)processing_threads_num` setting (`s3queue_` prefix is optional), which allows to control number of threads, which would do processing of `S3` files locally on the server.

For `ordered` mode without partitioning, ClickHouse may resume S3 listing from the last processed key to avoid re-listing the full prefix history. In bucketed ordered mode, the resume point is conservatively chosen as the smallest processed key across all buckets to avoid skipping unprocessed files.
This resume-listing optimization is used only for S3-backed queues in ordered mode without partitioning (not for AzureQueue or when `partitioning_mode` is set).
In addition, `ordered` mode also introduces another setting called `(s3queue_)buckets` which means "logical threads". It means that in distributed scenario, when there are several servers with `S3Queue` table replicas, where this setting defines the number of processing units. E.g. each processing thread on each `S3Queue` replica will try to lock a certain `bucket` for processing, each `bucket` is attributed to certain files by hash of the file name. Therefore, in distributed scenario it is highly recommended to have `(s3queue_)buckets` setting to be at least equal to the number of replicas or bigger. This is fine to have the number of buckets bigger than the number of replicas. The most optimal scenario would be for `(s3queue_)buckets` setting to equal a multiplication of `number_of_replicas` and `(s3queue_)processing_threads_num`.
The setting `(s3queue_)processing_threads_num` is not recommended for usage before version `24.6`.
The setting `(s3queue_)buckets` is available starting with version `24.6`.

<h2 id="select">
  SELECT from S3Queue table engine
</h2>

SELECT queries are forbidden by default on S3Queue tables. This follows the common queue pattern where data is read once and then removed from the queue. SELECT is forbidden to prevent accidental data loss.
However, sometimes it might be useful. To do this, you need to set the setting `stream_like_engine_allow_direct_select` to `True`.
The S3Queue engine has a special setting for SELECT queries: `commit_on_select`. Set it to `False` to preserve data in the queue after reading, or `True` to remove it.

<h2 id="description">
  Description
</h2>

`SELECT` is not particularly useful for streaming import (except for debugging), because each file can be imported only once. It is more practical to create real-time threads using [materialized views](/reference/statements/create/view). To do this:

1. Use the engine to create a table for consuming from specified path in S3 and consider it a data stream.
2. Create a table with the desired structure.
3. Create a materialized view that converts data from the engine and puts it into a previously created table.

When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background.

Example:

```sql theme={null}
  CREATE TABLE s3queue_engine_table (name String, value UInt32)
    ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
    SETTINGS
        mode = 'unordered';

  CREATE TABLE stats (name String, value UInt32)
    ENGINE = MergeTree() ORDER BY name;

  CREATE MATERIALIZED VIEW consumer TO stats
    AS SELECT name, value FROM s3queue_engine_table;

  SELECT * FROM stats ORDER BY name;
```

<h2 id="virtual-columns">
  Virtual columns
</h2>

* `_path` — Path to the file.
* `_file` — Name of the file.
* `_size` — Size of the file.
* `_time` — Time of the file creation.

For more information about virtual columns see [here](/reference/engines/table-engines#table_engines-virtual_columns).

<h2 id="wildcards-in-path">
  Wildcards in path
</h2>

`path` argument can specify multiple files using bash-like wildcards. For being processed file should exist and match to the whole path pattern. Listing of files is determined during `SELECT` (not at `CREATE` moment).

* `*` — Substitutes any number of any characters except `/` including empty string.
* `**` — Substitutes any number of any characters include `/` including empty string.
* `?` — Substitutes any single character.
* `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
* `{N..M}` — Substitutes any number in range from N to M including both borders. N and M can have leading zeroes e.g. `000..078`.

Constructions with `{}` are similar to the [remote](/reference/functions/table-functions/remote) table function.

<h2 id="limitations">
  Limitations
</h2>

1. Duplicated rows can be as a result of:

* an exception happens during parsing in the middle of file processing and retries are enabled via `s3queue_loading_retries`;

* `S3Queue` is configured on multiple servers pointing to the same path in zookeeper and keeper session expires before one server managed to commit processed file, which could lead to another server taking processing of the file, which could be partially or fully processed by the first server; However, this is not true since version 25.8 if `use_persistent_processing_nodes = 1`.

* abnormal server termination.

2. `S3Queue` is configured on multiple servers pointing to the same path in zookeeper and `Ordered` mode is used, then `s3queue_loading_retries` will not work. This will be fixed soon.

<h2 id="introspection">
  Introspection
</h2>

For introspection use `system.s3queue_metadata_cache` stateless table and `system.s3queue_log` persistent table.

1. `system.s3queue_metadata_cache`. This table is not persistent and shows in-memory state of `S3Queue`: which files are currently being processed, which files are processed or failed.

```sql theme={null}
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue_metadata_cache
(
    `database` String,
    `table` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` String,
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64)
    `exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.' │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```

Example:

```sql theme={null}

SELECT *
FROM system.s3queue_metadata_cache

Row 1:
──────
zookeeper_path:        /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name:             wikistat/original/pageviews-20150501-030000.gz
rows_processed:        5068534
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:31
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
```

2. `system.s3queue_log`. Persistent table. Has the same information as `system.s3queue_metadata_cache`, but for `processed` and `failed` files.

The table has the following structure:

```sql theme={null}
SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue_log
(
    `event_date` Date,
    `event_time` DateTime,
    `table_uuid` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` Enum8('Processed' = 0, 'Failed' = 1),
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64),
    `exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```

In order to use `system.s3queue_log` define its configuration in server config file:

```xml theme={null}
    <s3queue_log>
        <database>system</database>
        <table>s3queue_log</table>
    </s3queue_log>
```

Example:

```sql theme={null}
SELECT *
FROM system.s3queue_log

Row 1:
──────
event_date:            2023-10-13
event_time:            2023-10-13 13:10:12
table_uuid:
file_name:             wikistat/original/pageviews-20150501-020000.gz
rows_processed:        5112621
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:12
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception:
```
