top of page
Foto do escritorReginaldo Silva

Databricks - Delta Lake - Particionar ou não, eis a questão! Guia definitivo!

Fala dataholics, hoje vamos mergulhar nas profundidades do particionamento e otimização de tabelas no Delta Lake, falaremos sobre como funciona o particionamento de dados na tabela Delta e alguns dos seus prós e contras.


Conhecido como particionamento Hive-Style, não é algo novo no nosso mundo de Big Data, basicamente trata-se de distribuir os dados em pastas fisicamente diferentes para facilitar as leituras que utilizam o filtro daquela partição, irei entrar em mais detalhes a frente, a pergunta é, ainda vale a pena particionar dessa maneira? A partir de qual tamanho vale a pena?



O que veremos hoje:

  • Como funciona o particionamento

  • Por que o ZORDER não funciona na coluna do particionamento

  • ZORDER vs Particionamento - Tabela 16 bilhões de registros

  • Entendendo como a engine Delta funciona em tabelas particionadas

  • Particionamento para melhorar concorrência

  • Conclusão

 

Como funciona o particionamento?


Vamos a um exemplo bem prático para clarear as ideias.

Abaixo estamos criando um dataframe simples para gerarmos alguns exemplos.

Exemplo dos dados, informações fake de covid-19, no total temos 5165 registros.


Agora estou criando 2 tabelas no formato PARQUET com o mesmo dado, uma particionada por Country e outra sem particionamento.


Tabela sem particionamento:

Note que temos apenas 1 arquivo Parquet, devido à quantidade de registros, tudo coube em um único arquivo.

Tabela particionada por Country, note que temos 1 pasta para cada país.


A lógica é muito simples, se aplicarmos um filtro de Country na nossa query, a engine do Spark sabe qual pasta ele deve ler e consequentemente pular(data skipping) as demais, isso melhora muito a performance da query, pois, ao invés de ler todas as pastas e arquivos ele listará apenas a pasta correta, reduzindo leitura no storage.

Por outro lado, se escolhido um campo muito granular você sofrera do famoso problema de small files, isso pode prejudicar muito a performance do Spark, exemplo se você quiser fazer uma leitura da tabela inteira a performance será extremamente prejudicada, pois, ao invés de ler um único arquivo, precisará listar pasta por pasta e ler todos os arquivos, mesmo que os arquivos sejam menores isso é muito custoso, logo, tenha cuidado na escolha do campo para particionamento da tabela.

Vejamos isso na prática.


Query na tabela particionada, rodou em 0.90 segundos.


Query na tabela sem partição, rodou em 0.27 segundos.

Uai, então não vale a pena particionar? Calma, calma.

Essa tabela do exemplo é muito pequena para termos um ganho de performance, contudo, olharemos por outro angulo:


Se abrirmos as estatísticas da query sem particionamento notaremos o seguinte, ela leu 101 KB e 5165 registros, ou seja, toda a tabela (Full scan), nesse caso se trata apenas de 1 arquivo, por isso foi extremamente rápido.


Agora vejamos a tabela com particionamento, note que ela leu apenas 4.1 KB e apenas 1 registro, nesse caso foi devido a essa partição de fato só ter 1 registro.


Ou seja, em grandes proporções, a query com particionamento tende a ser muito melhor do que a primeira, pois, a primeira tende sempre a fazer uma leitura muito maior, já a segunda vai ler apenas as pastas corretas.


Vale ressaltar que mesmo em formato PARQUET pode ocorrer o data skipping, nesse exemplo da tabela sem particionamento tudo estava no mesmo arquivo, se quebrar ele em 5 ou mais arquivos, é possível que a engine evite um full scan, baseado em algumas estatísticas dos arquivos PARQUET.


Entendeu o benefício do particionamento Hive-Style? Mas será que essa mesma lógica se aplica para tabelas Delta, pois, nas tabelas Delta temos o Transaction Log que armazena informações de onde estão todos os arquivos da tabela, melhorando drasticamente a leitura em relação aos seus métodos antecessores.

 

Por que o ZORDER não funciona na coluna do particionamento?


Antes de entrar na questão da performance gostaria de mostrar uma curiosidade sobre o particionamento, você já tomou esse erro ao tentar aplicar ZORDER em uma coluna usada no particionamento? Sabe o motivo disso acontecer?


