Перейти к основному содержимому
Перейти к основному содержимому

Двигатель таблиц Distributed

осторожно

Чтобы создать движок таблиц Distributed в облаке, вы можете использовать функции таблиц remote и remoteSecure. Синтаксис Distributed(...) не может быть использован в ClickHouse Cloud.

Таблицы с движком Distributed не хранят собственные данные, но позволяют выполнять распределенную обработку запросов на нескольких серверах. Чтение автоматически параллелизуется. Во время чтения используются индексы таблицы на удалённых серверах, если они есть.

Создание таблицы

Из таблицы

Когда таблица Distributed указывает на таблицу на текущем сервере, вы можете принять схему этой таблицы:

Параметры Distributed

cluster

cluster - имя кластера в конфигурационном файле сервера

database

database - имя удаленной базы данных

table

table - имя удаленной таблицы

sharding_key

sharding_key - (по желанию) ключ шардирования

Указание sharding_key необходимо для следующего:

  • Для INSERT в распределенную таблицу (так как движок таблицы нуждается в sharding_key, чтобы определить, как разделить данные). Однако, если настройка insert_distributed_one_random_shard включена, то INSERT не нуждаются в ключе шардирования.
  • Для использования с optimize_skip_unused_shards, так как sharding_key необходим для определения, какие шары должны быть запрошены.

policy_name

policy_name - (по желанию) имя политики, оно будет использоваться для хранения временных файлов для фоновой отправки.

Смотрите также

Настройки Distributed

fsync_after_insert

fsync_after_insert - выполнить fsync для данных файла после фоновой вставки в Distributed. Гарантирует, что ОС записала все введенные данные на диск инициаторного узла.

fsync_directories

fsync_directories - выполнить fsync для каталогов. Гарантирует, что ОС обновила метаданные каталога после операций, связанных с фоновой вставкой в распределенной таблице (после вставки, после отправки данных на шары и т.д.).

skip_unavailable_shards

skip_unavailable_shards - Если true, ClickHouse безмолвно пропускает недоступные шары. Шар помечается как недоступный, когда: 1) Шар не может быть достигнут из-за сбоя соединения. 2) Шар невозможно разрешить через DNS. 3) Таблица не существует на шаре. По умолчанию false.

bytes_to_throw_insert

bytes_to_throw_insert - если больше этого количества сжатых байтов будет ожидать фоновой вставки, произойдёт исключение. 0 - не выбрасывать. По умолчанию 0.

bytes_to_delay_insert

bytes_to_delay_insert - если больше этого количества сжатых байтов будет ожидать фоновой вставки, запрос будет задержан. 0 - не задерживать. По умолчанию 0.

max_delay_to_insert

max_delay_to_insert - максимальная задержка вставки данных в распределенную таблицу в секундах, если имеется много ожидающих байтов для фоновой отправки. По умолчанию 60.

background_insert_batch

background_insert_batch - то же самое, что и distributed_background_insert_batch

background_insert_split_batch_on_failure

background_insert_split_batch_on_failure - то же самое, что и distributed_background_insert_split_batch_on_failure

background_insert_sleep_time_ms

background_insert_sleep_time_ms - то же самое, что и distributed_background_insert_sleep_time_ms

background_insert_max_sleep_time_ms

background_insert_max_sleep_time_ms - то же самое, что и distributed_background_insert_max_sleep_time_ms

flush_on_detach

flush_on_detach - Сбросить данные на удаленные узлы при DETACH/DROP/выключении сервера. По умолчанию true.

примечание

Настройки надёжности (fsync_...):

  • Влияют только на фоновые вставки (т.е. distributed_foreground_insert=false), когда данные сначала хранятся на диске инициаторного узла, а затем в фоне отправляются на шары.
  • Могут значительно снизить производительность вставок.
  • Влияют на запись данных, хранящихся внутри папки распределенной таблицы в узел, который принял вашу вставку. Если вам нужны гарантии записи данных в подлежащие таблицы MergeTree - смотрите настройки надёжности (...fsync...) в system.merge_tree_settings.

Для Настроек предела вставок (..._insert) смотрите также:

  • Настройка distributed_foreground_insert
  • Настройка prefer_localhost_replica
  • bytes_to_throw_insert обрабатывается перед bytes_to_delay_insert, так что вы не должны устанавливать его меньше, чем bytes_to_delay_insert.

Пример

Данные будут считываться со всех серверов в кластере logs, из таблицы default.hits, расположенной на каждом сервере в кластере. Данные не только считываются, но и частично обрабатываются на удалённых серверах (насколько это возможно). Например, для запроса с GROUP BY данные будут агрегистрироваться на удалённых серверах, и промежуточные состояния агрегатных функций будут отправляться на сервер запрашивающего. Затем данные будут дополнительно агрегироваться.

Вместо имени базы данных вы можете использовать константное выражение, которое возвращает строку. Например: currentDatabase().

Кластеры

Кластеры настраиваются в конфигурационном файле сервера:

