Apache Airflow 02 – Conceitos principais

Os principais conceitos do Airflow que serão explicados neste artigo serão: DAGs, operators, sensors e hooks.

O que vou explicar será sobre como eles se encaixam no Airflow e como utilizá-los no seu dia a dia, até mesmo vamos construir nossa primeira DAG. Eu vou tentar ensinar a partir dos partes ao invés de destrinchar o todo em partes, tem muita coisa pra aprender então recomendo reler o artigo caso você realmente queira aprender Apache airflow.

Operators

Operators são os principais componentes “fazedores” de algo dentro do Airflow. Basicamente o que se busca em um operator é realização uma operação.

Exemplos de Operator: PythonOperator (que chama um código Python), BashOperator (Que chama e executa um script bash) ou BigQueryOperator (que executa uma query no bigquery e salva o resultado em uma tabela também no BigQuery), segue a classe base do Operator (sempre bom ler): https://github.com/apache/airflow/blob/master/airflow/models/baseoperator.py

Sensors

Sensor são os “monitores” do airflow, basicamente, qualquer monitoramento que você queira fazer em relação à algo, é feito por um Sensor, ele responde perguntas do tipo:

  • “Tem um arquivo novo no folder X do S3?”
  • “A tabela Y foi modificada no BigQuery?”
  • “Algum usuário novo foi inserido na tabela W no Banco SQL?”

Ou qualquer outro sensor que você possa criar, a gente vai falar mais sobre isso no pŕoximo post ;).

Hooks

Agora eu vou passar para uma outra fase da explicação dos conceitos do airflow, isso aqui é mais voltado para quem vai cuidar (DevOps/ Data Engineer) do airflow do que propriamente de quem vai utilizar ou desenvolver para ele.

Tasks

Tasks são as instâncias em execução ou executadas em uma DAG, por exemplo, se um sensor rodar, é uma task, se um operator rodar também é uma task.

Pode-se dizer que uma execução de DAG é formada pela execução das suas tasks, o mais importante é toda task possui uma trigger rule [4]. E as possíveis trigger rules são:

  • all_success
    • é o valor default, a task só vai rodar se todas as suas dependências rodarem com sucesso (caso ela não possua dependência, ela irá rodar).
  • all_failed
    • Quando todas as dependências falham. Um bom caso é quando você quer checar se todas as partições de uma pipeline falharam, por exemplo.
  • one_failed
    • Roda caso uma de suas dependências falhe.
  • all_done
    • Roda independente do que acontecer com suas dependências.

Tem outras mas elas não servem pra agora, na documentação vocês podem explorar melhor.

Scheduler

O scheduler é o responsável por organizar em qual momento as tasks serão executadas. Os possíveis status [1] de uma task que são mais comuns:

  • Queued
    • Quando a tarefa tá pronta para ser executada mas ainda não há recursos disponíveis para rodá-la
  • Running
    • autoexplicativo.
  • Failed
    • autoexplicativo.
  • Failed_upstream
    • É quando alguma dependência da task falhou, então ela fica impedida de executar por uma “falha anterior”.
  • Skipped
    • Isso acontece quando por algum motivo o airflow decidiu não executar aquela task.
  • Success
    • autoexplicativo
  • up_for_retry
    • Isso acontece após a falha, com o airflow, se uma task falhar, o airflow pode “tentar ela novamente”, a quantidade e o intervalo que isso vai acontecer é configurável.

Dags

Dags é o principal conceito ao desenvolver algo em Airflow, e uma DAG nada mais é do que a representação de uma pipeline que você cria. Segue abaixo um exemplo de uma simples pipeline que criei:

E o código necessário para criá-la eu vou mostrar na seção abaixo onde eu vou tentar colocar todos os conceitos juntos.

Nossa primeira DAG

Por fim, mas não menos importante, eu só quero dar uma exemplo legal de como o airflow pode ser útil e de como ele funciona em um cenário real.

A dag que a gente vai criar vai fazer o seguinte:

  1. Checar de 5 em 5 minutos se há alguma mudança em um CSV
  2. Se houver, rodar pipeline de dados e escrever os resultados em um banco de dados, senão, fazer nada.

Para o ambiente de teste local vou utilizar o famoso projeto/imagem docker [3] do airflow do grande puckel [4]. Isso vai permitir testar o airflow sem precisar baixar os arquivos e instalar todas as resenhas. Também vou utilizar uma imagem Docker do postgres. Pra facilitar, dentro do repositório vai ter um script setup.sh onde é só rodar ele que você vai tá prontinho pra rodar o exemplo daqui.

A primeira coisa que eu vou codar é a definição de execução da DAG:

O primeiro parâmetro é o nome da DAG, o segundo é o intervalo que a DAG será executada, no nosso caso, como planejado nas especificações, vai ser de 5 em 5 minutos e ai é algo legal, no schedule interval do arflow, você pode utilizar tanto a função “timedelta” como uma string cron “20 * * * *” ,por exemplo.

E por último o start_date, esta é a data que a sua pipeline supostamente começou/irá começar, cuidado com esta data, se ela for no futuro, sua pipeline não irá executar e se for no passado, ela irá executar a quantidade de vezes necessária até o dia atual, segue como isso acontece ( e quem toma de checar isso é o scheduler!):

R = rodou

R1 = rodou a 1º vez

Rn-1 = a penúltima vez que rodou

