Skip to content

Commit

Permalink
style
Browse files Browse the repository at this point in the history
  • Loading branch information
moeinxyz committed Sep 19, 2023
1 parent b2e80a7 commit 9ad7f38
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 133 deletions.
318 changes: 187 additions & 131 deletions src/main/java/io/confluent/connect/hdfs/wal/QFSWAL.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.hdfs.wal;

import io.confluent.connect.hdfs.FileUtils;
Expand All @@ -9,165 +24,206 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class QFSWAL implements WAL {
private static final String LOCK_FILE_EXTENSION = "lock-qfs";
private static final int DEFAULT_LOCK_REFRESH_INTERVAL_IN_MS = 10_000;
private static final int DEFAULT_LOCK_TIMEOUT_IN_MS = 60_000;
private final UUID uuid;
private final Timer timer;
private final HdfsStorage storage;
private final String logsDir;
private final TopicPartition topicPartition;
private final Pattern pattern;
private final String lockDir;
private String lockFile;

private final int lockRefreshIntervalInMs;
private final int lockTimeoutInMs;
private static final Logger log = LoggerFactory.getLogger(QFSWAL.class);

public QFSWAL(String logsDir, TopicPartition topicPartition, HdfsStorage storage) {
this(logsDir, topicPartition, storage, DEFAULT_LOCK_REFRESH_INTERVAL_IN_MS, DEFAULT_LOCK_TIMEOUT_IN_MS);
}

public QFSWAL(String logsDir, TopicPartition topicPartition, HdfsStorage storage, int lockRefreshIntervalInMs, int lockTimeoutInMs) {
this.uuid = UUID.randomUUID();
this.pattern = Pattern.compile(String.format("([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-(\\d+)(\\.)%s", LOCK_FILE_EXTENSION));
this.timer = new Timer("QFSWAL-Timer", true);
this.storage = storage;
this.logsDir = logsDir;
this.topicPartition = topicPartition;
this.lockDir = FileUtils.directoryName(storage.url(), logsDir, topicPartition);
this.lockFile = this.createOrRenameLockFile();
this.lockRefreshIntervalInMs = lockRefreshIntervalInMs;
this.lockTimeoutInMs = lockTimeoutInMs;
this.startRenamingTimer();
}

private String getNewLockFile() {
return FileUtils.fileName(storage.url(), this.logsDir, this.topicPartition, String.format("%s-%s.%s", this.uuid, System.currentTimeMillis(), LOCK_FILE_EXTENSION));
private static final String LOCK_FILE_EXTENSION = "lock-qfs";
private static final int DEFAULT_LOCK_REFRESH_INTERVAL_IN_MS = 10_000;
private static final int DEFAULT_LOCK_TIMEOUT_IN_MS = 60_000;
private final UUID uuid;
private final Timer timer;
private final HdfsStorage storage;
private final String logsDir;
private final TopicPartition topicPartition;
private final Pattern pattern;
private final String lockDir;
private String lockFile;

private final int lockRefreshIntervalInMs;
private final int lockTimeoutInMs;
private static final Logger log = LoggerFactory.getLogger(QFSWAL.class);

public QFSWAL(String logsDir, TopicPartition topicPartition, HdfsStorage storage) {
this(logsDir,
topicPartition,
storage,
DEFAULT_LOCK_REFRESH_INTERVAL_IN_MS,
DEFAULT_LOCK_TIMEOUT_IN_MS);
}

public QFSWAL(
String logsDir,
TopicPartition topicPartition,
HdfsStorage storage,
int lockRefreshIntervalInMs,
int lockTimeoutInMs
) {
this.uuid = UUID.randomUUID();
this.pattern = Pattern.compile(String.format(
"([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-(\\d+)(\\.)%s",
LOCK_FILE_EXTENSION));
this.timer = new Timer("QFSWAL-Timer", true);
this.storage = storage;
this.logsDir = logsDir;
this.topicPartition = topicPartition;
this.lockDir = FileUtils.directoryName(storage.url(), logsDir, topicPartition);
this.lockFile = this.createOrRenameLockFile();
this.lockRefreshIntervalInMs = lockRefreshIntervalInMs;
this.lockTimeoutInMs = lockTimeoutInMs;
this.startRenamingTimer();
}

private String getNewLockFile() {
return FileUtils.fileName(storage.url(),
this.logsDir,
this.topicPartition,
String.format("%s-%s.%s",
this.uuid,
System.currentTimeMillis(),
LOCK_FILE_EXTENSION));
}

private String createOrRenameLockFile() {
if (!this.storage.exists(this.lockDir)) {
this.storage.create(this.lockDir);
}

private String createOrRenameLockFile() {
if (!this.storage.exists(this.lockDir)) {
this.storage.create(this.lockDir);
}

List<SimpleEntry<String, Matcher>> lockFiles = this.findLockFiles();
String newLockFile = getNewLockFile();

long liveLocksCount = lockFiles.stream().filter(pair -> Long.parseLong(pair.getValue().group(2)) > System.currentTimeMillis() - this.lockTimeoutInMs).count();

if (liveLocksCount != 0) {
throw new ConnectException("Lock has been acquired by another process");
}

if (lockFiles.isEmpty()) {
this.storage.create(newLockFile, true);
} else {
String oldLockFile = FileUtils.fileName(storage.url(), this.logsDir, this.topicPartition, lockFiles.get(0).getKey());
this.storage.commit(oldLockFile, newLockFile);
}

liveLocksCount = this.findLockFiles().stream().filter(pair -> Long.parseLong(pair.getValue().group(2)) > System.currentTimeMillis() - this.lockTimeoutInMs).count();
List<SimpleEntry<String, Matcher>> lockFiles = this.findLockFiles();
String newLockFile = getNewLockFile();

if (liveLocksCount > 1) {
this.storage.delete(newLockFile);
throw new ConnectException("Lock has been acquired by another process");
}
long liveLocksCount = lockFiles.stream()
.filter(pair -> Long.parseLong(
pair.getValue().group(2))
> System.currentTimeMillis() - this.lockTimeoutInMs
).count();

return newLockFile;
if (liveLocksCount != 0) {
throw new ConnectException("Lock has been acquired by another process");
}


private void startRenamingTimer() {
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
renameLockFile();
}
}, this.lockRefreshIntervalInMs, this.lockTimeoutInMs);
if (lockFiles.isEmpty()) {
this.storage.create(newLockFile, true);
} else {
String oldLockFile = FileUtils.fileName(storage.url(),
this.logsDir,
this.topicPartition,
lockFiles.get(0).getKey());
this.storage.commit(oldLockFile, newLockFile);
}

private void renameLockFile() {
String newLockFile = getNewLockFile();
liveLocksCount = this.findLockFiles()
.stream()
.filter(pair -> Long.parseLong(
pair.getValue().group(2))
> System.currentTimeMillis() - this.lockTimeoutInMs
).count();

try {
this.storage.commit(this.lockFile, newLockFile);
this.lockFile = newLockFile;
} catch (Exception e) {
log.error("Failed to rename the file", e);
}
if (liveLocksCount > 1) {
this.storage.delete(newLockFile);
throw new ConnectException("Lock has been acquired by another process");
}

private List<SimpleEntry<String, Matcher>> findLockFiles() {
List<FileStatus> walFiles = this.storage.list(this.lockDir);

return walFiles.stream()
.filter(fileStatus -> fileStatus.isFile())
.map((Function<FileStatus, Object>) fileStatus -> fileStatus.getPath().getName())
.map(Object::toString)
.map(fileName -> new SimpleEntry<String, Matcher>(fileName, pattern.matcher(fileName)))
.filter(pair -> pair.getValue().matches())
.collect(Collectors.toList());
}
return newLockFile;
}

@Override
public void acquireLease() throws ConnectException {
List<SimpleEntry<String, Matcher>> lockFiles = this.findLockFiles();
if (lockFiles.size() == 0) {
throw new ConnectException("The lock file is not present in the log dir");
}

if (lockFiles.size() > 1) {
throw new ConnectException("More than one lock file reside in the log dir");
}
private void startRenamingTimer() {
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
renameLockFile();
}
}, this.lockRefreshIntervalInMs, this.lockTimeoutInMs);
}

