forked from airbnb/optica
-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.rb
275 lines (228 loc) · 7.33 KB
/
store.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
require 'zk'
require 'oj'
require 'hash_deep_merge'
class Store
attr_reader :ips
DEFAULT_CACHE_STALE_AGE = 0
def initialize(opts)
@log = opts['log']
unless opts['zk_path']
raise ArgumentError, "missing required argument 'zk_path'"
else
@path = opts['zk_path']
end
@zk = nil
setup_cache(opts)
end
def setup_cache(opts)
# We use a daemon that refreshes cache every N (tunable)
# seconds. In addition, we subscript to all children joining/leaving
# events. This is less frequent because normally no one would constantly
# add/remove machines. So whenever a join/leave event happens, we immediately
# refresh cache. This way we guarantee that whenever we add/remove
# machines, cache will always have the right set of machines.
@cache_enabled = !!opts['cache_enabled']
@cache_stale_age = opts['cache_stale_age'] || DEFAULT_CACHE_STALE_AGE
# zk watcher for node joins/leaves
@cache_root_watcher = nil
# mutex for atomically updating cached results
@cache_mutex = Mutex.new
@cache_results = {}
# daemon that'll fetch from zk periodically
@cache_fetch_thread = nil
# flag that controls if fetch daemon should run
@cache_fetch_thread_should_run = false
# how long we serve cached data
@cache_fetch_interval = (opts['cache_fetch_interval'] || 20).to_i
# timestamp that prevents setting cache result with stale data
@cache_results_last_fetched_time = Time.now
end
def start()
@log.info "waiting to connect to zookeeper at #{@path}"
@zk = ZK.new(@path)
@zk.on_state_change do |event|
@log.info "zk state changed, state=#{@zk.state}, session_id=#{session_id}"
end
@zk.ping?
@log.info "ZK connection established successfully. session_id=#{session_id}"
# We have to readd all watchers and refresh cache if we reconnect to a new server.
@zk.on_connected do |event|
@log.info "ZK connection re-established. session_id=#{session_id}"
if @cache_enabled
@log.info "Resetting watchers and re-syncing cache. session_id=#{session_id}"
setup_watchers
reload_instances
end
end
if @cache_enabled
setup_watchers
reload_instances
start_fetch_thread
end
end
def session_id
'0x%x' % @zk.session_id rescue nil
end
def stop_cache_related()
@cache_root_watcher.unsubscribe if @cache_root_watcher
@cache_root_watcher = nil
@cache_fetch_thread_should_run = false
@cache_fetch_thread.join if @cache_fetch_thread
@cache_fetch_thread = nil
end
def stop()
@log.warn "stopping the store"
stop_cache_related
@zk.close() if @zk
@zk = nil
end
# get instances for a given service
def nodes()
STATSD.time('optica.store.get_nodes') do
return load_instances_from_zk unless @cache_enabled
check_cache_age
@cache_results
end
end
def load_instances_from_zk()
@log.info "Reading instances from zk:"
from_server = {}
begin
@zk.children('/', :watch => true).each do |child|
from_server[child] = get_node("/#{child}")
end
rescue Exception => e
# ZK client library caches DNS names of ZK nodes and it resets the
# cache only when the client object is initialized, or set_servers
# method is called. Set_servers is not exposed in ruby library, so
# we force re-init the underlying client object here to make sure
# we always connect to the current IP addresses.
@zk.reopen
@log.error "unexpected error reading from zk! #{e.inspect}"
raise e
end
from_server
end
def add(node, data)
child = "/#{node}"
# deep-merge the old and new data
prev_data = get_node(child)
new_data = prev_data.deep_merge(data)
json_data = Oj.dump(new_data)
@log.debug "writing to zk at #{child} with #{json_data}"
begin
STATSD.time('optica.zookeeper.set') do
@zk.set(child, json_data)
end
new_data
rescue ZK::Exceptions::NoNode => e
STATSD.time('optica.zookeeper.create') do
@zk.create(child, :data => json_data)
end
new_data
rescue Exception => e
@zk.reopen
@log.error "unexpected error writing to zk! #{e.inspect}"
raise e
end
end
def delete(node)
@log.info "deleting node #{node}"
begin
STATSD.time('optica.zookeeper.delete') do
@zk.delete("/" + node, :ignore => :no_node)
end
rescue Exception => e
@zk.reopen
@log.error "unexpected error deleting nodes in zk! #{e.inspect}"
raise e
end
end
def healthy?()
healthy = true
if $EXIT
@log.warn 'not healthy because stopping...'
healthy = false
elsif not @zk
@log.warn 'not healthy because no zookeeper...'
healthy = false
elsif not @zk.connected?
@log.warn 'not healthy because zookeeper not connected...'
healthy = false
end
return healthy
end
private
def get_node(node)
begin
data, stat = STATSD.time('optica.zookeeper.get') do
@zk.get(node)
end
STATSD.time('optica.json.parse') do
Oj.load(data)
end
rescue ZK::Exceptions::NoNode
@log.info "node #{node} disappeared"
{}
rescue JSON::ParserError
@log.warn "removing invalid node #{node}: data failed to parse (#{data.inspect})"
delete(node)
{}
rescue Exception => e
@zk.reopen
@log.error "unexpected error reading from zk! #{e.inspect}"
raise e
end
end
# immediately update cache if node joins/leaves
def setup_watchers
return if @zk.nil?
@cache_root_watcher = @zk.register("/", :only => :child) do |event|
@log.info "Children added/deleted"
reload_instances
end
end
def check_cache_age
return unless @cache_enabled
cache_age = Time.new.to_i - @cache_results_last_fetched_time.to_i
STATSD.gauge 'optica.store.cache.age', cache_age
if @cache_stale_age > 0 && cache_age > @cache_stale_age
msg = "cache age exceeds threshold: #{cache_age} > #{@cache_stale_age}"
@log.error msg
raise msg
end
end
def reload_instances()
# Here we use local time to preven race condition
# Basically cache fetch thread or zookeeper watch callback
# both will call this to refresh cache. Depending on which
# finishes first our cache will get set by the slower one.
# So in order to prevent setting cache to an older result,
# we set both cache and the timestamp of that version fetched
# Since timestamp will be monotonically increasing, we are
# sure that cache set will always have newer versions
fetch_start_time = Time.now
instances = load_instances_from_zk.freeze
@cache_mutex.synchronize do
if fetch_start_time > @cache_results_last_fetched_time then
@cache_results_last_fetched_time = fetch_start_time
@cache_results = instances
end
end
end
def start_fetch_thread()
@cache_fetch_thread_should_run = true
@cache_fetch_thread = Thread.new do
while @cache_fetch_thread_should_run do
begin
sleep(@cache_fetch_interval) rescue nil
@log.info "Cache fetch thread now fetches from zk..."
reload_instances rescue nil
check_cache_age
rescue => ex
@log.warn "Caught exception in cache fetch thread: #{ex} #{ex.backtrace}"
end
end
end
end
end