This repository has been archived by the owner on Nov 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
113 lines (84 loc) · 3.14 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import datetime
import typing
from sqlalchemy import (
and_,
create_engine,
Column,
ForeignKey,
LargeBinary,
Integer,
String,
DateTime,
)
from sqlalchemy.orm import relationship, Session
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import JSON
from m4m_sync.utils import DateTimeRange
Base = declarative_base()
class Controller(Base):
__tablename__ = "controllers"
id = Column(String(32), primary_key=True)
name = Column(String, nullable=False)
mac = Column(String, nullable=False)
class Sensor(Base):
__tablename__ = "sensors"
id = Column(String(32), primary_key=True)
name = Column(String, nullable=False)
controller_id = Column(Integer, nullable=False)
sensor_data = relationship(
"SensorData",
uselist=True,
lazy="noload"
)
class SensorData(Base):
__tablename__ = "sensor_data"
id = Column(Integer, primary_key=True)
sensor_id = Column(String(32), ForeignKey("sensors.id"), nullable=False)
data = Column(JSON, nullable=False)
sign = Column(LargeBinary)
signer = Column(LargeBinary)
sensor = relationship(
"Sensor",
uselist=False,
lazy="noload",
)
class UserInfo(Base):
__tablename__ = "users_info"
id = Column(Integer, primary_key=True)
encrypt_key = Column(String)
class UserSocialTokens(Base):
__tablename__ = 'users_social_tokens'
user_id = Column(Integer, ForeignKey('users.id'), primary_key=True, nullable=False)
yandex_disk = Column(String)
class DatabaseManager:
def __init__(self, db_uri):
self._db_uri = db_uri
def _create_session(self):
if not hasattr(self, "__session"):
setattr(self, "__session", Session(create_engine(self._db_uri)))
return getattr(self, "__session")
def get_controllers(self) -> typing.List[Controller]:
return self._create_session().query(Controller).all()
def get_sensors(self, controller: Controller) -> typing.List[Sensor]:
return self._create_session().query(Sensor).filter_by(controller_id=controller.id).all()
def get_sensor_data(self, sensor_id: str, datetime_range: DateTimeRange) -> typing.List[SensorData]:
return self._create_session().query(SensorData).filter(
and_(
SensorData.sensor_id == sensor_id,
SensorData.data["timestamp"].astext.cast(DateTime) >= datetime_range.start,
SensorData.data["timestamp"].astext.cast(DateTime) <= datetime_range.end,
),
).all()
def get_first_sensor_data_date(self, sensor_id: str) -> datetime.datetime:
data = self._create_session().query(SensorData.data["timestamp"].astext.cast(DateTime)) \
.filter(SensorData.sensor_id == sensor_id) \
.order_by(SensorData.id.asc()) \
.limit(1) \
.all()
if len(data):
return data[0][0]
def get_encryption_key(self):
key, = self._create_session().query(UserInfo.encrypt_key).one()
return key
def get_tokens(self):
return self._create_session().query(UserSocialTokens).one()