-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconstants.py
69 lines (53 loc) · 2.08 KB
/
constants.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import typing as t
from os import getenv
from datetime import datetime
from logger import log
def __history_datetime_setter(_history_datetime: str) -> t.Union[str, datetime]:
history_datetime = ''
try:
if _history_datetime:
history_datetime = datetime.strptime(_history_datetime, '%Y-%m-%dT%H:%M:%S')
except ValueError as e:
log.error(f'Error while parsing HISTORY_DATETIME: {e}')
except Exception as e:
log.error(f'Exception while setting HISTORY_DATETIME: {e}')
finally:
return history_datetime
def __bot_token_setter(_bot_token: str) -> str:
if not _bot_token:
raise ValueError(f'BOT_TOKEN is empty. Stop script')
else:
return _bot_token
def __guild_setter(_guild: str) -> str:
if not _guild:
raise ValueError(f'GUILD is empty. Stop script')
else:
return _guild
def __channels_setter(_channels: str) -> list:
# CHANNELS shouldn't be empty to not collect and store all msgs from all server channels including possible trash
channels = list()
if not _channels:
raise ValueError(f'CHANNELS is empty. Stop script')
for channel_id in set(_channels.split(',')):
try:
channels.append(int(channel_id))
except ValueError as e:
log.error(f'Error while parsing CHANNELS: {e}. Stop script')
raise
except Exception as e:
log.error(f'Exception while setting CHANNELS: {e}. Stop script')
raise
return channels
SCRAPING_HISTORY_INTERVAL = 86400
SCRAPING_UPDATES_INTERVAL = 300
QUEUE_SIZE_MULTIPLIER = 100
MESSAGE_BATCH_SIZE = 1000
HEALTH_CHECK_INTERVAL = getenv('HEALTH_CHECK_INTERVAL', "")
INDEX_NAME = getenv('INDEX', "")
TYPE_NAME = '_doc'
ELASTICSEARCH_HOST = getenv('ELASTICSEARCH_HOST', "")
ELASTICSEARCH_PORT = int(getenv('ELASTICSEARCH_PORT', ""))
HISTORICAL_RUN_START_DATE = __history_datetime_setter(getenv('HISTORICAL_RUN_START_DATE', ""))
BOT_TOKEN = __bot_token_setter(getenv('BOT_TOKEN', ""))
GUILD = __guild_setter(getenv('GUILD', ""))
CHANNELS = __channels_setter(getenv('CHANNELS', ""))