DeltaIllegalArgumentException: country is a partition column. Z-Ordering can only be performed on data columns


Para cada coluna adicionada no particionamento é criado uma estrutura de pastas aninhadas na mesma sequência em que é declarado no script, uma vez que essa coluna é fixada no nome da pasta e você vai navegando até o nível mais profundo, quando você chegar no arquivo PARQUET esse arquivo não precisar conter as colunas usadas no particionamento, uma vez que ela já está informada no caminho do arquivo, resumindo, as colunas usadas no particionamento não existem dentro do arquivo de dados final.


Note a estrutura de pastas criada:

table=PatientInfoDeltaParticionada/

country=Canada/

province=Seoul/

city=Jongno-gu/

sex=female/

O arquivo fica no último nível, agora como podemos comprovar que essas colunas não estão no arquivo de fato?


Copiarei esse arquivo PARQUET para pasta raiz e leremos diretamente o arquivo.

Note que os campos Country, Province, City e Sex não estão dentro desse arquivo PARTQUET, logo se você copiar esse arquivo para fora da pasta particionada ele perde essas colunas e não fará mais sentido.

Obs: Estou fazendo a leitura do arquivo usando SQL, através do hint parquet.`caminho`


Reginaldo, então se eu colocar todas as colunas da tabela no particionamento, não teremos arquivos PARQUET no último nível? Na teoria seria isso, mas a engine não permite colocar todos os campos, pelo menos 1 campo precisa ficar fora do particionamento, logo sempre teremos arquivos no último nível.


Isso explica o motivo de você não conseguir rodar um ZORDER nas colunas do particionamento, pois, fisicamente elas não existem, elas estão expostas no caminho da pasta com seus respectivos valores, capitou essa?

 

ZORDER vs Particionamento


Em linhas gerais ambas técnicas possibilitam pular (data skipping) arquivos e não realizar uma leitura massiva de dados, a aplicação e funcionamento diferem, mas são focados para melhorar a performance no geral.


Contudo, o modelo de particionamento nasceu bem antes do Delta Lake, naquela época ele fazia muito mais sentido do que nos tempos atuais, com o Delta Lake muitas melhorias foram aplicadas como remover a operação de List do storage, sem o Delta a engine o Spark precisa fazer uma operação de list nas pastas antes de ler os arquivos, isso é extremamente custoso para tabelas grandes. Com o Delta Lake temos o transaction log que armazena o endereço de todos os arquivos da versão atual da tabela e também dos arquivos removidos (time travel), evitando a necessidade de uma operação List custosa ele faz um GET direto nos arquivos necessários, esses metadados trouxeram uma performance gigantesca para o Delta Lake em relação a estruturas como tabelas em formato PARQUET.


Com técnicas de compactação e ordenação de dados o uso do particionamento pode ser tornar irrelevante para a maioria dos casos, sendo a recomendação inicial para pensar em particionamento a partir de 1 TB de dados, menos do que isso as técnicas com OPTIMIZE e ZORDER (Liquid clustering) podem ser fortes aliados.


Exemplo de dados no Transaction Log:

Se você é um explorador do Transaction Log já está familiarizado com algumas informações encontradas la dentro, basicamente para cada operação(INSERT, UPDATE, DELETE, etc) na sua tabela a engine gera um JSON (log) informando tudo o que aconteceu naquela operação exemplo, informando quais arquivos foram adicionados e arquivos marcados como removidos da tabela, esses marcados como removidos, não são excluídos automaticamente, fica disponível para Time Travel até que uma operação de vacuum seja executada.


No bloco de arquivos adicionados encontramos algumas informações interessantes como o nome do arquivo, tamanho, data e hora da ocorrência e estatísticas do arquivo.


Dentro desse campo estatísticas temo 3 informações muito relevantes, como:

  • numRecords - Quantidade de registro dentro daquele arquivo

  • minValues - O menor valor para cada coluna

  • minValues - O maior valor para cada coluna

Com base nessas informações, a engine pode listar apenas os arquivos necessários para aquela leitura, pulando arquivos desnecessários, mesmo sem a tabela estar particionada.


Vamos a um caso prático bem interessante, vou utilizar uma tabela de 100 GB e 16 bilhões de registros, rodando queries nela através do particionamento, outra sem usar particionamento e clonar essa tabela removendo o particionamento, mas, aplicando o ZORDER.


