From 6f380a40609db391e44f3b82f002f8ad419eeeb2 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Fri, 12 Apr 2024 21:07:26 +1000 Subject: [PATCH] feat: migrate to redis for service discovery (#268) --- docker-compose.yml | 23 +--- shard.lock | 20 ++-- shard.yml | 15 ++- spec/helper.cr | 76 ++++++++++--- spec/mappings/control_system_modules_spec.cr | 6 +- spec/module_manager_spec.cr | 13 +-- src/config.cr | 7 -- src/constants.cr | 7 +- src/core-app.cr | 2 +- src/placeos-core.cr | 5 +- src/placeos-core/healthcheck.cr | 3 - src/placeos-core/module_manager.cr | 106 ++++++++++--------- src/placeos-core/process_manager/edge.cr | 8 +- src/placeos-core/process_manager/local.cr | 8 +- src/placeos-edge/protocol.cr | 10 +- 15 files changed, 159 insertions(+), 150 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index b41d5f7b..b7a1de47 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,10 +6,6 @@ x-deployment-env: &deployment-env ENV: ${ENV:-development} SG_ENV: ${SG_ENV:-development} -x-etcd-client-env: &etcd-client-env - ETCD_HOST: ${ETCD_HOST:-etcd} - ETCD_PORT: ${ETCD_PORT:-2379} - x-redis-client-env: &redis-client-env REDIS_URL: ${REDIS_URL:-redis://redis:6379} @@ -44,7 +40,6 @@ services: - ${PWD}/spec:/app/spec - ${PWD}/src:/app/src depends_on: - - etcd - redis - postgres - migrator @@ -55,29 +50,13 @@ services: # Environment GITHUB_ACTION: ${GITHUB_ACTION:-} # Service Hosts - <<: [*etcd-client-env,*redis-client-env, *postgresdb-client-env,*deployment-env, *build-api-env] + <<: [*redis-client-env, *postgresdb-client-env,*deployment-env, *build-api-env] redis: image: eqalpha/keydb restart: always hostname: redis - etcd: - image: quay.io/coreos/etcd:${ETCD_VERSION:-v3.5.4} - restart: always - hostname: etcd - environment: - ALLOW_NONE_AUTHENTICATION: "yes" - ETCD_NAME: "etcd" - ETCD_INITIAL_ADVERTISE_PEER_URLS: "http://etcd:2380" - ETCD_LISTEN_PEER_URLS: "http://0.0.0.0:2380" - ETCD_LISTEN_CLIENT_URLS: "http://0.0.0.0:2379" - ETCD_ADVERTISE_CLIENT_URLS: "http://etcd:2379" - ETCD_INITIAL_CLUSTER_TOKEN: "etcd-cluster" - ETCD_INITIAL_CLUSTER=etcd: "http://etcd:2380" - ETCD_INITIAL_CLUSTER_STATE: "new" - TZ: $TZ - postgres: hostname: postgres image: postgres diff --git a/shard.lock b/shard.lock index 837bc354..c5338eed 100644 --- a/shard.lock +++ b/shard.lock @@ -25,10 +25,6 @@ shards: git: https://github.com/spider-gazelle/bindata.git version: 2.0.0 - clustering: - git: https://github.com/place-labs/clustering.git - version: 3.1.1 - connect-proxy: git: https://github.com/spider-gazelle/connect-proxy.git version: 2.0.0 @@ -69,10 +65,6 @@ shards: git: https://github.com/arcage/crystal-email.git version: 0.7.0 - etcd: - git: https://github.com/place-labs/crystal-etcd.git - version: 1.2.5 - eventbus: git: https://github.com/spider-gazelle/eventbus.git version: 0.9.9+git.commit.ca8ef0c5e21ee15da079edd5bcea39bee7e07f26 @@ -109,10 +101,6 @@ shards: git: https://github.com/jgaskins/hot_topic.git version: 0.1.0+git.commit.3c901e77b6e000930398738260a2944b6f5785dc - hound-dog: - git: https://github.com/place-labs/hound-dog.git - version: 2.9.1 - http-params-serializable: git: https://github.com/place-labs/http-params-serializable.git version: 0.5.0 @@ -211,7 +199,7 @@ shards: placeos-driver: git: https://github.com/placeos/driver.git - version: 6.9.19 + version: 6.11.0+git.commit.9363dbec039d92ccc0ea5afdbaa549bc6bb14cdf placeos-log-backend: git: https://github.com/place-labs/log-backend.git @@ -219,7 +207,7 @@ shards: placeos-models: git: https://github.com/placeos/models.git - version: 9.42.2 + version: 9.45.0 placeos-resource: git: https://github.com/place-labs/resource.git @@ -245,6 +233,10 @@ shards: git: https://github.com/caspiano/redis-cluster.cr.git version: 0.8.5 + redis_service_manager: + git: https://github.com/place-labs/redis_service_manager.git + version: 3.1.1 + rendezvous-hash: git: https://github.com/caspiano/rendezvous-hash.git version: 0.3.1 diff --git a/shard.yml b/shard.yml index f91b09e1..51c94956 100644 --- a/shard.yml +++ b/shard.yml @@ -13,10 +13,6 @@ dependencies: github: spider-gazelle/action-controller version: ~> 7.2 - clustering: - github: place-labs/clustering - version: ~> 3.0 - git-repository: github: place-labs/git-repository @@ -28,17 +24,13 @@ dependencies: github: crystal-community/hardware version: ~> 0.5 - hound-dog: - github: place-labs/hound-dog - version: ~> 2.5 - log_helper: github: spider-gazelle/log_helper version: ~> 1 placeos-driver: github: placeos/driver - version: ~> 6.1 + branch: master placeos-log-backend: github: place-labs/log-backend @@ -60,6 +52,11 @@ dependencies: github: caspiano/redis-cluster.cr version: ">= 0.8.4" + # clustering service discovery + redis_service_manager: + github: place-labs/redis_service_manager + version: ">= 3.0.0" + responsible: github: place-labs/responsible version: ~> 1.2 diff --git a/spec/helper.cr b/spec/helper.cr index 00d23bd9..fba875e1 100644 --- a/spec/helper.cr +++ b/spec/helper.cr @@ -34,14 +34,16 @@ def clear_tables PlaceOS::Model::Edge.clear end +def clustering_mock + MockClustering.new("core") +end + def discovery_mock - DiscoveryMock.new("core", uri: CORE_URL) + Clustering::Discovery.new(clustering_mock) end def module_manager_mock - discovery = discovery_mock - clustering = MockClustering.new(uri: CORE_URL, discovery: discovery) - PlaceOS::Core::ModuleManager.new(CORE_URL, discovery: discovery, clustering: clustering) + PlaceOS::Core::ModuleManager.new(CORE_URL, clustering: clustering_mock) end macro around_suite(block) @@ -55,7 +57,6 @@ end around_suite ->{ clear_tables - HoundDog::Service.clear_namespace } PgORM::Database.configure { |_| } @@ -151,24 +152,71 @@ def create_resources(process : Bool = true, use_head : Bool = false) {repository, driver, mod, resource_manager} end -class DiscoveryMock < HoundDog::Discovery +# reopen this for specs +class Clustering::Discovery DOES_NOT_MAP = "" def own_node?(key : String) : Bool key != DOES_NOT_MAP end - - def etcd_nodes - [@service_events.node].map &->HoundDog::Discovery.to_hash_value(HoundDog::Service::Node) - end end class MockClustering < Clustering - def start(&stabilize : Array(HoundDog::Service::Node) ->) - @stabilize = stabilize - stabilize.call([discovery.node]) + NODE_ID = "----core1----" + NODE_URI = ENV["CORE_URL"]? || "http://core:3000" + VERSION = "1" + + getter uri : String = NODE_URI + getter ulid : String = NODE_ID + + # registers this node with the cluster as a member + def register : Bool + rendezvous_hash = rendezvous + @version = VERSION + + ready_cb = Proc(Nil).new do + cluster_stable_callbacks.each do |callback| + spawn do + begin + callback.call + rescue error + Log.error(exception: error) { "notifying cluster stable" } + end + end + end + end + + rebalance_callbacks.each do |callback| + spawn do + begin + callback.call(rendezvous_hash, ready_cb) + rescue error + Log.error(exception: error) { "performing rebalance callback" } + end + end + end + true + end + + # removes this node from the cluster as a member + getter unregister : Bool = true + + # is this node registered as part of the cluster + getter? registered : Bool = true + + # is this class watching for changes to the cluster + # this should return true if registered returns true + getter? watching : Bool = true + + # returns the list of known nodes + def rendezvous : RendezvousHash + RendezvousHash.new(nodes: [NODE_URI]) end - def stop + # returns a node_id => URI mapping + def node_hash : Hash(String, URI) + { + NODE_ID => URI.parse(NODE_URI), + } end end diff --git a/spec/mappings/control_system_modules_spec.cr b/spec/mappings/control_system_modules_spec.cr index 61a34c1a..15845d44 100644 --- a/spec/mappings/control_system_modules_spec.cr +++ b/spec/mappings/control_system_modules_spec.cr @@ -15,16 +15,14 @@ module PlaceOS::Core::Mappings end def self.mocked_fail_manager - discovery = discovery_mock - clustering = MockClustering.new(uri: CORE_URL, discovery: discovery) - Mock.new(CORE_URL, discovery: discovery, clustering: clustering) + Mock.new(CORE_URL, clustering_mock) end describe ControlSystemModules, tags: "mappings" do describe ".update_mapping" do it "ignores systems not mapped to node" do control_system = Model::Generator.control_system - control_system.id = DiscoveryMock::DOES_NOT_MAP + control_system.id = Clustering::Discovery::DOES_NOT_MAP control_system_modules = ControlSystemModules.new(module_manager: module_manager_mock, startup: false) control_system_modules.process_resource(:updated, control_system).skipped?.should be_true end diff --git a/spec/module_manager_spec.cr b/spec/module_manager_spec.cr index 744cbf6d..58394e16 100644 --- a/spec/module_manager_spec.cr +++ b/spec/module_manager_spec.cr @@ -53,11 +53,7 @@ module PlaceOS::Core end describe "startup" do - it "registers to etcd" do - # Remove metadata in etcd - namespace = HoundDog.settings.service_namespace - HoundDog.etcd_client.kv.delete_prefix(namespace) - + it "registers to redis" do # Clear relevant tables Model::Driver.clear Model::Module.clear @@ -70,13 +66,14 @@ module PlaceOS::Core sleep 3 # Check that the node is registered in etcd - module_manager.discovery.nodes.map(&.[:name]).should contain(module_manager.discovery.name) + core_uri = URI.parse(CORE_URL) + module_manager.discovery.nodes.should contain(core_uri) - module_manager.discovery.unregister + module_manager.stop sleep 0.1 # Check that the node is no longer registered in etcd - module_manager.discovery.nodes.map(&.[:name]).should_not contain(module_manager.discovery.name) + module_manager.discovery.nodes.should_not contain(core_uri) ensure module_manager.try &.stop end diff --git a/src/config.cr b/src/config.cr index 947afe12..98bce356 100644 --- a/src/config.cr +++ b/src/config.cr @@ -22,13 +22,6 @@ require "./telemetry" # Server required after application controllers require "action-controller/server" -# Configure Service discovery -HoundDog.configure do |settings| - settings.service_namespace = "core" - settings.etcd_host = PlaceOS::Core::ETCD_HOST - settings.etcd_port = PlaceOS::Core::ETCD_PORT -end - # Filter out sensitive params that shouldn't be logged filter_params = ["password", "bearer_token"] diff --git a/src/constants.cr b/src/constants.cr index 08494b6a..8f2a5d69 100644 --- a/src/constants.cr +++ b/src/constants.cr @@ -11,11 +11,12 @@ module PlaceOS::Core DRIVERS = ENV["ENGINE_DRIVERS"]? || File.join(PlaceOS::Compiler.repository_dir, "drivers") - ETCD_HOST = ENV["ETCD_HOST"]? || "localhost" - ETCD_PORT = (ENV["ETCD_PORT"]? || 2379).to_i - REDIS_URL = ENV["REDIS_URL"]? || "redis://localhost:6379" + # seconds before a node is considered offline + # should not be divisible by 3 + CLUSTER_NODE_TTL = (ENV["CLUSTER_NODE_TTL"]? || "20").to_i + # `core` self-registers to etcd with this information. # In k8s we can grab the Pod information from the environment # https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/#use-pod-fields-as-values-for-environment-variables diff --git a/src/core-app.cr b/src/core-app.cr index f382648e..420cf9a1 100644 --- a/src/core-app.cr +++ b/src/core-app.cr @@ -99,7 +99,7 @@ Signal::INT.trap &terminate # Docker containers use the term signal Signal::TERM.trap &terminate -# Wait for etcd, redis, and postgres to be ready +# Wait for redis and postgres to be ready PlaceOS::Core.wait_for_resources spawn(same_thread: true) do diff --git a/src/placeos-core.cr b/src/placeos-core.cr index f62295df..6f3f4805 100644 --- a/src/placeos-core.cr +++ b/src/placeos-core.cr @@ -12,6 +12,10 @@ module PlaceOS::Core LOGSTASH_HOST = ENV["LOGSTASH_HOST"]? LOGSTASH_PORT = ENV["LOGSTASH_PORT"]? + # Minimize the number of connections being made to redis + REDIS_LOCK = Driver::RedisStorage.redis_lock + REDIS_CLIENT = Driver::RedisStorage.shared_redis_client + def self.log_backend if !(logstash_host = LOGSTASH_HOST.presence).nil? logstash_port = LOGSTASH_PORT.try(&.to_i?) || abort("LOGSTASH_PORT is either malformed or not present in environment") @@ -48,7 +52,6 @@ module PlaceOS::Core end # Wait for the upstream services to be ready - # - etcd # - redis # - postgres def self.wait_for_resources diff --git a/src/placeos-core/healthcheck.cr b/src/placeos-core/healthcheck.cr index d34d9a8c..d2f832fc 100644 --- a/src/placeos-core/healthcheck.cr +++ b/src/placeos-core/healthcheck.cr @@ -7,9 +7,6 @@ module PlaceOS::Core::Healthcheck Promise.defer { check_resource?("redis") { ::PlaceOS::Driver::RedisStorage.with_redis &.ping } }, - Promise.defer { - check_resource?("etcd") { ModuleManager.instance.discovery.etcd(&.maintenance.status) } - }, Promise.defer { check_resource?("postgres") { pg_healthcheck } }, diff --git a/src/placeos-core/module_manager.cr b/src/placeos-core/module_manager.cr index b25961c1..e927aa15 100644 --- a/src/placeos-core/module_manager.cr +++ b/src/placeos-core/module_manager.cr @@ -1,7 +1,5 @@ -require "clustering" -require "hound-dog" +require "redis_service_manager" require "mutex" -require "redis" require "uri/json" require "placeos-models/control_system" @@ -25,12 +23,12 @@ module PlaceOS::Core class_property uri : URI = URI.new("http", CORE_HOST, CORE_PORT) getter clustering : Clustering - getter discovery : HoundDog::Discovery + getter discovery : Clustering::Discovery protected getter store : DriverStore def stop - clustering.stop + clustering.unregister stop_process_check end @@ -45,8 +43,6 @@ module PlaceOS::Core # Redis channel that cluster leader publishes stable cluster versions to REDIS_VERSION_CHANNEL = "cluster/cluster_version" - getter redis : Redis { Redis.new(url: REDIS_URL) } - # Singleton configured from environment class_getter instance : ModuleManager { ModuleManager.new(uri: self.uri) } @@ -64,19 +60,20 @@ module PlaceOS::Core # - once load complete, mark in etcd that load is complete def initialize( uri : String | URI, - discovery : HoundDog::Discovery? = nil, - clustering : Clustering? = nil, - @redis : Redis? = nil + clustering : Clustering? = nil ) @uri = uri.is_a?(URI) ? uri : URI.parse(uri) ModuleManager.uri = @uri @store = DriverStore.new - @discovery = discovery || HoundDog::Discovery.new(service: "core", uri: @uri) - @clustering = clustering || Clustering.new( - uri: @uri, - discovery: @discovery, + @clustering = clustering || RedisServiceManager.new( + service: "core", + redis: REDIS_CLIENT, + lock: REDIS_LOCK, + uri: @uri.to_s, + ttl: CLUSTER_NODE_TTL ) + @discovery = Clustering::Discovery.new(@clustering) super() end @@ -96,9 +93,14 @@ module PlaceOS::Core # Register core node to the cluster protected def start_clustering - clustering.start(on_stable: ->publish_version(String)) do |nodes| + clustering.on_cluster_stable { publish_version(clustering.version) } + clustering.on_rebalance do |nodes, rebalance_complete_cb| + Log.info { "cluster rebalance in progress" } stabilize(nodes) + rebalance_complete_cb.call + Log.info { "cluster rebalance complete" } end + clustering.register end # Event loop @@ -137,7 +139,9 @@ module PlaceOS::Core def load_module(mod : Model::Module, rendezvous_hash : RendezvousHash = discovery.rendezvous) module_id = mod.id.as(String) - if ModuleManager.core_uri(mod, rendezvous_hash) == uri + allocated_uri = ModuleManager.core_uri(mod, rendezvous_hash) + + if allocated_uri == @clustering.uri driver = mod.driver! driver_id = driver.id.as(String) # repository_folder = driver.repository.not_nil!.folder_name @@ -166,6 +170,8 @@ module PlaceOS::Core # Not on node, but protocol manager exists Log.info { {message: "unloading module no longer on node", module_id: module_id} } unload_module(mod) + else + Log.warn { {message: "load module request invalid. #{allocated_uri.inspect} != #{@clustering.uri.inspect}", module_id: module_id} } end end @@ -314,49 +320,53 @@ module PlaceOS::Core # Run through modules and load to a stable state. # # Uses a semaphore to ensure intermediary cluster events don't trigger stabilization. - def stabilize(nodes : Array(HoundDog::Service::Node)) : Bool + def stabilize(rendezvous_hash : RendezvousHash) : Bool queued_stabilization_events.add(1) stabilize_lock.synchronize do queued_stabilization_events.add(-1) return false unless queued_stabilization_events.get.zero? - Log.debug { {message: "stabilizing", nodes: nodes.to_json} } - - # Create a one off rendezvous hash with nodes from the stabilization event - rendezvous_hash = RendezvousHash.new(nodes: nodes.map(&->HoundDog::Discovery.to_hash_value(HoundDog::Service::Node))) - + Log.debug { {message: "cluster rebalance: stabilizing", nodes: rendezvous_hash.nodes.to_json} } success_count, fail_count = 0_i64, 0_i64 - timeout_period = 5.seconds - waiting = Hash(String?, Promise::DeferredPromise(Nil)).new - - Model::Module.order(id: :asc).all.in_groups_of(STABILIZE_BATCH_SIZE, reuse: true) do |modules| - modules.each.reject(Nil).each do |mod| - waiting[mod.id] = Promise.defer(same_thread: true, timeout: timeout_period) do - begin - load_module(mod, rendezvous_hash) - success_count += 1 - rescue e - Log.error(exception: e) { {message: "failed to load module during stabilization", module_id: mod.id, name: mod.name, custom_name: mod.custom_name} } - fail_count += 1 + + SimpleRetry.try_to(max_attempts: 3, base_interval: 100.milliseconds, max_interval: 1.seconds) do + timeout_period = 5.seconds + waiting = Hash(String?, Promise::DeferredPromise(Nil)).new + + Model::Module.order(id: :asc).all.in_groups_of(STABILIZE_BATCH_SIZE, reuse: true) do |modules| + modules.each.reject(Nil).each do |mod| + waiting[mod.id] = Promise.defer(same_thread: true, timeout: timeout_period) do + begin + load_module(mod, rendezvous_hash) + success_count += 1 + rescue e + Log.error(exception: e) { {message: "cluster rebalance: module load failure", module_id: mod.id, name: mod.name, custom_name: mod.custom_name} } + fail_count += 1 + end + nil end - nil - end - waiting.each do |mod_id, promise| - begin - promise.get - rescue error - fail_count += 1 - Log.error(exception: error) { "load timeout during stabilization: #{mod_id}" } + waiting.each do |mod_id, promise| + begin + promise.get + rescue error + fail_count += 1 + Log.error(exception: error) { {message: "cluster rebalance: module load timeout", module_id: mod.id, name: mod.name, custom_name: mod.custom_name} } + end end + waiting.clear end - waiting.clear end end - Log.info { {message: "finished loading modules stabilization", success: success_count, failure: fail_count} } + Log.info { {message: "cluster rebalance: finished loading modules on node", success: success_count, failure: fail_count} } true end + rescue error + Log.fatal(exception: error) { "cluster rebalance: failed, terminating node" } + stop + sleep 0.5 + exit(1) end # Determine if a module is an edge module and allocated to the current core node. @@ -368,8 +378,8 @@ module PlaceOS::Core # Publish cluster version to redis # def publish_version(cluster_version : String) - redis.publish(REDIS_VERSION_CHANNEL, cluster_version) - + Driver::RedisStorage.with_redis { |redis| redis.publish(REDIS_VERSION_CHANNEL, cluster_version) } + Log.info { "cluster stable, publishing cluster version" } nil end @@ -389,9 +399,7 @@ module PlaceOS::Core end def self.core_uri(mod : Model::Module | String, rendezvous_hash : RendezvousHash) - rendezvous_hash[hash_id(mod)]?.try do |hash_value| - HoundDog::Discovery.from_hash_value(hash_value)[:uri] - end + rendezvous_hash[hash_id(mod)]? end protected getter uri : URI = ModuleManager.uri diff --git a/src/placeos-core/process_manager/edge.cr b/src/placeos-core/process_manager/edge.cr index 60a59e7b..a66e93de 100644 --- a/src/placeos-core/process_manager/edge.cr +++ b/src/placeos-core/process_manager/edge.cr @@ -15,11 +15,9 @@ module PlaceOS::Core getter transport : Transport getter edge_id : String - getter redis : Redis::Client { Redis::Client.boot(REDIS_URL) } - protected getter(store : DriverStore) { DriverStore.new } - def initialize(@edge_id : String, socket : HTTP::WebSocket, @redis : Redis? = nil) + def initialize(@edge_id : String, socket : HTTP::WebSocket) @transport = Transport.new do |(sequence_id, request)| if request.is_a?(Protocol::Client::Request) handle_request(sequence_id, request) @@ -215,10 +213,8 @@ module PlaceOS::Core {{ raise "Edge modules cannot request control systems" }} end - private getter redis_lock = Mutex.new - def on_redis(action : Protocol::RedisAction, hash_id : String, key_name : String, status_value : String?) - redis_lock.synchronize do + Driver::RedisStorage.with_redis do |redis| case action in .hset? value = status_value || "null" diff --git a/src/placeos-core/process_manager/local.cr b/src/placeos-core/process_manager/local.cr index d42af7c4..89110318 100644 --- a/src/placeos-core/process_manager/local.cr +++ b/src/placeos-core/process_manager/local.cr @@ -1,5 +1,5 @@ require "hardware" -require "hound-dog" +require "redis_service_manager" require "../process_manager" require "./common" @@ -9,10 +9,10 @@ module PlaceOS::Core include ProcessManager include Common - private getter discovery : HoundDog::Discovery + private getter discovery : Clustering::Discovery private getter store : DriverStore - def initialize(@discovery : HoundDog::Discovery) + def initialize(@discovery : Clustering::Discovery) @store = DriverStore.new end @@ -199,7 +199,7 @@ module PlaceOS::Core edge_id = Model::Module.find!(module_id).edge_id if Model::Module.has_edge_hint?(module_id) node = edge_id ? discovery.find?(edge_id) : discovery.find?(module_id) raise Error.new("No registered core instances") if node.nil? - node[:uri] + node end end end diff --git a/src/placeos-edge/protocol.cr b/src/placeos-edge/protocol.cr index cd159668..0008679e 100644 --- a/src/placeos-edge/protocol.cr +++ b/src/placeos-edge/protocol.cr @@ -12,17 +12,17 @@ module PlaceOS::Edge::Protocol # Binary messages # class Binary < BinData - enum Status + enum Status : UInt8 Success Fail end endian big - uint64 :sequence_id + field sequence_id : UInt64 - enum_field UInt8, status : Status = Status::Success - int32 :length, value: ->{ key.bytesize } - string :key, length: ->{ length } + field status : Status = Status::Success + field length : Int32, value: ->{ key.bytesize } + field key : String, length: ->{ length } # Keep a reference to the remainder of the message protected setter binary : IO