Sistema Distribuido com Clickhouse (Cluster)¶
Objetivo¶
Criar um sistema clickhouse que faça a disribuição de consulta em tabelas entre diferentes máquinas para aumento de performance em consulta. Ao contrário de possuir um único processador carregando páginas de uma grande tabela em memória, uma por uma sequencialmente, o objetivo é que multiplos processadores possam trabalhar independentemente e paralelamente com sua fração de tabela, podendo assim diminuir o gargalo de page fault.
Links¶
Dependências¶
Clickhouse, v24.1.5.6-stable, SGDB colunar com paralelismo e distribuido.
Setup inicial de 3 nodes¶
Configure cada uma das máquinas com um hostname próprio e com devido acesso a portas no firewall.
Instale e configure um servidor de clickhouse conforme a documentação. Em seguida, altere o arquivo de configuração EM TODOS OS HOSTS normalmente localizado em /etc/clickhouse-server/config.xml
para a configuração dos nodes.
OBS: Neste tutorial não será necessário editar os arquivos em /etc/clickhouse-keeper/*
e /etc/clickhouse-client/*
Configurar nodes¶
Descomente a tag <listen_host>::</listen_host>
.
Descomente também a tag do zookeeper para que o clickhouse-keeper possa usá-lo:
<zookeeper>
<node>
<host>chnode1.domain.com</host>
<port>9181</port>
</node>
<node>
<host>chnode2.domain.com</host>
<port>9181</port>
</node>
<node>
<host>chnode3.domain.com</host>
<port>9181</port>
</node>
</zookeeper>
Finalmente, adicione a tag <keeper_server>
para que o clickhouse suba internamente um server do clickhouse-keeper (gerenciador).
IMPORTANTE: Além de alterar o hostname conforme o domínio, altere a tag <server_id>
conforme a numeração em raft_configuration
.
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<raft_logs_level>warning</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>chnode1.domain.com</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>chnode2.domain.com</hostname>
<port>9234</port>
</server>
<server>
<id>3</id>
<hostname>chnode3.domain.com</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
Testando¶
Após configurar todos os arquivos de configuração e reiniciar o serviço com systemctl restart clickhouse-server
é possível testar a conexão entre os nodes com o client do keeper clickhouse-keeper-client
. Ao rodar o comando uma nova interface será aberta, entre com ruok
para realizar a verificação, se retornar imok
então significa que está tudo certo.
Outra opção é acessar o client do servidor com clickhouse-client
e mandar o comando SELECT * FROM system.zookeeper WHERE path IN ('/', '/clickhouse')
. Caso retorne uma tabela com Clickhouse
, task_queue
e tables
então está tudo certo, caso haja um erro de configuração haverá uma mensagem de erro de conexão.
Arquiteturas¶
É possível realizar consultas distribuídas no clickhouse com a engine ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
. Porém existem diferentes maneiras de inserir dados em cada node configurado.
Table Replication¶
Realizando uma cópia completa da tabela original em cada um dos nodes, com a engine de distribuição o próprio clickhouse se da o trabalho de distribuir e orquestrar a consulta.
Embora essa seja uma opção interessante devido a alta tolerância a falhas entre os nodes (falhas de disco e conexão), possui um alto consumo de disco sendo o total de armazenamento utilizado a multiplicação entre o número de replicas e o tamanho da tabela original.
Utilizando MaterializedPostgreSQL ON CLUSTER
APENAS UMA VEZ EM UM DOS HOSTS é possível criar uma replica completa de um banco em postgres para cada um dos nodes, ao rodar o comando automaticamente todos os nodes recebem e executam o mesmo comando de criação do banco portanto todos os hosts também devem possuir acesso a máquina com postgres.
Para configurar replicações em cada node adicione a seguinte configuração ao config.xml
<remote_servers>
...
<cluster_1S_3R>
<shard>
<replica>
<host>chnode1.domain.com</host>
<port>9000</port>
</replica>
<replica>
<host>chnode2.domain.com</host>
<port>9000</port>
</replica>
<replica>
<host>chnode3.domain.com</host>
<port>9000</port>
</replica>
</shard>
</cluster_1S_3R>
</remote_servers>
!!Documentar como distribuir uma query em replicações!!
Table Sharding¶
Utilizando uma função hash, é possivel “quebrar” a tabela original em diferentes fragmentos (shards) e separá-los um para cada node do cluster.
Por conta da possível falha de um dos nodes levar a impossibilidade de consulta da tabela, esta não é uma opção viável para armazenamento de dados. Entretanto, caso os dados estejam protegidos por outro SGDB como PostgreSQL e backups frequentes, esse é um caminho interessante para processamento de dados analíticos (possivelmente um datawarehouse).
Para configurar um cluster de 3 nodos como shards adicione a seguinte configuração ao config.xml
<remote_servers>
<cluster_3S_1R>
<shard>
<replica>
<host>chnode1.domain.com</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>chnode2.domain.com</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>chnode3.domain.com</host>
<port>9000</port>
</replica>
</shard>
</cluster_3S_1R>
</remote_servers>
Reinicie os servidores, acesse o client e verifique com SHOW CLUSTERS
se está tudo funcionando.
Se estiver tudo conforme, aparecerá um registro com o nome do cluster cluster_3S_1R
Finalmente para a distribuição será necessário a criação da tabela com a engine MergeTree
(obrigatoriamente MergeTree) e a criação da mesma tabela com a engine Distributed
. Tabelas MergeTree
serão utilizadas para o armazenamento dos dados e as Distributed
, que não armazenam dados apenas interligam tabelas MergeTree
, para a distribuição tanto na inserção de dados como na consulta. Além disso, para que não seja necessário acessar todos os nodes, um a um, para a criação da tabela, é possivel utilizar a cláusula ON CLUSTER 'cluster_3S_1R'
para que os comandos sejam mandados a todos os nodes.
Criação da tabela MergeTree
:
CREATE TABLE pg_test.lineitem ON CLUSTER 'cluster_3S_1R' (
l_shipdate timestamp,
l_orderkey numeric,
l_discount numeric,
l_extendedprice numeric,
l_suppkey numeric,
l_quantity numeric,
l_returnflag character(1),
l_partkey numeric,
l_linestatus character(1),
l_tax numeric,
l_commitdate timestamp,
l_receiptdate timestamp,
l_shipmode character(10),
l_linenumber numeric,
l_shipinstruct character(25),
l_comment character varying(44) )
ENGINE = MergeTree()
ORDER BY (l_linenumber, l_orderkey)
Criação da tabela Distributed
com chave hash rand()
:
CREATE TABLE pg_test.lineitem_distrib ON CLUSTER 'cluster_3S_1R' AS pg_test.lineitem
ENGINE = Distributed(cluster_3S_1R, pg_test, lineitem, rand());
Inserção na tabela distribuida:
INSERT INTO pg_test.lineitem_distrib SELECT * FROM postgresql('host:port', 'database', 'table', 'user', 'password')
Ao fim da inserção, os dados serão distribuidos entre os nodes nas tabelas MergeTree
. Para realizar consultas distribuidas simplesmente realize a consulta nas tabelas Distributed
e o clickhouse se encarregará do resto.
IN/JOIN Distribuido¶
Para realizar JOIN’s distribuidos será necessário setar a opção distributed_product_mode
com:
SET distributed_product_mode = 'allow'
Adicionar novos hosts¶
Caso seja adicionado novos hosts e reconfigurado a arquitetura na tag <remote_servers>
será necessário refazer a base inteira, uma vez que o clickhouse apenas redireciona o mesmo comando para todos os hosts no cluster especificado.
Diagrama¶
┌───────────┐
│coordenador│
└─────┬─────┘
│
┌─────────┼─────────┐
│ │ │
┌───┴───┐ ┌───┴───┐ ┌───┴───┐
│chnode1│ │chnode2│ │chnode3│
└───────┘ └───────┘ └───────┘