Здесь определён кластер с именем logs, который состоит из двух шардов, каждый из которых содержит две реплики. Шар относится к серверам, которые содержат различные части данных (для чтения всех данных вы должны получить доступ ко всем шарам). Реплики - это дублирующие серверы (для чтения всех данных вы можете получить доступ к данным на любой из реплик).

Имена кластеров не должны содержать точки.

Параметры host, port и, по желанию, user, password, secure, compression, bind_host указаны для каждого сервера:

  • host – Адрес удаленного сервера. Вы можете использовать либо доменное имя, либо IPv4 или IPv6 адрес. Если вы указываете доменное имя, сервер выполняет DNS-запрос, когда он запускается, а результат сохраняется, пока сервер работает. Если DNS-запрос не удаётся, сервер не запускается. Если вы измените DNS-запись, перезапустите сервер.
  • port – TCP порт для активности мессенджера (tcp_port в конфигурации, обычно установлен на 9000). Не путать с http_port.
  • user – Имя пользователя для подключения к удалённому серверу. Значение по умолчанию - пользователь default. Этот пользователь должен иметь доступ к подключению к указанному серверу. Доступ настраивается в файле users.xml. Для получения дополнительной информации смотрите раздел Права доступа.
  • password – Пароль для подключения к удаленному серверу (не скрыт). Значение по умолчанию: пустая строка.
  • secure - Использовать ли защищенное SSL/TLS соединение. Обычно также требует указания порта (порт по умолчанию для безопасности 9440). Сервер должен слушать на <tcp_port_secure>9440</tcp_port_secure> и быть настроен с правильными сертификатами.
  • compression - Использовать сжатие данных. Значение по умолчанию: true.
  • bind_host - Исходный адрес, который следует использовать при подключении к удалённому серверу с этого узла. Поддерживается только IPv4 адрес. Предназначен для использования в сложных сценариях развертывания, когда необходимо установить исходный IP-адрес, используемый запросами ClickHouse.

При указании реплик одна из доступных реплик будет выбрана для каждого из шардов при чтении. Вы можете настроить алгоритм балансировки нагрузки (предпочтение для доступа к какой реплике) – см. настройку load_balancing. Если соединение с сервером не установлено, будет предпринята попытка подключения с коротким временем ожидания. Если соединение не удалось, будет выбрана следующая реплика и так далее для всех реплик. Если попытка соединения не удалась для всех реплик, эта попытка будет повторена тем же образом несколько раз. Это способствует устойчивости, но не обеспечивает полной отказоустойчивости: удаленный сервер может принять соединение, но может не работать или работать плохо.

Вы можете указать только один из шардов (в этом случае обработка запроса должна называться удаленной, а не распределенной) или любое количество шардов. В каждом шаре вы можете указать от одной до любого количества реплик. Вы можете указать разное количество реплик для каждого шара.

Вы можете указать столько кластеров, сколько требуется в конфигурации.

Чтобы просмотреть свои кластеры, используйте таблицу system.clusters.

Двигатель Distributed позволяет работать с кластером так же, как с локальным сервером. Однако конфигурацию кластера нельзя указывать динамически, она должна быть настроена в конфигурационном файле сервера. Обычно все серверы в кластере будут иметь одинаковую конфигурацию кластера (хотя это не требуется). Кластеры из конфигурационного файла обновляются на лету, без перезапуска сервера.

Если вам нужно отправлять запрос к неизвестному набору шардов и реплик каждый раз, вам не нужно создавать таблицу Distributed – используйте функция таблицы remote вместо этого. См. раздел Функции таблиц.

Запись данных

Существует два метода записи данных в кластер:

Во-первых, вы можете определить, на какие серверы записывать какие данные, и выполнять запись непосредственно на каждом шаре. Другими словами, выполняйте прямые операторы INSERT на удаленных таблицах в кластере, на который указывает таблица Distributed. Это наиболее гибкое решение, так как вы можете использовать любую схему шардирования, даже ту, которая является нетривиальной из-за требований предметной области. Это также наиболее оптимальное решение, поскольку данные могут записываться на разные шары полностью независимо.

Во-вторых, вы можете выполнять операторы INSERT на таблице Distributed. В этом случае таблица самостоятельно распределит вставленные данные между серверами. Для записи в таблицу Distributed необходимо, чтобы был настроен параметр sharding_key (если шардов больше одного).

Каждый шар может иметь <weight>, определенный в конфигурационном файле. Значение по умолчанию - 1. Данные распределяются между шарами по количеству, пропорциональному весу шара. Все веса шардов суммируются, затем вес каждого шара делится на общий, чтобы определить пропорцию каждого шара. Например, если имеется два шара, и первый имеет вес 1, а второй имеет вес 2, первый получит одну треть (1 / 3) вставленных строк, а второй получит две трети (2 / 3).

