top of page
Foto do escritorReginaldo Silva

Databricks - Count(*) vs Count(1)? Como um Count em 100 Bilhões roda em menos de 1 segundo?

Fala dataholics, hoje trago algumas curiosidades da engine Delta, desde que trabalho com dados existe a lenda do Count(1) é mais rápido do que o Count(*), você já ouviu essa também?


Lembro que quando trabalhava como DBA cheguei a escrever um post sobre isso no SQL Server, é bem antigo (#oldbutgold):

Eai o que você acha que é mais rápido? Nesse post veremos isso aqui:

  • Diferenças entre COUNT(*) vs COUNT(1) no Spark

  • Como observar o plano de execução da sua query

  • Como um Count(*) em 100 Bilhões de registros roda em menos de 1 segundo

  • Detalhes sobre a delta engine



 

Bom a resposta para essa pergunta é bem trivial, a performance é a mesma!



Mas, qual motivo de serem a mesma coisa?


Para entender melhor, vamos rodar o comando EXPLAIN para mostrar qual o plano de execução criado para executar essa consulta.

De forma simples, qualquer comando que executamos no Spark é gerado um plano de execução para atingir aquele objetivo, esse plano de execução é baseado nas estatísticas e histogramas disponíveis nos Logs da tabela Delta, com base nessas informações o Spark tenta montar um plano de acesso aos dados que seja eficiente para te entregar os dados no menor tempo possível, sempre existem várias formas de executar a mesma query, então esse é um dos trabalhos mais nobres da engine, otimizar seu plano de execução.

A minha ideia aqui não é explicar o funcionamento do otimizador, até porque ele é bem complexo para um post.


Mas seguindo nossa linha do COUNT(*), veja mais de perto o plano de execução dele:

WTF? Você não esta vendo errado, é isso mesmo, o otimizador troca seu COUNT(*) para COUNT(1), ou seja, eles são o mesmo.


Diferente do SQL Server que troca o COUNT(1) por COUNT(*), aqui é efetuado o inverso, porém, é o mesmo efeito.


Com isso sabemos que o COUNT(*) vs COUNT(1) também é apenas um MITO no Spark.


 

Mas não acaba por aqui, vamos um pouco mais além sobre a DELTA Engine hoje, como um COUNT(*) de 100 bilhões de linhas executa em milissegundos no cluster mais básico de Databricks?

Só para curiosidade, depois faz um COUNT(*) no seu banco de dados OLTP (SQL Server, Oracle...) veja em quanto tempo ele retorna essa operação em tabelas acima de 1 bilhão.


100 Bilhões de linhas em menos de 1 segundo, é até difícil de contar esse numero rsrs.

Obs: Criei uma view para omitir o nome original da tabela.


Na teoria o COUNT() deveria varrer toda minha tabela para contar os registros dentro de cada arquivo PARQUET certo? Essa tabela do print tem 7 TB de dados!


Poxa, Reginaldo, isso é moleza para o Databricks, ele foi feito pra isso né? Processamento em larga escala de dados.

Sim, de fato, o Spark é feito para processamentos de dados em larga escala, mas com a Delta Engine é mesmo necessário varrer esses 7TB? Vou precisar escalar um cluster gigante, gastar dinheiro só para fazer um count(*)?


A resposta é não, como eu disse, essa query em 100 bilhões de registro rodei no menor cluster, 1x node 14 GB de RAM e 4 Cores, então qual é a mágica?


Desde dezembro de 2022 subiu uma release da Delta Engine para otimização dessa operação, você pode conferir nessa Pull Request:


A partir do Delta 2.2.0 temos esse recurso rodando, talvez você nem tenha percebido, claro que tem outras otimizações da engine que também melhoraram muito a performance do Databricks, em geral.


 

Vamos um pouco mais deep?


Se você me acompanha por aqui, já sabe que a Delta table trabalha com versionamento, cada operação gera uma versão nova, novos arquivos adicionados ou marcados como removidos, até aqui ok, vamos ver isso dentro do Delta Log.


Para encontrar a última versão da sua tabela, você pode rodar um describe history.

A minha está na versão 3221.


E vamos para o Storage:

Basicamente o Transaction Log (Delta log) é formado por esses arquivos:

0000000000000000xxxx.json - Log das alterações realizadas naquela versão \ operação

0000000000000000xxxx.crc - Informações sobre schema, histograma e outros metadados daquela versão

0000000000000000xxxx.checkpoint - Geralmente ocorre a cada 10 operações, ele pega todas as versões anteriores .json e compacta dentro de 1 arquivo .PARQUET, para otimizar a leitura.

_last_checkpoint - Utilizado para encontrar qual foi o último checkpoint, esse arquivo é um diferencial para a performance do Delta.


Basicamente com esses arquivos a tabela pode ser reconstruída seguindo essa linha:

  1. Encontrar o último checkpoint através do arquivo _last_checkpoint

  2. Ler todos os arquivos de dados (.PARQUET) referenciados dentro do checkpoint

  3. Ler os demais .JSON posteriores ao checkpoint, ou seja, acima da versão 3221.

Note que ela tem 6768 arquivos na pasta _delta_log, porém, para recriar essa tabela precisaria ler apenas 4 arquivos da _delta_log. Acredite, isso faz a diferença absurda na performance, ainda mais que esses arquivos ficam na memória do cluster, então acessá-los é extremamente rápido.


 

E como esse count(1) é reproduzido? Usarei um exemplo mais simples com uma tabela bem menor para você poder reproduzir e brincar no seu ambiente.


Criar aquela famosa tabela de pacientes usando o sample da Databricks.

Tabela criada com 5165 linhas.


Em seguida, estou inserindo uma nova linha nessa tabela para gerar uma nova versão.


E agora apagando 11 linhas, concluindo 3 operações na nossa tabela Delta para o nosso exemplo.

Com Describe detail sabemos o tamanho dela, ou seja, 2 arquivos.


Com o Describe history as suas 3 versões:

0 - CREATE TABLE

1 - WRITE (INSERT)

2 - DELETE


E no storage, as suas 3 versões, note que ainda não temos Checkpoint, pois, ela ainda não sofreu 10 alterações, se fizermos mais 10 Inserts, será criado um checkpoint agregando todos esses JSONs de log.

O que nos interessa nesse momento são os arquivos JSON, então vamos baixá-los e abrir com o VSCODE para facilitar a visualização.


Arquivo 00000000000000000000.json, criado na primeira versão com o CREATE TABLE.

Note que ele possui um campo chamado STATS, la dentro contem informações da quantidade de registro naquele arquivo adicionado, valores mínimos e máximos para cada coluna, fora o histograma que fica no arquivo CRC, com essas informações que o COUNT fica mais performático.

Não consigo colocar o JSON inteiro aqui, mas deixarei eles no Github se você quiser explorar.


Arquivo 00000000000000000001.json, esse arquivo é do INSERT.

Também foi adicionado um novo arquivo, note no campo numRecords que foi apenas 1 registro.


Arquivo 00000000000000000002.json, esse aqui temos a remoção do primeiro arquivo que possuía os dados deletados (esse arquivo não é deletado do storage, apenas marcado como removido da tabela Delta), depois a criação de 1 novo arquivo sem as informações do DELETE.

Ta começando a clarear por aí? É assim que a Delta Engine funciona, e por isso conseguimos executar UPDATEs, INSERTs, DELETEs, e MERGEs, ela vai criando novos arquivos e movimentando os dados necessários, mantendo os dados antigos disponíveis para TIME TRAVEL.


Veja o campo numRecords, ele foi inserido com 5154 registros.


Vamos à matemática agora:

  • Arquivo 00000000000000000000 campo numRecords: 5165

  • Arquivo 00000000000000000001 campo numRecords: 1

  • Arquivo 00000000000000000002 - Apaga o arquivo com 5165 registros

  • Arquivo 00000000000000000002 campo numRecords: 5154

Totalizando: 5155, certo?


Também podemos reproduzir via código SQL.

Lembrando que esse exemplo está bem simples e não temos checkpoints ainda.


select
  sum(from_json(
    add.stats,'numRecords DOUBLE'
  ).numRecords) as numRecordsAdd
from
json.`abfss://reginaldo@stdts360.dfs.core.windows.net/bronze/PatientInfoDelta/_delta_log/0000000000000000*.json`
  where add is not null
  and add.path NOT IN (
    select remove.path from json.`abfss://reginaldo@stdts360.dfs.core.windows.net/bronze/PatientInfoDelta/_delta_log/0000000000000000*.json`
    where remove is not null
  )

Basicamente, estou realizando as seguintes operações para simular a Delta engine.

  1. Utilizando SQL, estou acessando todos os JSON da pasta Delta Log, usando essa sintaxe 0000000000000000*.json

  2. Na coluna ADD estou pegando o campo STATS que é um JSON e extraindo apenas o campo numRecords dos arquivos que foram adicionadas na tabela

  3. Depois faço um NOT IN para excluir os arquivos REMOVIDOs e somo o restante.


Simples né? Claro que não, esse exemplo está bem básico, e quando temos vários checkpoints? Aí começa a brincadeira e precisamos sair do SQL e ir para um Python ou Scala, mas não entrarei nesse ponto agora, porém os passos que fiz acima é basicamente uma simulação simplificada da Delta engine.


Ou seja, ela não lerá os arquivos de dados (.PARQUET) e fazer um COUNT la dentro, ela lerá os arquivos de LOG e utilizara as estatísticas disponíveis.


Funciona para todos os COUNTS()? Não necessariamente, exemplo se você fizer um COUNT(coluna) em uma coluna com NULLs, ele tentará usar as estatísticas, mas caso não tenha metadados suficiente, pode ser que o otimizador preferia fazer um SCAN nos arquivos de dados.


Bom gente, vou parar por aqui, esse assunto é tão empolgante que se deixar esse post virá um livro rsrs.


Espero que tenha gostado, e que fique claro, COUNT(*) e COUNT(1) são a mesma coisa.


Fique bem e até a próxima.


Link github:


Referências:



1.838 visualizações0 comentário
Post: Blog2 Post
bottom of page