Executando DAG com DbtTaskGroup Operator no BigQuery
Uma das features sensacionais que o time da astronomer implementou na lib cosmos [PyPi] foi o DbtTaskOperator, um recurso similar ao que…
Uma das features sensacionais que o time da astronomer implementou na lib cosmos [PyPi] foi o DbtTaskOperator, um recurso similar ao que tem no DBT Cloud[link] que è a dependencias entre modelos. O intuito desse pequeno exemplo è mostrar como emplementar no BigQuery Warehouse utilizando o Cloud Composer[AirFlow Gerenciado]. Para isso vamos precisar instalar no Composer as seguintes libs.
# requirements.txt
astronomer-cosmos==1.3.2
dbt-bigquery==1.5.4
A forma mais simples de instalar os libs acima è fazendo direto nas configuracoes do Composer ou utilizando o gcloud cli
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--update-pypi-packages-from-file requirements.txt
Para isso vamos utilizar o Dataset GOLD
O nosso profile.yml ficara assim
example_context:
outputs:
prd:
dataset: GOLD
job_execution_timeout_seconds: 300
job_retries: 1
location: us-central1
method: oauth
priority: interactive
project: bigo-engineeging
threads: 1
type: bigquery
target: dev
Nosso dbt_project.yml
name: 'example_context'
version: '1.5'
config-version: 2
profile: 'example_context'
model-paths: ['models']
test-paths: ['tests']
seed-paths: ['seeds']
snapshot-paths: ['snapshots']
target-path:
clean-targets:y
- 'target'
- 'dbt_packages'
models:
warehouse:
STAGE:
+materialized: table
Estrutura do projeto.
Coisa bem simplezona.
Nossa diretorio modelo ficara assim
Nosso schema.yml ficara assim
version: 2
sources:
- name: SILVER
tables:
- name: table_mock
models:
- name: table_mock_data
- name: table_sample_cleaning
- name: table_sample_ordening
- name: table_sample_union
temos uma tabela como origem: table_mock e a partir delas vamos criar 3 views[models] na nossa camada GOLD.
Essa essa è a nossa origem dos dados
Agora vamos a implementacao da DAG
Mas antes tem um detalhe, os arquivos do dbt precisam esta no diretorio das dags do Composer, precisam ficar assim
O que acontece, è que o Operador do Dbt vai procurar os arquivos nesse diretorio.
DAG Code
from airflow import models
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig
from datetime import datetime
from pathlib import Path
DBT_PATH = "/home/airflow/gcs/dags/dbt"
DBT_PROFILE = "example_context"
DBT_TARGETS = "dev"
# Profile enviroment
profile_config = ProfileConfig(
profile_name=DBT_PROFILE,
target_name=DBT_TARGETS,
profiles_yml_filepath=Path(f'{DBT_PATH}/profiles.yml')
)
# Models dbt
project_config = ProjectConfig(
dbt_project_path=DBT_PATH,
models_relative_path="models"
)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2
}
with models.DAG(
dag_id=f"dbt_my_models",
schedule_interval=None,
start_date=datetime(2024, 5, 13),
is_paused_upon_creation=True,
catchup=False,
tags=["dbttaskgroup", "sample"],
) as dag:
start = EmptyOperator(task_id="start")
dbt_running_models = DbtTaskGroup(
group_id="dbt_running_models",
project_config=project_config,
profile_config=profile_config,
default_args={"retries": 2},
)
end = EmptyOperator(task_id="end")
start >> dbt_running_models >> end
Perceba que na variavel DBT_PATH temos o bucket das DAG montado dentro do Cluster do Kubernetes no diretorio /home/airflow/gcs, facilitando assim a leitura do projeto pelo Operador DbtTaskGroup. Vamos a execucao da DAG
Prontinho, temos a visao das dependencias de cada modelo, e por fim nossos dados no BigQuery
Um detalhe super importante, voce deve esta se perguntando, como aconteceu essa comunicacao com o BigQuery? Nao deveria ter uma account service? sem essa account service qual seria o outro meio ?? aqui temos o Pulo do gato. Olhe o arquivos profiles.yml, nele temos o method: oauth ou seja, quando usamos o Composer automaticamente temos essa comunicacao com os demais servicos integrado por meio do Admin/connections que è criados no momento da criacao do Composer, que seja de forma manual ou via Terraform. E tambem o usuario que esta executando a DAG precisa ter permissoes no contexto do BigQuery [nao lembro de cabeca as Roles]. Bem, espero que tenham gostado. Sei que existem outras formas de fazer esse exemplo que trouxe aqui. Fiquem a vontade para comentar e fazerem sugestoes. Para entender melhor o DbtTaskGroup da uma olhada nesses links
https://astronomer.github.io/astronomer-cosmos/configuration/operator-args.html
https://github.com/astronomer/astronomer-cosmos
Bye!