Skip to content

Commit 580d523

Browse files
aspeddrotricktxfolhesgabriel
authored
[feat]: upload data to dev and prod (#1182)
## Resumo O processo é divido em três tasks: 1. `create_table_dev_and_upload_to_gcs`: Enviar dados para `basedosdados-dev` 2. `run_dbt`: Executar dbt run e test no ambiente de dev 3. `create_table_prod_gcs_and_run_dbt`: Enviar dados para `basedosdados-staging` e executar apenas o `dbt run` Exemplo básico: ```python wait_upload_table = create_table_dev_and_upload_to_gcs( data_path=filepath, dataset_id=dataset_id, table_id=table_id, dump_mode="append", upstream_tasks=[filepath], ) wait_for_materialization = run_dbt( dataset_id=dataset_id, table_id=table_id, dbt_command="run/test", dbt_alias=dbt_alias, upstream_tasks=[wait_upload_table], ) with case(materialize_after_dump, True): wait_upload_table = create_table_prod_gcs_and_run_dbt( data_path=filepath, dataset_id=dataset_id, table_id=table_id, dump_mode="append", upstream_tasks=[wait_for_materialization], ) ``` ### Pontos importantes - A task `run_dbt` deve ter o parâmetro `dbt_command` igual a `"run/test"` para executar e testar em dev. É importante a equipe ficar atento nas revisões do PRs. ### Mudanças - O `Parameter` `target` foi removido porque passou a ser redundante. ### TODO - [x] Atualizar os flows para adicionar `dbt_comand="run/test"` em `run_dbt` - Muitos flows não tem o param `run/test` na task `run_dbt`. É preciso modificar para executar os testes em dev. Exemplo de como deve ser: ```python wait_for_materialization = run_dbt( dataset_id=dataset_id, table_id=table_id, dbt_command="run/test", upstream_tasks=[wait_upload_table], ) ``` - [x] Adicionar uma task depois de `run_dbt` - Essa task envia os dados para `basedosdados-staging` e executa dbt run. ```python create_table_upload_to_gcs_and_run_dbt( data_path=path, dataset_id=dataset_id, table_id=table_id, dump_mode="overwrite", upstream_tasks=[wait_for_materialization], ) ``` cc @basedosdados/dados --------- Co-authored-by: Patrick Teixeira <[email protected]> Co-authored-by: Gabriel Pisa <[email protected]>
1 parent 7e8c8b7 commit 580d523

File tree

59 files changed

+1584
-1439
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1584
-1439
lines changed

models/test_dataset/test_dataset__test_table.sql

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99

1010
select
1111
safe_cast(ano as int64) ano,
12-
safe_cast(mes as int64) mes,
13-
safe_cast(sigla_uf as string) sigla_uf,
14-
safe_cast(dado as string) dado,
15-
3 as col_test
12+
safe_cast(github as string) github,
13+
safe_cast(idade as int64) idade,
14+
safe_cast(sexo as string) sexo
1615
from {{ set_datalake_project("test_dataset_staging.test_table") }} as t

pipelines/crawler/anatel/banda_larga_fixa/flows.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
update_django_metadata,
1515
)
1616
from pipelines.utils.tasks import (
17-
create_table_and_upload_to_gcs,
18-
download_data_to_gcs,
17+
create_table_dev_and_upload_to_gcs,
18+
create_table_prod_gcs_and_run_dbt,
1919
rename_current_flow_run_dataset_table,
2020
run_dbt,
2121
)
@@ -31,7 +31,7 @@
3131
"table_id",
3232
required=True,
3333
)
34-
target = Parameter("target", default="prod", required=False)
34+
3535
materialize_after_dump = Parameter(
3636
"materialize_after_dump", default=True, required=False
3737
)
@@ -72,31 +72,31 @@
7272
table_id=table_id, ano=new_ano, upstream_tasks=[get_max_date]
7373
)
7474

75-
wait_upload_table = create_table_and_upload_to_gcs(
75+
wait_upload_table = create_table_dev_and_upload_to_gcs(
7676
data_path=filepath,
7777
dataset_id=dataset_id,
7878
table_id=table_id,
7979
dump_mode="append",
80-
wait=filepath,
8180
upstream_tasks=[
8281
filepath
8382
], # Fix: Wrap filepath in a list to make it iterable
8483
)
8584

86-
with case(materialize_after_dump, True):
87-
wait_for_materialization = run_dbt(
88-
dataset_id=dataset_id,
89-
table_id=table_id,
90-
target=target,
91-
dbt_alias=dbt_alias,
92-
dbt_command="run/test",
93-
disable_elementary=False,
94-
upstream_tasks=[wait_upload_table],
95-
)
85+
wait_for_materialization = run_dbt(
86+
dataset_id=dataset_id,
87+
table_id=table_id,
88+
dbt_alias=dbt_alias,
89+
dbt_command="run/test",
90+
disable_elementary=False,
91+
upstream_tasks=[wait_upload_table],
92+
)
9693

97-
wait_for_dowload_data_to_gcs = download_data_to_gcs(
94+
with case(materialize_after_dump, True):
95+
wait_upload_prod = create_table_prod_gcs_and_run_dbt(
96+
data_path=filepath,
9897
dataset_id=dataset_id,
9998
table_id=table_id,
99+
dump_mode="append",
100100
upstream_tasks=[wait_for_materialization],
101101
)
102102

@@ -108,9 +108,8 @@
108108
date_format="%Y-%m",
109109
coverage_type="part_bdpro",
110110
time_delta={"months": 6},
111-
prefect_mode=target,
112111
bq_project="basedosdados",
113-
upstream_tasks=[wait_for_dowload_data_to_gcs],
112+
upstream_tasks=[wait_upload_prod],
114113
)
115114

116115
flow_anatel_banda_larga_fixa.storage = GCS(constants.GCS_FLOWS_BUCKET.value)

pipelines/crawler/anatel/telefonia_movel/flows.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
update_django_metadata,
1919
)
2020
from pipelines.utils.tasks import (
21-
create_table_and_upload_to_gcs,
22-
download_data_to_gcs,
21+
create_table_dev_and_upload_to_gcs,
22+
create_table_prod_gcs_and_run_dbt,
2323
rename_current_flow_run_dataset_table,
2424
run_dbt,
2525
)
@@ -35,7 +35,7 @@
3535
"table_id",
3636
required=True,
3737
)
38-
target = Parameter("target", default="prod", required=False)
38+
3939
materialize_after_dump = Parameter(
4040
"materialize_after_dump", default=True, required=False
4141
)
@@ -88,31 +88,31 @@
8888
upstream_tasks=[get_max_date],
8989
)
9090

