Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

硬改版本,不建议合并,有短期需求的自取。 #131

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# hardcode
1. 修复部分因为Postgres jsonb为空导致的segmentation fault
2. 增加对于Postgres命令的配置文件和线程数的命令行参数支持
3. 跳过copy_table_data复制报错,因为源库可以用权限来限制要同步的表
4. 增加自动清理同步临时表,仅保留最近1000条
5. 跳过因为未知原因导致的SQL为空的问题
6. 跳过因为未知原因导致不该有的增量的INSERT报错

# dbsync 项目

dbsync 项目目标是围绕 PostgreSQL Greenplum ,实现易用的数据的互迁功能。
Expand Down
42 changes: 39 additions & 3 deletions dbsync/dbsync-pgsql2pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "pg_logicaldecode.h"
#include "pgsync.h"
#include "ini.h"
#include <unistd.h>

int
main(int argc, char **argv)
Expand All @@ -22,10 +23,45 @@ main(int argc, char **argv)
char *local = NULL;
void *cfg = NULL;

cfg = init_config("my.cfg");
int num_thread = 5;
int res_getopt = 0;
char *ini_file = "my.cfg";

while ((res_getopt = getopt(argc, argv, "l:j:h")) != -1)
{
switch (res_getopt)
{
case 'l':
ini_file = optarg;
break;
case 'j':
fprintf(stderr, "ini_file=%s, optarg=%s\n", ini_file, optarg);
num_thread = atoi(optarg);
if(num_thread == 0)
{
num_thread = 5;
}
break;
case ':':
fprintf(stderr, "No value specified for -%c\n", optopt);
break;
case 'h':
fprintf(stderr, "Usage: -l <ini file> -j <thread number> -h\n");
fprintf(stderr, "\n -l specifies a file like my.cfg;\n -j specifies number of threads to do the job;\n -h display this usage manual\n");
return 0;
case '?':
fprintf(stderr, "Unsupported option: %c", optopt);
break;
default:
fprintf(stderr, "Parameter parsing error: %c", res_getopt);
return -1;
}
}

cfg = init_config(ini_file);
if (cfg == NULL)
{
fprintf(stderr, "read config file error, insufficient permissions or my.cfg does not exist");
fprintf(stderr, "read config file error, insufficient permissions or %s does not exist\n", ini_file);
return 1;
}

Expand All @@ -39,6 +75,6 @@ main(int argc, char **argv)
return 1;
}

return db_sync_main(src, desc, local ,5);
return db_sync_main(src, desc, local ,num_thread);
}

135 changes: 90 additions & 45 deletions dbsync/pgsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,21 @@ copy_table_data(void *arg)
res1 = PQexec(origin_conn, query.data);
if (PQresultStatus(res1) != PGRES_COPY_OUT)
{
fprintf(stderr,"table copy failed Query '%s': %s",
query.data, PQerrorMessage(origin_conn));
goto exit;
// dance365
//fprintf(stderr,"table copy failed Query '%s': %s",
// query.data, PQerrorMessage(origin_conn));
//goto exit;

args->count++;
curr->count++;

finish_copy_origin_tx(origin_conn);
finish_copy_target_tx(target_conn);
curr->complete = true;
PQclear(res1);
resetStringInfo(&query);

continue;
}

/* Build COPY FROM query. */
Expand Down Expand Up @@ -757,19 +769,19 @@ logical_decoding_receive_thread(void *arg)
}

paramValues[0] = buffer->data;
res = PQexecPrepared(local_conn, stmtname, 1, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "exec prepare INSERT INTO sync_sqls failed: %s", PQerrorMessage(local_conn));
time_to_abort = true;
goto exit;
}
PQclear(res);
res = PQexecPrepared(local_conn, stmtname, 1, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "exec prepare INSERT INTO sync_sqls failed: %s", PQerrorMessage(local_conn));
time_to_abort = true;
goto exit;
}
PQclear(res);

hander->flushpos = hander->recvpos;
if(msg->type == MSGKIND_COMMIT)
{
res = PQexec(local_conn, "END");
res = PQexec(local_conn, "END");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "decoding receive thread commit a local trans failed: %s", PQerrorMessage(local_conn));
Expand Down Expand Up @@ -805,7 +817,7 @@ logical_decoding_apply_thread(void *arg)
PGconn *local_conn_u = NULL;
PGconn *apply_conn = NULL;
Oid type[1];
PGresult *resreader = NULL;
PGresult *resreader = NULL;
PGresult *applyres = NULL;
int pgversion;
bool is_gp = false;
Expand Down Expand Up @@ -851,7 +863,8 @@ logical_decoding_apply_thread(void *arg)
char tmp[16];
int n_commit = 0;
int sqltype = SQL_TYPE_BEGIN;


int n_delete = 0;
sprintf(tmp, INT64_FORMAT, apply_id);
paramValues[0] = tmp;

Expand Down Expand Up @@ -880,7 +893,7 @@ logical_decoding_apply_thread(void *arg)
goto exit;
}
PQclear(resreader);

while(!time_to_abort)
{
resreader = PQexec(local_conn, "FETCH FROM ali_decoder_cursor");
Expand Down Expand Up @@ -926,49 +939,81 @@ logical_decoding_apply_thread(void *arg)
sqltype = SQL_TYPE_OTHER_STATMENT;
}

