-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
105 lines (85 loc) · 2.06 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from fastapi import FastAPI
from pydantic import BaseModel
from redis import Redis
from rq import Queue
from worker import runTask
app = FastAPI()
redis_conn = Redis(host='apiproj_redis', port=6379, db=0)
q = Queue('my_queue', connection=redis_conn)
# Request body class
class Task(BaseModel):
owner: str
description: str = None
@app.get('/holdings',
responses={
200: {
"content": {"application/csv": {}},
"description": "Return CSV or JSON",
}
}
)
def get_the_holdings_file():
"""
download the holdings file. that's what PMs do. csv, json formats, gzip
"""
return None
@app.post('/tasks/holdings/run', status_code=201)
def run_the_holdings_task():
"""
start the holdings file creation task. Consumes the datamart odbc datasource and the local cache.
"""
job = q.enqueue(
runTask,
'holdings run'
)
return {'job': job}
@app.post('/tasks/cache/update', status_code=201)
def run_the_cache_update_task():
"""
update cache task
"""
job = q.enqueue(
runTask,
'cache update'
)
return {'job': job}
@app.get('/tasks/holdings')
def get_the_status_of_the_holdings_task():
"""
get the status of the holdings task
"""
return None
@app.get('/tasks/cache')
def get_the_status_of_the_cache():
"""
get the status of the cache
"""
return None
@app.get('/queue')
def get_the_status_of_the_task_queue():
"""
get the status of the task queue
"""
return None
@app.get('/test')
def test():
"""Test"""
return {'hello': 'world'}
#
# @app.post('/tasks/{task_name}', status_code=201)
# def addTask(task_name: str):
# """
# Adds tasks to worker queue.
# Expects body as dictionary matching the Group class.
# """
# if task_name not in ('task1', 'task2'):
# raise HTTPException(
# status_code=404, detail='task not found'
# )
# job = q.enqueue(
# runTask,
# task_name
# )
# return {'job': job}
#
#