top of page

Databricks - Workflows - Compartilhando resultados entre Tasks - Dynamic Values

Fala dataholics, o post de hoje é uma dica rápida, mas bem legal e útil na orquestração dos seus pipelines dentro do Databricks.


Já precisou pegar o resultado de uma task anterior e passar para a seguinte? Melhor ainda, já precisou avaliar o resultado de uma Task e dependendo desse resultado mudar o fluxo do pipeline? Sim, veremos isso hoje.


O que veremos nesse post:

  • Databricks Utilities

  • Task Values Subutility

  • Dynamic Values e Widgets

  • If/Else Condition


 

Databricks Utilities


Compartilhar o resultado de uma task para outra não é nada novo, já temos disponível o dbutils.jobs.taskValues há um tempo, apesar de pouco conhecido e divulgado, mas além do Job utilities mostrarei uma evolução, deixando nossos Jobs mais dinâmicos e flexíveis usando Dynamic Values e If\Condition.


Já havia escrito sobre If\condition nesse post, dá uma conferida la também:

Com certeza você já conhece o Databricks Utilities, mas talvez não por esse nome e sim por dbutils, é tão comum no nosso dia a dia para diversas operações, como listar arquivos em uma pasta, manipular pastas, manipular Secrets, exemplo:


Dentro do Databricks Utilities podemos dizer que temos alguns módulos, sendo eles:

  • dbutils.credentials - Manipulação de credenciais

  • dbutils.data - Preview

  • dbutils.fs - Manipulação do file system(O mais usado)

  • dbutils.jobs - Manipulação de tasks de um Job - Iremos usar esse

  • dbutils.library - Manipulação de bibliotecas

  • dbutils.notebook - Manipulação de resultados do notebook (Muito usado)

  • dbutils.secrets - Manipulação de secrets

  • dbutils.widgets - Manipulação de widgets

Depois do FS que para mim é o mais usado e famoso, talvez o segundo mais usado seria o Notebook, exemplo, o comando abaixo interrompe a execução do notebook.

dbutils.notebook.exit('Pare o notebook aqui')

 

Compartilhar resultado entre Tasks


Vamos ao tema do nosso post, agora sabemos que temos um Jobs Utilities para manipular o resultado dos nossos notebooks e compartilhar valores entre tasks.


Nos Jobs utilities temos o taskValues subutilities que podemos usar para Setar e Recuperar o resultado de uma task: SET:

dbutils.jobs.taskValues.set(key = "Task2Key", value = 'OlaMundo!')

GET:

dbutils.jobs.taskValues.get(taskKey = "Task1", key = "Task1Key")

Com isso, você seta uma chave e valor como sendo valores de uma Task especifica e pode recuperar esse valor em qualquer outra Task do Job.


Agora temos uma nova maneira de recuperar esses valores das tasks usando Dynamic Values e Widgets, deixando nossos Jobs mais dinâmicos, sem precisar fixar nome de Tasks no seu código e essa é a recomendação a partir de agora.


Podemos recuperar o valor da task anterior com Dynamic Values e passar para nossa task como um parâmetro de entrada:


Primeiro, criaremos nosso notebook de exemplo, esse notebook será chamado na primeira task e ele preencherá 3 chaves:

  • Task1Key: Ola Mundo - Sou a Task1

  • Task2Key: Segue o baile! (Em caso de passar no Try/Except)

  • Task3Key: Deu não, volta! (Em caso de entrar no Except)

Observe que para setar essas chaves estamos usando o Job Subutilities:

dbutils.jobs.taskValues.set()

Agora vamos para o segundo Notebook, nesse notebook estamos recuperando o valor de 2 maneiras.

  1. Usando Job Subutilities (Não é mais o recomendado): dbutils.jobs.taskValues.get()

  2. Usando Dynamic Values e Widgets (Recomendado): getArgument("task1Value") ou dbutils.widgets.get("task1Value")


A primeira célula recupera os dados configurados no primeiro notebook, mas de onde vem os valores recuperados na segunda célula?

Bom vamos ao Workflows e criar nosso Job agora.


Na Task1 não temos novidade, apenas uma chamada simples de notebook.


Agora na Task2 que chama o segundo Notebook temos esses parâmetros de entrada, aqui declaramos 4 parâmetros utilizando Dynamic Values para recuperar resultados da Task1.

Note que colocamos na expressão a referência da Task1.


Dica, se for colocar muitos parâmetros ou regras mais complexas, você pode usar o modo JSON:


Porém, antes de executar a Task2 quero saber se o resultado da Task1 foi o retorno esperado, caso não seja, executaremos a Task3, para isso vamos usar um IF/ele Condition.

A configuração dessa task é bem simples, primeiro ela depende da Task1, depois ela avalia o valor retornado pela expressão "{{tasks.[Task1].values.[Task2Key]}}", caso essa expressão retorno o valor "Segue o baile!" seguimos para a Task2, caso o valor seja diferente seguimos para a Task3.

Com isso conseguimos controlar o fluxo baseado em resultado de tasks anteriores, deixando nossos pipelines mais flexíveis e dentro do Notebook não preciso fixar o nome da task que estou buscando o valor, simplesmente recupera do Widget.

Essa é uma execução com Sucesso, ou seja, retornando TRUE no IF/Else:


Resultado do Notebook 2:


Após executado, podemos abrir a task do IF/Else e ver os valores retornados, isso é extremamente útil para Debug.


Agora simularemos um fluxo de erro, para isso acontecer precisamos forçar um erro nessa etapa do TRY/EXCEPT, para isso, adicionei um assert para forçar uma exceção, logo essa operação entrará no Excpet e os valores preenchidos serão outros.

Fluxo com erro:


Resultado da Task IF/ELSE:

Retorno da Task3:

Veja que estamos imprimindo a mensagem de erro do primeiro notebook aqui.


Note que na Task do IF/ELSE temos o fluxo de saída tanto para True como para False:


Essa configuração é feita nas dependências das Tasks, você escolhe se irá vincular com o retorno do True ou False da task de IF/ELSE.


 

Resumo


Post de hoje foi um complemento desse Post sobre IF/ELSE condition:

Hoje vimos como compartilhar valores entre tasks de maneira mais dinâmica e flexível utilizando Jobs Subutilities e como controlar o fluxo dos nossos Jobs baseado em resultados de Tasks anteriores.


Espero que tenha curtido.


Fique bem e até a próxima.


Referências:



145 visualizações0 comentário

Posts recentes

Ver tudo
Post: Blog2 Post
bottom of page