-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathRtp_cluster_member.py
151 lines (135 loc) · 5.51 KB
/
Rtp_cluster_member.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Copyright (c) 2009-2014 Sippy Software, Inc. All rights reserved.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from sippy.Rtp_proxy_client import Rtp_proxy_client
from sippy.Time.Timeout import TimeoutInact
class rc_filter(object):
a = None
b = None
lastval = None
def __init__(self, fcoef, initval = 0.0):
self.lastval = initval
self.a = 1.0 - fcoef
self.b = fcoef
def apply(self, x):
self.lastval = (self.a * x) + (self.b * self.lastval)
return self.lastval
def get(self):
return self.lastval
class Rtp_cluster_member(Rtp_proxy_client):
name = None
status = 'ACTIVE'
capacity = 4000
weight = 100
wan_address = None
lan_address = None
call_id_map = None
call_id_map_old = None
on_state_change = None
on_active_update = None
timer = None
global_config = None
asess_filtered = None
cmd_out_address = None
stats_cache = None
def __init__(self, name, global_config, address, cmd_out_address, **kwargs):
self.call_id_map = []
self.call_id_map_old = []
self.name = name
self.global_config = global_config
self.asess_filtered = rc_filter(0.5)
self.cmd_out_address = cmd_out_address
self.stats_cache = {}
if cmd_out_address != None:
bind_address = (cmd_out_address, 0)
else:
bind_address = None
Rtp_proxy_client.__init__(self, global_config, address,
bind_address = bind_address, **kwargs)
self.timer = TimeoutInact(self.call_id_map_aging, 600, -1)
self.timer.spread_runs(0.1)
self.timer.go()
def reconnect(self, address):
if self.cmd_out_address != None:
bind_address = (self.cmd_out_address, 0)
else:
bind_address = None
Rtp_proxy_client.reconnect(self, address, bind_address = bind_address)
def isYours(self, call_id):
if call_id in self.call_id_map:
self.call_id_map.remove(call_id)
self.call_id_map.insert(0, call_id)
return True
if call_id not in self.call_id_map_old:
return False
self.call_id_map_old.remove(call_id)
self.call_id_map.insert(0, call_id)
return True
def bind_session(self, call_id, cmd_type):
if cmd_type != 'D':
self.call_id_map.insert(0, call_id)
else:
self.call_id_map_old.insert(0, call_id)
def unbind_session(self, call_id):
self.call_id_map.remove(call_id)
self.call_id_map_old.insert(0, call_id)
def go_online(self):
#print 'go_online', self
online_pre = self.online
# Rtp_proxy_client may or may not decide to change actual status
Rtp_proxy_client.go_online(self)
if online_pre or not self.online:
return
self.global_config['_sip_logger'].write('RTPproxy "%s" has changed ' \
'state from offline to online' % self.name)
if self.on_state_change != None:
self.on_state_change(self, True)
#print 'exit go_online', self, self.online
def go_offline(self):
#print 'go_offline', self
if self.online:
self.global_config['_sip_logger'].write('RTPproxy "%s" has changed ' \
'state from online to offline' % self.name)
self.stats_cache = {}
if self.on_state_change != None:
self.on_state_change(self, False)
Rtp_proxy_client.go_offline(self)
def update_active(self, active_sessions, *more_args):
self.asess_filtered.apply(active_sessions)
if self.active_sessions != active_sessions and self.on_active_update != None:
self.on_active_update(self, active_sessions)
Rtp_proxy_client.update_active(self, active_sessions, *more_args)
def call_id_map_aging(self):
if self.shut_down:
self.timer.cancel()
return
if len(self.call_id_map) < 1000:
# Do not age if there are less than 1000 calls in the list
self.call_id_map_old = []
return
new_len = int(len(self.call_id_map) / 2)
self.call_id_map_old = self.call_id_map[new_len:]
del self.call_id_map[new_len:]
def get_caputil(self):
return (self.asess_filtered.get() / self.capacity)