From 536c16e7f20961b4e55651f305a2159f4fb76eea Mon Sep 17 00:00:00 2001 From: gautamgambhir97 <140384949+gautamgambhir97@users.noreply.github.com> Date: Tue, 27 Aug 2024 13:39:15 +0530 Subject: [PATCH] feat(docs): added example of utilize the postgresql database with the agent (#900) Co-authored-by: felix.bucsa <72919584+FelixNicolaeBucsa@users.noreply.github.com> --- pages/examples/intermediate/_meta.json | 5 + .../postgres-database-with-an-agent.mdx | 431 ++++++++++++++++++ 2 files changed, 436 insertions(+) create mode 100644 pages/examples/intermediate/postgres-database-with-an-agent.mdx diff --git a/pages/examples/intermediate/_meta.json b/pages/examples/intermediate/_meta.json index cc8b1a53f..b6b694978 100644 --- a/pages/examples/intermediate/_meta.json +++ b/pages/examples/intermediate/_meta.json @@ -133,5 +133,10 @@ "title": "Shopping Assistance Agent using on_query ", "tags": ["Intermediate", "Python", "Functions", "OpenAI", "Use Cases"], "timestamp": true + }, + "postgres-database-with-an-agent": { + "title": "Utilize the PostgreSQL database with the Agent.", + "tags": ["Intermediate", "Python", "Docker", "PostgreSQL", "Use Cases"], + "timestamp": true } } diff --git a/pages/examples/intermediate/postgres-database-with-an-agent.mdx b/pages/examples/intermediate/postgres-database-with-an-agent.mdx new file mode 100644 index 000000000..e1b1dffd8 --- /dev/null +++ b/pages/examples/intermediate/postgres-database-with-an-agent.mdx @@ -0,0 +1,431 @@ +# Utilize the PostgreSQL database with the Agent + +## Introduction + +In this documentation example, we demonstrate how to use the uAgent library to create agents that interact with PostgreSQL data within a Docker Compose setup. In this scenario, one agent handles the insertion of employee data into the PostgreSQL database, while another agent retrieves this data. This example illustrates the seamless integration between agents, PostgreSQL, and Docker, showcasing how to manage data flow and communication in a distributed system. + +### Supporting Documents + + - [Almanac contract overview ↗️](../../references/contracts/uagents-almanac/almanac-overview). + - [How to create an agent ↗️](../../guides/agents/create-a-uagent). + - [Registering in the Almanac Contract ↗️](../../guides/agents/register-in-almanac). + - [Creating an interval task ↗️](/guides/agents/interval-task) + - [Communicating with other agents ↗️](/guides/agents/communicating-with-other-agents) + +## Pre-requisites + +- **Python:** Download and install from [Python official website ↗️](https://www.python.org/downloads/). +- **Poetry:** Install by following the instructions on [Poetry's official website ↗️](https://python-poetry.org/docs/#installation). +- **Docker:** Download and install from [Docker official website ↗️](https://docs.docker.com/get-docker/). +- **Docker Compose:** Download and install from [Docker Compose official documentation ↗️](https://docs.docker.com/compose/). + +## Project Structure + +``` +.postgres-database-with-an-agent  +├── docker-compose.yml +├── Dockerfile +├── README.md +└── src + ├── constants.py + ├── db + │ ├── db_connection.py + │ ├── __init__.py + │ ├── models + │ │ └── models.py + │ └── schemas + │ └── employees.sql + └── main.py +``` + +## Agent with PostgreSQL database + +### Set up the PostgreSQL connection with Docker using Docker Compose + + +This section details the files involved in setting up the PostgreSQL connection within your project. + +**`db_connection.py`**: Contains the functions to establish and close the connection to the PostgreSQL database using the psycopg2 library. The `create_connection` function connects to the database with provided credentials, while the `close_connection` function ensures the connection is safely terminated. + +```py copy filename="db_connection.py" +import psycopg2 +from psycopg2 import OperationalError + + +def create_connection(dbname, user, password, host="localhost", port="5432"): + """ + Create a connection to the PostgreSQL database. + + :param dbname: Name of the database + :param user: Database user + :param password: User's password + :param host: Database host + :param port: Database port + :return: Connection object or None if connection fails + """ + try: + conn = psycopg2.connect( + dbname=dbname, user=user, password=password, host=host, port=port + ) + print("Connection successful") + return conn + except OperationalError as error: + print(f"Error connecting to PostgreSQL database: {error}") + return None + + +def close_connection(conn): + if conn: + conn.close() + print("Connection closed") +``` + +**`docker-compose.yml`**: Configures the Docker services, including the PostgreSQL database and the application. It defines the environment variables for the database connection and maps the database schema from the host to the container. + +```yml copy +version: "3.8" +services: + db: + container_name: postgres_container + image: postgres + restart: always + environment: + POSTGRES_USER: ${DB_USER} + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_DB: ${DB_NAME} + volumes: + - "postgres:/var/lib/postgresql/data" + - ./src/db/schemas/employees.sql:/docker-entrypoint-initdb.d/employees.sql + ports: + - "5432:5432" + networks: + - agent_network + app: + build: . + container_name: poetry_app + volumes: + - .:/app + ports: + - "8000:8000" + depends_on: + - db + networks: + - agent_network + environment: + DB_USER: ${DB_USER} + DB_PASSWORD: ${DB_PASSWORD} + DB_NAME: ${DB_NAME} + command: poetry run python ./src/main.py + +volumes: + postgres: + +networks: + agent_network: + driver: bridge +``` + +**`Dockerfile`**: Builds the application container, installing dependencies via Poetry and setting up the environment to run the application. The container exposes port 8000 for the application service. + +```docker copy +FROM python:3.12-slim +ENV PATH="$PATH:/root/.local/bin" + +RUN apt-get update && \ + apt-get install -y curl gcc && \ + curl -sSL https://install.python-poetry.org/ | python3 - + +WORKDIR /app +ADD pyproject.toml poetry.lock /app/ +RUN poetry install +ADD . /app +EXPOSE 8000 + +ENTRYPOINT ["poetry", "run"] +CMD ["python", "main.py"] +``` + + +### Defining the Employees Database Schema and Model + +The `Employees` class defines a model representing employee data as a dictionary. The `GetEmployees` class represents a model used to request employee information, with a flag indicating whether a response is expected. + +The `employees.sql` script defines a schema for an Employees table in a PostgreSQL database, if it doesn't already exist. This table includes columns for employee ID, first name, last name, birth date, and salary. + + + +```py copy filename="models.py" +from uagents import Model + + +class Employees(Model): + employees_data: dict + + +class GetEmployees(Model): + reply_back: bool +``` + +```query copy filename="employees.sql" +CREATE TABLE IF NOT EXISTS Employees ( + EmployeeID INT PRIMARY KEY, + FirstName VARCHAR(50), + LastName VARCHAR(50), + BirthDate DATE, + Salary DECIMAL(10, 2) +); +``` + +### Postgres data with agent + +This script sets up and runs two agents, `db_insert_agent` and `db_fetch_agent`, which interact with a PostgreSQL database to manage employee data. The agents use asynchronous event handling to fetch and insert employee information into the database. + +#### Database Connection + +- The `create_connection` function is used to establish a connection to the PostgreSQL database using parameters defined in `db_params`. + +#### Agents + +- **`db_insert_agent`**: Responsible for inserting employee data into the database. +- **`db_fetch_agent`**: Responsible for fetching and reporting employee data from the database. + +#### Event Handlers + +- **`on_startup` (db_fetch_agent)**: + - Triggered when `db_fetch_agent` starts. + - Retrieves and logs the PostgreSQL database version. + - Sends employee data to `db_insert_agent` if the version retrieval is successful. + +- **`handle_employee_data` (db_insert_agent)**: + - Handles messages with employee data. + - Inserts the received employee data into the `Employees` table in the database. + +- **`fetch_all_employee_details` (db_fetch_agent)**: + - Handles messages requesting all employee details. + - Retrieves all employee records from the `Employees` table and logs the data. + +#### Database Operations + +- **Fetching Database Version**: Uses the query `SELECT version();` to get the PostgreSQL version. +- **Inserting Employee Data**: Executes an `INSERT` query to add employee records to the `Employees` table. +- **Fetching Employee Details**: Executes a `SELECT * FROM Employees` query to retrieve all employee records. + +#### Execution + +- Initializes a `Bureau` instance. +- Adds both agents (`db_insert_agent` and `db_fetch_agent`) to the bureau. +- Runs the bureau, which starts the agents and their event handlers. + +#### Usage + +1. **Startup**: On startup, `db_fetch_agent` will log the database version and send employee data to `db_insert_agent`. +2. **Inserting Data**: `db_insert_agent` will insert received employee data into the database. +3. **Fetching Data**: `db_fetch_agent` will fetch and log all employee details upon request. + + +```py copy filename="main.py" +from db.db_connection import create_connection +from uagents import Agent, Context, Bureau +from db.models.models import Employees, GetEmployees +from constants import employees_data +from constants import db_params, DB_FETCH_AGENT_ADDRESS + + +def get_db_version(): + """ + Retrieves the PostgreSQL database version. + + :return: Database version string or None if retrieval fails + """ + conn = create_connection(**db_params) + if conn: + try: + cursor = conn.cursor() + cursor.execute("SELECT version();") + db_version = cursor.fetchone() + cursor.close() + return db_version + except Exception as error: + print(f"Error executing query: {error}") + return None + + +db_insert_agent = Agent(name="db_inserter", seed="db_inserter_seed_phrase") +db_fetch_agent = Agent(name="db_fetcher", seed="db_fetcher_seed_phrase") + +DB_FETCH_AGENT_ADDRESS = DB_FETCH_AGENT_ADDRESS + + +@db_fetch_agent.on_event("startup") +async def on_startup(ctx: Context): + """ + Event handler triggered on agent startup to fetch database version and send employee data. + + :param ctx: Context object for handling agent events + """ + db_version = get_db_version() + if db_version: + ctx.logger.info( + f"Hello, I'm agent {db_insert_agent.name} and my address is {db_insert_agent.address}. PostgreSQL database version: {db_version[0]}" + ) + await ctx.send(DB_FETCH_AGENT_ADDRESS, Employees(employees_data=employees_data)) + else: + ctx.logger.info( + f"Hello, I'm agent {db_insert_agent.name} and my address is {db_insert_agent.address}. Could not retrieve the database version." + ) + + +@db_insert_agent.on_message(model=Employees, replies=GetEmployees) +async def handle_employee_data(ctx: Context, sender: str, msg: Employees): + """ + Handler for inserting employee data into the database. + + :param ctx: Context object for handling agent events + :param sender: Sender of the message + :param msg: Message containing employee data + """ + ctx.logger.info(f"Received request from {sender} {msg.dict()}") + employee_data = msg.employees_data + conn = create_connection(**db_params) + if conn: + try: + cursor = conn.cursor() + insert_query = """ + INSERT INTO Employees (EmployeeID, FirstName, LastName, BirthDate, Salary) + VALUES (%s, %s, %s, TO_DATE(%s, 'DD-MM-YYYY'), %s) + """ + cursor.execute( + insert_query, + ( + employee_data["EmployeeID"], + employee_data["FirstName"], + employee_data["LastName"], + employee_data["BirthDate"], + employee_data["Salary"], + ), + ) + REPLY_BACK = True + conn.commit() + cursor.close() + ctx.logger.info(f"Inserted employee data: {employee_data}") + await ctx.send(sender, GetEmployees(reply_back=REPLY_BACK)) + except Exception as error: + ctx.logger.error(f"Error inserting employee data: {error}") + else: + ctx.logger.error("Could not connect to the database.") + + +@db_fetch_agent.on_message(model=GetEmployees) +async def fetch_all_employee_details(ctx: Context, sender: str, msg: GetEmployees): + """ + Handler for fetching all employee details from the database. + + :param ctx: Context object for handling agent events + :param sender: Sender of the message + :param msg: Message triggering the fetch operation + """ + if msg.reply_back: + conn = create_connection(**db_params) + if conn: + try: + cursor = conn.cursor() + query = "SELECT * FROM Employees" + cursor.execute(query) + all_employees = cursor.fetchall() + cursor.close() + + employees_list = [] + for employee in all_employees: + employee_info = { + "EmployeeID": employee[0], + "FirstName": employee[1], + "LastName": employee[2], + "BirthDate": employee[3].strftime("%d-%m-%Y"), + "Salary": employee[4], + } + employees_list.append(employee_info) + ctx.logger.info(f"Retrieved all employee data: {employees_list}") + except Exception as error: + ctx.logger.error(f"Error retrieving employee data: {error}") + else: + ctx.logger.error("Could not connect to the database.") + + +bureau = Bureau() +bureau.add(db_insert_agent) +bureau.add(db_fetch_agent) + +if __name__ == "__main__": + bureau.run() + +``` + + +This constant file initializes a dictionary for storing employee data and configures the database connection parameters using environment variables. It also defines a constant for the address of the database fetch agent. + +```py copy filename="constants.py" +import os + +employees_data = { + "EmployeeID": "", + "FirstName": "", + "LastName": "", + "BirthDate": "", + "Salary": 0, +} + +db_params = { + "dbname": os.getenv("DB_NAME"), + "user": os.getenv("DB_USER"), + "password": os.getenv("DB_PASSWORD"), + "host": "db", + "port": "5432", +} + +DB_FETCH_AGENT_ADDRESS = ( + "agent1qwg0h3gx2kvqmwadlg0j4r258r7amcfskx2mudz92ztjmtfdclygxrh5esu" +) + +``` + +## Poetry Dependencies + +```pyproject.toml copy filename="pyproject.toml" +[tool.poetry.dependencies] +python = "^3.10" +psycopg2-binary = "^2.9.9" +uagents = { version = "^0.13.0", python = ">=3.10,<3.13" } +``` + +## How to Run This Example + +#### Update the Required environment variables + +```env copy filename=".env.example" +DB_USER= +DB_PASSWORD= +DB_NAME= +``` + +#### Instructions to execute the example. + +- Navigate to the root Folder of Example. +- Update the constant file with new entries to store in the database +- Run `docker-compose build` +- Run `docker-compose up` + + + +## Expected Output + +``` +poetry_app | Connection successful +poetry_app | INFO: [db_fetcher]: Hello, I'm agent db_inserter and my address is agent1qwg0h3gx2kvqmwadlg0j4r258r7amcfskx2mudz92ztjmtfdclygxrh5esu. PostgreSQL database version: PostgreSQL 16.3 (Debian 16.3-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit +poetry_app | INFO: [db_inserter]: Received request from agent1qv470qn3vfgn3dwe5z90m8u6qvtn6chrgm4urfzdg2v9qyagln6sgnh4wwg {'employees_data': {'EmployeeID': '0', 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': 50000}} +poetry_app | Connection successful +poetry_app | INFO: [db_inserter]: Inserted employee data: {'EmployeeID': '0', 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': 50000} +poetry_app | Connection successful +poetry_app | INFO: [db_fetcher]: Retrieved all employee data: [{'EmployeeID': 0, 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': Decimal('50000.00')}] +poetry_app | INFO: [bureau]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit) +``` +