ModernDataEngineering: Building a Robust Data Pipeline: Integrating Proxy Rotation, Kafka, MongoDB, Redis, Logstash, Elasticsearch, and MinIO for Efficient Web Scraping
Proxies and Rotating User Agents: To overcome anti-scraping measures, our system uses a combination of proxies and rotating user agents. Proxies mask the scraper’s IP address, making it difficult for websites to detect and block them. Additionally, rotating user-agent strings further disguises the scraper, simulating requests from different browsers and devices.
Storing Proxies in Redis: Valid proxies are crucial for uninterrupted scraping. Our system stores and manages these proxies in a Redis database. Redis, known for its high performance, acts as an efficient, in-memory data store for managing our proxy pool. This setup allows quick access and updating of proxy lists, ensuring that our scraping agents always have access to working proxies.
Extracting News from RSS Feeds: The system is configured to extract news from various RSS feeds. RSS, a web feed that allows users and applications to access updates to websites in a standardized, computer-readable format, is an excellent source for automated news aggregation.
Quality Validation and Kafka Integration: Once the news is extracted, its quality is validated. The validated news data is then published to a Kafka topic (Kafka A). Kafka, a distributed streaming platform, is used here for its ability to handle high-throughput data feeds, ensuring efficient and reliable data transfer.
MongoDB Integration with Kafka Connect: Kafka Connect Mongo Sink consumes data from Kafka topic A and stores it in MongoDB.
MongoDB, a NoSQL database, is ideal for handling large volumes of unstructured data. The upsert functionality, based on the _id field, ensures that the data in MongoDB is current and avoids duplicates.
Data Accessibility in FastAPI: The collected data in MongoDB is made accessible through FastAPI with OAuth 2.0 authentication, providing secure and efficient access for administrators.
Logstash and Elasticsearch Integration: Logstash monitors MongoDB replica sets for document changes, capturing these as events. These events are then indexed in Elasticsearch, a powerful search and analytics engine. This integration allows for real-time data analysis and quick search capabilities.
Data Persistence with Kafka Connect S3-Minio Sink: To ensure data persistence, Kafka Connect S3-Minio Sink is employed. It consumes records from Kafka topic A and stores them in MinIO, a high-performance object storage system. This step is crucial for long-term data storage and backup.
ElasticSearch for Public Search: The data collected and indexed in Elasticsearch is made publicly accessible through FastAPI. This setup allows users to perform fast and efficient searches across the aggregated data.
Here are some example API calls and their intended functionality:
Basic Request Without Any Parameters:
-
Fetches all news items without applying any specific search criteria or language filter.
-
Example API Call: GET http://localhost:8000/api/v1/news/
Search with a General Keyword:
-
Searches across multiple fields (like title, description, and author) using a general keyword.
-
Example API Call: GET http://localhost:8000/api/v1/news/?search=Arsenal
-
This call returns news items where the word “Arsenal” appears in either the title, description, or author.
Search in a Specific Field:
-
Targets a specific field for searching with a keyword.
-
Example API Call: GET http://localhost:8000/api/v1/news/?search=title|Milan
-
Searches for news items where the title contains the word “Milan”.
Filter by Language:
-
Filters news items by a specific language.
-
Example API Call: GET http://localhost:8000/api/v1/news/?language=en
-
Returns news items where the language is English ("en").
Combining General Search with Language Filter:
-
Performs a general keyword search while also applying a language filter.
-
Example API Call: GET http://localhost:8000/api/v1/news/?search=Arsenal&language=en
-
Searches for items containing “Arsenal” in English language articles.
Combining Specific Field Search with Language Filter:
-
Combines a specific field search with a language filter.
-
Example API Call: GET http://localhost:8000/api/v1/news/?search=description|defender&language=en
-
Looks for news items where the description contains “defender” and the language is English.
This guide provides step-by-step instructions for setting up and running the "ModernDataEngineerPipeline" project.
Start by cloning the repository from GitHub:
git clone https://github.com/Stefen-Taime/ModernDataEngineerPipeline
cd ModernDataEngineerPipeline
Use docker-compose
to build and start the services:
docker-compose up --build -d
You can use ready-made MongoDB and Redis clusters from MongoAtlas and Redis, or create a free account to get trial clusters. It is also possible to use local MongoDB and Redis clusters by deploying them with Docker.
cd src
Execute proxy_handler.py
to retrieve proxies and store them in Redis:
python proxy_handler.py
Use rss_handler.py
to produce messages towards Kafka:
python rss_handler.py
Add the two JSON Sink connectors found in the connect
folder on Confluent Connect or use the Connect API.
Run Logstash using Docker:
docker exec -it <container_id> /bin/bash -c "mkdir -p ~/logstash_data && bin/logstash -f pipeline/ingest_pipeline.conf --path.data /usr/share/logstash/logstash_data"
Finally, start the API:
cd api
python main.py
Follow these steps to set up and run the "ModernDataEngineerPipeline" project.