-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathes_rolling_restart.rb
executable file
·484 lines (452 loc) · 14.3 KB
/
es_rolling_restart.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
#!/usr/bin/ruby
#########################
# es_rolling_restart.rb #
#########################
# Authors: Ashton Davis ([email protected])
# - a significant expansion upon / rewrite of https://github.com/adamenger/rolypoly
###
# Purpose: Gracefully restart an elasticsearch cluster.
###
# License: Apache License v2.0 -- http://www.apache.org/licenses/LICENSE-2.0
# This script is provided as-is, with no implicit or explicit warranty.
# Use at your own discretion.
###
# Requirements:
# OS:
# - Linux
# RUBY:
# - 2.x
# GEMS:
# - elasticsearch
# - elasticsearch-api
# - elasticsearch-transport
# - httparty
# - colorize
###
# Notes on usage: This script will use your username and your default ssh settings.
# By default it will use your private key. I don't recommend doing this with passwords.
###
# Notes on development: I mean to add username and private key options - just haven't gotten there yet.
###
# Contribution: Feel free to fork and submit PRs - I'm 100% for making this script better.
############
# INCLUDES #
############
# Gems
require 'elasticsearch'
require 'httparty'
require 'colorize'
# Built-in
require 'socket'
require 'net/ssh'
#
###############
# HOST GROUPS #
###############
# Format:
# var_name = {
# name => 'string',
# master => [ 'array', 'of', 'masters' ],
# data => [ 'array', 'of', 'hosts' ],
# client => [ 'array', 'of', 'hosts' ]
# }
# If all nodes are both master and data, just specify data - but leave the empty arrays.
# Note: add each new host group to the list_of_clusters array.
# EXAMPLE CLUSTER
cluster1 = {
'name' => 'Production Cluster',
'master' => [
'es-master-01.yourcompany.com',
'es-master-02.yourcompany.com',
'es-master-03.yourcompany.com'
],
'data' => [
'es-data-01.yourcomany.com',
'es-data-02.yourcomany.com',
'es-data-03.yourcomany.com',
'es-data-04.yourcomany.com',
'es-data-05.yourcomany.com'
],
'client' => [
'es-client-01.yourcompany.com'
]
}
list_of_clusters = [ cluster1 ]
logfile = "/tmp/rolypoly_progress"
# END CONFIGURATION SECTION
# Start the clock!
start_time = Time.now.to_i
# Check to make sure sync_id is present
# AND that replica shards match the primary's sync_id
def check_sync_id(node)
# Create an empty array
$to_sync = []
# Hash containing Sync IDs. Format:
# { index: { shard: { primary: $val, replicas: [ $val, $val ] } } }
sync_ids = Hash.new
# Create client object
client = Elasticsearch::Client.new(host: node)
# List of indices via cat api
myindices = client.cat.indices h: 'i'
# create array from string, split on newline
myindices = myindices.split("\n")
puts 'Checking sync ids'
# For each index...
myindices.each do |index|
# Strip whitespace
index = index.strip
# Create a hash for this index to contain shards and their sync ids
sync_ids[index] = Hash.new
#puts "Data for index '#{index}'"
# create object with index data
stats = client.indices.stats index: index, level: 'shards'
# For each shard id...
stats['indices'][index]['shards'].each_key do |shardid|
#puts "Data for shard #{shardid}"
# Create the shardid hash to contain sync ids
sync_ids[index][shardid] = Hash.new
# This is my hacked way of getting the array id for reporting.
iter = 0
# Create an array for replica ids
sync_ids[index][shardid]['replicas'] = Array.new
# Each shard id contains an array of shards. For each one...
stats['indices'][index]['shards'][shardid].each do |shard|
#puts "Stats for shard #{shardid}:#{iter}"
# This object contains the sync_id for this particular shard.
sync_id_obj = shard['commit']['user_data']['sync_id']
# If the shard has no sync_id, mark it
if sync_id_obj.nil?
#puts "INDEX: #{index} SHARD: #{shardid}:#{iter} - missing sync_id!"
$to_sync.push(index)
elsif shard['routing']['primary'] == true
#puts "#{index} primary shard sync_id = #{sync_id_obj}"
sync_ids[index][shardid]['primary'] = sync_id_obj
else
sync_ids[index][shardid]['replicas'].push(sync_id_obj)
end
iter = iter + 1
# Now we check for ID consistency.
end
sync_ids[index][shardid]['replicas'].each do |replica|
primary = sync_ids[index][shardid]['primary']
# If the replica's sync ID doesn't match the primary shard's sync id...
if replica != primary
puts " |- Mismatch on index #{index} shard #{shardid}".red
puts " |- Primary: #{primary} Replica: #{replica}".red
# Add this index to the list of indices that need to be flushed.
$to_sync.push(index)
end
end
end
end
if $to_sync.any?
puts " |- Indices that need syncing:".yellow
puts " |- #{$to_sync.uniq}"
end
end
def get_relocating_shards(client)
if client.cluster.health['relocating_shards'] == 0 && client.cluster.health['status'] == "green" && client.cluster.health['unassigned_shards'] == 0
return true
else
return false
end
end
def sync_flush(node, index)
# Send a `$index/_flush/synced` command so shard status is clean for restart
client = Elasticsearch::Client.new(host: node)
puts " |- Executing a sync flush on all indices."
client.indices.flush_synced(index: "#{index}")
end
def disable_allocation(client)
# Disable shard allocation so things don't get jumbled
puts " |- Disabling shard allocation on the cluster"
client.cluster.put_settings body: { transient: { 'cluster.routing.allocation.enable' => 'none' } }
end
def enable_allocation(client)
# Enable shard allocation following successful restart
puts " |- Enabling shard allocation on the cluster"
client.cluster.put_settings body: { transient: { 'cluster.routing.allocation.enable' => 'all' } }
end
def wait_for_relocating_shards(node, client)
# Sleep for two seconds at a time and recheck shard status
print " |- Waiting for shards to settle on #{node} "
until get_relocating_shards(client) do
print ". ".red
sleep 2
end
puts ""
end
def restart_node(node, reboot)
# Using SSH, send a service restart call
current_user = ENV['USER']
if reboot == TRUE
puts " |- Sending reboot request to #{node}..."
Net::SSH.start("#{node}", current_user) do |ssh|
ssh.exec!("sudo shutdown now -r")
end
puts " |- Done.".green
else
puts " |- Sending restart request to #{node}..."
Net::SSH.start("#{node}", current_user) do |ssh|
ssh.exec!("sudo service elasticsearch-01 restart")
# It's not necessary to output the service restart, but you can uncomment this if you want to see it.
# NOTE: Make sure to comment out the line above, or you'll run the restart twice.
#output = ssh.exec!("sudo service elasticsearch-01 restart")
#puts output
end
puts " |- Done.".green
end
end
def wait_for_http(node)
# Wait for the node to respond again so we can check status
puts " |- Waiting for elasticsearch to accept connections on #{node}:9200"
until test_http(node) do
print "."
sleep 1
end
end
def test_http(node)
# Ping port 9200 until it responds.
response = HTTParty.get("http://#{node}:9200", timeout: 1)
if response['tagline'] == "You Know, for Search"
true
end
rescue Net::OpenTimeout, Errno::ECONNREFUSED
sleep 1
false
end
def file_to_array(file)
# Open the progress file and load in the nodes that have already been restarted.
nodelist = []
f = File.open(file)
if File.exist?(file)
f.each_line {|line|
nodelist.push line.chomp
}
f.close
end
nodelist
end
def already_done(nodelist, node)
# Check if this node is in the list of completed nodes.
if nodelist.nil?
return false
elsif nodelist.include?(node)
return true
else
return false
end
end
def append_to_file(node, file)
# Add each node to the end of the log file as they are completed.
f = File.new(file, 'a')
f.puts node
f.close
end
def delete_file(file)
# Delete the log file when complete
File.delete(file)
end
def pick_cluster (clusters)
# Choose the cluster you wish to restart.
print "Selection: ".blue
answer = gets.chomp.to_i
if answer == 0 then pick_cluster # 'a string'.to_i results in 0
else clusters[answer - 1] end
end
def list_clusters (clusters)
# List the available clusters.
i = 1
clusters.each do |cname|
puts "\t#{i}) #{cname['name']}"
i = i + 1
end
end
def print_cluster (cluster)
# Print masters, if any exist
if cluster['master'].any?
cluster['master'].each do |node|
puts "\t #{node}"
end
end
# Print clients, if any exist
if cluster['client'].any?
cluster['client'].each do |node|
puts "\t #{node}"
end
end
# Print data nodes
cluster['data'].each do |node|
puts "\t #{node}"
end
end
def ask_reboot()
# Ask whether or not to reboot
print "Perform server reboot? ".blue
answer = gets.chomp
if answer =~ /^(y|Y)/
return TRUE
elsif answer =~ /^(n|N)/
return FALSE
else
puts "Invalid selection. Please choose y[es] or n[o]".red
ask_reboot()
end
end
#############
# GET INPUT #
#############
puts "*~*~* ELASTICSEARCH CLUSTER RESTART *~*~*".yellow
puts "This script will run either a service restart or a server reboot on the " \
"hostgroup of your choosing. \n \n"
puts "Please choose from the following options: "
list_clusters(list_of_clusters)
# Don't go on until we have a valid answer.
until elasticsearch_cluster = pick_cluster(list_of_clusters)
puts "Invalid selection. Please try again.".red
end
reboot = ask_reboot()
# Brief on what will happen
puts "This script will run a rolling restart of these hosts:"
print_cluster(elasticsearch_cluster)
print "Is this okay? (y/N): ".blue
# Die if the answer isn't y|Y(es)
unless gets.chomp =~ /^(y|Y)/
puts "Exiting."
exit
end
# Look in a file for already-done nodes.
# (for continuing when the script fails mid-roll)
if File.exist?(logfile)
previously_run = file_to_array(logfile)
end
#################
# SANITY CHECKS #
#################
# Quickly grab the first host in the array and run some sanity checks.
cluster_rep = elasticsearch_cluster['data'][0]
client = Elasticsearch::Client.new(host: cluster_rep)
# Check for shards in relocation status, die if greater than 0
if client.cluster.health['relocating_shards'] > 0
puts "Cluster is rebalancing - There are currently #{client.cluster.health['relocating_shards']} shards relocating. Quitting..."
exit 1
end
# Check for cluster status, die if not green.
if client.cluster.health['status'] != "green"
puts "Cluster health is not green, discontinuing."
exit 1
end
# Check for unassigned shards, die if greater than 0
if client.cluster.health['unassigned_shards'] > 0
puts "There are unassigned shards - please handle that before trying a rolling restart. Aborting."
exit 1
end
# Check for sync_id, and if it doesn't exist, sync_flush
check_sync_id(cluster_rep)
if $to_sync.any?
$to_sync.uniq.each do |index|
puts "Executing a sync flush on #{index}."
begin
client.indices.flush_synced(index: "#{index}")
rescue
puts " |- Failed for #{index} (there are probably active jobs)".red
else
puts " |- Synced #{index}".red
end
end
check_sync_id(cluster_rep)
if $to_sync.any?
print "There are still indices that aren't sync'd, would you like to continue? (y/N): ".blue
if gets.chomp =~ /^(y|Y)/
puts "Continuing..."
else
puts "Exiting script.".red
exit 1
end
end
else
puts "No indices need syncing.".green
end
###########
# DO WORK #
###########
## MASTER
# loop through our cluster and restart all master nodes sequentially
if elasticsearch_cluster['master'].any?
elasticsearch_cluster['master'].each do |node|
puts "Processing #{node}".yellow
if !already_done(previously_run, node)
client = Elasticsearch::Client.new(host: node)
# Send the restart command
restart_node(node, reboot)
# Wait for node to shutdown
puts " |- Waiting 15s for node to initiate shutdown..."
sleep 15
# Wait for the node to come back
wait_for_http(node)
# Add it to the log
append_to_file(node, logfile)
puts " |- Complete. Logging completion.".green
else
# Skip it if it's already been done.
puts " |- #{node} is already finished, skipping...".green
end
end
end
## CLIENT
# loop through our cluster and restart all client nodes sequentially
if elasticsearch_cluster['client'].any?
elasticsearch_cluster['client'].each do |node|
puts "Processing #{node}".yellow
if !already_done(previously_run, node)
client = Elasticsearch::Client.new(host: node)
# Disable shard allocation
restart_node(node, reboot)
# Wait for node to shutdown
puts " |- Waiting 15s for node to initiate shutdown..."
# Wait for the node to come back
sleep 15
wait_for_http(node)
# Add it to the log
append_to_file(node, logfile)
puts " |- Complete. Logging completion.".green
else
# Skip it if it's already been done
puts " |- #{node} is already finished, skipping...".green
end
end
end
## DATA
# loop through our cluster and restart all data nodes sequentially
elasticsearch_cluster['data'].each do |node|
puts "Processing #{node}".yellow
if !already_done(previously_run, node)
client = Elasticsearch::Client.new(host: node)
# Disable shard allocation
disable_allocation(client)
# Send the restart command
restart_node(node, reboot)
# Wait for node to shutdown
puts " |- Waiting 15s for node to initiate shutdown..."
sleep 15
# Wait for the node to come back
wait_for_http(node)
# Reenable shard allocation
enable_allocation(client)
# Wait for the shards to settle before moving on
wait_for_relocating_shards(node, client)
# Add it to the log
append_to_file(node, logfile)
puts " |- Complete. Logging completion.".green
else
# Skip it if it's already been done
puts " |- #{node} is already finished, skipping...".green
end
end
##########
# FINISH #
##########
# Report the total time
puts "Total restart time: #{Time.now.to_i - start_time}s"
# Clear the log file
delete_file(logfile)