Recolecta y analiza el Big Data de las Noticias con tu propio Data Lake

En este artículo mostrare como ejecutar en tu equipo personal los componentes básicos de un Data Lake o Lago de Datos en español, con esfuerzo mínimo gracias a la tecnología de contenedores de Docker (https://www.docker.com/). Por cierto si aun no lo tienes instalado, este es buen momento para que lo hagas, en una publicación anterior de este Blog (https://abxda.medium.com/geo-big-data-con-datos-censales-2de6250772a5), hay algunas instrucciones para que lo puedas instalar en un equipo con Windows 10 Professional. Gracias a esta tecnología bastará con ejecutar una sola línea en tu terminal y algo paciencia, tendrás todos los componentes básicos de un Mini Data Lake ejecutándose en tu computadora personal. Con todo eso corriendo en tu Laptop podrás recolectar sistemáticamente miles de artículos de noticias usando la tecnología llamada Apache Airflow (https://airflow.apache.org), creando una base de datos semiestructurada. Todas las noticias recolectadas serán almacenadas en un deposito de objetos basados en la tecnología S3 de Amazon, pero corriendo localmente en tu propio Data Lake con tecnología de Software Libre (https://min.io). Después de eso realizaremos análisis de ese Big Data recolectado mediante PySpark usando Cuadernos de Jupyter Lab (https://jupyter.org), con lo que realizaremos algunos resúmenes y le daremos estructura a los datos para ser puestos en la base de datos relacional PostgreSql (https://www.postgresql.org). Por último, conectaremos una herramienta de Inteligencia de Negocios de última generación para analizar visualmente los datos analizados (https://superset.apache.org). Aunque no se verá en este tutorial, también se incluyo en el Data Lake una tecnología para generar APIs de Datos, que nos permitirá realizar productos de Datos a la Medida, aunque eso será material para otro tutorial 😃

¿Qué es un Data Lake?

Antes de continuar, vale la pena establecer lo que se entiende por un Data Lake, al final de este artículo podrás encontrar las referencias usadas en esta definición. Se entiende que la finalidad básica de un Lago de Datos es almacenar todos los datos que una organización produce. Permitiendo su incorporación con la menor fricción posible, aceptando datos sin modelar, semiestructurados o incluso no estructurados cómo archivos de texto e imágenes. Por lo que los datos se encuentran accesibles para su análisis tan pronto como se incorporan. De ahí que el sistema de almacenamiento usado es altamente relevante, pues requiere una capacidad de adaptarse dinámicamente a las demandas de espacio y flexibilidad en los tipos de objetos a almacenar. Existen varias formas de implementar dichos requerimientos para el componente de Almacenamiento, en el caso de este artículo me incliné por un almacenamiento de objetos de alto desempeño de código abierto compatible con Amazon S3 llamado MinIO (https://min.io/), el cual puede ser instalado y operado localmente. Si en algún momento de la evolución de tu Lago de Datos, se requiere transitar a la nube parte del almacenamiento, no será necesario reescribir todo el código que ya se tenga para consumir los objetos almacenados en los servidores de almacenamiento distribuido. Simplemente se reemplazaría la dirección de Internet usada y el resto de los protocolos de comunicación se mantienen sin cambios.

Además del sistema de almacenamiento, un lago de datos requiere una estrategia de recolección sistemática de datos, que se denomina Ingestión de Datos, en este artículo se implementó usando Apache Airflow (https://airflow.apache.org/) la cual es una plataforma de gestión de flujos de trabajo basada en Python, la cual permite lanzar, calendarizar y monitorear procesos continuos de recolección, análisis y manejo de datos en general. Para los expertos en sistemas equivale a una versión de la herramienta de Linux llamada Cron (https://es.wikipedia.org/wiki/Cron_(Unix)) re-imaginada para el siglo XXI. Adicionalmente en la implementación concreta de nuestro Lago de Datos, usaremos la una versión extendida de la herramienta llamada Googler (https://github.com/jarun/googler) la cual ofrece la característica de poder consultar a Google y varios de sus servicios como lo son Búsquedas Web, Noticias y Videos desde la consola del sistema operativo, es decir sin usar explícitamente un navegador web. Es decir, combinando Airflow y Googler, automatizamos la consulta y almacenamiento de búsquedas en Google. El autor de este artículo modificó Googler (https://github.com/abxda/googler) para que también descargue todo el contenido de las noticias, no solamente el resumen. Por lo que la ingestión será mucho mayor y se tendrá más texto para analizar.

Posteriormente será necesario aplicar técnicas de Ciencia de Datos para generar valor a partir de los datos recolectados en el sistema de almacenamiento de nuestro Lago de Datos. Derivando al siguiente componente al que se denomina Procesamiento. Aquí usamos la tecnología Apache Spark (https://spark.apache.org/) en su variante de Python (PySpark) complementada con SparkSQL, para simplificar el procesamiento de la información semiestructurada recolectada en formato JSON. Es en este componente que ocurre la mayor parte del procesamiento Big Data, así como la aplicación de técnicas de Machine Learning con la finalidad de extraer valor de los datos recolectados. En este componente se generan resúmenes y se le dota de estructura a los datos, con lo que pueden ser colocados en almacenamientos estructurados mas tradicionales, como puede ser una Base de Datos SQL el cual es el siguiente componente de nuestro Lago de Datos.

Es a la Base de Datos SQL que se transfieren los resultados de los diferentes análisis que se realicen, elegimos PostgreSQL como la base de datos debido a su robustez y los avances en eficiencia de la versión 13. Es a este motor de Datos que se conectan los componentes de Entrega de Datos y Autoservicio de Inteligencia de Negocios que son los últimos componentes de nuestra interpretación de un Lago de Datos.

El Autoservicio de Inteligencia de Negocios es una de las salidas que permite la exploración de los resultados producto del análisis de Big Data, así como la generación Tableros de Control y Visualizaciones atractivas que soporten la toma de decisiones para el resto de la organización. Aquí elegimos la tecnología Apache Superset (https://superset.apache.org/) que permite realizar exploración de datos interactiva y visualmente atractiva.

También incluimos un componente para la Entrega de datos mediante APIs REST con la tecnología FastAPI (https://fastapi.tiangolo.com/). Aunque en este artículo no se usa, dejamos preparada la infraestructura para la creación de productos de datos a la medida, a partir de los hallazgos de nuestro Lago de Datos, pero eso será en otro artículo.

Por último nuestro Lago de Datos no incorpora a los componentes de Gobernanza, Seguridad de Datos y Administración de Metadatos. Dichos componentes dependen de las reglas y tecnologías adoptadas por cada organización, por lo que en esta versión las dejamos fuera. Sin embargo son parte de la definición completa de un Lago de Datos.

La Siguiente imagen muestra los componentes de la implementación actual:

Estructura de nuestro mini data lake

Recolección de Noticias del Covid-19 en México

Al momento de escribir este artículo es marzo de 2021 y nos encontramos atravesando una contingencia sanitaria de proporciones mundiales, aún con un desenlace incierto. Por lo que es relevante poder monitorear toda la corriente de noticias que se generan día con día, incluso poder recolectar noticias generadas en meses pasados para poner en contexto lo sucedido. Por lo que en este ejercicio realizaremos la recolección de miles de artículos de la región de México desde el 1 de enero de 2020 hasta el día que ejecutes tu recolección. El patrón de búsqueda utilizado es: “Covid México” y analizaremos el contenido de las noticias para generar visualizaciones de resumen que permitan explorar de mejor manera el universo de información que se está generando. Obviamente el recolector está preparado para ser ajustado a nuevas fechas y cualquier patrón de búsqueda. Para mas detalles de los parámetros del buscador puede profundizar en el manual de usuario de Googler : https://github.com/jarun/googler. En el caso de este artículo se entregará una versión modificada que también descarga el texto completo de la noticia para poder ser analizado. Se preservan las URL’ s originales para poder citar apropiadamente los hallazgos.

Puesta en Marcha y Operación de Nuestro Mini Data Lake

La implementación de este lago de datos se subió a GitHub, por lo que podrás acceder al código de todas las definiciones de Docker que componen esta propuesta. Si eres un lector inquieto y autogestivo en tu aprendizaje, incluso podrás modificar y agregar o quitar componentes a tu gusto. Para poder descargar el código requieres tener instalado Git en tu equipo, eso se logra descargando el instalador en la siguiente liga: https://git-scm.com/download/win dónde puedes descargar el instalador haciendo clic en la liga que se muestra:

Descarga la versión mas adecuada, si no estas seguro descarga la de 32-bit

procede a su instalación con todas las opciones por defecto, para confirmar que se ha instalado correctamente abre una ventana de PowerShell y ejecuta el siguiente comando:

git --version

Se espera una salida similar a la siguiente:

Ya que tienes GIT instalado, podrás descargar todas las definiciones necesarias para ejecutar tu Lago de Datos, para lo cual requieres desplazarte a el directorio definitivo donde se descargara el código. por ejemplo en windows sería a D:\

Desplazándose a el directorio destinatario del código del lago de datos

Por último puedes descargar el código de la siguiente forma:

git clone https://github.com/abxda/mini-data-lake.git
Se descarga todo el código a tu equipo en el directorio: mini-data-lake

El siguiente paso depende de que exista Docker en ejecución en tu maquina, en el artículo anterior, se dieron las instrucciones para la instalación de Docker, este la ultima oportunidad para instalarlo si quieres seguir adelante.

Para verificar que esta corriendo debes poder ver la ballenita en la barra de iconos de tu equipo, por ejemplo en mi caso aún no está corriendo!!!

No está la ballenita de Docker

Entonces hay que activarlo, buscándolo en la barra de tareas y abriendolo:

Tardará un poco, pero una vez que termine, podrás identificar que Docker está corriendo y lograras ver su icono en la barra de notificaciones:

Ahora sigue la instrucción que compilara y levantará todos los servidores de nuestro Lago de Datos. Tenemos que estar en el directorio donde descargamos los el código y ejecutar la sigueinte instrucción en PowerShell:

docker-compose up --build -d
Docker se encargará de descargar y compilar todo lo necesario.

La primera vez tardará aproximadamente 1 hora, dependiendo de tu conexión a Internet y la velocidad de tu equipo, porque implica descargar y configurar por primera vez todo lo necesario, es buen momento de tomarse un café o ver un capitulo de tu serie favorita.

Cuando tengas la siguiente salida, sabrás que ha terminado:

Posteriormente solamente bastará la siguiente linea para iniciar todo y tardara unos segundos, pues ya tienes todo instalado en tu equipo.

docker-compose up --build -d

La linea anterior será necesaria la próxima ocasión que quieras iniciar tu Lago de Datos. Por el momento ya está corriendo todo. Puedes verificarlo abriendo el Dashboard de Docker:

Ahí podrás ver el grupo de servicios corriendo y lisos para usarse:

Servicios del Lago de Datos Corriendo Satisfactoriamente.

El siguiente paso es usar Apache Airflow para descargar noticias según algún criterio de búsqueda especifico. Ahora hay que abrir la siguiente liga: http://localhost:7777/admin/ en donde encontraremos esta aplicacion web:

El grafo pre-programado para recolectar noticias requiere que establezcamos los parámetros básicos a partir de un archivo de variables que descargamos con el resto del código, por lo que hora hay que incorporarlo de la siguiente manera:

Incorporación de variables y ajuste del número de noticias a recolectar.

Antes de iniciar podemos ir al servidor de almacenamiento de objetos en la siguiente dirección: http://localhost:9000/ dónde podemos ver que esta corriendo y sin ningun archivo en este momento:

usuario es minio-access-key password es minio-secret-key

Al entrar confirmamos que se encuentra vacío, Airflow se encargara de llenarlo con los documentos que procederá a descargar en unos momentos mas, en cuanto activemos el recolector.

Regresemos a Airflow en http://localhost:7777/ y activamos la recolección:

Activar recolección en Airflow

Después de unos 25 minutos aproximadamente tendremos la siguiente gráfica en la vista de árbol del grafo de Airflow que acabamos de activar:

La gráfica indica que se descargaron todas las noticias y se subieron a el almacén de objetos.

Ahora podemos visitar nuestro almacén de objetos donde vamos a ver todos los archivos en un nuevo Bucket creado por Airflow en la ruta: /news/covid_mexico. Dicha ruta contiene los archivos JSON con la información de todas las noticias que se pudieron recolectar. Para poder identificar mejor cada archivo se estableció el patrón AÑO-MES-DÍA.json, lo cual indica que dentro de cada archivo se encuentran las noticias descargadas. Ahora procederemos a explorar el resultado de nuestra recolección.

Ahora vamos a procesar los datos con el un cuaderno de Jupyter preprogramado, hay que abrir la siguiente ruta en nuestro equipo http://127.0.0.1:8888/lab?token=m1n1lake

y nos dirigimos al cuaderno como lo indica la siguiente imagen:

El cuaderno incluye un patrón de programación que se conecta a el almacen de objetos y lee todos los archivos descargados, en la siguiente linea podemos ver la ruta de conexión:

PySpark se conecta a el deposito de objetos y carga todas las noticias descargadas en s3a://news/covid_mexico/*.json

Podemos explorar la catidad de noticias descargadas e incluso el esquema con el que se guardó en los archivos JSON:

Esquema de los archivos descargados.

En el cuaderno realice algunas limpiezas básicas, por supuesto que el lector puede ajustar el proceso a su gusto aquí una muestra de la información con un nuevo campo de sitio, que nos permite observar el origen de las noticias:

Ejemplo de descarga después de un análisis sencillo.

Ahora tenemos una tabla, ajustada y con algunos procesos de limpieza, podemos guardar el DataFrame en Postgresql y empezar a preparar el camino para conectar los datos a una herramienta de inteligencia de negocios como Apache Superset. Los parámetros de conexión son los siguientes:

Conexión a una tabla de Postgresql

Es importante aclarar que la base de datos shared y el usuario shared se crearon en el proceso de inicialización, por eso es que podemos conectarnos. El password es el bien conocido: changeme1234 que tambien se guardó en la variable de entorno SHARED_PASSWORD.

En la siguiente linea guardamos el DataFrame de PySpark en la base de datos de PostgreSQL y sobre-escribimos cualquier cosa que se llame: tb_news_covid_mexico_date_text. El cual será el nombre de la primera tabla que cargaremos en nuestro motor SQL.

En el resto del cuaderno, seleccioné las 15 palabras mas significativas de cada artículo usando el criterio de TFIDF, quedando con una tabla como la siguiente, la cual se cargará también a PostgreSQL. La inspiración para el cálculo de TFIDF la obtuve de los siguientes artículos: https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf/ & https://sigdelta.com/blog/word-count-in-spark-with-a-pinch-of-tf-idf-continued/ .

De los campos generados n_w tiene el total de veces que una palabra aparece en el artículo dónde se encontró. Junto con el campo palabra podemos realizar nubes de palabras y otras visualizaciones.

por último guardamos el resultado en la base de datos PostgreSQL en la tabla llamada: tb_news_covid_mexico_palabras_top_tfidf.

Podemos verificar que los datos se agregaron correctamente, abriendo el administrador de PostgreSQL llamado pgAdmin 4 en la siguiente ruta: http://localhost:5050/ . Para conectarnos el usuario es: pgadmin4@pgadmin.org y el password: admin :

Conexión a pgAdmin 4

Una vez conectados podemos crear una conexión a la base de datos donde acabamos de cargar las tablas con el análisis básico de las noticias recolectadas. Como ya dijimos arriba los parámetros de conexión son Base de Datos: shared; Usuario: shared; Password: changeme1234 y dirección postgres. Veamos la forma de conectarnos:

Conexión a la base de datos que incluye los datos procesados con PySpark.

Ahora vamos a crear visualizaciones de los datos a partir de Apache Superset. Esta herramienta se encuentra en : http://localhost:8088/login/ las credenciales de acceso son: usuario: admin y password: changeme1234

Para poder crear visualizaciones primero se requiere dar de alta la base de datos que tiene la información que queremos explorar, nuestro caso es la bien conocida shared 😉

Dando de alta una base de datos en Superset.

Ahora será necesario dar de alta las tablas con las que estaremos trabajando:

Ahora dando de alta una tabla para analizar en Superset.

Es importante que se den de alta las dos tablas generadas con PySpark:

Las dos tablas ahora están disponibles para realizar gráficas y tableros de control.

Primero realizaremos un filtro, el cual será el control que nos permitirá ajustar los datos que estaremos visualizando, es interesante notar que siempre partiremos de la vista de tablas, para realizar controles o gráficas. Ademas crearemos un Tablero de Control llamado Noticias COVID-19.

Creación de un Filtro a partir de las fechas y los sitios que publican las noticias.

Ahora realizaremos una nube de palabras con las palabras mas significativas, la cual ahora depende del la configuración que el usuario haga del filtro mientras interactúa con el Tablero.

Creación de la Nube de Palabras, vinculada al filtro.

Ahora realizaremos un conteo de la cantidad de artículos por sitio de noticias.

Ahora se agregaron la cantidad de artículos por sitio de noticias

Por último agregaremos un histograma de noticias diarias, de acuerdo al periodo de análisis:

Incorporamos un histograma que cuenta el numero de noticias detectadas por día.

Con eso logramos generar un Tablero de Control que nos permite explorar y analizar las noticias que han tocado el tema del COVID durante el ultimo cuatrimestre del 2021.

Conclusión

En este artículo-tutorial logramos levantar un Lago de Datos desde cero, y recolectar miles de noticias desde el 1 de enero del 2020 hasta marzo del 2021. Analizamos los datos recolectados conectándonos al depósito de objetos S3 de nuestra implementación. Y por último generamos una visualización interactiva de los datos analizados.

Solamente queda que el lector replique por sí mismo el ejercicio y profundice por su cuenta en todas las tecnologías aplicadas aquí.

Gracias por leerme !

Abel Coronado

Referencias

Errores Detectados

Puede suceder que no se levanten los servicios de Airflow y Superset debido a un error de fin de línea en los archivos de inicialización de la base de datos postgresql:

El siguiente paso implica realizar el cambio de fin de linea, una forma es mediante el editor Sublime: https://www.sublimetext.com/ de la siguiente manera:

Cambio de fin de línea a UNIX

Para que nuestro pequeño cluster detecte el cambio hay que realizar las siguientes instrucciones en Powershell:

docker-compose stop
docker-compose rm
#aceptar la eliminación de los contenedores indicando: (y)
docker volume rm mini-data-lake_postgres_volume
docker-compose up --build -d

Con los ajustes indicados debería quedar funcional nuestro cluster.

Saludos

Father-Husband-Data Scientist-Philosopher-Entrepreneur-Professor PhD c. in Data Science-MSc Stats #R #Scala #Spark #SatelliteImagery #Python #BigData #Nerd

Get the Medium app