top of page

Databricks - Workflows - If Else Condition e Dynamic Values

Fala dataholics, mais um post sobre Databricks o pnultimo de 2023 (sim já estamos encerrando o ano rs), hoje falaremos sobre uma novidade bem legal no Workflows, a possibilidade de criar uma task com If else condition.


O Databricks Workflows vem recebendo uma aceleração fora do normal, a Databricks esta soltando feature atrás de feature, uma mais legal que a outra, o Workflows esta criando corpo e robustez, sem ousadia da minha parte, Airflow que se cuide rsrs, brincadeiras a parte utilizo as duas ferramentas e claro o Airflow tem suas vantagens.


O que veremos nesse post:

  • Databricks Workflows

  • Workflows vs Airflow?

  • Novas funcionalidades

  • Dynamic Values

  • Tipos de tarefas

  • If Else Condition Task

  • Chamando queries SQL e passando parâmetros

  • Salvando logs dos Jobs em uma tabela Delta

  • Alerta via e-mail com task SQL

  • Resumo da utilização do Workflows


 

Em resumo, o Databricks Workflows é uma ferramenta integrada na plataforma Databricks para orquestração eficiente de fluxos de trabalho de dados (famosos Jobs\DAGs). Ele permite aos desenvolvedores\engenheiros organizar e automatizar tarefas e processos, exemplo, agendar uma ingestão de dados, processamento dos dados e disponibilização pro usuário final, tudo no mesmo fluxo (Job) e gerenciando as dependências, você pode estar acostumado com outras ferramentas como Azure Data Factory ou Airflow.


O Databricks esta evoluindo o Workflows para competir com esses grandes orquestradores, mas com a vantagem de estar totalmente integrado com o Databricks, é claro que essa vantagem pode ser também uma desvantagem, pois, terá dificuldade de orquestrar coisas fora do Databricks, embora, essa seja a estratégia, trazer tudo para dentro da plataforma, se você está 100% em Databricks sem dúvidas o uso do Workflows é sensacional.


Enquanto o Databricks Workflows e o Apache Airflow compartilham a missão de gerenciar fluxos de trabalho(Jobs ou DAGs), há nuances distintas em suas abordagens. O Airflow é um orquestrador agnóstico, oferecendo flexibilidade para integrar com diversos ambientes. Ele é altamente configurável e amplamente utilizado para orquestrar tarefas em ecossistemas heterogêneos, o Airflow é baseado na linguagem python, então todos os seus Jobs\DAGs são escritos em python.


Ambos oferecem funcionalidades comuns, como agendamento, monitoramento e execução de tarefas. A escolha entre eles dependerá das necessidades específicas de cada ambiente de dados.

Em linhas gerais, papo reto, se você usa Databricks como centralizador da sua estratégia de dados, não tem o que discutir, o Workflows é o seu caminho. Se você tem um ecossistema mais diverso, com N ferramentas, o Airflow tende a ser mais flexível, embora o Databricks consegue agendar scripts python assim como o Airflow, o que mudará é a complexidade de implementação, pois, o Airflow já tem muita coisa pronta.


Também vale ressaltar que o Airflow já é uma ferramenta robusta e bem estável no mercado, por outro lado, Workflows é acelerado por uma das maiores empresas de dados do mundo rs, então vamos esperar por novidades de ambas.


Usei o Airflow de comparação pelas semelhanças inclusive da interface gráfica, mas o Workflows pode competir com todos os orquestradores do mercado.


Dado o contexto, falaremos de uma nova feature lançada recentemente no Workflows, a possibilidade de criar uma Task de IF ELSE, que pode nos ajudar a criar fluxos mais complexos e robustos, no exemplo de hoje explicarei como funciona e mostrar um exemplo bem legal.


Você saberia quem é quem só pelo visual?



Qualquer semelhança é mera coincidência.

 

Workflows Features


Sem dúvidas o Workflows vem nos surpreendendo com novas features frequentemente, embora hoje falaremos de uma específica, outas features como:

  • Lineage integrado na interface gráfica

  • Agendamento dinâmico com CRON

  • Job parameters para todas as tasks

  • Enfileiramento de execuções

  • Alerta de tempo de execução

  • Notificação ao nível de task

  • Notificação somente no último Retry (Sensacional)

  • Tempo de duração ao nível de task

  • Integração com GIT - Chamando notebooks de branches especificas (Sensacional)

  • ... Entre várias outras bem legais


Atualmente você pode criar um Job com diversas tasks, podendo criar fluxos completos e complexos de orquestração, o tipo das tasks pode variar sendo as opções abaixo:

  • Notebook: Chamando um notebook Databricks

  • Python Script: Agendando um script python (Airflow rs)

  • Python Wheel: Agendando um Wheel script

  • SQL: Agendando um script SQL no Warehouse (SQL Endpoint)

  • Delta Live Tables: Agendando um pipeline Delta Live Tables

  • dbt: Agendando um pipeline DBT

  • JAR: Agendando um JAR (Para quem gosta de Java)

  • Spark Submit

  • Run JOB: Chamando outro Job no Workflow (Isso é muito massa e novo)

  • IF ELSE Condition: Nosso tema de hoje



Só esse print acima já podemos notar a robustez dessa ferramenta.

 

IF ELSE Condition + Dynamic Values


Temos um novo tipo de task que vem para trazer dinamicidade para nossos Jobs no Workflows, com o IF Else Condition podemos fazer validações durante nosso fluxo de dados e decidir o que fazer com o fluxo se determinado comportamento acontecer.


