-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
159 lines (120 loc) · 4.6 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import json
import logging
from flask import Flask, request
from flask_cors import CORS
import boto3
import csv
from datetime import datetime, timedelta, timezone
from io import StringIO
from awsgi import response
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=True)
# Initialize the S3 client
s3 = boto3.client('s3')
BUCKET_NAME = 'ugo-people-tracker'
@app.route('/people/register', methods=['POST'])
def register():
data = request.json
name = data.get('name')
if name:
log_action(name, 'register')
return json.dumps({"status": "registered"}), 200, {'Content-Type': 'application/json'}
else:
return json.dumps({"error": "Name is required"}), 400, {'Content-Type': 'application/json'}
@app.route('/people/unregister', methods=['POST'])
def unregister():
data = request.json
name = data.get('name')
if name:
log_action(name, 'unregister')
return json.dumps({"status": "unregistered"}), 200, {'Content-Type': 'application/json'}
else:
return json.dumps({"error": "Name is required"}), 400, {'Content-Type': 'application/json'}
@app.route('/people/status', methods=['GET'])
def status():
occupants = get_current_occupants()
last_update = get_last_update_time()
if occupants:
status_message = "open"
else:
status_message = "closed"
return json.dumps({
"status": status_message,
"occupants": list(occupants),
"count": len(occupants),
"last_update": last_update # Only time will be included
}), 200, {'Content-Type': 'application/json'}
def log_action(name, action):
# Get current time in UTC
now_utc = datetime.now(timezone.utc)
# Convert to UTC+1 by adding 1 hours
now_utc_plus_1 = now_utc + timedelta(hours=1)
# Format time to HH:MM:SS
log_entry = [now_utc_plus_1.date().isoformat(), now_utc_plus_1.strftime('%H:%M:%S'), name, action]
append_log_to_s3(log_entry)
def append_log_to_s3(log_entry):
today = datetime.today()
LOG_FILE_KEY = f'{today.strftime("%Y-%m-%d")}-logs.csv'
try:
s3response = s3.get_object(Bucket=BUCKET_NAME, Key=LOG_FILE_KEY)
existing_content = s3response['Body'].read().decode('utf-8')
except s3.exceptions.NoSuchKey:
existing_content = 'Date,Time,Name,Action\n' # Log now has separate Date and Time columns
csv_buffer = StringIO()
csv_buffer.write(existing_content)
csv_writer = csv.writer(csv_buffer)
csv_writer.writerow(log_entry)
s3.put_object(Bucket=BUCKET_NAME, Key=LOG_FILE_KEY, Body=csv_buffer.getvalue())
csv_buffer = StringIO()
csv_buffer.write(existing_content)
csv_writer = csv.writer(csv_buffer)
csv_writer.writerow(log_entry)
s3.put_object(Bucket=BUCKET_NAME, Key=LOG_FILE_KEY, Body=csv_buffer.getvalue())
def get_last_update_time():
today = datetime.today()
LOG_FILE_KEY = f'{today.strftime("%Y-%m-%d")}-logs.csv'
try:
s3response = s3.get_object(Bucket=BUCKET_NAME, Key=LOG_FILE_KEY)
csv_content = s3response['Body'].read().decode('utf-8')
csv_reader = csv.reader(StringIO(csv_content))
next(csv_reader) # Skip header
last_row = None
for row in csv_reader:
last_row = row # Keep track of the last row
if last_row:
time_str = last_row[1] # Retrieve the formatted time
return time_str
except s3.exceptions.NoSuchKey:
return None
def get_current_occupants():
today = datetime.today()
LOG_FILE_KEY = f'{today.strftime("%Y-%m-%d")}-logs.csv'
try:
s3response = s3.get_object(Bucket=BUCKET_NAME, Key=LOG_FILE_KEY)
csv_content = s3response['Body'].read().decode('utf-8')
csv_reader = csv.reader(StringIO(csv_content))
next(csv_reader) # Skip header
occupants = set()
for row in csv_reader:
if row[3] == 'register':
occupants.add(row[2])
elif row[3] == 'unregister':
occupants.discard(row[2])
return occupants
except s3.exceptions.NoSuchKey:
return set()
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWS Lambda handler
def lambda_handler(event, context):
# Log the full event (this includes the request body and other information)
logger.info(f"Received event: {json.dumps(event)}")
# Log only the request body (if present)
if 'body' in event:
logger.info(f"Request body: {event['body']}")
# Return awsgi response
return response(app, event, context)
# For local testing
if __name__ == '__main__':
app.run(debug=True)