Skip to content

Commit

Permalink
Merge branch 'master' into ShowTableStorageFormatToNereids
Browse files Browse the repository at this point in the history
  • Loading branch information
rijeshkp authored Nov 6, 2024
2 parents f3729c0 + 699509d commit c36ee4b
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 33 deletions.
6 changes: 6 additions & 0 deletions cloud/src/meta-service/meta_service_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re
VLOG_DEBUG << "finish " << func_name << " from " << ctrl->remote_side()
<< " response=" << res->ShortDebugString();
} else if constexpr (std::is_same_v<Response, GetDeleteBitmapResponse>) {
if (res->status().code() != MetaServiceCode::OK) {
res->clear_rowset_ids();
res->clear_segment_ids();
res->clear_versions();
res->segment_delete_bitmaps();
}
LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side()
<< " status=" << res->status().ShortDebugString()
<< " delete_bitmap_size=" << res->segment_delete_bitmaps_size();
Expand Down
1 change: 1 addition & 0 deletions conf/ldap.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# ldap_user_basedn - Search base for users.
# ldap_user_filter - User lookup filter, the placeholder {login} will be replaced by the user supplied login.
# ldap_group_basedn - Search base for groups.
# ldap_group_filter - Group lookup filter, the placeholder {login} will be replaced by the user supplied login. example : "(&(memberUid={login}))"
## step2: Restart fe, and use root or admin account to log in to doris.
## step3: Execute sql statement to set ldap admin password:
# set ldap_admin_password = 'password';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,14 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int report_queue_size = 100;

// if the number of report task in FE exceed max_report_task_num_per_rpc, then split it to multiple rpc
@ConfField(mutable = true, masterOnly = true, description = {
"重新发送 agent task 时,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
"The maximum number of batched tasks per RPC assigned to each BE when resending agent tasks, "
+ "the default value is 10000."
})
public static int report_resend_batch_task_num_per_rpc = 10000;

/**
* If set to true, metric collector will be run as a daemon timer to collect metrics at fix interval
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public class LdapConfig extends ConfigBase {
@ConfigBase.ConfField
public static String ldap_group_basedn = "";

/**
* Group lookup filter, the placeholder {login} will be replaced by the user supplied login.
*/
@ConfigBase.ConfField
public static String ldap_group_filter = "";

/**
* The user LDAP information cache time.
* After timeout, the user information will be retrieved from the LDAP service again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public void setTypeRead(boolean isTypeRead) {

public abstract boolean isCancelled();

public abstract boolean isFinished();

public abstract Status updateRepo(Repository repo);

public static AbstractJob read(DataInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ public class BackupHandler extends MasterDaemon implements Writable {

private Env env;

// map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes
// this map not present in persist && only in fe master memory
// map to store backup info, key is label name, value is the BackupJob
// this map not present in persist && only in fe memory
// one table only keep one snapshot info, only keep last
private final Map<String, Snapshot> localSnapshots = new HashMap<>();
private final Map<String, BackupJob> localSnapshots = new HashMap<>();
private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();

public BackupHandler() {
Expand Down Expand Up @@ -168,6 +168,7 @@ private boolean init() {
return false;
}
}

isInit = true;
return true;
}
Expand Down Expand Up @@ -558,11 +559,15 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
return;
}

List<String> removedLabels = Lists.newArrayList();
jobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
jobs.removeFirst();
AbstractJob removedJob = jobs.removeFirst();
if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) {
removedLabels.add(removedJob.getLabel());
}
}
AbstractJob lastJob = jobs.peekLast();

Expand All @@ -575,6 +580,17 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
} finally {
jobLock.unlock();
}

if (job.isFinished() && job instanceof BackupJob) {
// Save snapshot to local repo, when reload backupHandler from image.
BackupJob backupJob = (BackupJob) job;
if (backupJob.isLocalSnapshot()) {
addSnapshot(backupJob.getLabel(), backupJob);
}
}
for (String label : removedLabels) {
removeSnapshot(label);
}
}

private List<AbstractJob> getAllCurrentJobs() {
Expand Down Expand Up @@ -813,22 +829,42 @@ public boolean report(TTaskType type, long jobId, long taskId, int finishedNum,
return false;
}

public void addSnapshot(String labelName, Snapshot snapshot) {
public void addSnapshot(String labelName, BackupJob backupJob) {
assert backupJob.isFinished();

LOG.info("add snapshot {} to local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.put(labelName, snapshot);
localSnapshots.put(labelName, backupJob);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}

public void removeSnapshot(String labelName) {
LOG.info("remove snapshot {} from local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.remove(labelName);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}

public Snapshot getSnapshot(String labelName) {
BackupJob backupJob;
localSnapshotsLock.readLock().lock();
try {
return localSnapshots.get(labelName);
backupJob = localSnapshots.get(labelName);
} finally {
localSnapshotsLock.readLock().unlock();
}

if (backupJob == null) {
return null;
}

return backupJob.getSnapshot();
}

public static BackupHandler read(DataInput in) throws IOException {
Expand Down
45 changes: 30 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ public enum BackupJobState {
@SerializedName("prop")
private Map<String, String> properties = Maps.newHashMap();

private byte[] metaInfoBytes = null;
private byte[] jobInfoBytes = null;

public BackupJob() {
super(JobType.BACKUP);
}
Expand Down Expand Up @@ -344,11 +341,7 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas

@Override
public synchronized void replayRun() {
LOG.info("replay run backup job: {}", this);
if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
env.getBackupHandler().addSnapshot(label, snapshot);
}
// nothing to do
}

@Override
Expand All @@ -366,6 +359,11 @@ public boolean isCancelled() {
return state == BackupJobState.CANCELLED;
}

@Override
public boolean isFinished() {
return state == BackupJobState.FINISHED;
}

@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;
Expand Down Expand Up @@ -839,8 +837,6 @@ private void saveMetaInfo() {
}
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
// read meta info to metaInfoBytes
metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());

// 3. save job info file
Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
Expand All @@ -867,8 +863,6 @@ private void saveMetaInfo() {
}
jobInfo.writeToFile(jobInfoFile);
localJobInfoFilePath = jobInfoFile.getAbsolutePath();
// read job info to jobInfoBytes
jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
} catch (Exception e) {
status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage());
return;
Expand Down Expand Up @@ -922,7 +916,6 @@ private void uploadMetaAndJobInfoFile() {
}
}


finishedTime = System.currentTimeMillis();
state = BackupJobState.FINISHED;

Expand All @@ -931,8 +924,7 @@ private void uploadMetaAndJobInfoFile() {
LOG.info("job is finished. {}", this);

if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
env.getBackupHandler().addSnapshot(label, snapshot);
env.getBackupHandler().addSnapshot(label, this);
return;
}
}
Expand Down Expand Up @@ -1025,6 +1017,29 @@ private void cancelInternal() {
LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this);
}

public boolean isLocalSnapshot() {
return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
}

// read meta and job info bytes from disk, and return the snapshot
public synchronized Snapshot getSnapshot() {
if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
return null;
}

try {
File metaInfoFile = new File(localMetaInfoFilePath);
File jobInfoFile = new File(localJobInfoFilePath);
byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
return new Snapshot(label, metaInfoBytes, jobInfoBytes);
} catch (IOException e) {
LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ",
localMetaInfoFilePath, localJobInfoFilePath, e);
return null;
}
}

public synchronized List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@ public boolean isCancelled() {
return state == RestoreJobState.CANCELLED;
}

@Override
public boolean isFinished() {
return state == RestoreJobState.FINISHED;
}

@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ private static void taskReport(long backendId, Map<TTaskType, Set<Long>> running

List<AgentTask> diffTasks = AgentTaskQueue.getDiffTasks(backendId, runningTasks);

AgentBatchTask batchTask = new AgentBatchTask();
AgentBatchTask batchTask = new AgentBatchTask(Config.report_resend_batch_task_num_per_rpc);
long taskReportTime = System.currentTimeMillis();
for (AgentTask task : diffTasks) {
// these tasks no need to do diff
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,21 @@ List<String> getGroups(String userName) {
if (userDn == null) {
return groups;
}
List<String> groupDns = getDn(org.springframework.ldap.query.LdapQueryBuilder.query()
List<String> groupDns;

// Support Open Directory implementations
// If no group filter is configured, it defaults to querying groups based on the attribute 'member'
// for standard LDAP implementations
if (!LdapConfig.ldap_group_filter.isEmpty()) {
groupDns = getDn(org.springframework.ldap.query.LdapQueryBuilder.query()
.base(LdapConfig.ldap_group_basedn)
.filter(getGroupFilter(LdapConfig.ldap_group_filter, userName)));
} else {
groupDns = getDn(org.springframework.ldap.query.LdapQueryBuilder.query()
.base(LdapConfig.ldap_group_basedn)
.where("member").is(userDn));
}

if (groupDns == null) {
return groups;
}
Expand Down Expand Up @@ -209,4 +221,8 @@ protected String doMapFromContext(DirContextOperations ctx) {
private String getUserFilter(String userFilter, String userName) {
return userFilter.replaceAll("\\{login}", userName);
}

private String getGroupFilter(String groupFilter, String userName) {
return groupFilter.replaceAll("\\{login}", userName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2908,15 +2908,18 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
}

// Step 3: get snapshot
String label = request.getLabelName();
TGetSnapshotResult result = new TGetSnapshotResult();
result.setStatus(new TStatus(TStatusCode.OK));
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label);
if (snapshot == null) {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
result.getStatus().addToErrorMsgs("snapshot not exist");
result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label));
} else {
result.setMeta(snapshot.getMeta());
result.setJobInfo(snapshot.getJobInfo());
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}",
label, snapshot.getMeta().length, snapshot.getJobInfo().length);
}

return result;
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ struct TOlapTableSchemaParam {
11: optional string auto_increment_column
12: optional i32 auto_increment_column_unique_id = -1
13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1
14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode = Types.TUniqueKeyUpdateMode.UPSERT
14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode
15: optional i32 sequence_map_col_unique_id = -1
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
1

-- !select_timestamp2 --
11
1

-- !select_date --
1
Expand Down Expand Up @@ -86,7 +86,7 @@ B
1

-- !select_timestamp2 --
11
1

-- !select_date --
1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
1

-- !select_timestamp2 --
11
1

-- !select_date --
1
Expand Down Expand Up @@ -90,7 +90,7 @@ B
1

-- !select_timestamp2 --
11
1

-- !select_date --
1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ suite("test_primary_key_partial_update", "p0") {

qt_select_timestamp "select count(*) from ${tableName} where `ctime` > \"1970-01-01\""

sql "set time_zone = 'America/New_York'"
sql "set time_zone = 'Asia/Tokyo'"

Thread.sleep(5000)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ suite("test_primary_key_partial_update", "p0") {

qt_select_timestamp "select count(*) from ${tableName} where `ctime` > \"1970-01-01\""

sql "set time_zone = 'America/New_York'"
sql "set time_zone = 'Asia/Tokyo'"

Thread.sleep(5000)

Expand Down

0 comments on commit c36ee4b

Please sign in to comment.