Confluent HTTP Sink Connector
HTTP Sink Connector является независимым от типа данных и, следовательно, не требует схемы Kafka, а также поддерживает специфичные для ClickHouse типы данных, такие как Maps и Arrays. Эта дополнительная гибкость требует немного больше усилий при настройке.
Ниже мы опишем простую установку, получая сообщения из одной темы Kafka и вставляя строки в таблицу ClickHouse.
HTTP Connector распространяется под лицензией Confluent Enterprise.
Быстрые шаги
1. Соберите свои данные для подключения
To connect to ClickHouse with HTTP(S) you need this information:
-
The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.
-
The DATABASE NAME: out of the box, there is a database named
default
, use the name of the database that you want to connect to. -
The USERNAME and PASSWORD: out of the box, the username is
default
. Use the username appropriate for your use case.
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl
command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
2. Запустите Kafka Connect и HTTP Sink Connector
У вас есть два варианта:
-
Самоуправление: Скачайте пакет Confluent и установите его локально. Следуйте инструкциям по установке коннектора, описанным здесь. Если вы используете метод установки confluent-hub, ваши локальные файлы конфигурации будут обновлены.
-
Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для хостинга Kafka. Это требует, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.
Следующие примеры используют Confluent Cloud.
3. Создайте целевую таблицу в ClickHouse
Перед тестированием подключения давайте начнем с создания тестовой таблицы в ClickHouse Cloud, эта таблица будет получать данные из Kafka:
4. Настройте HTTP Sink
Создайте тему Kafka и экземпляр HTTP Sink Connector:

Настройте HTTP Sink Connector:
- Укажите имя темы, которую вы создали
- Аутентификация
HTTP Url
- URL ClickHouse Cloud с указанным запросомINSERT
<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow
. Примечание: запрос должен быть закодирован.Endpoint Authentication type
- BASICAuth username
- имя пользователя ClickHouseAuth password
- пароль ClickHouse
Этот HTTP Url подвержен ошибкам. Убедитесь, что кодирование точное, чтобы избежать проблем.

- Конфигурация
Input Kafka record value format
зависит от ваших исходных данных, но в большинстве случаев это JSON или Avro. Мы предполагаемJSON
в следующих настройках.- В разделе
advanced configurations
:HTTP Request Method
- Установите на POSTRequest Body Format
- jsonBatch batch size
- Согласно рекомендациям ClickHouse, установите это на не менее 1000.Batch json as array
- trueRetry on HTTP codes
- 400-500, но адаптируйте по необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.Maximum Reties
- значение по умолчанию (10) подходит, но можете настроить для более надежных повторов.

5. Тестирование подключения
Создайте сообщение в теме, настроенной вашим HTTP Sink

и проверьте, что созданное сообщение было записано в ваш экземпляр ClickHouse.
Устранение неполадок
HTTP Sink не объединяет сообщения
Согласно документации Sink:
HTTP Sink connector не объединяет запросы для сообщений, содержащих различные значения заголовков Kafka.
- Убедитесь, что ваши записи Kafka имеют одинаковый ключ.
- Когда вы добавляете параметры к URL API HTTP, каждая запись может привести к уникальному URL. По этой причине слияние отключено, когда используются дополнительные параметры URL.
400 Bad Request
CANNOT_PARSE_QUOTED_STRING
Если HTTP Sink не удается выполнить вставку JSON-объекта в колонку String
с сообщением:
Установите настройку input_format_json_read_objects_as_strings=1
в URL в виде закодированной строки SETTINGS%20input_format_json_read_objects_as_strings%3D1
Загрузите набор данных GitHub (необязательно)
Обратите внимание, что этот пример сохраняет массивные поля набора данных Github. Мы предполагаем, что у вас есть пустая тема github в примерах и вы используете kcat для вставки сообщений в Kafka.
1. Подготовьте конфигурацию
Следуйте этим инструкциям для настройки Connect в зависимости от вашего типа установки, учитывая различия между автономным и распределенным кластером. Если вы используете Confluent Cloud, актуальна распределенная настройка.
Самый важный параметр - это http.api.url
. HTTP интерфейс для ClickHouse требует, чтобы вы закодировали оператор INSERT в качестве параметра в URL. Это должно включать формат (JSONEachRow
в данном случае) и целевую базу данных. Формат должен соответствовать данным Kafka, которые будут преобразованы в строку в HTTP полезной нагрузке. Эти параметры должны быть URL закодированы. Пример этого формата для набора данных Github (предполагая, что вы запускаете ClickHouse локально) показан ниже:
Следующие дополнительные параметры актуальны для использования HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:
request.method
- Установите на POSTretry.on.status.codes
- Установите на 400-500, чтобы повторить при любых ошибках. Уточните в зависимости от ожидаемых ошибок в данных.request.body.format
- В большинстве случаев это будет JSON.auth.type
- Установите на BASIC, если у вас есть безопасность с ClickHouse. Другие совместимые с ClickHouse механизмы аутентификации в данный момент не поддерживаются.ssl.enabled
- установите на true, если используете SSL.connection.user
- имя пользователя для ClickHouse.connection.password
- пароль для ClickHouse.batch.max.size
- Количество строк, отправляемых в одном пакете. Убедитесь, что это значение установлено на достаточно большое число. Согласно рекомендациям ClickHouse, значением 1000 следует считать минимум.tasks.max
- HTTP Sink connector поддерживает выполнение одной или нескольких задач. Это может использоваться для увеличения производительности. Вместе с размером пакета это ваши основные способы улучшения производительности.key.converter
- установите в зависимости от типов ваших ключей.value.converter
- установите в зависимости от типа данных в вашей теме. Эти данные не требуют схемы. Формат здесь должен соответствовать формату, указанному в параметреhttp.api.url
. Проще всего использовать JSON и конвертер org.apache.kafka.connect.json.JsonConverter. Также возможно рассматривать значение как строку, используя конвертер org.apache.kafka.connect.storage.StringConverter, хотя это потребует от пользователя извлечения значения в операторах вставки с использованием функций. Также поддерживается формат Avro в ClickHouse, если используется конвертер io.confluent.connect.avro.AvroConverter.
Полный список настроек, включая то, как настроить прокси, повторы и расширенные настройки SSL, можно найти здесь.
Примеры файлов конфигурации для образца данных Github можно найти здесь, предполагая, что Connect запускается в автономном режиме, а Kafka хостится в Confluent Cloud.
2. Создайте таблицу ClickHouse
Убедитесь, что таблица создана. Пример для минимального набора данных github, используя стандартный MergeTree, показан ниже.
3. Добавьте данные в Kafka
Вставьте сообщения в Kafka. Ниже мы используем kcat для вставки 10k сообщений.
Простое чтение из целевой таблицы "Github" должно подтвердить вставку данных.