-
Notifications
You must be signed in to change notification settings - Fork 0
/
streams.py
62 lines (51 loc) · 1.89 KB
/
streams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from queue import Queue
import signal
import sqlite3
from threading import Thread
import os
from dnaStreaming.listener import Listener
def processMessagesQueue(queue):
db = sqlite3.connect('test_listener_python.sqlite3', isolation_level=None)
cursor = db.cursor()
while True:
if not queue.empty():
message = queue.get()
cursor.execute(
'INSERT INTO received_articles (an, ingestion_datetime) VALUES (?, ?)',
(message['an'], message['ingestion_datetime'])
)
print(f"Written: {message['an']}")
queue.task_done()
def main():
try:
assert (subscription_id := os.environ['SUBSCRIPTION_ID_PYTHON']), 'The subscription for python listener is not set'
db = sqlite3.connect('test_listener_python.sqlite3', isolation_level=None)
cursor = db.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS received_articles (an TEXT, ingestion_datetime TEXT)')
db.close()
the_queue = Queue()
def callback(message, _):
the_queue.put(message)
return True
writer_worker = Thread(target=processMessagesQueue, args=(the_queue,))
writer_worker.setDaemon(True)
writer_worker.start()
listener = Listener()
future = listener.listen_async(callback, subscription_id)
def manage_closing():
if future.running():
future.cancel()
future.result()
the_queue.join()
def signal_handler(signum, frame):
manage_closing()
signal.signal(signal.SIGTERM, signal_handler)
future.result(timeout=18_000)
except Exception as ex:
print(f'Se encontró una excepción {ex}')
except KeyboardInterrupt:
pass
finally:
manage_closing()
if __name__ == '__main__':
main()