Aliado a esta feature temos o retorno do status das Tasks dependentes, então nossas condições podem se basear nos seguintes status:

  • All Succeeded: Se todas as tasks derem sucesso

  • At least one succeeded: Pelo menos uma com sucesso, ou seja, se duas tasks rodarem e 1 teve sucesso e 1 teve falha, essa condição é atendida.

  • None failed: As dependências não terem falhado, ou seja, falhas podem ter acontecido, mas as dependências da tasks em questão tiveram sucesso.

  • All done: Se todas as tasks rodarem independente de falha ou sucesso

  • At least one failed: Pelo menos uma falha, se 10 tasks rodaram e 1 falhou, atendará essa condição.

  • All failed: Somente se todas as tasks falharem



Para demonstração de hoje, criaremos um Job no Worflows que fara as seguintes tarefas:

  1. Realiza um INSERT na camada Bronze

  2. Checa o Status da task

  3. Se finalizou com sucesso, faz um insert na Gold

  4. Se retornou com falha, faz um insert no Log e dispara um e-mail



A beleza dessa task de IF Condition é que você pode decidir o rumo do seu fluxo mesmo quando uma task falhar, num fluxo normal se uma task falha, as demais são puladas, como exemplo abaixo:


Vamos para o ambiente: A primeira task é relativamente simples, note que ela chama um notebook Databricks.


Configurações em destaque aqui para você que não está acostumado:

Podemos configurar um RETRY para casos de falha na Task e definir um intervalo entre o Retry, isso já ajuda na resiliência do seu Job.


Nas configurações de notificação da task podemos configurar para ela gerar um alerta somente quando o último RETRY falhar, isso ajuda a evitar alertas desnecessários.


Agora vamos para nossa task de IF ELSE:

Note algumas configurações interessantes como:

Run If Dependencies: All Done (se todas executarem independente de falha ou sucesso)

Type: IF/ELSE Condition, usando para checar o status da task anterior usando o nome da task de referência.

Browse Dynamic values: Usando variáveis dinâmicas disponibilizadas pelo ambiente


Nessa task acima estamos validando se o status da Task anterior chamada "Insert_and_SQL" finalizou com sucesso, se sim faremos a execução da Gold e se não, faremos outro fluxo.


Os Dynamic Values são variáveis de ambiente que podemos usar de forma dinâmica em nossos jobs, como o exemplo acima de recuperar o status da task anterior ou recuperar o Job ID, Job Name, Job Run ID entre outras informações como abaixo.


Sendo assim, aproveitei os Dynamic Values para inserir logs da execução em uma tabela de monitoramento.


No script abaixo observe 2 coisas, nossa tabela está comentada apenas para informativo do schema dela, note que no scritp estamos usando variáveis do Databricks Warehouse (SQL Endpoint) usando o formato {{var}}.

Na parte visual ele já apresenta os campos para você preencher manualmente e testar, contudo, no nosso exemplo enviarei via parâmetros na task do Job.



Abaixo a task criada para geração de log, estou usando um cluster Warehouse para rodar um script SQL e enviando os parâmetros necessários todos com Dynamic Values.


Destaque para esse trecho, no campo de dependências, agora você tem duas opções para a mesma task, CheckStatus (true) e CheckStatus (false).

No caso da nossa condição estamos validando se o status da task anterior deu sucesso, logo como a task deu falha, o resultado dessa condição é FALSE, então o fluxo do False será disparado agora.


Resultado da execução da task:

Task anterior falhou, entrou na Task CheckStatus e retornou False para a validação.


Resultado da task IF\ELSE:


Resultado da task de geração de log em caso de falha:

Note que os valores são preenchidos em tempo de execução no lugar dos Dynamic Values.

Detalhe na task de alerta:

Só ressaltando que a task tem alerta nativo para os seguintes eventos: Quando a task Iniciar, falhar, finalizar com sucesso ou estourar o timeout, você pode configurar esse alerta nativamente na interface gráfica, conforme o print la no começa na task notifications.


Nesse exemplo estou usando uma task de alerta, embora, note que quando você utiliza task do tipo SQL, podemos chamar uma query, dashboard, alerta ou arquivo.

O Alerta foi apenas um exemplo do que podemos fazer com nosso fluxo.



Aqui é o fluxo completando com sucesso, quando nenhuma task falha (baseado na nossa lógica desse exemplo):

Destaque para o Lineage no canto direito, mostrando toda a linhagem desse Job.


Fluxo quando a task principal falhar:


Abaixo é a nossa tabela de logs criado para o exemplo, podemos criar um monitoramento com base nos logs e Dynamic Values para entender o comportamento dos nossos jobs, claro que isso poderia ser feio via API, contudo, aqui é mais uma forma que podemos customizar da nossa maneira.


 

Em resumo, o Databricks Workflows vem ganhando corpo para poder dar mais flexibilidade para os nossos pipelines dentro do Databricks. A feature IF\ELSE traz dinamicidade para nossos fluxos e dando mais poder ao desenvolvedor, seria até possível orquestrar coisas fora do Databricks, mas o Airflow e outras ferramentas ainda teriam vantagem sobre o Workflows nesse quesito.

 

Faz sentido usar outra ferramenta de orquestração quando estamos usando Databricks?

Se sua estratégia está dentro do Databricks, como é a realidade de muitas empresas, use e abuse do Workflows, mas não diria que qualquer outra ferramenta é dispensável, afinal, o Workflows é uma criança perto das demais, mas, fazendo muito nem o que se propõe a fazer.


Espero que tenha gostado dessa nova feature do Workflow.


Fique bem e até a próxima.


Referencias:





646 visualizações0 comentário

Comments


Post: Blog2 Post
bottom of page