-
Notifications
You must be signed in to change notification settings - Fork 1k
Phoenix-7568 - Adding Replication Log Replay Implementation #2278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Phoenix-7568 - Adding Replication Log Replay Implementation #2278
Conversation
9e066e8 to
276fc91
Compare
|
Force pushed due to rebase with upstream feature branch. |
|
Looks like there are lots of blank, checkstyle, and spotbugs issues. Can we fix those first? Or are they false positives? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements major replication log replay infrastructure components for Phoenix, providing HA-aware standby cluster functionality for processing replication logs with state-aware processing.
Key changes:
- Added state-aware replication log replay service and discovery components
- Implemented shard-based directory management for distributed log file processing
- Added comprehensive metrics tracking for replication operations
Reviewed Changes
Copilot reviewed 25 out of 26 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| ReplicationLogReplayService.java | Singleton service managing replication replay for all HA groups with configurable scheduling |
| ReplicationLogReplay.java | HA group-specific replay coordinator with singleton pattern and lifecycle management |
| ReplicationLogDiscoveryReplay.java | State-aware discovery implementation handling SYNC/DEGRADED/SYNCED_RECOVERY states with listener integration |
| ReplicationShardDirectoryManager.java | Time-based shard directory mapping for distributed file processing |
| ReplicationLogTracker.java | File lifecycle management with retry logic and UUID-based tracking |
| ReplicationRound.java | Time window representation for batch processing |
| Various metrics classes | Comprehensive monitoring and JMX integration |
| Test files | Extensive unit test coverage for all major components |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
...-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
Outdated
Show resolved
Hide resolved
...-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
Show resolved
Hide resolved
|
|
||
| @Override | ||
| protected void processRound(ReplicationRound replicationRound) throws IOException { | ||
| System.out.println("Processing Round: " + replicationRound); |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using System.out.println() for logging in test code. Consider using a proper logging framework (SLF4J) for consistency with the rest of the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely don't do this.
| // Simulate state change by listener after certain number of rounds | ||
| roundsProcessed++; | ||
| if (stateChangeAfterRounds > 0 && roundsProcessed == stateChangeAfterRounds && newStateAfterRounds != null) { | ||
| System.out.println("Rounds Processed: " + roundsProcessed + " - " + newStateAfterRounds); |
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using System.out.println() for logging in test code. Consider using a proper logging framework (SLF4J) for consistency with the rest of the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
| System.out.println("Processed files"); | ||
| for (Path file : processedFiles) { | ||
| System.out.println(file); | ||
| } | ||
|
|
Copilot
AI
Oct 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using System.out.println() for debug output in test code. This debug output should be removed or replaced with proper logging to avoid cluttering test output.
| System.out.println("Processed files"); | |
| for (Path file : processedFiles) { | |
| System.out.println(file); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address checkstyle, spotbugs, and copilot findings, please.
Sure, addressed all of those (except false positives of spotbugs). |
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
Show resolved
Hide resolved
| * Enum representing the type of replication log directory. | ||
| * IN: Directory created on standby cluster for Incoming replication log files | ||
| * OUT: Directory created on primary cluster for Outgoing replication log files | ||
| */ | ||
| public enum DirectoryType { | ||
| IN("in"), | ||
| OUT("out"); | ||
|
|
||
| private final String name; | ||
|
|
||
| DirectoryType(final String name) { | ||
| this.name = name; | ||
| } | ||
|
|
||
| public String getName() { | ||
| return this.name; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should get rid of this enum. It is not really an enum. What if we use a different terminology. These are just string constants but they don't need to be defined here but at a higher level. The ReplicationLogTracker doesn't need to know about it. It should take a path as input and just work with it.
...ver/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscovery.java
Show resolved
Hide resolved
| MetricsReplicationLogDiscoveryImpl.METRICS_CONTEXT, | ||
| MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT | ||
| + ",haGroup=" + haGroupName); | ||
| super.groupMetricsContext = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this also be moved to the constructor instead of a standalone call ?
...main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogTrackerReplayImpl.java
Show resolved
Hide resolved
...in/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
Show resolved
Hide resolved
...in/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
Show resolved
Hide resolved
| Optional<Long> minTimestampFromInProgressFiles = | ||
| getMinTimestampFromInProgressFiles(); | ||
| if (minTimestampFromInProgressFiles.isPresent()) { | ||
| LOG.info("Initializing lastRoundProcessed from IN PROGRESS files with minimum " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IN should not be hardcoded here
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
Outdated
Show resolved
Hide resolved
fdc75f4 to
c0cb77b
Compare
|
Forced push due to rebase with upstream changes. |
|
|
||
| protected String getInProgressLogSubDirectoryName() { | ||
| return getNewLogSubDirectoryName() + "_progress"; | ||
| return getInSubDirectoryName() + "_progress"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be a more generic name instead of getInSubDirectoryName
|
@Himanshu-g81 There are test failures which need to be fixed [ERROR] ReplicationLogDiscoveryTest.testProcessInProgressDirectoryWithIntermittentFailure » OutOfMemory Java heap space |
High Level Description of major Replication Replay Componenets added in this PR
ReplicationLogReplayService - A singleton class that has single thread which gets all the HA groups and start Replication Replay for each using ReplicationReplay.get(conf, replicationGroup).startReplay(); every 60 seconds (configurable). Note that startReplay() of ReplicationReplay is idempotent. This is hooked into RS start / stop path in PhoenixRegionServerEndpoint.java
ReplicationLogReplay - Responsible for handling replication replay lifecycle for single HA Group. It initialize the file system from which replay needs to be done for this HA Group. The init method also initialize ReplicationReplayLogDiscovery (and respective ReplicationLogFileTrackerReplay for the group) - Responsibilites of these 2 componenets are described below.
ReplicationLogFileTracker - Abstract class to deal with all file system interactions (getNewFiles, markInProgress, markCompleted, markInProgress). It has one implementation currently for standby cluster (ReplicationLogReplayFileTracker) overriding directory (IN directory) and metric source. Similar implementation can be added for store and forward mode (with OUT as directory) and custom metric source.
ReplicationLogDiscovery - Abstract class responsible for logic of processing files round by round. It contains ReplicationLogFileTracker. It creates a thread pool (with all properties configurable, i.e. thread count, scheduling interval, etc) to process the log files round by round (details - this is salesforce internal doc, will update once design is published). The process method is abstract and implemented for standby cluster replay (as ReplicationReplayLogDiscovery) to apply mutations on target. Similar implementation can be added for store and forward mode - to just copy file to standby cluster.
ReplicationShardDirectoryManager - Encapsulates the logic of shard directory management (for both active and DR cluster). Only root directory needs to be given during initialization. Changes to leverage it in source are also part of this PR.
Simplified Sequence Diagram
