
Una canalización de datos ETL (Extraer, Transformar y Cargar) es un conjunto de procesos que se utilizan para extraer, transformar y cargar datos de un origen a un destino.
La fuente de los datos puede provenir de una o varias fuentes, como una llamada a una API, archivos CSV, información dentro de una base de datos y muchas más. Tomamos estas fuentes de información y luego las transformamos de manera que pueda ser utilizada inmediatamente por otro cliente, usuario o desarrollador,
En la práctica, una canalización ETL se ejecuta con poca frecuencia. Por lo general, hay grandes cantidades de información que no está filtrada y es difícil de procesar y limpiar, lo que a menudo requiere una gran cantidad de tiempo y recursos para transformar.
¿Por qué necesitamos una canalización ETL?
Imagina que tuviéramos una colección de datos que quisiéramos utilizar. Pero el problema es que estos datos no están limpios, falta información y son inconsistentes. Una solución es tener un programa que limpie y transforme esta información de manera que:
- No haya información faltante
- Los datos son consistentes
- Los datos se cargan rápidamente en otro programa
En la industria actual hay muchos datos sin procesar y sin filtrar con los dispositivos inteligentes, comunidades en línea y el comercio electrónico. Pero la mayor parte se desperdicia porque es demasiado difícil de interpretar debido a lo complicado que es. Las canalizaciones ETL existen para combatir esto mediante la automatización de la recopilación y transformación de datos para que los analistas puedan utilizarlos para obtener información empresarial.
La canalización ETL
Hay muchas herramientas y tecnologías diferentes que podemos utilizar para construir nuestra canalización de datos. Esto depende completamente de cómo se recopilen nuestros datos. En nuestro ejemplo, recopilaremos datos sin procesar de GitHub de la Universidad John Hopkins. Específicamente, veremos los informes diarios. Los datos abarcan desde el 22 de enero de 2020 hasta el 22 de diciembre de 2020 a partir de la redacción de este artículo.
Nuestra pila tecnológica incluirá Python usando las bibliotecas Requests y Pandas. También usaremos SQLite para almacenar esta información.
1. Extraer
Antes de que podamos hacer cualquier tipo de transformación de datos, ¡necesitamos tener datos! Los datos se recopilarán de Github de la Universidad John Hopkins. Estamos especialmente interesados en los informes diarios. Desde la documentación de la API de Github, podemos enviar una solicitud GET a este punto final para recibir una respuesta JSON que nos indique las URL de cada archivo csv para que podamos leerlo directamente.
# CONSTANTES
OWNER = 'CSSEGISandData'
REPO = 'COVID-19'
PATH = 'csse_covid_19_data/csse_covid_19_daily_reports'
URL = f'https://api.github.com/repos/{OWNER}/{REPO}/contents/{PATH}'
- OWNER es propietario del repositorio de GitHub
- REPO es el nombre del repositorio
- PATH es la ruta de la carpeta en la cual se encuentran los archivos CSV
- La URL es el punto final de la API que se usa para enviar la solicitud GET
Usando la libreria Requests de Python interpretaremos la respuesta JSON y examinaremos la información que necesitamos. Para mayor claridad, la siguiente imagen es la respuesta devuelta utilizando un programa llamado Postman. Se devolverá una respuesta similar utilizando la biblioteca Request de Python.

Lo que se devuelve es una lista con la informacion para cada archivo en el repositorio de github. Estamos interesados en "download_url". Usaremos este "download_url" para descargar estos datos directamente usando Pandas y luego transformarlos como mejor nos parezca. El siguiente script de comandos recopila todas las "URL de descarga" en una lista.
download_urls = []
response = requests.get(URL)
for data in tqdm(response.json()):
if data['name'].endswith('.csv'):
download_urls.append(data['download_url'])
2 . Transformar
¡Ahora tenemos una lista de enlaces directos a nuestros archivos CSV! Podemos leer estas URL directamente usando pandas.read_csv(url).
Echando un vistazo a la información, nos interesa mirar valores específicos. Si estos valores no están dentro del DataFrame, reemplazaremos/agregaremos estas columnas con valores vacíos.
Algunas etiquetas son inconsistentes. Definen el mismo tipo de datos, pero reciben un nombre diferente. Por ejemplo, "Last Update" también se denomina "Last_Update". También redefiniremos estas etiquetas para mantener la coherencia. Esta es una canalización ETL muy básica, por lo que solo consideraremos una pequeña cantidad de etiquetas. Estas etiquetas serán
labels = ['Province_State', 'Country_Region', 'Last_Update', 'Confirmed', 'Deaths', 'Recovered']
# Lista de etiquetas a ser renombradas
relabel = {
# 'Last Update': 'Last_Update',
'Country/Region': 'Country_Region',
'Lat': 'Latitude',
'Long_': 'Longitude',
'Province/State': 'Province_State',
}
def factor_dataframe(dat, filename):
""" Refactorizar el dataframe para ser montado a base datos SQL
como un DataFrame de pandas
"""
# renombrar etiquetas
for label in dat:
if label in relabel:
dat = dat.rename(columns = {label: relabel[label]})
# retornar un dataframe con estos parametros
labels = ['Province_State', 'Country_Region', 'Last_Update', 'Confirmed', 'Deaths', 'Recovered']
# nombre del archivo es el datetime
if 'Last_Update' not in dat:
dat['Last_Update'] = pd.to_datetime(filename)
# remplazar columnas vacias con nan
for label in labels:
if label not in dat:
dat[label] = np.nan
return dat[labels]
El código anterior transforma los datos ya recopilados como un DataFrame de Pandas, y hace que el DataFrame sea consistente al agregar los datos y volver a etiquetar columnas. Con nuestros datos limpios, filtrados y procesados, el siguiente paso es cargar los datos en una base de datos.
3. Cargar
Podemos cargar directamente nuestro DataFrame de Pandas en una base de datos SQL usando pandas.DataFrame.to_sql.
Si la tabla dentro de la base de datos ya se ha creado, reemplácela por una nueva. Esta no es la forma adecuada para hacerlo en producción, pero para la sencillez de este ejercicio es suficiente. El siguiente código describe cómo conectarse a una base de datos SQLite usando sqlite3 y cargar los datos a la base de datos usando Pandas.
def upload_to_sql(filenames, db_name, debug=False):
conn = sqlite3.connect(f"{db_name}.db")
if debug:
print("Uploading into database")
for i, file_path in tqdm(list(enumerate(filenames))):
dat = pd.read_csv(file_path)
# renombrar etiquetas
filename = os.path.basename(file_path).split('.')[0]
dat = factor_dataframe(dat, filename)
# escribir datos a sqlite
if i == 0: # si es la primera entrada y el nombre de la tabla ya existe, entonces remplazar
dat.to_sql(db_name, con=conn, index = False, if_exists='replace')
else: # de lo contrario añadir a la tabla existente
dat.to_sql(db_name, con=conn, index = False, if_exists='append')
# cargar datos a sqlite
upload_to_sql(download_urls, 'example', debug=True)
De esta forma ya tenemos un código funcional para realizar una canalización de datos ETL, a continuación se muestra el código completo.
import requests
import pandas as pd
import numpy as np
import os
import sqlite3
from tqdm.auto import tqdm
# CONSTANTES
OWNER = 'CSSEGISandData'
REPO = 'COVID-19'
PATH = 'csse_covid_19_data/csse_covid_19_daily_reports'
URL = f'https://api.github.com/repos/{OWNER}/{REPO}/contents/{PATH}'
print(f'Downloading paths from {URL}')
download_urls = []
response = requests.get(URL)
for data in tqdm(response.json()):
if data['name'].endswith('.csv'):
download_urls.append(data['download_url'])
# Lista de etiquetas a ser renombradas
relabel = {
# 'Last Update': 'Last_Update',
'Country/Region': 'Country_Region',
'Lat': 'Latitude',
'Long_': 'Longitude',
'Province/State': 'Province_State',
}
def factor_dataframe(dat, filename):
""" Refactorizar el dataframe para ser montado a base datos SQL
como un DataFrame de pandas
"""
# renombrar etiquetas
for label in dat:
if label in relabel:
dat = dat.rename(columns = {label: relabel[label]})
# retornar un dataframe con estos parametros
labels = ['Province_State', 'Country_Region', 'Last_Update', 'Confirmed', 'Deaths', 'Recovered']
# nombre del archivo es el datetime
if 'Last_Update' not in dat:
dat['Last_Update'] = pd.to_datetime(filename)
# remplazar columnas vacias con nan
for label in labels:
if label not in dat:
dat[label] = np.nan
return dat[labels]
def upload_to_sql(filenames, db_name, debug=False):
conn = sqlite3.connect(f"{db_name}.db")
if debug:
print("Uploading into database")
for i, file_path in tqdm(list(enumerate(filenames))):
dat = pd.read_csv(file_path)
# renombrar etiquetas
filename = os.path.basename(file_path).split('.')[0]
dat = factor_dataframe(dat, filename)
# escribir datos a sqlite
if i == 0: # si es la primera entrada y el nombre de la tabla ya existe, entonces remplazar
dat.to_sql(db_name, con=conn, index = False, if_exists='replace')
else: # de lo contrario añadir a la tabla existente
dat.to_sql(db_name, con=conn, index = False, if_exists='append')
# cargar datos a sqlite
upload_to_sql(download_urls, 'example', debug=True)
Contenido del articulo
- Comentarios
Comentarios
No hay comentarios. Inicia sesión para comentar.