91-
wait_upload_table = create_table_and_upload_to_gcs(
91+
wait_upload_table = create_table_dev_and_upload_to_gcs(
9292
data_path=filepath,
9393
dataset_id=dataset_id,
9494
table_id=table_id,
9595
dump_mode="append",
96-
wait=filepath,
9796
upstream_tasks=[
9897
filepath
9998
], # Fix: Wrap filepath in a list to make it iterable
10099
)
101100

102-
with case(materialize_after_dump, True):
103-
wait_for_materialization = run_dbt(
104-
dataset_id=dataset_id,
105-
table_id=table_id,
106-
target=target,
107-
dbt_alias=dbt_alias,
108-
dbt_command="run/test",
109-
disable_elementary=False,
110-
upstream_tasks=[wait_upload_table],
111-
)
101+
wait_for_materialization = run_dbt(
102+
dataset_id=dataset_id,
103+
table_id=table_id,
104+
dbt_alias=dbt_alias,
105+
dbt_command="run/test",
106+
disable_elementary=False,
107+
upstream_tasks=[wait_upload_table],
108+
)
112109

113-
wait_for_dowload_data_to_gcs = download_data_to_gcs(
110+
with case(materialize_after_dump, True):
111+
wait_upload_prod = create_table_prod_gcs_and_run_dbt(
112+
data_path=filepath,
114113
dataset_id=dataset_id,
115114
table_id=table_id,
115+
dump_mode="append",
116116
upstream_tasks=[wait_for_materialization],
117117
)
118118

@@ -124,9 +124,8 @@
124124
date_format="%Y-%m",
125125
coverage_type="part_bdpro",
126126
time_delta={"months": 6},
127-
prefect_mode=target,
128127
bq_project="basedosdados",
129-
upstream_tasks=[wait_for_dowload_data_to_gcs],
128+
upstream_tasks=[wait_upload_prod],
130129
)
131130

132131
flow_anatel_telefonia_movel.storage = GCS(constants.GCS_FLOWS_BUCKET.value)

pipelines/crawler/camara_dados_abertos/flows.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from pipelines.utils.decorators import Flow
1717
from pipelines.utils.metadata.tasks import update_django_metadata
1818
from pipelines.utils.tasks import (
19-
create_table_and_upload_to_gcs,
20-
download_data_to_gcs,
19+
create_table_dev_and_upload_to_gcs,
20+
create_table_prod_gcs_and_run_dbt,
2121
rename_current_flow_run_dataset_table,
2222
run_dbt,
2323
)
@@ -35,7 +35,7 @@
3535
"table_id",
3636
required=True,
3737
)
38-
target = Parameter("target", default="prod", required=False)
38+
3939
materialize_after_dump = Parameter(
4040
"materialize_after_dump", default=True, required=False
4141
)
@@ -58,34 +58,35 @@
5858
table_id=table_id,
5959
upstream_tasks=[rename_flow_run],
6060
)
61-
wait_upload_table = create_table_and_upload_to_gcs(
61+
wait_upload_table = create_table_dev_and_upload_to_gcs(
6262
data_path=filepath,
6363
dataset_id=dataset_id,
6464
table_id=table_id,
6565
dump_mode="append",
66-
wait=filepath,
6766
upstream_tasks=[filepath],
6867
)
68+
69+
wait_for_materialization = run_dbt(
70+
dataset_id=dataset_id,
71+
table_id=table_id,
72+
dbt_alias=dbt_alias,
73+
dbt_command="run/test",
74+
disable_elementary=False,
75+
upstream_tasks=[wait_upload_table],
76+
)
6977
with case(materialize_after_dump, True):
70-
wait_for_materialization = run_dbt(
71-
dataset_id=dataset_id,
72-
table_id=table_id,
73-
target=target,
74-
dbt_alias=dbt_alias,
75-
dbt_command="run/test",
76-
disable_elementary=False,
77-
upstream_tasks=[wait_upload_table],
78-
)
79-
wait_for_dowload_data_to_gcs = download_data_to_gcs(
78+
wait_upload_prod = create_table_prod_gcs_and_run_dbt(
79+
data_path=filepath,
8080
dataset_id=dataset_id,
8181
table_id=table_id,
82+
dump_mode="append",
8283
upstream_tasks=[wait_for_materialization],
8384
)
8485

8586
get_table_id_in_update_metadata_variable_dictionary = (
8687
update_metadata_variable_dictionary(
8788
table_id=table_id,
88-
upstream_tasks=[wait_for_dowload_data_to_gcs],
89+
upstream_tasks=[wait_upload_prod],
8990
)
9091
)
9192
with case(update_metadata, True):

0 commit comments

Comments
 (0)