-
Notifications
You must be signed in to change notification settings - Fork 11
/
serve_kinesis.py
71 lines (53 loc) · 2.76 KB
/
serve_kinesis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import json
import boto3
from time import sleep
import logging
from model_loader import ModelLoader
from model_service import ModelService
PREDICTIONS_STREAM_NAME = os.getenv('PREDICTIONS_STREAM_NAME', 'predictions')
RESULTS_STREAM_NAME = os.getenv('RESULTS_STREAM_NAME', 'results')
KINESIS_ADDRESS = os.getenv('KINESIS_ADDRESS', 'http://127.0.0.1:4566')
RUN_ID = os.getenv('RUN_ID', "c68164ce869e4c90a7c93752436e9bc7")
log = logging.getLogger(__name__)
# logging.basicConfig()
# logging.root.setLevel(logging.NOTSET)
logging.basicConfig(level=logging.INFO)
model, dv = ModelLoader().load_model_from_mlflow(RUN_ID)
model_service = ModelService(model, dv)
kinesis_client = boto3.client('kinesis',
endpoint_url=KINESIS_ADDRESS,
region_name='eu-west-1')
response = kinesis_client.describe_stream(StreamName=PREDICTIONS_STREAM_NAME)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=PREDICTIONS_STREAM_NAME,
ShardId=my_shard_id,
ShardIteratorType='LATEST')
my_shard_iterator = shard_iterator['ShardIterator']
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator, Limit=2)
def publish_result(partition_key, result):
kinesis_client.put_record(
StreamName=RESULTS_STREAM_NAME,
Data=json.dumps(result),
PartitionKey=str(partition_key)
)
log.info("Starting Kinesis listener.")
# aws kinesis put-record \
# --stream-name predictions --endpoint-url=http://localhost:4566 \
# --partition-key 1 \
# --data "ewogICAgICAiY3VzdG9tZXJfYWdlIjogMTAwLAogICAgICAiZ2VuZGVyIjogIkYiLAogICAgICAiZGVwZW5kZW50X2NvdW50IjogMiwKICAgICAgImVkdWNhdGlvbl9sZXZlbCI6IDIsCiAgICAgICJtYXJpdGFsX3N0YXR1cyI6ICJtYXJyaWVkIiwKICAgICAgImluY29tZV9jYXRlZ29yeSI6IDIsCiAgICAgICJjYXJkX2NhdGVnb3J5IjogImJsdWUiLAogICAgICAibW9udGhzX29uX2Jvb2siOiA2LAogICAgICAidG90YWxfcmVsYXRpb25zaGlwX2NvdW50IjogMywKICAgICAgImNyZWRpdF9saW1pdCI6IDQwMDAsCiAgICAgICJ0b3RhbF9yZXZvbHZpbmdfYmFsIjogMjUwMAogICAgfQ=="
while 'NextShardIterator' in record_response:
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'], Limit=2)
records = record_response['Records']
for record in records:
log.info(f"Received event: {record['Data']}")
input = json.loads(record['Data'])
features = model_service.prepare_features(input)
pred = model_service.predict(features)
result = {
'churn chance': float(str(pred)),
'model_run_id': RUN_ID
}
log.info(f"Response: {result}")
publish_result("partition1", result)
sleep(5)