Каждый шар может иметь параметр internal_replication, определенный в конфигурационном файле. Если этот параметр установлен в true, операция записи выбирает первую здоровую реплику и записывает данные в неё. Используйте это, если таблицы, лежащие в основе таблицы Distributed, являются реплицируемыми таблицами (например, любая из движков таблиц Replicated*MergeTree). Одна из реплик таблицы получит запись, и она будет автоматически реплицирована на другие реплики.

Если internal_replication установлен в false (по умолчанию), данные записываются во все реплики. В этом случае таблица Distributed сама реплицирует данные. Это хуже, чем использование реплицируемых таблиц, потому что консистентность реплик не проверяется и со временем они будут содержать немного разные данные.

Чтобы выбрать шар, в который отправляется строка данных, анализируется выражение шардирования, и берётся его остаток от деления на общий вес шардов. Затем строка отправляется в тот шар, который соответствует полупромежутку остатков от prev_weights до prev_weights + weight, где prev_weights - это общий вес шардов с наименьшим номером, а weight - это вес этого шара. Например, если имеется два шара, и первый имеет вес 9, тогда как второй имеет вес 10, строка будет отправлена в первый шар для остатков из диапазона [0, 9), а во второй для остатков из диапазона [9, 19).

Выражение шардирования может быть любым выражением из констант и столбцов таблицы, которое возвращает целое число. Например, вы можете использовать выражение rand() для случайного распределения данных или UserID для распределения по остатку от деления ID пользователя (в этом случае данные одного пользователя будут находиться на одном шаре, что упрощает выполнение IN и JOIN по пользователям). Если один из столбцов распределен недостаточно равномерно, вы можете обернуть его в хеш-функцию, например, intHash64(UserID).

Простое деление по остаткам - это ограниченное решение для шардирования и не всегда подходит. Оно работает для средних и больших объемов данных (десятки серверов), но не для очень больших объемов данных (сотни серверов и более). В последнем случае используйте схему шардирования, необходимую для предметной области, а не записи в таблицах Distributed.

Вы должны беспокоиться о схеме шардирования в следующих случаях:

  • Используются запросы, которые требуют объединения данных (IN или JOIN) по определенному ключу. Если данные шардированы по этому ключу, вы можете использовать локальный IN или JOIN, вместо GLOBAL IN или GLOBAL JOIN, что в разы эффективнее.
  • Используется большое количество серверов (сотни или более) с множеством мелких запросов, например, запросов на данные отдельных клиентов (например, веб-сайты, рекламодатели или партнёры). Чтобы мелкие запросы не затрагивали весь кластер, имеет смысл размещать данные для одного клиента на одном шаре. В качестве альтернативы можно настроить двухуровневое шардирование: разделить весь кластер на "слои", где слой может состоять из нескольких шардов. Данные для одного клиента находятся на одном слое, но к слою могут добавляться шары по мере необходимости, и данные распределяются случайным образом внутри них. Таблицы Distributed создаются для каждого слоя, а одна общая распределенная таблица создается для глобальных запросов.

Данные записываются в фоне. При вставке в таблицу блок данных просто записывается в локальную файловую систему. Данные отправляются на удаленные серверы в фоне, как можно скорее. Периодичность отправки данных регулируется настройками distributed_background_insert_sleep_time_ms и distributed_background_insert_max_sleep_time_ms. Двигатель Distributed отправляет каждый файл с вставленными данными отдельно, но вы можете включить пакетную отправку файлов с помощью настройки distributed_background_insert_batch. Эта настройка улучшает производительность кластера, более эффективно используя ресурсы локального сервера и сети. Вы должны проверить, что данные отправляются успешно, проверив список файлов (данные, ожидающие отправки) в каталоге таблицы: /var/lib/clickhouse/data/database/table/. Число потоков, выполняющих задачи в фоне, может быть установлено с помощью настройки background_distributed_schedule_pool_size.

Если сервер перестал существовать или произошел грубый перезапуск (например, из-за аппаратного сбоя) после вставки в таблицу Distributed, вставленные данные могут быть потеряны. Если в каталоге таблицы обнаружена повреждённая часть данных, она перемещается в подкаталог broken и больше не используется.

Чтение данных

При запросе таблицы Distributed запросы SELECT отправляются на все шары и работают независимо от того, как данные распределены по шарам (они могут быть распределены совершенно случайно). Когда вы добавляете новый шар, вам не нужно переносить старые данные в него. Вместо этого вы можете записывать новые данные в него, используя более высокий вес – данные будут распределены немного неравномерно, но запросы будут работать корректно и эффективно.

Когда опция max_parallel_replicas активирована, обработка запросов параллелизуется по всем репликам в пределах одного шара. Для больше информации ознакомьтесь с разделом max_parallel_replicas.

Чтобы узнать больше о том, как обрабатываются распределенные запросы in и global in, обратитесь к этой документации.

Виртуальные столбцы

_shard_num

_shard_num — Содержит значение shard_num из таблицы system.clusters. Тип: UInt32.

примечание

Поскольку функции таблиц remote и cluster внутренне создают временную таблицу Distributed, _shard_num также доступен там.

Смотрите также