Rn = a execução que tá acontecendo “agora”

Percebam o detalhe: A primeira execução só acontece APÓS o primeiro intervalo de tempo acontece, ou seja, se você quer que sua pipeline rode diariamente a partir de hoje, você deve colocar o start_date para ontem e o schedule_interval para 1 dia.

Sobre os dados que eu criei para este tutorial eles são estes aqui:

Ele é um dataset com as ntoas dos estudantes, o que eu vou fazer no pipeline é simplesmente modificar a coluna aprovado para ser preenchida (imagine um caso de missing data).

Depois disso, eu vou codar o primeiro sensor: como não há sensor para detectar mudança em um arquivo, eu criei uma lógica bem fuleira para checar se o arquivo existe e se o número de estudantes mudou.

Eu utilizei um código Python e para isso tem um dos primeiros Operators do airflow, o PythonSensor, ele implementa a seguinte função:

def detect_new_student(path_to_students, path_to_record):
    if not os.path.exists(path_to_students):
        logging.info("File for students doesn't exist")
        return False
    student_df = pandas.read_csv(path_to_students)
    amount_of_students_fp = open(path_to_record, 'r+')
    line = amount_of_students_fp.readline()
    old_amount_of_students = int(line)
    amount_of_students = student_df.shape[0]
    if amount_of_students > old_amount_of_students:
        logging.info(f"There are {amount_of_students - old_amount_of_students} new students.")
        amount_of_students_fp.seek(0)
        amount_of_students_fp.write(str(amount_of_students))
        amount_of_students_fp.truncate()
        return True
    return False

E aqui está o código do sensor:

Simples né? Basicamente você só precisa dizer o ID da task (tem de ser único dentro da DAG), de quanto em quanto tempo o sensor vai checar, aqui seriam de 10 em 10 segundos e por último, você passa a função e os parâmetros para ela em “op_kwargs”.

Com isso, o sensor só retorna sucesso caso haja uma linha nova no arquivo students.csv e atualiza meu “banco” de dados com a quantidade atual de linhas, caso haja sucesso na atualização, ai sim a task seguinte é executada, nessa task é que eu transformo os dados e preencho a coluna “aprovado”, segue o código que define o operador:

Igualzinho ao anterior em estrutura mas o agora é um operador, então não tem nada de intervalo de checagem ou soft_fail, é simplesmente a função e o que tem de fazer.

A única coisa extra aqui é o “provide_context”, que é por um motivo: XCom. XCom é a forma como task passam mensagens entre si no airflow. Então se você quer mandar informações (geralmente pequenas) de uma task para outra, você utiliza o XCom.

E aqui está a função:

Basicamente eu só faço preencher o campo aprovado e depois criar a query SQL de insert. E no final, eu adiciono uma mensagem XCom para o operador. Detalhe, eu não digo para qual operador vou enviar, eu simplesmente adiciono na task a mensagem e qualquer outra task pode ler a mensagem dela.

E por último mas não menos importante: enviar os dados para o banco Postgres. Eu vou fazer uma gambiarrar e criar a tabela necessária no postgres que vem acompanhando o airflow no puckel, então vai ser nojento mesmo kkkk.

Segue o operador, não precisa de mais nada fora isso que tá escrito aqui:

E essa é a graça, o airflow possui muitos operadores bons e você pode criar o seu próprio, pra uma biblioteca que você tá usando, quem sabe? E é o que a gente vai fazer no próximo post.

Para executar a DAG, você vai seguir os seguintes passos:

  1. Baixar o código-fonte do repositório: https://github.com/fbormann/blog-posts/tree/airflow-series-02).
  2. Checar se possui docker e docker-compose
  3. rodar setup.sh (instalar o docker se você ainda não o tiver).
  4. docker-compose -f docker-compose-LocalExecutor.yml exec postgres bash
    1. Dentro dele rodar “psql -p 5432 -U airflow -d airflow”
    2. copiar a query sql/create_students_table.sql e apertar enter e ai só sair digitando “\q” e depois “exit”.
  5. Abrir o navegador em localhost:8080
  6. Ligar a DAG (tem um botão ON enorme na lista) e ser feliz. O exemplo já veio com uns dados faltando e tá tudo certo.
  7. Caso você queira testar a pipeline novamente, é só você adicionar uma linha ao arquivo students.csv com dados válidos.

Você pode chcar a execução dos operadores nos logs e para isso é só clicar na DAG, depois “Tree view”, depois no quadradinho da task verdinha e depois em “Logs”, segue como funciona a tela de “Tree view após várias execuções da pipeline”, os status estão na direita:

Para terminar, segue como é difícil definir dependências no airflow:

Pronto, com isso, você consegue definir a estrutura que eu coloquei na primeira imagem sobre a DAG. E é isso pessoal, deu muito trabalho pra fazer mas espero que isso sirva para vocês mandarem como onboarding pra alguém que tá aprendendo airflow haha.

Fontes

[1] https://cwiki.apache.org/confluence/display/AIRFLOW/Scheduler+Basics

[2] https://github.com/apache/airflow

[3] https://github.com/puckel/docker-airflow

[4] https://airflow.apache.org/docs/stable/concepts.html#trigger-rules

Um comentário em “Apache Airflow 02 – Conceitos principais

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair /  Alterar )

Foto do Google

Você está comentando utilizando sua conta Google. Sair /  Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair /  Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair /  Alterar )

Conectando a %s