Teremos essas 2 tabelas:

  • logs2: Tabela sem particionamento, aplicado OPTIMIZE e ZORDER pelo campo ticket_id

  • logs_particionada: Tabela com particionamento pelos campos de ano, mes e dia

Ambas possuem os mesmos dados, exceto que a logs_particionada continua recebendo modificações e pode conter alguns registros a mais, mas, não afeta os testes.


Tabela sem particionamento:


Tabela particionada:

Note que ela possui muito mais arquivos do que a tabela sem particionamento, justamente pelo fato de não poder compactar mais os arquivos por estarem distribuídos em pastas diferentes.


Quantidade de registro total:

Note que ambas possuem mais de 16 bilhões, no entanto, a particionada um pouco mais devido ela ser a tabela produtiva e a outra ser uma cópia.

Temos uma quantidade de registros bem expressiva, depende muito da empresa e ambiente para chegar em volumetrias acima de 1 bilhão de registro.


Query com filtro de particionamento:

Agora farei o filtro de 1x ticket_id especifico, contudo, como sei exatamente a partição desse registro informarei ela na query:

Tempo de execução: 11 segundos, a engine utilizou a estrutura de pastas no Storage a seu favor.


Query sem filtro de particionamento:

Usando apenas o filtro pelo Ticket_id.

Tempo de execução: 3 minutos e 20 segundos.

Um tempo muito mais elevado, isso pelo fato da tabela estar particionada e não estarmos informando as partições na query.

Mesmo sendo uma tabela Delta, usando os metadados no Transaction Log a engine precisa ler muitos arquivos nessa operação, pois os dados podem estar em várias partições\arquivos diferentes, isso ocorre, pois, vários arquivos podem conter um range de IDs parecidos e também precisam ser lidos para garantir a integridade.

Se fosse uma tabela PARQUET, a engine faria uma leitura muito mais agressiva e demoraria consideravelmente mais.


Query na tabela sem particionamento e com ZORDER:

Estamos usando o mesmo filtro das queries anteriores.

Tempo de execução: 8 segundos


Qual a mágica por detrás dos panos?

Como já mencionei anteriormente os metadados possuem informações muito importantes e a engine Delta faz bom proveito disso, vamos fazer uma engenharia reversa para esse caso lendo o arquivo de Log da tabela sem particionamento e com ZORDER.


Na query abaixo utilizei SQL para ler o arquivo JSON do log e expandir as estatísticas dessa tabela, buscando arquivos que possuam aquele range de ID que estou buscando.

Veja bem a imagem acima, estou buscando dentro do Transaction Log todos os arquivos de dados que possui o range do ticket_id 22334863, ele me retornou apenas 1 arquivo, ou seja, em uma tabela de 16 bilhões de registros sem particionamento, uma query que poderia custar muito, irá ler apenas 1 arquivo de dados.


Quer confirmar se acertamos o arquivo?

Usando a função input_file_name() conseguimos saber em qual arquivo se encontra aquele registro e para nossa "surpresa" é o mesmo arquivo: part-00422-3a46ba11-3e72-40b2-ae83-c974f6c48864-c000.snappy.parquet

Obs: Note que essa segunda execução foi até mais rápido, pois, ja estava em cache.


Quantidade de dados lido na tabela SEM particionamento: 152 MB


Quantidade de dados lido na tabela COM particionamento usando filtro do particionamento: 419 MB


A query na tabela particionada sem o filtro de particionamento leu mais de 20 GB, isso demostra que para esse caso a partição é muito pior, uma vez que nem sempre sabemos a partição do dados que queremos consultar.

A query na tabela de 16 milhões de registros sem particionamento foi mais rápida e menos custosa do que na tabela particionada usando os filtros de partição, os filtros e o cluster utilizados foram os mesmos.


Em resumo, a compactação de arquivos em tabelas sem partição + dados ordenados (e estatísticas mais precisas) melhoram tanto a performance que fazem ela ser melhor do que tabelas particionadas.

Tabelas particionadas podem gerar o famoso small file problem, em linhas gerais a engine do Spark se dá melhor em ler apenas 1x arquivo de 1 GB do que 1000x arquivos que somam 1 GB, tabelas particionadas podem causar esse efeito.


