Skip to content

Commit

Permalink
Merge pull request #45 from ywy2090/feature-milestone2
Browse files Browse the repository at this point in the history
update wedpr scheduler
  • Loading branch information
ywy2090 authored Sep 26, 2024
2 parents 7556ad5 + 7591f69 commit 1692e6b
Show file tree
Hide file tree
Showing 59 changed files with 1,192 additions and 1,330 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.venv

# Prerequisites
*.d

Expand Down
3 changes: 2 additions & 1 deletion python/ppc_common/db_models/computing_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@


class ComputingNodeRecord(db.Model):
__tablename__ = 't_computing_node'
__tablename__ = 'wedpr_computing_node'
id = db.Column(db.String(255), primary_key=True)
url = db.Column(db.String(255))
type = db.Column(db.String(255))
token = db.Column(db.String(255))
loading = db.Column(db.Integer)
42 changes: 28 additions & 14 deletions python/ppc_common/db_models/config.sql
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@

CREATE TABLE t_job_worker (
worker_id VARCHAR(100) PRIMARY KEY,
job_id VARCHAR(255) INDEX,
CREATE TABLE wedpr_scheduler_job_worker_table (
worker_id VARCHAR(100),
job_id VARCHAR(255),
type VARCHAR(255),
status VARCHAR(255),
args TEXT,
upstreams TEXT,
inputs_statement TEXT,
outputs TEXT,
create_time BIGINT,
update_time BIGINT
)ENGINE=InnoDB default charset=utf8mb4 default collate=utf8mb4_unicode_ci;
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (worker_id),
INDEX job_id_idx (job_id)
)ENGINE='InnoDB' DEFAULT CHARSET='utf8mb4' COLLATE='utf8mb4_bin' ROW_FORMAT=DYNAMIC;

CREATE TABLE t_computing_node (
id VARCHAR(255) PRIMARY KEY,
CREATE TABLE wedpr_scheduler_job_table (
job_id VARCHAR(255),
request TEXT,
status TEXT,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (job_id)
)ENGINE='InnoDB' DEFAULT CHARSET='utf8mb4' COLLATE='utf8mb4_bin' ROW_FORMAT=DYNAMIC;

CREATE TABLE wedpr_computing_node (
id VARCHAR(255),
url VARCHAR(255),
type VARCHAR(255),
loading INT
)ENGINE=InnoDB default charset=utf8mb4 default collate=utf8mb4_unicode_ci;
loading INT,
token VARCHAR(255) DEFAULT '',
PRIMARY KEY (id)
)ENGINE='InnoDB' DEFAULT CHARSET='utf8mb4' COLLATE='utf8mb4_bin' ROW_FORMAT=DYNAMIC;


INSERT INTO t_computing_node (id, url, type, loading)
INSERT INTO wedpr_computing_node (id, url, type, loading, token)
VALUES
("001", '127.0.0.1:10200', 'PSI', 0),
("002", '127.0.0.1:10201', 'MPC', 0),
("003", '127.0.0.1:10202', 'MODEL', 0);
("001", '127.0.0.1:10200', 'PSI', 0, ''),
("002", '127.0.0.1:10201', 'MPC', 0, ''),
("003", '127.0.0.1:10202', 'MODEL', 0, '');
10 changes: 6 additions & 4 deletions python/ppc_common/db_models/job_worker_record.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@

from datetime import datetime
from ppc_common.db_models import db


class JobWorkerRecord(db.Model):
__tablename__ = 't_job_worker'
worker_id = db.Column(db.String(100), primary_key=True)
__tablename__ = 'wedpr_scheduler_job_worker_table'
worker_id = db.Column(db.String(127), primary_key=True)
job_id = db.Column(db.String(255), index=True)
type = db.Column(db.String(255))
status = db.Column(db.String(255))
upstreams = db.Column(db.Text)
inputs_statement = db.Column(db.Text)
args = db.Column(db.Text)
outputs = db.Column(db.Text)
create_time = db.Column(db.BigInteger)
update_time = db.Column(db.BigInteger)
create_time = db.Column(db.DateTime, default=datetime.now)
update_time = db.Column(db.DateTime, onupdate=datetime.now)
2 changes: 1 addition & 1 deletion python/ppc_common/deps_services/storage_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ def load(config: dict, logger):
'HDFS_USER', None, config, False)
hdfs_home = common_func.get_config_value(
"HDFS_HOME", None, config, False)
return HdfsStorage(config['HDFS_ENDPOINT'], hdfs_user, hdfs_home)
return HdfsStorage(config['HDFS_URL'], hdfs_user, hdfs_home)
else:
raise Exception('unsupported storage type')
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _cleanup_finished_threads(self):
for target_id in finished_threads:
with self.lock:
del self.threads[target_id]
self.logger.info(f"Cleanup finished thread {target_id}")
self.logger.info(f"cleanup finished thread {target_id}")

def __del__(self):
self.kill_all()
60 changes: 0 additions & 60 deletions python/ppc_common/ppc_dataset/dataset_helper.py

This file was deleted.

61 changes: 0 additions & 61 deletions python/ppc_common/ppc_dataset/dataset_helper_factory.py

This file was deleted.

30 changes: 0 additions & 30 deletions python/ppc_common/ppc_initialize/dataset_handler_initialize.py

This file was deleted.

126 changes: 0 additions & 126 deletions python/ppc_common/ppc_initialize/tests/dataset_initializer_test.py

This file was deleted.

Loading

0 comments on commit 1692e6b

Please sign in to comment.