-
Notifications
You must be signed in to change notification settings - Fork 0
/
PMIQuoteProviderApp.siddhi
75 lines (63 loc) · 2.37 KB
/
PMIQuoteProviderApp.siddhi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@App:name('PMIQuoteProviderApp')
@App:description('Provide PMI quotes for validated loan applications through the approved insurance vendor')
@App:asyncAPI("""asyncapi: 2.0.0
info:
title: InsuranceQuotesApp
version: 1.0.0
description: This exposes an API from WSO2 SI
servers:
production:
url: 'localhost:8025'
protocol: ws
security: []
channels:
/insurance:
subscribe:
message:
$ref: '#/components/messages/PMIQuoteListStreamPayload'
components:
messages:
PMIQuoteListStreamPayload:
payload:
type: object
properties:
loanAppID:
$ref: '#/components/schemas/loanAppID'
customerID:
$ref: '#/components/schemas/customerID'
insuranceRate:
$ref: '#/components/schemas/insuranceRate'
monthlyPMI:
$ref: '#/components/schemas/monthlyPMI'
schemas:
loanAppID:
type: string
customerID:
type: string
insuranceRate:
type: number
monthlyPMI:
type: number
securitySchemes: {}
""")
@sink(type='http-call',publisher.url = "http://localhost:9098/insurance/rate",method = "POST",sink.id = "insurance",
@map(type='json'))
define stream InsuranceRateRequestStream (loanApplicationID string,customerID string);
@sink(type='websocket-server',host = "localhost",port = "8025",
@map(type='xml'))
define stream PMIQuoteListStream (loanAppID string,customerID string,insuranceRate float,monthlyPMI float);
@source(type='kafka',topic.list = "pmi_loan_topic",threading.option = "single.thread",group.id = "group",bootstrap.servers = "localhost:9092",
@map(type='json'))
define stream FilteredLoanStream (loanAppID string,customerID string,total_loan_amount float);
@source(type='http-call-response',sink.id = "insurance",http.status.code = "200",
@map(type='json'))
define stream InsuranceRateResponseStream (loanApplicationID string,insuranceRate float);
@info(name='query1')
from FilteredLoanStream
select loanAppID as loanApplicationID,customerID
insert into InsuranceRateRequestStream;
@info(name='query2')
from FilteredLoanStream#window.time(10 min) as f
join InsuranceRateResponseStream#window.time(10 min) as i on f.loanAppID == i.loanApplicationID
select f.loanAppID as loanAppID,f.customerID as customerID,i.insuranceRate as insuranceRate,f.total_loan_amount * i.insuranceRate as monthlyPMI
insert into PMIQuoteListStream;