Commit 2fad05c8 authored by Miguel Guerrero's avatar Miguel Guerrero

refatora e valida funções de normalização e leitura de dados, adiciona verificações de integridade

parent 303316ff
# Esboço validação
library(dplyr)
library(readxl)
library(stringr)
library(purrr)
library(arrow)
# carrega caminhos para dados
filtra_mudancas <- function(x, padrao) {
x |>
select(matches(padrao)) |>
filter(
if_any(-1, ~ !is.na(.))
) |>
mutate(!!padrao := do.call(coalesce, rev(as.list(across(-1))))) |>
select(
ncm_original = 1,
last_col()
) |>
distinct()
}
normaliza_ncm <- function(x, validar = FALSE, contexto = "NCM") {
x <- as.character(x)
x <- str_trim(x)
x <- str_remove_all(x, "\\.")
if (validar) {
invalidos <- x[!is.na(x) & !str_detect(x, "^\\d{8}$")]
invalidos <- unique(invalidos)
if (length(invalidos) > 0) {
stop(
sprintf(
"%s contem %s NCM(s) com formato invalido. Amostra: %s",
contexto,
length(invalidos),
paste(head(sort(invalidos), 5), collapse = ", ")
),
call. = FALSE
)
}
}
x
}
general_dir <- Sys.getenv("general")
if (!nzchar(general_dir)) {
stop("Variavel de ambiente general vazia.", call. = FALSE)
}
if (!dir.exists(general_dir)) {
stop(
sprintf("Diretorio general nao existe: %s", general_dir),
call. = FALSE
)
}
path_tec <- Sys.getenv("tec")
if (!nzchar(path_tec)) {
stop("Variavel de ambiente tec vazia.", call. = FALSE)
}
if (!file.exists(path_tec)) {
stop(
sprintf("Arquivo TEC nao encontrado: %s", path_tec),
call. = FALSE
)
}
# caminho correlacao normal
path_ncm <- file.path(
Sys.getenv("general"),
general_dir,
"Bases",
"correlacoes",
"mercosul",
......@@ -18,11 +76,8 @@ path_ncm <- file.path(
"correlacao_completa_ncm.xlsx"
)
correlacao_completa_ncm_mercosul <- read_xlsx(path_ncm, guess_max = 1e6) # agora
# caminho correlacao expandida
path_ncm_detalhada <- file.path(
Sys.getenv("general"),
general_dir,
"Bases",
"correlacoes",
"mercosul",
......@@ -30,137 +85,473 @@ path_ncm_detalhada <- file.path(
"correlacao_completa_ncm_detalhada.xlsx"
)
# Leitura de correlacao expandida
correlacao_ncm_detalhada <- read_xlsx(path_ncm_detalhada, guess_max = 1e6)
path_ncms_originais_snapshot <- "dados_gerados/ncms_originais.rds"
path_dataset_ncm_snapshot <- "dados_gerados/dataset_ncm.parquet"
# funcao para obter linhas com mudancas de codigo em cada revisao NCM
filtra_mudancas <- function(x, padrao) {
x |>
select(matches(padrao)) |>
filter(
# seleciona as linhas que tem algum valor diferente de NA com exceção
# de dados na primeira coluna
if_any(-1, ~ !is.na(.))
) |>
# cria uma coluna preenchendo-a com a primeira informação não NA da
# direita para a esquerda
mutate(!!padrao := do.call(coalesce, rev(as.list(across(-1))))) |>
select(
ncm_original = 1,
last_col()
) |>
distinct()
arquivos_necessarios <- c(
correlacao_ncm = path_ncm,
correlacao_ncm_detalhada = path_ncm_detalhada,
tec = path_tec,
ncms_originais_snapshot = path_ncms_originais_snapshot,
dataset_ncm_snapshot = path_dataset_ncm_snapshot
)
arquivos_faltantes <- arquivos_necessarios[!file.exists(arquivos_necessarios)]
if (length(arquivos_faltantes) > 0) {
stop(
sprintf(
"Arquivos obrigatorios ausentes: %s",
paste(
sprintf("%s=%s", names(arquivos_faltantes), arquivos_faltantes),
collapse = "; "
)
),
call. = FALSE
)
}
# cria lista de nomes para usar como input em filtra_mudancas
nomes_para_funcao <- names(correlacao_ncm_detalhada) |>
message("Ambiente ok.")
message("Arquivos obrigatorios encontrados.")
correlacao_ncm_detalhada <- read_xlsx(path_ncm_detalhada, guess_max = 1e6)
correlacao_completa_ncm_raw <- read_xlsx(path_ncm, guess_max = 1e6)
tec <- read_xlsx(path_tec, guess_max = 1e6)
ncms_originais_snapshot <- readRDS(path_ncms_originais_snapshot)
dataset_ncm_snapshot <- read_parquet(path_dataset_ncm_snapshot)
message("Dados carregados.")
revisoes_ncm <- names(correlacao_ncm_detalhada) |>
str_extract("NCM_\\d{4}") |>
discard(is.na) |>
unique()
# obtem lista com as ncms originais das revisoes
ncms_originais <- nomes_para_funcao |>
if (length(revisoes_ncm) == 0) {
stop(
"Nenhuma coluna NCM_#### encontrada na correlacao detalhada.",
call. = FALSE
)
}
anos_revisoes <- as.integer(str_extract(revisoes_ncm, "\\d{4}"))
indice_ultima_revisao <- which.max(anos_revisoes)
coluna_ultima_revisao <- revisoes_ncm[[indice_ultima_revisao]]
nome_ultima_revisao <- str_replace(coluna_ultima_revisao, "_", " ")
ncms_originais <- revisoes_ncm |>
map(~ filtra_mudancas(correlacao_ncm_detalhada, padrao = .x))
names(ncms_originais) <- nomes_para_funcao |>
str_replace("_", " ")
names(ncms_originais) <- str_replace(revisoes_ncm, "_", " ")
if (!nome_ultima_revisao %in% names(ncms_originais)) {
stop(
sprintf(
"Revisao mais recente %s nao foi gerada na correlacao detalhada.",
nome_ultima_revisao
),
call. = FALSE
)
}
if (!is.list(ncms_originais_snapshot) || is.null(names(ncms_originais_snapshot))) {
stop(
"Snapshot anterior de ncms_originais invalido.",
call. = FALSE
)
}
if (indice_ultima_revisao > length(ncms_originais_snapshot)) {
stop(
sprintf(
"Snapshot anterior nao possui o indice %s para %s.",
indice_ultima_revisao,
nome_ultima_revisao
),
call. = FALSE
)
}
# tamanho da lista de ncms originais
tamanho_lista <- length(ncms_originais)
if (!nome_ultima_revisao %in% names(ncms_originais_snapshot)) {
stop(
sprintf(
"Snapshot anterior nao possui a revisao %s.",
nome_ultima_revisao
),
call. = FALSE
)
}
# ultimo elemento da lista de ncms originais
ultima_revisao <- ncms_originais[[tamanho_lista]]
if (!identical(names(ncms_originais_snapshot)[[indice_ultima_revisao]], nome_ultima_revisao)) {
stop(
sprintf(
"Snapshot anterior tem %s no indice %s; esperado %s.",
names(ncms_originais_snapshot)[[indice_ultima_revisao]],
indice_ultima_revisao,
nome_ultima_revisao
),
call. = FALSE
)
}
# ultimo arquivo do projeto ncms_originais
dados_gerados_ncms_originais <- readRDS("dados_gerados/ncms_originais.rds")
ultima_revisao_atual <- ncms_originais[[nome_ultima_revisao]]
ultima_revisao_anterior <- ncms_originais_snapshot[[nome_ultima_revisao]]
if (!all(c("ncm_original", coluna_ultima_revisao) %in% names(ultima_revisao_atual))) {
stop(
sprintf(
"Revisao atual %s nao contem as colunas esperadas.",
nome_ultima_revisao
),
call. = FALSE
)
}
if (!all(c("ncm_original", coluna_ultima_revisao) %in% names(ultima_revisao_anterior))) {
stop(
sprintf(
"Snapshot anterior de %s nao contem as colunas esperadas.",
nome_ultima_revisao
),
call. = FALSE
)
}
dados_gerados_ncms_originais <- dados_gerados_ncms_originais[[tamanho_lista]]
ultima_revisao_atual <- ultima_revisao_atual |>
mutate(
ncm_original = normaliza_ncm(
ncm_original,
validar = TRUE,
contexto = sprintf("%s ncm_original atual", nome_ultima_revisao)
),
!!coluna_ultima_revisao := normaliza_ncm(
.data[[coluna_ultima_revisao]],
validar = TRUE,
contexto = sprintf("%s atual", nome_ultima_revisao)
)
)
ultima_revisao_anterior <- ultima_revisao_anterior |>
mutate(
ncm_original = normaliza_ncm(
ncm_original,
validar = TRUE,
contexto = sprintf("%s ncm_original anterior", nome_ultima_revisao)
),
!!coluna_ultima_revisao := normaliza_ncm(
.data[[coluna_ultima_revisao]],
validar = TRUE,
contexto = sprintf("%s anterior", nome_ultima_revisao)
)
)
message(sprintf("Revisao mais recente identificada: %s.", nome_ultima_revisao))
# compara coluna ncm original de agora com anterior
# indica que codigos que não estavam na revisão original apareceram agora. Isso é
# importante pois alguns códigos podem ter sido alterados agora ou podem ter sido esquecidos
# e não inseridos anteriorermente
setdiff(
ultima_revisao$ncm_original,
dados_gerados_ncms_originais$ncm_original
# importante pois alguns códigos podem ter sido alterados agora ou podem ter sido esquecidos e não inseridos anteriorermente
ncm_original_so_atual <- setdiff(
unique(ultima_revisao_atual$ncm_original),
unique(ultima_revisao_anterior$ncm_original)
)
if (length(ncm_original_so_atual) > 0) {
stop(
sprintf(
"Diferenca em ncm_original para %s: %s codigo(s) so no atual. Amostra: %s",
nome_ultima_revisao,
length(ncm_original_so_atual),
paste(head(sort(ncm_original_so_atual), 5), collapse = ", ")
),
call. = FALSE
)
}
# obter nome da segunda coluna da ultima revisao
nome_segunda_coluna <- ultima_revisao |>
select(-ncm_original) |>
names()
ncm_original_so_anterior <- setdiff(
unique(ultima_revisao_anterior$ncm_original),
unique(ultima_revisao_atual$ncm_original)
)
if (length(ncm_original_so_anterior) > 0) {
warning(
sprintf(
"Diferenca em ncm_original para %s: %s codigo(s) so no anterior. Amostra: %s",
nome_ultima_revisao,
length(ncm_original_so_anterior),
paste(head(sort(ncm_original_so_anterior), 5), collapse = ", ")
),
call. = FALSE
)
}
# compara coluna de mudanças dentro da revisão. Indica códigos que
# apareceram agora em relação ao ultimo arquivo. Aqui vale a pena conferir se
# todos os códigos que aparececem foram de fato criados. Vale a pena conferir se todos os
# códigos criados (olhar nas resoluções) aparecem dentros os códigos seguintes.
setdiff(
ultima_revisao[[nome_segunda_coluna]],
dados_gerados_ncms_originais[[nome_segunda_coluna]]
codigos_revisao_so_atual <- setdiff(
unique(ultima_revisao_atual[[coluna_ultima_revisao]]),
unique(ultima_revisao_anterior[[coluna_ultima_revisao]])
)
if (length(codigos_revisao_so_atual) > 0) {
stop(
sprintf(
"Diferenca em %s: %s codigo(s) so no atual. Amostra: %s",
nome_ultima_revisao,
length(codigos_revisao_so_atual),
paste(head(sort(codigos_revisao_so_atual), 5), collapse = ", ")
),
call. = FALSE
)
}
codigos_revisao_so_anterior <- setdiff(
unique(ultima_revisao_anterior[[coluna_ultima_revisao]]),
unique(ultima_revisao_atual[[coluna_ultima_revisao]])
)
if (length(codigos_revisao_so_anterior) > 0) {
stop(
sprintf(
"Diferenca em %s: %s codigo(s) so no anterior. Amostra: %s",
nome_ultima_revisao,
length(codigos_revisao_so_anterior),
paste(head(sort(codigos_revisao_so_anterior), 5), collapse = ", ")
),
call. = FALSE
)
}
message(sprintf("Comparacao com snapshot de %s ok.", nome_ultima_revisao))
correlacao_completa_ncm <- correlacao_completa_ncm_raw |>
rename_with(~ str_to_upper(.x))
linhas_duplicadas <- nrow(correlacao_completa_ncm) -
nrow(distinct(correlacao_completa_ncm))
if (linhas_duplicadas > 0) {
stop(
sprintf(
"Dataset NCM atual contem %s linha(s) duplicada(s).",
linhas_duplicadas
),
call. = FALSE
)
}
correlacao_completa_ncm <- correlacao_completa_ncm |>
distinct()
colunas_ncm_dataset <- names(correlacao_completa_ncm) |>
str_subset("^NCM_\\d{4}$")
if (length(colunas_ncm_dataset) == 0) {
stop(
"Dataset NCM atual nao contem colunas NCM_####.",
call. = FALSE
)
}
if (!coluna_ultima_revisao %in% colunas_ncm_dataset) {
stop(
sprintf(
"Dataset NCM atual nao contem a coluna %s.",
coluna_ultima_revisao
),
call. = FALSE
)
}
colunas_com_na <- colunas_ncm_dataset[
colSums(is.na(correlacao_completa_ncm[colunas_ncm_dataset])) > 0
]
if (length(colunas_com_na) > 0) {
linhas_com_na <- sum(rowSums(is.na(correlacao_completa_ncm[colunas_ncm_dataset])) > 0)
stop(
sprintf(
"Dataset NCM atual contem NA em %s linha(s). Colunas: %s",
linhas_com_na,
paste(head(colunas_com_na, 5), collapse = ", ")
),
call. = FALSE
)
}
# carrega dados da tec
tec <- read_xlsx(Sys.getenv("tec"), guess_max = 1e6)
correlacao_ncm_atual <- correlacao_completa_ncm |>
rename(ncm = all_of(coluna_ultima_revisao)) |>
mutate(
ncm = normaliza_ncm(
ncm,
validar = TRUE,
contexto = sprintf("Dataset NCM atual %s", nome_ultima_revisao)
)
)
# observa mudanças nos códigos ncm de um arquivo anterior
# e verifica se tem alguma ncm que não está mais na tec vigente.
# Se um código aparecer esse código deve ter sido alterado. Conferir.
dados_gerados_ncms_originais |>
rename(ncm = all_of(nome_segunda_coluna)) |>
mutate(ncm = str_remove_all(ncm, "\\.")) |>
left_join(tec, by = "ncm") |>
message("Dataset NCM atual ok.")
if (!"ncm" %in% names(tec)) {
stop("TEC nao contem a coluna ncm.", call. = FALSE)
}
if (!"descricao_tec" %in% names(tec)) {
stop("TEC nao contem a coluna descricao_tec.", call. = FALSE)
}
if (anyNA(tec$ncm)) {
stop("TEC contem NA na coluna ncm.", call. = FALSE)
}
tec <- tec |>
mutate(
ncm = normaliza_ncm(
ncm,
validar = TRUE,
contexto = "TEC"
)
)
duplicados_tec <- tec$ncm[duplicated(tec$ncm)] |>
unique()
if (length(duplicados_tec) > 0) {
stop(
sprintf(
"TEC contem %s NCM(s) duplicado(s). Amostra: %s",
length(duplicados_tec),
paste(head(sort(duplicados_tec), 5), collapse = ", ")
),
call. = FALSE
)
}
faltantes_snapshot_tec <- ultima_revisao_anterior |>
transmute(
ncm = .data[[coluna_ultima_revisao]]
) |>
distinct() |>
left_join(
tec |>
select(ncm, descricao_tec),
by = "ncm"
) |>
filter(is.na(descricao_tec)) |>
pull(ncm)
#####################################
# compara codigos ncm da tabela de correlacao ncm de agora
# e verifica se todos estão presentes na tec vigente.
if (length(faltantes_snapshot_tec) > 0) {
stop(
sprintf(
"TEC vigente nao cobre %s NCM(s) do snapshot anterior em %s. Amostra: %s",
length(faltantes_snapshot_tec),
nome_ultima_revisao,
paste(head(sort(faltantes_snapshot_tec), 5), collapse = ", ")
),
call. = FALSE
)
}
correlacao_completa <- correlacao_completa_ncm_mercosul |>
faltantes_atual_tec <- correlacao_ncm_atual |>
transmute(ncm) |>
distinct() |>
rename_with(~ str_to_upper(.x)) |>
tidyr::drop_na()
left_join(
tec |>
select(ncm, descricao_tec),
by = "ncm"
) |>
filter(is.na(descricao_tec)) |>
pull(ncm)
correlacao_ncm <- correlacao_completa |>
rename(ncm = all_of(nome_segunda_coluna)) |>
mutate(ncm = str_remove_all(ncm, "\\."))
if (length(faltantes_atual_tec) > 0) {
stop(
sprintf(
"TEC vigente nao cobre %s NCM(s) do dataset atual em %s. Amostra: %s",
length(faltantes_atual_tec),
nome_ultima_revisao,
paste(head(sort(faltantes_atual_tec), 5), collapse = ", ")
),
call. = FALSE
)
}
# contagem de códigos únicos no arquivo com a última revisão
correlacao_ncm |>
select(ncm) |>
distinct()
message("Validacao contra TEC ok.")
# contagem linhas tec
nrow(tec)
if (!coluna_ultima_revisao %in% names(dataset_ncm_snapshot)) {
stop(
sprintf(
"Snapshot dataset_ncm.parquet nao contem a coluna %s.",
coluna_ultima_revisao
),
call. = FALSE
)
}
# verifica se numero de linhas é igual ao número de linhas da tec
tec |>
filter(ncm %in% correlacao_ncm[["ncm"]] |> unique()) |>
nrow()
correlacao_ncm_anterior <- dataset_ncm_snapshot |>
rename(ncm = all_of(coluna_ultima_revisao)) |>
mutate(
ncm = normaliza_ncm(
ncm,
validar = TRUE,
contexto = sprintf("Snapshot dataset_ncm %s", nome_ultima_revisao)
)
)
codigos_entraram_dataset <- setdiff(
unique(correlacao_ncm_atual$ncm),
unique(correlacao_ncm_anterior$ncm)
)
codigos_sairam_dataset <- setdiff(
unique(correlacao_ncm_anterior$ncm),
unique(correlacao_ncm_atual$ncm)
)
# compara codigos ncm da tabela correlação ncm antiga e
# verifica se todos estão presentes na tec vigente.
correlacao_ncm_antes <- read_parquet("dados_gerados/dataset_ncm.parquet") # antes
codigos_entraram_revisao <- setdiff(
unique(ultima_revisao_atual[[coluna_ultima_revisao]]),
unique(ultima_revisao_anterior[[coluna_ultima_revisao]])
)
correlacao_ncm_antes <- correlacao_ncm_antes |>
rename(ncm = all_of(nome_segunda_coluna)) |>
mutate(ncm = str_remove_all(ncm, "\\."))
codigos_sairam_revisao <- setdiff(
unique(ultima_revisao_anterior[[coluna_ultima_revisao]]),
unique(ultima_revisao_atual[[coluna_ultima_revisao]])
)
# um número menor de linhas indica que alguns códigos saíram da tabela de correlação e não estão mais presentes na tec vigente.
tec |>
filter(ncm %in% unique(correlacao_ncm_antes$ncm)) |>
nrow()
entradas_so_dataset <- setdiff(codigos_entraram_dataset, codigos_entraram_revisao)
entradas_so_revisao <- setdiff(codigos_entraram_revisao, codigos_entraram_dataset)
if (length(entradas_so_dataset) > 0 || length(entradas_so_revisao) > 0) {
stop(
sprintf(
paste(
"Entradas de codigos divergem em %s.",
"So no dataset: %s codigo(s). Amostra: %s.",
"So na revisao: %s codigo(s). Amostra: %s"
),
nome_ultima_revisao,
length(entradas_so_dataset),
paste(head(sort(entradas_so_dataset), 5), collapse = ", "),
length(entradas_so_revisao),
paste(head(sort(entradas_so_revisao), 5), collapse = ", ")
),
call. = FALSE
)
}
# verifica codigos que entraram na tabela correlacao ncm
setdiff(correlacao_ncm$ncm, correlacao_ncm_antes$ncm)
# compara com codigos ncms originais antes e depois. Os códigos da linha acima devem
# bater com a linha abaixo (sem pontos)
setdiff(
ultima_revisao[[nome_segunda_coluna]],
dados_gerados_ncms_originais[[nome_segunda_coluna]]
)
saidas_so_dataset <- setdiff(codigos_sairam_dataset, codigos_sairam_revisao)
saidas_so_revisao <- setdiff(codigos_sairam_revisao, codigos_sairam_dataset)
if (length(saidas_so_dataset) > 0 || length(saidas_so_revisao) > 0) {
stop(
sprintf(
paste(
"Saidas de codigos divergem em %s.",
"So no dataset: %s codigo(s). Amostra: %s.",
"So na revisao: %s codigo(s). Amostra: %s"
),
nome_ultima_revisao,
length(saidas_so_dataset),
paste(head(sort(saidas_so_dataset), 5), collapse = ", "),
length(saidas_so_revisao),
paste(head(sort(saidas_so_revisao), 5), collapse = ", ")
),
call. = FALSE
)
}
# codigos que saíram da tabela de correlacao
setdiff(correlacao_ncm_antes$ncm, correlacao_ncm$ncm)
message("Comparacao com dataset_ncm anterior ok.")
message("Validacao concluida.")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment