- La idea es aventar TODOS, TODOS, TODOS nuestros datos ahí.
- Obviamente de manera ordenada y catalogada para saber día de la carga, tamaño de la carga, fuente, etc.
- Esto se llama metadata (i.e. data about your data), y es crítico para limpieza de datos y su catalogación en el lake.
- Cuando necesitamos hacer un análisis, la metadata nos dice qué tenemos sin tener que voltear a ver los archivotes (i.e. ver el catálogo en lugar del producto).
- El datalake es un sistema de archivos distribuído y replicado. Usualmente en una variante de HDFS (Hadoop File System)
- Se carga un archivo
- Se parte en bloques de 64MB
- Se hacen 3 copias de cada bloque
- Se reparten en un cluster de máquinas (mínimo óptimo de 4: 3 workers + 1 master o main)
- Las máqunas no son hardware especializado, y se pueden armar con hardware commodity, es decir, normalito, incluso con partes de compus "de yonke"
- El 1er datalake de Grupo Expansión y de TERAN/TBWA fue armado con sparte parts del almacén.
- De ese modo, cuando se cae 1 máquina worker, todos los archivos siguen disponibles
- BA.
- RA.
- TÍ.
- SI.
- MO!!!!!!!
- Podemos incorporar máquinas de diferentes capacidades al cluster.
- Y de esta manera podemos crecer storage y poder de cómputo por separado (aunque no tanto, aún debemos agregar una máquina completa).
- Mencioné que es baratísimo?🤣
- Mucha chamba para configurar el cluster más mínimo
- Cómputo y storage no están suficientemente separados porque aún tenemos que agregar máquinas completas al cluster.
- Complejidad administrativa el crecer el cluster y agregar máquinas completas
- Mantenimiento y troubleshooting pesadísimo
- Todos los datos tienen la misma prioridad e importancia
AWS Lake Formation. De los productos más chonchos de AWS.
Qué nos da AWS Lake Formation?
- Secciones dedicadas para limpieza y procesamiento de datos de menor refinamiento a mayor refinamiento.
- zona
bronze
para raw data, osea, la que se ingiere desde nuestras fuentes transaccionales - zona
silver
para data procesada y limpiada - zona
gold
para agregados y sumarizados para presentar directo a herramientas de BI
- zona
- Secciones dedicadas para diferentes frecuencias de uso de datos, de mayor frecuencia hasta el "glaciar", donde están los datos de menor uso. La morgue, pues☠️
- Separación total de cómputo y storage: podemos aumentar disco, o aumentar procesamiento totalmente por separado.
- Servicio administrado: 0 configuración de archivos, 0 mantenimiento.
- Aún hay que hacer algunas maromas con redes en AWS, pero nada que ver con la config de bajo nivel que se requiere cuando hacemos esto on premises, osea, local, no en la nube.
- Un PostgreSQL en un EC2
- Una tabla de juguete
- Una función Lambda para insertar datos random en la tabla de juguete
- Un evento de EventBridge para realizar una inserción cada 2 mins
- Todo lo anterior es para simular una BD transaccionar con operatividad e I/O de numerosas veces al día
- Crear un bucket de S3
- Crear un Data Lake con AWS Lake Formation
- Asociar bucket de S3 con Data Lake
- Usar un Blueprint de AWS Glue (el servicio de ETLs de AWS) para "ingerir" data de forma incremental (copiar datos nuevos cada X tiempo desde la tabla transaccional de PostgreSQL)
- El mismo Blueprint va a crear una base de datos de "catálogos" con metadata de nuestra tabla origen
- Copiar la data incremental al bucket de S3 en la zona/directorio
bronze
- Usar un 2o job de AWS Glue para leer los datos del bucket de S3, limpiarlos y guardarlos en la zona
silver
- Usar el mismo job de AWS Glue para leer los datos de la zona/directorio
silver
, agregarlos, formar cubos de info, y guardarlos en la zona/directoriogold
- Visualizar la data en AWS Quicksight, la herramienta de visualización de AWS
Pasos para crear el Lake:
Como este producto no lo tenemos con nuestro usr de AWS Academy, vamos a tener que crear un nuevo usr con su cuenta de correo.
Este usuario va a ser el root, pero esto no es suficiente. El usr root no puede ser el administrador del data lake. Tenemos que crear otro usuario, al cual llamaremos datalakeadmin
y le asignaremos rol de administrador.**
Vamos a hacer lo mismo con los siguientes permisos:
- AWSGlueConsoleFullAccess
- CloudWatchLogsReadOnlyAccess
- AmazonAthenaFullAccess
Y vamos a agregar 2 in-line policies:
Y pegar los siguientes policies, por separado, sin olvidar de reemplazar <account-id>
por el número que sacamos en el paso 0.4 más abajo.
- In-line policy
LakeFormationSLR
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:CreateServiceLinkedRole",
"Resource": "*",
"Condition": {
"StringEquals": {
"iam:AWSServiceName": "lakeformation.amazonaws.com"
}
}
},
{
"Effect": "Allow",
"Action": [
"iam:PutRolePolicy"
],
"Resource": "arn:aws:iam::<account-id>:role/aws-service-role/lakeformation.amazonaws.com/AWSServiceRoleForLakeFormationDataAccess"
}
]
}
- In-line policy
UserPassRole
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PassRolePermissions",
"Effect": "Allow",
"Action": [
"iam:PassRole"
],
"Resource": [
"arn:aws:iam::<account-id>:role/LakeFormationWorkflowRole"
]
}
]
}
Y asignar al usuario que acabamos de crear como administrador
Vamos a hacer logout y vamos a volver a entrar a la consola de AWS con este usuario recién creado.
Para volver a hacer login con este usuario no root y que está asociado a nuestra cuenta, debemos de fijarnos bien en nuestro account id. Lo podemos ver acá:
Una vez que lo tengamos guardado, entonces podemos hacer logout y entrar con la nueva cuenta:
Aquí es donde vamos a poner nuestro account id:
Posiblemente les pida cambiar el password. Cámbienlo.
Lake Formation va a preguntarles que asignen un administrador y si quieren ser uds (osea, su usuario). Obvio acepten los defaults.
Ya estamos adentro de la consola de Lake Formation. Ahora debemos configurar más perimsos para nuestro usuario nuevo:
Ahora vamos a crear un bucket de S3 que servirá como data lake.
S3 (Simple Storage Service) es el servicio de almacenamiento genérico de AWS. Podemos meter lo que sea ahí.
La unidad mínima de S3 es el bucket.
Tenemos que crear 1 bucket con 3 áreas (como directorios):
- uno para bronze (la data de nuestras fuentes JUSTO como es ingestada)
- uno para silver (la data refinada, con registros individuales, pero unificada)
- uno para golden (la data agregada, como "cubos de información")
De *TODO AWS.
En cuanto a la región, puede ser donde uds quieran.
Lo mismo para silver
y gold
.
Dentro de bronze
, crear directorio ingest
.
Dentro de silver
, crear directorio output
.
Con esto terminamos hasta este momento con S3. Vamos ahora a simular una BD transaccional.
Para simular estos movimientos transaccionales, vamos a crear una tabla y disparar un evento cada X tiempo para que se inserte 1 nuevo registro en ella.
Vamos a configurar los siguientes componentes en AWS:
- Una instancia de EC2 con una elastic ip
- Un PostgreSQL dentro de esa instancia de EC2, de acuerdo a esta guía
- Si estás instalando PostgreSQL 14, el setting en el archivo
pg_hba.conf
debe serhost all all trust
en lugar dehost all all md5
.
- Si estás instalando PostgreSQL 14, el setting en el archivo
- Crear una tabla de juguete que representará nuestros sistemas transaccionales
- Crear una lambda function que insertará en dicha tabla de juguete
- Configurar un evento en AWS EventBridge para que se dispare cada 2 min y llame a la lambda function de arriba
CREATE TABLE random_data (
id serial4 NOT NULL,
valor text NULL,
fecha timestamp NULL,
CONSTRAINT random_data_pkey PRIMARY KEY (id)
);
Pueden descargar el código fuente de la lambda de mi repo: https://github.com/xuxoramos/lambda-transactionaldb-insert
Al descargar este repo, deben:
- modificar el archivo
/db.ini
y capturar sus credenciales de su instalación de PostgreSQL - zipear el contenido del repo
- subirlo a la creación de la función lambda como se muestra acá abajo
Chequen su tabla, debe haber un registro cada 2 mins:
Antes de regresar a Lake Formation, debemos crear un endpoint de nuestra VPC de AWS.
Cada vez que nosotros creamos recursos en AWS, se crea una Virtual Private Cloud, que es una estructura de networking dentr de la cual cae todo lo que creamos.
Sin embargo, hay recursos de AWS, sobre todo los servicios administrados (i.e. los que son plataformas más que infraestructura, como RDS en lugar de EC2 + PostgreSQL o DocumentDB en lugar de EC2 + MongoDB) que no tienen VPC.
AWS Lake Formation es un servicio adminsitrado, y va a ingerir datos desde un PostgreSQL en una EC2. EC2 está en una VPC, por lo que para tener acceso a ella, debemos de crear un VPC Endpoint para que Lake Formation pueda a través de él llegar a nuestro PostgreSQL en la EC2.
Y listo!
En AWS, como en todas las nubes, todo, absolutamente todo se corre con un usuario o rol asignado.
En AWS hay Service Roles que son un conjunto de permisos con los que se ejeuta un servicio.
LakeFormationWorkflowRole
es el rol con el que se van a corren los procesos de Glue, que es la herramienta de ETLs de AWS y con la cual vamos a "ingerir" datos desde nuestro PostgreSQL en EC2 para copiarlos a nuestro Data Lake.
Necesitamos darle permiso a este rol nuevo de que acceda sin restricciones a S3, y que haga las veces de administrador.
Para hacer esto necesitamos crear un rol:
Finalmente, agregamos el siguiente inline policy:
Y pegar lo siguiente.
<account_id>
por el account id que obtuvieron en el paso 0.4
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lakeformation:GetDataAccess",
"lakeformation:GrantPermissions"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": ["iam:PassRole"],
"Resource": [
"arn:aws:iam::<account-id>:role/LakeFormationWorkflowRole"
]
}
]
}
Listo, ahora si regresamos a LakeFormation!
us-east-2
, u "Ohio".
Tenemos ahora una BD vacía.
Con qué la vamos a llenar?
Vamos a dejar que Lake Formation vaya a nuestra BD transaccional de juguete en PostgreSQL, la examine, saque sus datos y sus metadatos, y los meta al data lake:
Recordemos que AWS Glue es la herramienta de AWS para hacer ETLs, es decir, los procesos que pasan datos de un lugar a otro.
En la siguiente pantalla vamos a capturar la info de nuestra BD en PostgreSQL.
La VPC y la subnet las obtenemos de la máquina EC2 donde vive nuestro PostgreSQL.
Esto se debe a que la Elastic IP es visible hacia el mundo exterior, PERO el AWS Glue y demás servicios ESTÁN EN EL INTERIOR de la red de AWS! Por lo que Glue NUNCA va a alcanzar esa IP pública, por lo que tiene que utilizar la interna!
Y de dónde sacamos la IP interna? De la consola de EC2.
Después de dar click en el botón de Finish, podemos probar la conexión:
Va la explicación:
- Vamos a elegir "Incremental Database" porque, dado que es una "tabla viva" (aunque con datos de juguete), necesitaremos que los datos se copien periódicamente para "mantenerlos frescos".
- La fuente de importación va a ser la conexión que acabamos de crear en la consola de Glue
- La ruta va a ser
database/schema/table
, en nuestro casotransactionaldb/public/random_data
Continúa la explicación:
- En la sección de Incremental Data debemos poner los detalles de la tabla que le ayudará a Glue a saber qué renglones ya se copiaron y qué renglones aún no.
- Table name es la tabla que contiene el campo que nos va a ayudar a distinguir un renglón de otro
- Bookmark Keys es el campo de esa tabla que nos va a decir si un registro ya fue copiado o no.
- En la sección de "Import Target", en la parte de Target Database vamos a asignar la BD dentro del lake que creamos en el paso 4.2
- El _storage location se va a llenar solo.
- Data format tenemos la opción de CSV o Parquet. Parquet es la mejor opción para fuentes de datos con millones de registros. Es un formato columnar de archivo similar a como los crea MonetDB.
Finalmente, con estos datos terminamos de definir nuestro workflow
- Podríamos definir una CRON Expression al estilo Unix, pero para este ejemplo lo ejecutaremos On Demand
- Nombramos el workflow como deseemos
- Las tablas de catálogo se les agrega el prefijo
incremental
Ya que está creado el workflow, AWS nos preguntará si queremos arrancarlo. Digámos que si.
Cómo podemos ver si está corriendo o qué caramba está haciendo?
Debemos ir a AWS Glue y examinar el "grafo de ejecución" del ETL que acabamos de hacer:
Esperamos unos minutos a que se termine de ejecutar...en este caso, 10 mins es suficiente...
🥳
El error más común es el definir la frecuencia de ejecución de los workflows de tal forma que queden muy cerca una de la siguiente.
Por ejemplo, si definiéramos esta ejecución cada 30 mins tendríamos lo siguiente:
Dejamos corriendo el workflow y nos encontramos con esto:
Vamos a inspeccionar las ejecuciones del workflow:
Examinemos la última ejecución:
Vemos que la fase de crawling tuvo un error.
La fase de crawling es en donde Lake Formation inspecciona nuestro PostgreSQL, la tabla, los registros, y las estructuras para en automático generar los catálogos de metadatos y preparar la extracción de datos de SQL a archivos Parquet como lo especificamos en el paso 5.2.
Si hacemos click en esa fase del grafo de ejecución, podemos ver el error:
Parece que 2 ejecuciones se comenzaron a "pisar las agujetas".
Esto sucede cuando el espacio entre ejecuciones del workflow no es suficiente para dejar terminar a uno cuando ya está comenzando otro.
Pero esto significa que la ejecución anterior debió haber terminado, no? Veamos.
Entonces seguramente de todas nuestras ejecuciones, tenemos una que si terminó, y otra que no, y así sucesivamente. Esto se debe a que no dejamos tiempo suficiente entre ejecuciones.
Cómo lo corregimos? Vamos a tener que eliminar completamente el blueprint y crear otro con la frecuencia adecuada. Esto es lo más certero que intentar modificar el parámetro en Glue.
Esto se logra repitiendo desde el paso 5.2.
Ya que el blueprint tuvo una ejecución exitosa, vamos a ver el resultado en la zona bronze
de nuestro data lake en S3:
Vemos aquí 2 tablas creadas por Lake Formation:
- una con Classification en
postgresql
, que no es mas que la descripción de nuestra tabla dentro de Lake Formation, como lo podemos ver cuando le damos click:
- otra con Classification en
s3://lakeformation-nosql4ds/bronze/ingest/incremental_transactionaldb_public_random_data/
, que es la materialización de la tabla de PostgreSQL en archivos Parquet.
Demos click en la liga Location para ir a S3 :
Como estamos ejecutando un workflow de un blueprint incremental, esto significa que la 1a ejecución del workflow nos traería toda la BD hasta ese punto, y ejecuciones subsecuentes nos traerían al datalake solamente los incrementos o deltas, es decir, los registros creados o presentes desde la última ejecución hasta la siguiente.
Los archivos parquet son columnares binarios, por lo que no serviría de mucho descargar uno y explorar su interior. Más bien debemos de explorar esta data con otra herramienta, pero antes, vamos a arreglar la frecuencia de ejecución de nuestro workflow.
Vamos ahora a examinar la data en nuestro data lake:
Athena es un servicio de AWS que nos permite correrle queries tipo SQL a archivos que estén guardados en S3.
Vamos a ir al home de este servicio:
Vemos que el costo es de $5 USD por terabyte. Este es uno de los servicios más caros de AWS, y con justa razón, imagínense aventarle lo que sea al S3 y poderle tirar queries con SQL normalito! Esta funcionalidad es poderosa.
Este es el home de Athena. Como podemos ver, han pasado 3 cosas interesantes:
- Ya nos puso el data source seteado a
AwsDataCatalog
. Este es el catálogo creado automágicamente por Lake Formation cuando ejecutamos el workflow, particularmente en la fase de crawling. - Como solo tenemos 1 base de datos en nuestro
AwsDataCatalog
creado por Lake Formation, pues Athena nos la asigna por default. Esta BDtransactionaldb-ingest
fue creada por nosotros en el paso 4.2 arriba - El resultado de los queries tiene que caer en algún lado, no es como DBeaver donde el resultado solo se muestra en pantalla y ya, y es por eso que Athena nos está "sugiriendo" que **antes de que corramos nuestro 1er query, que asignemos un lugar en S3 para los resultados".
- Y como lo deben estar imaginando, este resultado deberá caer en la zona/capa
silver
de nuestro data lake. - por qué? porque estos queries van a refinar/limpiar/procesar lo que ha caído como data "cruda" o raw en la zona
bronze
- Y como ya dijimos, eso va en
silver
.
- Y como lo deben estar imaginando, este resultado deberá caer en la zona/capa
Una vez que tenemos seleccionada la zona silver
, vamos a imaginar la siguiente pregunta.
Qué tan diferente es el promedio de la distancia de Levenshtein de los registros cuyo
value
comienza con 'D' contra los registros cuyovalue
comienza con 'Z'?
Como se los enseñé el semestre pasado, vamos a partir el problema en cachos:
- Seleccionar todos los registros cuyo
valor
comience con 'D' - Aplicar la window function
lag
para poder comparar 2 camposvalue
en el mismo registro - Aplicar función
levenshtein
a ambos camposvalue
y ponerlo en una columnaleven_dist
- Guardar en zona
silver
- Correr función
avg
a columnaleven_dist
. - Repetir todos los pasos con registros cuyo
valor
comience con 'Z'
silver
y estamos agregando, y los agregados, ortodoxamente, van en gold
, pero no lo vamos a hacer aquí porque si no se haría laaaaaargo el tutorial.
El query que nos va a ayudar a resolver todo el merequetengue de arriba es el siguiente. Para más info, ver la documentación de Athena respecto a funciones y mis apuntes del semestre pasado 😠 tanto de window functions como de common table expressions:
with lagged_values_z as
(
SELECT id, valor,
lag(valor,1) OVER (ORDER BY id) AS prev_valor
FROM "catalog__transactionaldb_public_random_data"
where valor like 'Z%'
ORDER BY id
),
lagged_values_d as
(
SELECT id, valor,
lag(valor,1) OVER (ORDER BY id) AS prev_valor
FROM "catalog__transactionaldb_public_random_data"
where valor like 'D%'
ORDER BY id
),
leven_dist_d as
(
select id, valor, prev_valor, 'd' as start_with, levenshtein_distance(valor, prev_valor) as leven_dist_valor
from lagged_values_d
),
leven_dist_z as
(
select id, valor, prev_valor, 'z' as start_with, levenshtein_distance(valor, prev_valor) as leven_dist_valor
from lagged_values_z
),
all_levens as
(
select * from leven_dist_d
union
select * from leven_dist_z
)
select start_with, avg(leven_dist_valor) as avg_leven_dist
from all_levens
group by start_with;
Vamos a ejecutarlo:
Luego vamos a guardarlo...
...como vista
Y vamos a configurar el guardado con las siguientes opciones:
output
de la zona silver
en nuestro bucket de S3 no existe y debemos crearlo ANTES de crear la tabla!
Athena nos va a mostrar un preview de como va a crear la tabla desde nuestro query, y solo damos click en Create table
.
Y listo!
Lo que acabamos de hacer es súuuuuuper poderoso. Recordemos que la BD se compone de archivos parquet. Los archivos parquet tienen una estructura columnar, PERO NO SABEN NADA DEL TIPO DE DATO QUE GUARDAN.
Cómo es posible, entonces, que con select
podamos hacer operaciones numéricas, aritméticas, o de strings sobre estos datos si no tienen un esquema definido?
La respuesta está en una funcionalidad poderosa de los componentes de lectura SQL de los data lakes llamado schema-on-read, esto es, no necesitamos definir la estructura de los datos que vamos a consumir sino hasta que los consumimos, a diferencia de una BD SQL relacional, donde el esquema, es decir, la estructura, está definida desde que estamod diseñando la BD en una diagrama Entidad-Relación.
Ahora si, vamos a visualizar ahora esta tabla con AWS Quicksight!
Primero accedamos a Quicksight.
Y pidamos acceso al servicio.
Muy vivillos, los de AWS nos quieren enjaretar una suscripción Enterprise, y no conformes con eso, están subrepresentando el botón para las suscripciones estándar usando un viejo truco de UX. No vamos a caer en su trampa y vamos a darle click en Standard arriba a la derecha:
Vamos ahora a configurar Quicksight con las siguientes opciones.
Más abajo debemos configurar a qué tendrá acceso Quicksight. Lo más importante es que tenga acceso al bucket de S3 donde tenemos nuestro Data Lake.
Una vez que nuestra cuenta de Qucksight esté lista, y nos brinquemos el tutorial, vamos a tener unas visualizaciones y datasets pre-hechos como ejemplo.
Vamos a dar click en Datasets, luego en New Dataset.
Vemos que tenemos muchísimas opciones para conectarnos. Vamos a dar click en Athena:
Y nos va a pedir que nombremos el data source. De dónde sale ese [primary]?
De acá:
Vamos ahora a seleccionar la tabla que acabamos de crear con los resultados del query de la sección 6:
Vamos a setear estas opciones. Es importante mencionar que nos conviene dejar la conf de SPICE porque nos va a ayudar a refrescar la visualización que vamos a crear en caso de que la data cambie.
Y listo. Quicksight nos va a seleccionar la mejor visualización para nuestra gráfica:
😠 TANTO PARA UNA GRÁFICA DE 2 BARRAS?!?!?! 😠
Tengan en cuenta que esto es un ejemplo de juguete. En un setting empresarial van a tener cientos de tablas, decenas de gráficas, y veintenas de queries y analíticos, lo que justifica el uso del data lake. Lo más importante es que una vez que terminamos todo este flujo, ya se queda forever, y entonces habremos construido un pipeline que va desde datos crudos hasta datos refinados.
Los data lakes son caros. En AWS lo más caro es el servicio de Glue (ETL), sobre todo porque cataloga, importa y organiza la info automática y periódicamente. Esto reemplaza a un grupo pequeño de ingenieros de datos programando queries y ETL jobs sin ningún problema.
Solo por esta demo, AWS me mandó esta factura:
En qué gastamos $270 USD?
Como podemos ver, menos de $10 USD por la maquinita donde tenemos nuestro PostgreSQL.