String filename = lockFiles.get(0).getKey();
Matcher matcher = lockFiles.get(0).getValue();
private void renameLockFile() {
String newLockFile = getNewLockFile();

if (!matcher.group(1).equals(this.uuid.toString())) {
log.error("UUID of the lock file %s for %s-%s topic-partition does not match %s uuid", filename, this.topicPartition.topic(), this.topicPartition.partition(), this.uuid);
throw new ConnectException("Lock uuid does not match");
}

if (Long.parseLong(matcher.group(2)) < System.currentTimeMillis() - this.lockTimeoutInMs) {
throw new ConnectException("Lock file has not been renamed for more than the threshold");
}
try {
this.storage.commit(this.lockFile, newLockFile);
this.lockFile = newLockFile;
} catch (Exception e) {
log.error("Failed to rename the file", e);
}

@Override
public void append(String s, String s1) throws ConnectException {
this.acquireLease();
}

@Override
public void apply() throws ConnectException {
}

@Override
public void truncate() throws ConnectException {
}

private List<SimpleEntry<String, Matcher>> findLockFiles() {
List<FileStatus> walFiles = this.storage.list(this.lockDir);

return walFiles.stream()
.filter(fileStatus -> fileStatus.isFile())
.map((Function<FileStatus, Object>) fileStatus -> fileStatus.getPath()
.getName())
.map(Object::toString)
.map(fileName -> new SimpleEntry<String, Matcher>(fileName,
pattern.matcher(fileName)))
.filter(pair -> pair.getValue().matches())
.collect(Collectors.toList());
}

@Override
public void acquireLease() throws ConnectException {
List<SimpleEntry<String, Matcher>> lockFiles = this.findLockFiles();
if (lockFiles.size() == 0) {
throw new ConnectException("The lock file is not present in the log dir");
}

@Override
public void close() throws ConnectException {
this.timer.cancel();
if (lockFiles.size() > 1) {
throw new ConnectException("More than one lock file reside in the log dir");
}

@Override
public String getLogFile() {
return null;
String filename = lockFiles.get(0).getKey();
Matcher matcher = lockFiles.get(0).getValue();

if (!matcher.group(1).equals(this.uuid.toString())) {
log.error(
"UUID of the lock file %s for %s-%s topic-partition does not match %s uuid",
filename,
this.topicPartition.topic(),
this.topicPartition.partition(),
this.uuid);
throw new ConnectException("Lock uuid does not match");
}

@Override
public FilePathOffset extractLatestOffset() {
return null;
if (Long.parseLong(matcher.group(2)) < System.currentTimeMillis() - this.lockTimeoutInMs) {
throw new ConnectException(
"Lock file has not been renamed for more than the threshold");
}
}

@Override
public void append(String s, String s1) throws ConnectException {
this.acquireLease();
}

@Override
public void apply() throws ConnectException {
}

@Override
public void truncate() throws ConnectException {
}

@Override
public void close() throws ConnectException {
this.timer.cancel();
}

@Override
public String getLogFile() {
return null;
}

@Override
public FilePathOffset extractLatestOffset() {
return null;
}
}
3 changes: 1 addition & 2 deletions src/test/java/io/confluent/connect/hdfs/wal/QFSWALTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static org.junit.Assert.*;

public class QFSWALTest extends TestWithMiniDFSCluster {

@Test
public void testLockCreating() throws Exception {
setUp();
Expand All @@ -39,7 +38,7 @@ public void testLockGetsRenamed() throws Exception {
setUp();
HdfsStorage storage = new HdfsStorage(connectorConfig, url);
TopicPartition tp = new TopicPartition("mytopic", 123);
QFSWAL wal = new QFSWAL("/logs", tp, storage, 1000, 2000);
QFSWAL wal = new QFSWAL("/logs", tp, storage, 500, 2000);
String fileName1 = storage.list("/logs/mytopic/123/").get(0).getPath().getName();

Thread.sleep(2000);
Expand Down

0 comments on commit 9ad7f38

Please sign in to comment.