Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from pipelines.constants import constants
from pipelines.utils.crawler_anatel.banda_larga_fixa.tasks import (
join_tables_in_function,
get_max_date_in_table_microdados
get_max_date_in_table_microdados,
get_year_and_unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -39,7 +40,7 @@
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

ano = Parameter("ano", default=2024, required=False)
ano = Parameter("ano", default=None, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

Expand All @@ -50,7 +51,14 @@
wait=table_id,
)

update_tables = get_max_date_in_table_microdados(ano=ano)
#####
# Function dynamic parameters
# https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99
#####

new_ano = get_year_and_unzip(day=ano)

update_tables = get_max_date_in_table_microdados(ano=new_ano, table_id=table_id, upstream_tasks=[new_ano])

get_max_date = check_if_data_is_outdated(
dataset_id = dataset_id,
Expand All @@ -59,7 +67,9 @@
date_format = "%Y-%m")

with case(get_max_date, True):
filepath = join_tables_in_function(table_id = table_id, ano=ano, upstream_tasks=[get_max_date])
filepath = join_tables_in_function(
table_id=table_id, ano=new_ano, upstream_tasks=[get_max_date]
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
Expand Down Expand Up @@ -115,4 +125,4 @@
)

flow_anatel_banda_larga_fixa.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
flow_anatel_banda_larga_fixa.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
49 changes: 38 additions & 11 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
treatment_br,
treatment_uf,
treatment_municipio,
unzip_file
unzip_file,
get_year,
)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -42,19 +43,45 @@ def join_tables_in_function(table_id: str, ano):
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int):
unzip_file()
log("Obtendo a data máxima do arquivo microdados da Anatel")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv",
def get_max_date_in_table_microdados(table_id: str, ano: int):
if table_id == "microdados":
log("Obtendo a data máxima do arquivo microdados da Anatel")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Banda_Larga_Fixa_{ano}.csv",
sep=";",
encoding="utf-8",
dtype=str
)
df['data'] = df['Ano'] + '-' + df['Mês']

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())

return df['data'].max()

else:
log(f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv")

df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv",
sep=";",
encoding="utf-8",
dtype=str
)
df['data'] = df['Ano'] + '-' + df['Mês']
)
df['data'] = df['Ano'] + '-' + df['Mês']

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())

return df['data'].max()

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())
@task
def get_year_and_unzip(day):
if day is None:
log("Download dos dados...")
unzip_file()

return df['data'].max()
return get_year()
17 changes: 15 additions & 2 deletions pipelines/utils/crawler_anatel/banda_larga_fixa/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def treatment_uf(table_id:str):
)



def treatment_municipio(table_id:str):
log("Iniciando o tratamento do arquivo densidade municipio da Anatel")
df = pd.read_csv(
Expand All @@ -211,4 +210,18 @@ def treatment_municipio(table_id:str):
df_municipio,
partition_columns=["ano"],
savepath=anatel_constants.TABLES_OUTPUT_PATH.value[table_id],
)
)


def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
parts = x.split("_")
if len(parts) > 4:
x = parts[4]
if len(x) == 4:
lista.append(x)

max_year = max(lista)
log(f"Ano máximo: {max_year}")
return max_year
36 changes: 23 additions & 13 deletions pipelines/utils/crawler_anatel/telefonia_movel/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from pipelines.constants import constants
from pipelines.utils.crawler_anatel.telefonia_movel.tasks import (
join_tables_in_function,
get_max_date_in_table_microdados
get_max_date_in_table_microdados,
get_year_full,
get_semester,
unzip,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
Expand All @@ -23,9 +26,7 @@
rename_current_flow_run_dataset_table,
)

