-
Notifications
You must be signed in to change notification settings - Fork 11
Client User Guide
Before starting the client, you need to ensure:
- The server has been started, refer to QuickStart
- zookeeper has been started and obtained the address zk-ip:zk-port
Execute the git command to get the ptubes code
git clone [email protected]:meituan/ptubes.git
Find the class in the IDE
com.meituan.ptubes.sdk.example.ExampleByMainFunction
Replace the zookeeper address in ExampleByMainFunction
with the real address zk-ip:zk-port, and modify the reader address, subscription list, etc. as needed. An example is as follows:
public <T> T getConfig(String confName, Class<T> confClass) {
// ignore confName, when there is only one implement for subscript or consumer config in task.
try {
// ...
if (confClass.isAssignableFrom(PtubesSdkSubscriptionConfig.class)) {
return (T) new PtubesSdkSubscriptionConfig(
"demoR1", // reader taskName, eg. demoR1
taskName, // SDK task name
"127.0.0.1:2181", // change the zookeeper address to a real address.
"test_test.wm_risk_list_account,test_test.test_table" // tables for subscription
);
}
// ...
} catch (Exception e) {
System.out.println("getConfig error for confName:{" + confName + "}, confClass:{" + confClass + "}");
e.printStackTrace();
}
return null;
}
Run the compiled client
- Download the compiled SDK client ptubes-sdk-client.tar.gz
- Unzip the client archive
- Check the directory and configuration
The structure of the compressed package is as follows:
drwxr-xr-x 4 admin root 12:00 bin
drwxr-xr-x 5 admin root 12:00 conf
drwxr-xr-x 4 admin root 12:10 lib
The bin directory stores start and stop scripts, the conf directory stores dependent configuration files, and the lib directory stores required packages.
./conf/ptubes_demo_task.properties is the configuration file for the example task, where ptubes.sdk.zookeeper.address=127.0.0.1:2181 should be replaced with the real zookeeper address zk-ip:zk-port that was started
sdk.conf
ptubes.sdk.task.set=ptubes_demo_task // Each task name needs to create a corresponding configuration file, separated by commas
ptubes_demo_task.properties
ptubes.sdk.reader.name=demoR1 // The corresponding Task name in the reader, such as demoR1
ptubes.sdk.task.name=ptubes_demo_task // SDK's own task name
ptubes.sdk.zookeeper.address=127.0.0.1:2181 // The corresponding zk address
ptubes.sdk.subs=test.test_table// Subscription library table names, separated by commas
ptubes.sdk.reader.ip=127.0.0.1:28332 // Corresponding Reader addresses, separated by commas
- Execute
# This command will run all sdk tasks listed in the ./conf/sdk.conf file
# At the same time, the configuration of these tasks also needs to exist in ./conf/${sdkTaskName}.properties
cd bin && sh start.sh
- Stop
# This command will stop all running SDK tasks on the machine
sh stop.sh
Create a new maven project and depend on the ptubes project
- Create a new maven project and add dependencies
<dependency>
<groupId>com.meituan</groupId>
<artifactId>ptubes-sdk</artifactId>
<version>1.0.0</version>
</dependency>
- Add two configuration files in the project resource directory and adjust the parameters sdk.conf
ptubes.sdk.task.set=ptubes_demo_task
ptubes_demo_task.properties
# The corresponding Task name in the reader, such as demoR1
ptubes.sdk.reader.name=demoR1
#SDK's own task name
ptubes.sdk.task.name=ptubes_demo_task
# Corresponding zk address
ptubes.sdk.zookeeper.address=127.0.0.1:2181
# Subscription library table names, separated by commas
ptubes.sdk.subs=test.test_table
# Corresponding Reader addresses, separated by commas
ptubes.sdk.reader.ip=127.0.0.1:28332
- Add a new class TestMain
package com.example.buffalo;
import com.meituan.ptubes.common.utils.PbJsonUtil;
import com.meituan.ptubes.sdk.IPtubesConnector;
import com.meituan.ptubes.common.log.LoggerFactory;
import com.meituan.ptubes.sdk.IRdsCdcEventListener;
import com.meituan.ptubes.sdk.RdsCdcEventStatus;
import com.meituan.ptubes.sdk.config.notification.IConfigChangeNotifier;
import com.meituan.ptubes.sdk.RdsCdcConnectorFactory;
import com.meituan.ptubes.sdk.config.notification.SimpleLocalFileConfigChangeNotifier;
import com.meituan.ptubes.sdk.protocol.RdsPacket;
import java.util.List;
public class TestMain {
public static void main(String[] args) {
String taskName = "ptubes_demo_task";
// set log file directory
System.setProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY, TestMain.class.getResource("/").getPath() + "/logs");
// set log type
System.setProperty(LoggerFactory.DEFAULT_LOG_TYPE_PROPERTY, "log4j2");
System.out.println(System.getProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY));
IPtubesConnector rdsCdcConnector = null;
IConfigChangeNotifier iConfigChangeNotifier = new SimpleLocalFileConfigChangeNotifier(taskName);
try {
rdsCdcConnector = RdsCdcConnectorFactory.buildMySQLConnector(taskName, iConfigChangeNotifier, new IRdsCdcEventListener() {
@Override
public RdsCdcEventStatus onEvents(List<RdsPacket.RdsEvent> events) {
for (RdsPacket.RdsEvent event : events) {
System.out.println(event.getRowData()
.getBeforeColumnsMap()
.get("id"));
System.out.println(event.getRowData()
.getAfterColumnsMap()
.get("id"));
System.out.println(PbJsonUtil.printToStringDefaultNull(event));
}
return RdsCdcEventStatus.SUCCESS;
}
});
rdsCdcConnector.startup();
long eachSleepTime = 30000L;
long sleepTimeCount = 300000L;
do {
Thread.sleep(eachSleepTime);
sleepTimeCount -= eachSleepTime;
try {
if (null != rdsCdcConnector) {
/**
* hot to get a buffalo sdk task runtime info
*/
System.out.println(rdsCdcConnector.getConnectorMonitorInfo());
}
} catch (Exception e) {
System.out.println(e);
}
} while (sleepTimeCount > 0);
} catch (Exception e) {
e.printStackTrace();
} finally {
// rdsCdcConnector.shutdown();
}
}
}
- Execute the TestMain.main method. and observe the console output