Skip to content

Commit fb204f7

Browse files
authored
Phoenix-7568 - Adding Replication Log Replay Implementation (#2278)
Phoenix-7568 Replication Log Replay Implementation
1 parent 520f48b commit fb204f7

26 files changed

+8380
-35
lines changed

phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.phoenix.jdbc.ClusterRoleRecord;
3838
import org.apache.phoenix.jdbc.HAGroupStoreManager;
3939
import org.apache.phoenix.protobuf.ProtobufUtil;
40+
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
4041
import org.apache.phoenix.util.ClientUtil;
4142
import org.apache.phoenix.util.SchemaUtil;
4243
import org.apache.phoenix.util.ServerUtil;
@@ -62,10 +63,14 @@ public void start(CoprocessorEnvironment env) throws IOException {
6263
this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
6364
.getInstance().getMetadataCachingSource();
6465
this.zkUrl = getLocalZkUrl(conf);
66+
// Start replication log replay
67+
ReplicationLogReplayService.getInstance(conf).start();
6568
}
6669

6770
@Override
6871
public void stop(CoprocessorEnvironment env) throws IOException {
72+
// Stop replication log replay
73+
ReplicationLogReplayService.getInstance(conf).stop();
6974
RegionServerCoprocessor.super.stop(env);
7075
ServerUtil.ConnectionFactory.shutdown();
7176
}

phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java

Lines changed: 502 additions & 0 deletions
Large diffs are not rendered by default.

phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ public class ReplicationLogGroup {
5858
"phoenix.replication.log.standby.hdfs.url";
5959
public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
6060
"phoenix.replication.log.fallback.hdfs.url";
61-
public static final String REPLICATION_NUM_SHARDS_KEY = "phoenix.replication.log.shards";
62-
public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
63-
public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
61+
public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
62+
"phoenix.replication.log.rotation.time.ms";
63+
public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 1000L;
6464
public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
6565
"phoenix.replication.log.rotation.size.bytes";
6666
public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 * 1024 * 1024L;
@@ -86,8 +86,7 @@ public class ReplicationLogGroup {
8686
"phoenix.replication.log.retry.delay.ms";
8787
public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
8888

89-
public static final String SHARD_DIR_FORMAT = "%05d";
90-
public static final String FILE_NAME_FORMAT = "%d-%s.plog";
89+
public static final String FILE_NAME_FORMAT = "%d_%s.plog";
9190

9291
/** Cache of ReplicationLogGroup instances by HA Group ID */
9392
protected static final ConcurrentHashMap<String, ReplicationLogGroup> INSTANCES =

phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public abstract class ReplicationLogGroupWriter {
127127
protected Disruptor<LogEvent> disruptor;
128128
protected RingBuffer<LogEvent> ringBuffer;
129129
protected volatile boolean closed = false;
130+
protected ReplicationShardDirectoryManager replicationShardDirectoryManager;
130131

131132
/** The reason for requesting a log rotation. */
132133
protected enum RotationReason {
@@ -179,6 +180,7 @@ protected ReplicationLogGroupWriter(ReplicationLogGroup logGroup) {
179180
/** Initialize the writer. */
180181
public void init() throws IOException {
181182
initializeFileSystems();
183+
initializeReplicationShardDirectoryManager();
182184
// Start time based rotation.
183185
lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
184186
startRotationExecutor();
@@ -251,6 +253,12 @@ public void sync() throws IOException {
251253
/** Initialize file systems needed by this writer implementation. */
252254
protected abstract void initializeFileSystems() throws IOException;
253255

256+
/**
257+
* Initialize the {@link ReplicationShardDirectoryManager} to manage file to shard directory
258+
* mapping
259+
*/
260+
protected abstract void initializeReplicationShardDirectoryManager();
261+
254262
/**
255263
* Create a new log writer for rotation.
256264
*/

0 commit comments

Comments
 (0)