-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
197 lines (163 loc) · 6.81 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from .models import MachineLogModel, MachineChallenge
from .schema import MachineLogSchema
from flask import request, views
from flask_restx import Resource, Namespace
from werkzeug.exceptions import NotFound, abort
from CTFd.api import CTFd_API_v1
from CTFd.api.v1.challenges import Challenge as ChallengeAPI
from CTFd.models import Challenges, Solves
from CTFd.plugins.challenges import get_chal_class
from CTFd.utils import config
from CTFd.utils.dates import ctf_paused
from CTFd.utils.decorators import admins_only, during_ctf_time_only, require_verified_emails
from CTFd.utils.decorators.visibility import check_challenge_visibility
from CTFd.utils.user import authed, get_current_user, is_admin
import logging
machineLogDumper = MachineLogSchema()
logger = logging.getLogger('machine')
class ChallengeAPIMod(ChallengeAPI, views.View):
@check_challenge_visibility
@during_ctf_time_only
@require_verified_emails
def get(self, challenge_id):
resp = super().get(challenge_id)
if is_admin():
return resp
else:
if isinstance(resp, dict):
resp['data'].pop('machine', None)
return resp
else: return resp
@admins_only
def patch(self, challenge_id):
return super().patch(challenge_id)
@admins_only
def delete(self, challenge_id):
return super().delete(challenge_id)
machine_namespace = Namespace("machines")
@machine_namespace.route("")
class MachineList(Resource):
@check_challenge_visibility
@during_ctf_time_only
@require_verified_emails
@machine_namespace.doc(
description="Endpoint to start a machine for a specific Challenge"
)
def post(self):
if authed() is False:
return {"success": True, "data": {"status": "authentication_required"}}, 403
data = request.get_json() or request.form
challenge_id = data.get("challenge_id")
challenge = Challenges.query.filter_by(id = challenge_id).first_or_404()
user = get_current_user()
if is_admin() == False:
if ctf_paused():
return {
"success": True,
"data": {
"status": "paused",
"message": "{} is paused".format(config.ctf_name()),
},
}, 403
if challenge == None or challenge.state == "hidden":
abort(404)
if challenge.state == "locked":
abort(403)
if challenge.requirements:
requirements = challenge.requirements.get("prerequisites", [])
solve_ids = (
Solves.query.with_entities(Solves.challenge_id)
.filter_by(account_id=user.account_id)
.order_by(Solves.challenge_id.asc())
.all()
)
solve_ids = {solve_id for solve_id, in solve_ids}
prereqs = set(requirements)
if solve_ids < prereqs:
abort(403)
chall_class = get_chal_class(challenge.type)
if not hasattr(chall_class, "startmachine"):
abort(400)
try:
machine = chall_class.startmachine(user, challenge)
response = machineLogDumper.dump(machine)
except Exception as e:
logger.error(f"Failed start machine: {user.name} - {challenge_id} - {str(e)}")
return {"success": False, "errors": str(e)}, 500
return {"success": True, "data": response.data}
@admins_only
@machine_namespace.doc(
description="Endpoint to terminate multi machine"
)
def delete(self):
data = request.get_json() or request.form
machine_ids = data.get("machine_ids")
if machine_ids == None or not isinstance(machine_ids, list):
abort(400)
machines = MachineLogModel.query.filter(
MachineLogModel.status == 1,
MachineLogModel.id.in_(machine_ids)
).all()
err = []
for machine in machines:
try:
MachineChallenge.terminatemachine(machine.user, machine.challenge)
except Exception as e:
logger.error(f"Failed bulk terminate machine: {machine.id} - {str(e)}")
err.append(f"ERROR: {machine.id} - {str(e)}")
if len(err) == 0:
return {"success": True}
else:
return {"success": False, "errors": err}, 500
@machine_namespace.route("/<challenge_id>")
class Machine(Resource):
@check_challenge_visibility
@during_ctf_time_only
@require_verified_emails
@machine_namespace.doc(
description="Endpoint to update the status of a machine for a specific challenge"
)
def get(self, challenge_id):
if authed() is False:
return {"success": True, "data": {"status": "authentication_required"}}, 403
user = get_current_user()
challenge = Challenges.query.filter_by(id=challenge_id).first_or_404()
chall_class = get_chal_class(challenge.type)
if not hasattr(chall_class, "updatemachine"):
abort(400)
try:
machine = chall_class.updatemachine(user, challenge)
response = machineLogDumper.dump(machine)
except NotFound as e:
return {'success': False, 'errors': str(e)}
except Exception as e:
logger.error(f"Failed update machine: {user.name} - {challenge_id} - {str(e)}")
return {"success": False, "errors": str(e)}, 500
return {"success": True, "data": response.data}
@check_challenge_visibility
@during_ctf_time_only
@require_verified_emails
@machine_namespace.doc(
description="Endpoint to terminate a machine for a specific challenge"
)
def delete(self, challenge_id):
if authed() is False:
return {"success": True, "data": {"status": "authentication_required"}}, 403
user = get_current_user()
challenge = Challenges.query.filter_by(id=challenge_id).first_or_404()
chall_class = get_chal_class(challenge.type)
if not hasattr(chall_class, "terminatemachine"):
abort(400)
try:
stt = chall_class.terminatemachine(user, challenge)
except Exception as e:
logger.error(f"Failed terminate machine: {user.name} - {challenge_id} - {str(e)}")
return {"success": False, "errors": str(e)}, 500
return {"success": stt}
@machine_namespace.route("/ping")
class MachinePing(Resource):
def get(self):
return {"success": True, "data": "pong"}
def load(app):
app.view_functions['api.challenges_challenge'] = ChallengeAPIMod.as_view('challenges_challenge_mod')
CTFd_API_v1.add_namespace(machine_namespace, "/machines")