-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventhubreceiver.py
63 lines (53 loc) · 2.03 KB
/
eventhubreceiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#### #!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
An example to show receiving events from an Event Hub partition.
"""
import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset
# import examples
# logger = examples.get_logger(logging.INFO)
# Address can be in either of these formats:
# "amqps://<URL-encoded-SAS-policy>:<URL-encoded-SAS-key>@<mynamespace>.servicebus.windows.net/myeventhub"
# "amqps://<namespace>.servicebus.windows.net/myeventhub"
ADDRESS = os.environ.get('EVENT_HUB_ADDRESS')
# SAS policy and key are not required if they are encoded in the URL
# Endpoint=sb://<namespace>.servicebus.windows.net/;
USER = os.environ.get('EVENT_HUB_SAS_POLICY')
KEY = os.environ.get('EVENT_HUB_SAS_KEY')
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"
total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
client.run()
start_time = time.time()
print('starting listener for 5000s')
batch = receiver.receive()
#batch = receiver.receive(timeout=5000)
while batch:
for event_data in batch:
last_offset = event_data.offset
last_sn = event_data.sequence_number
print("Received: {}, {}".format(last_offset.value, last_sn))
print(event_data.body_as_str())
total += 1
batch = receiver.receive(timeout=5000)
end_time = time.time()
client.stop()
run_time = end_time - start_time
print("Received {} messages in {} seconds".format(total, run_time))
except KeyboardInterrupt:
pass
finally:
client.stop()