applyres = PQexec(apply_conn, ssql);
if (PQresultStatus(applyres) != PGRES_COMMAND_OK)
// dance365
if(strcmp(ssql, "") == 0)
{
char *sqlstate = PQresultErrorField(applyres, PG_DIAG_SQLSTATE);
int errcode = 0;
fprintf(stderr, "exec apply id %s, sql %s failed: %s\n", PQgetvalue(resreader, 0, 0), ssql, PQerrorMessage(apply_conn));
errcode = atoi(sqlstate);
if (errcode == ERROR_DUPLICATE_KEY && sqltype == SQL_TYPE_FIRST_STATMENT)
PQclear(resreader);
fprintf(stderr, "dance365: fixed sql is empty cause segmention fault\n");
}
else
{
applyres = PQexec(apply_conn, ssql);
if (PQresultStatus(applyres) != PGRES_COMMAND_OK)
{
PQclear(applyres);
applyres = PQexec(apply_conn, "END");
if (PQresultStatus(applyres) != PGRES_COMMAND_OK)
char *sqlstate = PQresultErrorField(applyres, PG_DIAG_SQLSTATE);
int errcode = 0;
// dance365: close for speed ?
//fprintf(stderr, "exec apply id %s, sql %s failed: %s\n", PQgetvalue(resreader, 0, 0), ssql, PQerrorMessage(apply_conn));
errcode = atoi(sqlstate);
if (errcode == ERROR_DUPLICATE_KEY && sqltype == SQL_TYPE_FIRST_STATMENT)
{
goto exit;
PQclear(applyres);
applyres = PQexec(apply_conn, "END");
if (PQresultStatus(applyres) != PGRES_COMMAND_OK)
{
goto exit;
}
PQclear(applyres);
applyres = PQexec(apply_conn, "BEGIN");
if (PQresultStatus(applyres) != PGRES_COMMAND_OK)
{
goto exit;
}
sqltype = SQL_TYPE_BEGIN;
}
PQclear(applyres);
applyres = PQexec(apply_conn, "BEGIN");
if (PQresultStatus(applyres) != PGRES_COMMAND_OK)
else
{
goto exit;
// dance365
// fprintf(stderr, "dance365: continue when insert into error\n");
// PQclear(resreader);
// PQclear(applyres);
// goto exit;
}
sqltype = SQL_TYPE_BEGIN;
}
else

if (sqltype == SQL_TYPE_COMMIT)
{
PQclear(resreader);
PQclear(applyres);
goto exit;
n_commit++;
n_delete++;
apply_id = atoll(PQgetvalue(resreader, 0, 0));
if(n_commit == 5)
{
n_commit = 0;
update_task_status(local_conn_u, false, false, false, apply_id);

// dance365
// clean sync_sqls
// if(apply_id >= 0)
// {
// ExecuteSqlStatement(local_conn_u, "delete from sync_sqls where id < (select apply_id - 1000 from db_sync_status limit 1)");
// }
}
if(n_delete == 200)
{
n_delete = 0;
ExecuteSqlStatement(local_conn_u, "delete from sync_sqls where id < (select apply_id - 1000 from db_sync_status limit 1)");
}
}
PQclear(resreader);
PQclear(applyres);
}

if (sqltype == SQL_TYPE_COMMIT)
if(n_delete > 0)
{
n_commit++;
apply_id = atoll(PQgetvalue(resreader, 0, 0));
if(n_commit == 5)
{
n_commit = 0;
update_task_status(local_conn_u, false, false, false, apply_id);
}
n_delete = 0;
ExecuteSqlStatement(local_conn_u, "delete from sync_sqls where id < (select apply_id - 1000 from db_sync_status limit 1)");
}
PQclear(resreader);
PQclear(applyres);

}
}

Expand Down
27 changes: 26 additions & 1 deletion dbsync/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,13 @@ append_update_statement_key_not_change(Decoder_handler *hander, ALI_PG_DECODE_ME
}
}

// dance365
if(strcmp(msg->atttype[i], "jsonb") == 0 && new_tuple->svalues[i] == NULL)
{
fprintf(stderr, "dance365: continue when type is jsonb && new value is null.%s\n", where->data);
continue;
}

if (first)
{
first = false;
Expand Down Expand Up @@ -1734,7 +1741,25 @@ quote_literal_local(Decoder_handler *hander, const char *rawstr, char *type, PQE
return;
}

len = strlen(rawstr);

// dance365
if(rawstr == NULL)
{
len = 0;
fprintf(stderr, "dance365: fixed rawstr is null cause segmentation fault\n");
if(strcmp(type, "jsonb") == 0)
{
fprintf(stderr, "dance365: fixed jsonb '' cause jsonb sql format error\n");
appendPQExpBuffer(buffer, "'{}'");
return;
}
}
else
{
len = strlen(rawstr);
}


resetStringInfo(s);
appendStringInfoSpaces(s, len * 2 + 3);

Expand Down
4 changes: 4 additions & 0 deletions postgres/ali_decoding_9.4/ali_decoding.control
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
comment = 'ali_decoding'
default_version = '0.0.1'
module_pathname = '$libdir/ali_decoding'
relocatable = true
Binary file added postgres/ali_decoding_9.4/ali_decoding.so
Binary file not shown.
7 changes: 7 additions & 0 deletions postgres/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/binbash
docker stop postgres9.4
docker cp postgresql.conf postgres9.4:/var/lib/postgresql/data/postgresql.conf
docker cp pg_hba.conf postgres9.4:/var/lib/postgresql/data/pg_hba.conf
docker cp ali_decoding_9.4/ali_decoding.so postgres9.4:/usr/lib/postgresql/9.4/lib/
docker cp ali_decoding_9.4/ali_decoding.control postgres9.4:/usr/share/postgresql/9.4/extension/
docker start postgres9.4
Loading