with Flow(
name="BD template - Anatel Telefonia Móvel", code_owners=["trick"]
) as flow_anatel_telefonia_movel:
with Flow(name="BD template - Anatel Telefonia Móvel", code_owners=["trick"]) as flow_anatel_telefonia_movel:
# Parameters
dataset_id = Parameter(
"dataset_id", default="br_anatel_telefonia_movel", required=True
Expand All @@ -42,9 +43,9 @@
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

ano = Parameter("ano", default=2024, required=False)
ano = Parameter("ano", default=None, required=False)

semestre = Parameter("semestre", default=1, required=False)
semestre = Parameter("semestre", default=None, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

Expand All @@ -54,8 +55,17 @@
table_id=table_id,
wait=table_id,
)
#####
# Function dynamic parameters
# https://discourse.prefect.io/t/my-parameter-value-shows-the-same-date-every-day-how-can-i-set-parameter-value-dynamically/99
#####
unzip_task = unzip()
new_year = get_year_full(ano, upstream_tasks=[unzip_task])
new_semester = get_semester(semestre, upstream_tasks=[new_year])

update_tables = get_max_date_in_table_microdados(ano=ano, semestre=semestre)
update_tables = get_max_date_in_table_microdados(
table_id = table_id, ano=new_year, semestre=new_semester, upstream_tasks=[new_year, new_semester]
)

get_max_date = check_if_data_is_outdated(
dataset_id = dataset_id,
Expand All @@ -67,11 +77,11 @@
with case(get_max_date, True):

filepath = join_tables_in_function(
table_id = table_id,
ano=ano,
semestre=semestre,
upstream_tasks=[get_max_date]
)
table_id=table_id,
ano=new_year,
semestre=new_semester,
upstream_tasks=[get_max_date],
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
Expand Down Expand Up @@ -127,4 +137,4 @@
)

flow_anatel_telefonia_movel.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
flow_anatel_telefonia_movel.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
74 changes: 64 additions & 10 deletions pipelines/utils/crawler_anatel/telefonia_movel/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
clean_csv_brasil,
clean_csv_municipio,
clean_csv_uf,
get_year

)
from pipelines.utils.utils import log, to_partitions

Expand Down Expand Up @@ -46,19 +48,71 @@ def join_tables_in_function(table_id, semestre, ano):
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_max_date_in_table_microdados(ano: int, semestre: int):
unzip_file()
log("Obtendo a data máxima da tabela microdados...")
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv",
def get_max_date_in_table_microdados(table_id, ano, semestre):

if table_id == 'microdados':
log("Obtendo a data máxima da tabela microdados...")
log(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv"
)
df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_{semestre}S.csv",
sep=";",
encoding="utf-8",
dtype=str
)
df['data'] = df['Ano'] + '-' + df['Mês']

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())

return df['data'].max()

else:
log(f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv")

df = pd.read_csv(
f"{anatel_constants.INPUT_PATH.value}Densidade_Telefonia_Movel.csv",
sep=";",
encoding="utf-8",
dtype=str
)
df['data'] = df['Ano'] + '-' + df['Mês']
)
df['data'] = df['Ano'] + '-' + df['Mês']

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")

log(df['data'].max())

return df['data'].max()


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def unzip():
return unzip_file()


@task
def get_year_full(year):
log("Obtendo o ano...")
if year is None:

df['data'] = pd.to_datetime(df['data'], format="%Y-%m")
return get_year()

log(df['data'].max())

return df['data'].max()
@task
def get_semester(semester):
log("Obtendo o semestre...")
ano = get_year()
if semester is None:
if os.path.exists(
f"{anatel_constants.INPUT_PATH.value}Acessos_Telefonia_Movel_{ano}_2S.csv"
):
log("Segundo semestre")
return 2
else:
log("Primeiro semestre")
return 1
16 changes: 15 additions & 1 deletion pipelines/utils/crawler_anatel/telefonia_movel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,18 @@ def clean_csv_municipio(table_id):
sep=",",
encoding="utf-8",
na_rep="",
)
)


def get_year():
lista = []
for x in os.listdir(anatel_constants.INPUT_PATH.value):
parts = x.split("_")
if len(parts) > 3:
x = parts[3]
if len(x) == 4:
lista.append(x)

max_year = max(lista)
log(f"Ano máximo: {max_year}")
return max_year
Loading