From fad1b5c072266ac8d575b01f1b30c661adb69b92 Mon Sep 17 00:00:00 2001 From: story Date: Sat, 23 Dec 2023 15:50:56 +0800 Subject: [PATCH] canal auto scan --- .../Canal\351\203\250\347\275\262.md" | 192 ++++++++++++++++-- 1 file changed, 178 insertions(+), 14 deletions(-) diff --git "a/docs/linux/applications/Canal\351\203\250\347\275\262.md" "b/docs/linux/applications/Canal\351\203\250\347\275\262.md" index b6615e48d..29c809dd7 100644 --- "a/docs/linux/applications/Canal\351\203\250\347\275\262.md" +++ "b/docs/linux/applications/Canal\351\203\250\347\275\262.md" @@ -35,9 +35,7 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down mysql -uroot -p < ~/Downloads/canal/admin/conf/canal_manager.sql ``` - - - ![image-20210617000438523](https://storyxc.com/images/blog//image-20210617000438523.png) +![image-20210617000438523](https://storyxc.com/images/blog//image-20210617000438523.png) - 创建`canal`用户并授权`canal`链接 MySQL 账号具有作为 MySQL slave 的权限 @@ -48,11 +46,9 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down FLUSH PRIVILEGES; ``` - - - 修改conf文件夹中的application.yml +修改conf文件夹中的application.yml - ![image-20231223000510198](https://storyxc.com/images/blog/b0c9b225-f1ee-4bec-b2b2-fe32122ff354.png) +![image-20231223000510198](https://storyxc.com/images/blog/b0c9b225-f1ee-4bec-b2b2-fe32122ff354.png) - 执行admin/bin目录的startup.sh @@ -60,7 +56,6 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down ![image-20210617001200010](https://storyxc.com/images/blog//image-20210617001200010.png) - - 使用默认账号密码 admin/123456即可登录 @@ -72,8 +67,6 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down ![image-20210617001457272](https://storyxc.com/images/blog//image-20210617001457272.png) - - > > canal-admin的核心模型主要有: > > 1. instance,对应canal-server里的instance,一个最小的订阅mysql的队列 @@ -84,7 +77,10 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down > > 1. instance是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上, > 2. 有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务 -> - 动态加载的过程,对应配置文件中的autoScan配置,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件 + > + +- 动态加载的过程,对应配置文件中的autoScan配置,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件 + > 3. 将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可) - 新建server,按照图中配置即可 @@ -247,6 +243,175 @@ canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf" ``` +:::tip + +`canal.auto.scan`如果设置为true,`canal.destinations`可以不填写,server会自动扫描instance然后启动 + +```java +// CanalController +// 初始化monitor机制 +autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); +if (autoScan) { + defaultAction = new InstanceAction() { + public void start(String destination) { + InstanceConfig config = instanceConfigs.get(destination); + if (config == null) { + // 重新读取一下instance config + config = parseInstanceConfig(properties, destination); + instanceConfigs.put(destination, config); + } + if (!embededCanalServer.isStart(destination)) { + // HA机制启动 + ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); + if (!config.getLazy() && !runningMonitor.isStart()) { + runningMonitor.start(); + } + } + logger.info("auto notify start {} successful.", destination); + } + //... + } +} + +instanceConfigMonitors = MigrateMap.makeComputingMap(new Function() { + + public InstanceConfigMonitor apply(InstanceMode mode) { + int scanInterval = Integer.valueOf(getProperty(properties, + CanalConstants.CANAL_AUTO_SCAN_INTERVAL, + "5")); + + if (mode.isSpring()) { + SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); + monitor.setScanIntervalInSecond(scanInterval); + monitor.setDefaultAction(defaultAction); + // 设置conf目录,默认是user.dir + conf目录组成 + String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR); + if (StringUtils.isEmpty(rootDir)) { + rootDir = "../conf"; + } + + if (StringUtils.equals("otter-canal", System.getProperty("appName"))) { + monitor.setRootConf(rootDir); + } else { + // eclipse debug模式 + monitor.setRootConf("src/main/resources/"); + } + return monitor; + } else if (mode.isManager()) { + ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor(); + monitor.setScanIntervalInSecond(scanInterval); + monitor.setDefaultAction(defaultAction); + String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER); + monitor.setConfigClient(getManagerClient(managerAddress)); + return monitor; + } else { + throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor"); + } + } +}); + + +// CanalController.start() +public void start() throws Throwable { + // ... + // 尝试启动一下非lazy状态的通道 + for (Map.Entry entry : instanceConfigs.entrySet()) { + final String destination = entry.getKey(); + InstanceConfig config = entry.getValue(); + // 创建destination的工作节点 + if (!embededCanalServer.isStart(destination)) { + // HA机制启动 + ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); + if (!config.getLazy() && !runningMonitor.isStart()) { + runningMonitor.start(); + } + } + + if (autoScan) { + instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); + } + } + + if (autoScan) { + instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); + for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { + if (!monitor.isStart()) { + monitor.start(); + } + } + } + // ... +} + +// 然后会调用ManagerInstanceConfigMonitor的start方法,start方法会启动一个定时任务,每隔scanInterval秒调用scan方法 +public void start() { + super.start(); + executor.scheduleWithFixedDelay(new Runnable() { + + public void run() { + try { + scan(); + if (isFirst) { + isFirst = false; + } + } catch (Throwable e) { + logger.error("scan failed", e); + } + } + + }, 0, scanIntervalInSecond, TimeUnit.SECONDS); +} + +// scan方法中会通过configClient调用canal-admin的接口获取instance的配置信息, +// 最后对instance进行stop/reload/start操作 + +private void scan() { + String instances = configClient.findInstances(null); + final List is = Lists.newArrayList(StringUtils.split(instances, ',')); + List start = Lists.newArrayList(); + List stop = Lists.newArrayList(); + List restart = Lists.newArrayList(); + for (String instance : is) { + if (!configs.containsKey(instance)) { + PlainCanal newPlainCanal = configClient.findInstance(instance, null); + if (newPlainCanal != null) { + configs.put(instance, newPlainCanal); + start.add(instance); + } + } else { + PlainCanal plainCanal = configs.get(instance); + PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5()); + if (newPlainCanal != null) { + // 配置有变化 + restart.add(instance); + configs.put(instance, newPlainCanal); + } + } + } + + configs.forEach((instance, plainCanal) -> { + if (!is.contains(instance)) { + stop.add(instance); + } + }); + + stop.forEach(instance -> { + notifyStop(instance); + }); + + restart.forEach(instance -> { + notifyReload(instance); + }); + + start.forEach(instance -> { + notifyStart(instance); + }); + +} +``` + +::: + ## canal deployer - 解压 @@ -268,9 +433,9 @@ mkdir -p ~/Downloads/canal/deployer && tar -zxvf canal.deployer-1.1.4.tar.gz -C :::tip - 1. 注意ip前后不能有空格,不然会无法启动netty server从而无法启动canal server,应该是后台没做trim + 1. 注意ip前后不能有空格,不然会无法启动netty server从而无法启动canal server,应该是后台没做trim - 2. 如果不填写`canal.ip`和`canal.register.ip`两个配置项,代码中将通过`AddressUtils.getHostIp()`获取本机的ip地址,如果本地有docker/orbstack等创建的虚拟网络设备会导致启动canal-server后识别到多个server且是不同的ip(docker0网桥或orbstack容器等的ip),比较膈应人。([#issue47](https://github.com/alibaba/canal/issues/47)) + 2. 如果不填写`canal.ip`和`canal.register.ip`两个配置项,代码中将通过`AddressUtils.getHostIp()`获取本机的ip地址,如果本地有docker/orbstack等创建的虚拟网络设备会导致启动canal-server后识别到多个server且是不同的ip(docker0网桥或orbstack容器等的ip),比较膈应人。([#issue47](https://github.com/alibaba/canal/issues/47)) 源码: @@ -292,7 +457,6 @@ mkdir -p ~/Downloads/canal/deployer && tar -zxvf canal.deployer-1.1.4.tar.gz -C ::: - - 执行bin目录下的startup.sh