Skip to content

Commit

Permalink
added mock for webhook payment
Browse files Browse the repository at this point in the history
  • Loading branch information
rafixcs committed Oct 15, 2024
1 parent 58421e1 commit 4b7304b
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 1 deletion.
Empty file added mock-payment-server/__init__.py
Empty file.
76 changes: 76 additions & 0 deletions mock-payment-server/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from pydantic import BaseModel
import uvicorn
from fastapi import Body, FastAPI, Request
from typing import Annotated, Literal
import httpx

fake_db = {}

async def set_webhook_url_for_user_payment(user_id: int, webhook_url: str):
fake_db[user_id] = webhook_url


async def get_webhook_urls():
return list(fake_db.values())

class Payment(BaseModel):
user_id: str
user_amount: float
total_sale_amount: float

class WebHookResponse(BaseModel):
status: Literal["ok", "error"]

app = FastAPI()

@app.webhooks.post("payment-status-webhook")
async def payment_webhook(paymeny_data: Payment):
"""_summary_
When a payment is requested we'll send a POST request with this
data to the URL that was registerd for the event "payment-status-webhook" via "set-webhook"
endpoint.
Args:
paymeny_data (Payment): _description_
"""

pass


@app.post("/set-webhook")
async def set_webhook(
user_id: int,
webhook_url: Annotated[
str,
Body(
description="See info about webhook at 'Webhooks\\payment-status-webhook' section",
embed=True,
)
],
):
await set_webhook_url_for_user_payment(user_id, webhook_url)
return "ok"


@app.post("/user-payment")
async def user_paymeny(paymeny_info: Payment):
result = "error"
if paymeny_info.user_amount > paymeny_info.total_sale_amount:
result = "ok"
else:
result = "error"

notification_data = {
"payment_status": result
}

webhook_urls = await get_webhook_urls()
async with httpx.AsyncClient() as client:
for webhook_url in webhook_urls:
resp = await client.post(webhook_url, json=notification_data)
if (resp.status_code != 200):
print(f"Failed to send notification to {webhook_url}")
return notification_data

if __name__ == "__main__":
uvicorn.run("server:app", host="127.0.0.1", port=8001, reload=True)
Empty file added src/api/drivers/__init__.py
Empty file.
Empty file.
32 changes: 32 additions & 0 deletions src/api/drivers/webhook/payment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from fastapi import APIRouter, HTTPException, Request, FastAPI
import httpx
import asyncio

router = APIRouter()

@router.post("/api/v1/payment-status")
async def receive_payment_status(request: Request):
try:
data = await request.json()
result = data["payment_status"]
print(f"Payment result {result}")
except Exception as exc:
raise HTTPException(status_code=500, detail="Server Error")

WEBHOOK_URL = "http://localhost:8001/set-webhook"

async def register_webhook_payment_server():
params = {
"user_id": "0",
}
body = {
"webhook_url": "http://localhost:8000/api/v1/payment-status"
}

async with httpx.AsyncClient() as client:
resp = await client.post(WEBHOOK_URL, params=params, json=body)
print(resp)
if (resp.status_code != 200):
print(f"Failed to register webhook in the payment mock server: {WEBHOOK_URL}")

asyncio.run(register_webhook_payment_server())
2 changes: 1 addition & 1 deletion src/product/ports/unit_of_work_interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod

from product.ports.repository_interface import IProductRepository
#from product.ports.repository_interface import IProductRepository


class IProductUnitOfWork(ABC):
Expand Down

0 comments on commit 4b7304b

Please sign in to comment.