Você também pode aplicar o ZORDER nas tabelas particionadas, mas o efeito não será tão eficiente quanto uma tabela sem particionamento, pois, ele ordenará os dados na partição e pode acontecer de muitas partições possuírem um range de IDs próximos, fazendo a engine ler mais do que era necessário em buscas ou joins por ID.


Um dos benefícios de particionar tabelas muito grandes (> 1 TB) é você poder rodar o OPTMIZE com filtro, ou seja, otimizar somente as partições mais recentes, deixando a operação muito mais rápida, exemplo:

OPTIMIZE events
WHERE date >= current_timestamp()-INTERVAL 1 day
ZORDER BY(eventType)

Espero que essa parte tenha ficado clara, obviamente não da pra explicar toda a engine Delta num único post, mas se você já vem acompanhando por aqui, em outros posts já falamos bastante sobre o funcionamento da engine Delta Lake.


Depois desse post, você entenderá que sair criando partição padrão (default) para todas as tabelas do seu Delta Lake NÃO é nem de longe uma boa prática, pare de particionar à toa a partir de agora, analise cada tabela com cuidado e claro, estude a engine para entender cada vez mais.

 

Particionamento para melhorar concorrência


Uma das funcionalidades do Particionamento, além da performance em alguns cenários de tabelas muito grandes, é o controle de concorrência, às vezes sua tabela não é tão grande, mas tem a necessidade de receber escritas em paralelo, porém, você se depara com o erro a seguir.


Deixarei em uma aba um Update sem Where rodando constantemente.


Agora na segunda aba note que as leituras funcionam normalmente.


Já as escritas temos o seguinte erro.

Files were added to the root of the table by a concurrent update. Please try the operation again


Isso acontece devido ao nível de isolamento que o Databricks utiliza para as tabelas Delta, por default ele trabalha nesse modelo:

Escritas: WriteSerializable - Garante que somente 1 processo esta gravando na tabela por vez, contudo não há controle de Locks, caso isso aconteça um dos processos tomará um erro.

Isso vale para processos que estão gravando no mesmo conjunto de dados.

Para evitar esse caso, usar o particionamento é indicado.


Leituras: Snapshot - Lê a última versão comitada, não há bloqueios, contudo, você pode estar vendo uma versão anterior do dado, pois se uma escrita foi gravada após o início da sua leitura, você não vera essa informação.

Obs: O Databricks possui um nível mais forte de isolamento que é o Serializable, nesse nível as leituras irão tomar erro se uma gravação paralela acontecer.


Para resolver esse problema particionaremos essa tabela pelo campo sex:


Agora rodaremos Updates concorrentes novamente, porém, dessa vez usando a coluna de particionamento para não conflitar as operações:


Nesse loop atualizaremos a partição 'Male'.


No segundo Loop atualizaremos a partição 'Female' e não teremos mais conflitos.


Assim você resolve o problema de escritas concorrentes, embora, mais uma vez tome cuidado, pois, se você particionar por um campo ID, você nunca sofrerá de conflito de escrita, por outro lado, sua performance será completamente degradada devido ao small file problem, não existe almoço grátis, estude cada caso.


 

Conclusão


Em resumo, vimos que o particionamento foi um grande aliado no passado, antes do Delta Lake existir ele era o rei do pedaço, embora com a chegada do Delta o particionamento torna-se irrelevante para a maioria dos casos, na verdade, ele pode piorar sua performance quando comparado a uma tabela sem particionamento com uma rotina de OPTIMIZE e ZORDER (Liquid cluster) bem aplicada.


Como mencionei, não existe almoço grátis e cada caso é um caso, o seu ambiente é único e diferente de todos os benchmarks, não acredite em tudo que vê ou lê por aí, teste e avalie se cabe para o seu ambiente, mas, vimos que o Particionamento já não é tão relevante quando usamos Delta, então avalie com cuidado e faça testes, use como base a recomendação inicial de 1 TB para considerar particionar suas tabelas.


Se for particionar tome cuidado com o problema de muitos arquivos pequenos, selecione bem as colunas e você pode também utilizar OPTIMIZE e ZORDER(em outra coluna claro.) em conjunto para melhorar a performance.


Fica a reflexão para você, particionar ou não particionar, eis a questão!

Leia também os posts relacionados, pois, tem muito mais detalhes sobre o Transaction Log.


Fique bem e até a próxima.


Link para scripts:


Referências:




1.340 visualizações1 comentário
Post: Blog2 Post
bottom of page