Skip to content

Commit fdc75f4

Browse files
author
Himanshu Gwalani
committed
Addressing review comments
1 parent 61a4178 commit fdc75f4

File tree

9 files changed

+590
-167
lines changed

9 files changed

+590
-167
lines changed

phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -296,14 +296,15 @@ protected void processNewFilesForRound(ReplicationRound replicationRound) throws
296296
protected void processInProgressDirectory() throws IOException {
297297
// Increase the count for number of times in progress directory is processed
298298
getMetrics().incrementNumInProgressDirectoryProcessed();
299-
LOG.info("Starting in progress directory processing");
299+
LOG.info("Starting {} directory processing", replicationLogTracker.getInProgressLogSubDirectoryName());
300300
long startTime = EnvironmentEdgeManager.currentTime();
301301
long oldestTimestampToProcess = replicationLogTracker.getReplicationShardDirectoryManager()
302302
.getNearestRoundStartTimestamp(EnvironmentEdgeManager.currentTime())
303303
- getReplayIntervalSeconds() * 1000L;
304304
List<Path> files = replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
305-
LOG.info("Number of In Progress files with oldestTimestampToProcess {} is {}",
306-
oldestTimestampToProcess, files.size());
305+
LOG.info("Number of {} files with oldestTimestampToProcess {} is {}",
306+
replicationLogTracker.getInProgressLogSubDirectoryName(), oldestTimestampToProcess,
307+
files.size());
307308
while (!files.isEmpty()) {
308309
processOneRandomFile(files);
309310
files = replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
@@ -325,7 +326,7 @@ private void processOneRandomFile(final List<Path> files) throws IOException {
325326
try {
326327
optionalInProgressFilePath = replicationLogTracker.markInProgress(file);
327328
if (optionalInProgressFilePath.isPresent()) {
328-
processFile(file);
329+
processFile(optionalInProgressFilePath.get());
329330
replicationLogTracker.markCompleted(optionalInProgressFilePath.get());
330331
}
331332
} catch (IOException exception) {
@@ -363,15 +364,15 @@ protected void initializeLastRoundProcessed() throws IOException {
363364
Optional<Long> minTimestampFromInProgressFiles =
364365
getMinTimestampFromInProgressFiles();
365366
if (minTimestampFromInProgressFiles.isPresent()) {
366-
LOG.info("Initializing lastRoundProcessed from IN PROGRESS files with minimum "
367-
+ "timestamp as {}", minTimestampFromInProgressFiles.get());
367+
LOG.info("Initializing lastRoundProcessed from {} files with minimum "
368+
+ "timestamp as {}", replicationLogTracker.getInProgressLogSubDirectoryName(), minTimestampFromInProgressFiles.get());
368369
this.lastRoundProcessed = replicationLogTracker.getReplicationShardDirectoryManager()
369370
.getReplicationRoundFromEndTime(minTimestampFromInProgressFiles.get());
370371
} else {
371372
Optional<Long> minTimestampFromNewFiles = getMinTimestampFromNewFiles();
372373
if (minTimestampFromNewFiles.isPresent()) {
373-
LOG.info("Initializing lastRoundProcessed from IN files with minimum timestamp "
374-
+ "as {}", minTimestampFromNewFiles.get());
374+
LOG.info("Initializing lastRoundProcessed from {} files with minimum timestamp "
375+
+ "as {}", replicationLogTracker.getInSubDirectoryName(), minTimestampFromNewFiles.get());
375376
this.lastRoundProcessed = replicationLogTracker
376377
.getReplicationShardDirectoryManager()
377378
.getReplicationRoundFromEndTime(minTimestampFromNewFiles.get());

phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java

Lines changed: 17 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.phoenix.replication;
1919

2020
import java.io.IOException;
21-
import java.net.URI;
2221
import java.util.ArrayList;
2322
import java.util.Collections;
2423
import java.util.List;
@@ -64,36 +63,33 @@ public class ReplicationLogTracker {
6463
*/
6564
private static final long DEFAULT_FILE_DELETE_RETRY_DELAY_MS = 1000L;
6665

67-
private final URI rootURI;
68-
private final DirectoryType directoryType;
6966
private final FileSystem fileSystem;
7067
private Path inProgressDirPath;
71-
private ReplicationShardDirectoryManager replicationShardDirectoryManager;
68+
private final ReplicationShardDirectoryManager replicationShardDirectoryManager;
7269
protected final Configuration conf;
7370
protected final String haGroupName;
7471
protected MetricsReplicationLogTracker metrics;
7572

76-
public ReplicationLogTracker(final Configuration conf, final String haGroupName,
77-
final FileSystem fileSystem, final URI rootURI, final DirectoryType directoryType,
73+
public ReplicationLogTracker(final Configuration conf, final String haGroupName, final FileSystem fileSystem,
74+
final ReplicationShardDirectoryManager replicationShardDirectoryManager,
7875
final MetricsReplicationLogTracker metrics) {
7976
this.conf = conf;
80-
this.fileSystem = fileSystem;
8177
this.haGroupName = haGroupName;
82-
this.rootURI = rootURI;
83-
this.directoryType = directoryType;
78+
this.fileSystem = fileSystem;
79+
this.replicationShardDirectoryManager = replicationShardDirectoryManager;
8480
this.metrics = metrics;
8581
}
8682

87-
protected String getNewLogSubDirectoryName() {
88-
return this.directoryType.getName();
89-
}
90-
9183
protected MetricsReplicationLogTracker getMetricsSource() {
9284
return this.metrics;
9385
}
9486

87+
protected String getInSubDirectoryName() {
88+
return getReplicationShardDirectoryManager().getRootDirectoryPath().getName();
89+
}
90+
9591
protected String getInProgressLogSubDirectoryName() {
96-
return getNewLogSubDirectoryName() + "_progress";
92+
return getInSubDirectoryName() + "_progress";
9793
}
9894

9995
/**
@@ -102,11 +98,11 @@ protected String getInProgressLogSubDirectoryName() {
10298
* exist.
10399
*/
104100
public void init() throws IOException {
105-
Path newFilesDirectory = new Path(new Path(rootURI.getPath(), haGroupName),
106-
getNewLogSubDirectoryName());
107-
this.replicationShardDirectoryManager = new ReplicationShardDirectoryManager(conf,
108-
newFilesDirectory);
109-
this.inProgressDirPath = new Path(new Path(rootURI.getPath(), haGroupName),
101+
// Path newFilesDirectory = new Path(new Path(rootURI.getPath(), haGroupName),
102+
// getNewLogSubDirectoryName());
103+
// this.replicationShardDirectoryManager = new ReplicationShardDirectoryManager(conf,
104+
// newFilesDirectory);
105+
this.inProgressDirPath = new Path(getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(),
110106
getInProgressLogSubDirectoryName());
111107
createDirectoryIfNotExists(inProgressDirPath);
112108
}
@@ -358,14 +354,14 @@ protected Optional<Path> markInProgress(final Path file) {
358354
newNameBuilder.append(parts[i]);
359355
}
360356
String extension = fileName.substring(fileName.lastIndexOf("."));
361-
newNameBuilder.append("_").append(UUID.randomUUID().toString()).append(extension);
357+
newNameBuilder.append("_").append(UUID.randomUUID()).append(extension);
362358
newFileName = newNameBuilder.toString();
363359
targetDirectory = file.getParent();
364360
} else {
365361
// File is not in in-progress directory, add UUID and move to IN_PROGRESS directory
366362
String baseName = fileName.substring(0, fileName.lastIndexOf("."));
367363
String extension = fileName.substring(fileName.lastIndexOf("."));
368-
newFileName = baseName + "_" + UUID.randomUUID().toString() + extension;
364+
newFileName = baseName + "_" + UUID.randomUUID() + extension;
369365
targetDirectory = getInProgressDirPath();
370366
}
371367

@@ -495,24 +491,4 @@ private void createDirectoryIfNotExists(Path directoryPath) throws IOException {
495491
}
496492
}
497493
}
498-
499-
/**
500-
* Enum representing the type of replication log directory.
501-
* IN: Directory created on standby cluster for Incoming replication log files
502-
* OUT: Directory created on primary cluster for Outgoing replication log files
503-
*/
504-
public enum DirectoryType {
505-
IN("in"),
506-
OUT("out");
507-
508-
private final String name;
509-
510-
DirectoryType(final String name) {
511-
this.name = name;
512-
}
513-
514-
public String getName() {
515-
return this.name;
516-
}
517-
}
518494
}

phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ public class ReplicationShardDirectoryManager {
8282

8383
private final Path shardDirectoryPath;
8484

85+
private final Path rootDirectoryPath;
86+
8587
public ReplicationShardDirectoryManager(final Configuration conf, final Path rootPath) {
88+
this.rootDirectoryPath = rootPath;
8689
this.shardDirectoryPath = new Path(rootPath.toUri().getPath(),
8790
REPLICATION_SHARD_SUB_DIRECTORY_NAME);
8891
this.numShards = conf.getInt(REPLICATION_NUM_SHARDS_KEY, DEFAULT_REPLICATION_NUM_SHARDS);
@@ -189,6 +192,10 @@ public Path getShardDirectoryPath() {
189192
return this.shardDirectoryPath;
190193
}
191194

195+
public Path getRootDirectoryPath() {
196+
return this.rootDirectoryPath;
197+
}
198+
192199
public int getNumShards() {
193200
return this.numShards;
194201
}

phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.fs.Path;
2828
import org.apache.phoenix.replication.log.LogFileWriter;
2929
import org.apache.phoenix.replication.log.LogFileWriterContext;
30+
import org.apache.phoenix.replication.reader.ReplicationLogReplay;
3031
import org.apache.phoenix.util.EnvironmentEdgeManager;
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
@@ -75,7 +76,7 @@ protected void initializeFileSystems() throws IOException {
7576
@Override
7677
protected void initializeReplicationShardDirectoryManager() {
7778
this.haGroupLogFilesPath = new Path(new Path(standbyUrl.getPath(),
78-
logGroup.getHaGroupName()), ReplicationLogTracker.DirectoryType.IN.getName());
79+
logGroup.getHaGroupName()), ReplicationLogReplay.IN_DIRECTORY_NAME);
7980
this.replicationShardDirectoryManager = new ReplicationShardDirectoryManager(
8081
logGroup.getConfiguration(), haGroupLogFilesPath);
8182
}

phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,5 @@ public MetricsReplicationLogDiscoveryReplayImpl(final String haGroupName) {
3232
MetricsReplicationLogDiscoveryImpl.METRICS_CONTEXT,
3333
MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT
3434
+ ",haGroup=" + haGroupName);
35-
super.groupMetricsContext =
36-
MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT
37-
+ ",haGroup=" + haGroupName;
3835
}
3936
}

phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.fs.FileSystem;
2727
import org.apache.hadoop.fs.Path;
2828
import org.apache.phoenix.replication.ReplicationLogTracker;
29+
import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
2930
import org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ public class ReplicationLogReplay {
4647
public static final String REPLICATION_LOG_REPLAY_HDFS_URL_KEY =
4748
"phoenix.replication.log.replay.hdfs.url";
4849

50+
public static final String IN_DIRECTORY_NAME = "in";
4951
/**
5052
* Singleton instances per group name
5153
*/
@@ -106,8 +108,11 @@ public void stopReplay() throws IOException {
106108
*/
107109
protected void init() throws IOException {
108110
initializeFileSystem();
111+
Path newFilesDirectory = new Path(new Path(rootURI.getPath(), haGroupName), ReplicationLogReplay.IN_DIRECTORY_NAME);
112+
ReplicationShardDirectoryManager replicationShardDirectoryManager =
113+
new ReplicationShardDirectoryManager(conf, newFilesDirectory);
109114
ReplicationLogTracker replicationLogReplayFileTracker = new ReplicationLogTracker(
110-
conf, haGroupName, fileSystem, rootURI, ReplicationLogTracker.DirectoryType.IN,
115+
conf, haGroupName, fileSystem, replicationShardDirectoryManager,
111116
new MetricsReplicationLogTrackerReplayImpl(haGroupName));
112117
replicationLogReplayFileTracker.init();
113118
this.replicationLogDiscoveryReplay =

0 commit comments

Comments
 (0)