Skip to content

Commit 23ee238

Browse files
authored
HBASE-29376 ReplicationLogCleaner.preClean/getDeletableFiles should return early when asyncClusterConnection closes during HMaster stopping (#7071)
Signed-off-by: Duo Zhang <[email protected]>
1 parent c18b712 commit 23ee238

File tree

3 files changed

+76
-8
lines changed

3 files changed

+76
-8
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.fs.FileStatus;
2929
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
3030
import org.apache.hadoop.hbase.ServerName;
31+
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
3132
import org.apache.hadoop.hbase.master.HMaster;
3233
import org.apache.hadoop.hbase.master.MasterServices;
3334
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
@@ -65,6 +66,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
6566
// queue for a given peer, that why we can use a String peerId as key instead of
6667
// ReplicationQueueId.
6768
private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
69+
private MasterServices masterService;
6870
private ReplicationLogCleanerBarrier barrier;
6971
private ReplicationPeerManager rpm;
7072
private Supplier<Set<ServerName>> getNotFullyDeadServers;
@@ -74,9 +76,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
7476

7577
@Override
7678
public void preClean() {
77-
if (this.getConf() == null) {
79+
if (this.getConf() == null || isAsyncClusterConnectionClosedOrNull()) {
80+
LOG.warn(
81+
"Skipping preClean because configuration is null or asyncClusterConnection is unavailable.");
7882
return;
7983
}
84+
8085
try {
8186
if (!rpm.getQueueStorage().hasData()) {
8287
return;
@@ -192,6 +197,13 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
192197
if (this.getConf() == null) {
193198
return files;
194199
}
200+
201+
if (isAsyncClusterConnectionClosedOrNull()) {
202+
LOG.warn("Skip getting deletable files because asyncClusterConnection is unavailable.");
203+
// asyncClusterConnection is unavailable, we shouldn't delete any files.
204+
return Collections.emptyList();
205+
}
206+
195207
try {
196208
if (!rpm.getQueueStorage().hasData()) {
197209
return files;
@@ -251,11 +263,11 @@ public void init(Map<String, Object> params) {
251263
super.init(params);
252264
if (MapUtils.isNotEmpty(params)) {
253265
Object master = params.get(HMaster.MASTER);
254-
if (master != null && master instanceof MasterServices) {
255-
MasterServices m = (MasterServices) master;
256-
barrier = m.getReplicationLogCleanerBarrier();
257-
rpm = m.getReplicationPeerManager();
258-
getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
266+
if (master instanceof MasterServices) {
267+
masterService = (MasterServices) master;
268+
barrier = masterService.getReplicationLogCleanerBarrier();
269+
rpm = masterService.getReplicationPeerManager();
270+
getNotFullyDeadServers = () -> getNotFullyDeadServers(masterService);
259271
return;
260272
}
261273
}
@@ -271,4 +283,13 @@ public void stop(String why) {
271283
public boolean isStopped() {
272284
return this.stopped;
273285
}
286+
287+
/**
288+
* Check if asyncClusterConnection is null or closed.
289+
* @return true if asyncClusterConnection is null or is closed, false otherwise
290+
*/
291+
private boolean isAsyncClusterConnectionClosedOrNull() {
292+
AsyncClusterConnection asyncClusterConnection = masterService.getAsyncClusterConnection();
293+
return asyncClusterConnection == null || asyncClusterConnection.isClosed();
294+
}
274295
}

hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hbase.TableName;
4242
import org.apache.hadoop.hbase.TableNameTestRule;
4343
import org.apache.hadoop.hbase.Waiter;
44+
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
4445
import org.apache.hadoop.hbase.client.TableDescriptor;
4546
import org.apache.hadoop.hbase.master.HMaster;
4647
import org.apache.hadoop.hbase.master.MasterServices;
@@ -134,6 +135,9 @@ public void beforeTest() throws Exception {
134135
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
135136
when(masterServices.getReplicationLogCleanerBarrier())
136137
.thenReturn(new ReplicationLogCleanerBarrier());
138+
AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class);
139+
when(masterServices.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
140+
when(asyncClusterConnection.isClosed()).thenReturn(false);
137141
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
138142
when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
139143
when(rpm.getQueueStorage()).thenReturn(queueStorage);

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.junit.Assert.assertSame;
2424
import static org.junit.Assert.assertTrue;
2525
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.verify;
2627
import static org.mockito.Mockito.when;
2728

2829
import java.util.ArrayList;
@@ -39,6 +40,7 @@
3940
import org.apache.hadoop.hbase.HBaseClassTestRule;
4041
import org.apache.hadoop.hbase.HBaseConfiguration;
4142
import org.apache.hadoop.hbase.ServerName;
43+
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
4244
import org.apache.hadoop.hbase.master.HMaster;
4345
import org.apache.hadoop.hbase.master.MasterServices;
4446
import org.apache.hadoop.hbase.master.ServerManager;
@@ -61,6 +63,7 @@
6163
import org.junit.ClassRule;
6264
import org.junit.Test;
6365
import org.junit.experimental.categories.Category;
66+
import org.mockito.Mockito;
6467

6568
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
6669

@@ -77,16 +80,21 @@ public class TestReplicationLogCleaner {
7780

7881
private ReplicationLogCleaner cleaner;
7982

83+
private ReplicationPeerManager rpm;
84+
8085
@Before
8186
public void setUp() throws ReplicationException {
8287
services = mock(MasterServices.class);
8388
when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
84-
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
89+
AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class);
90+
when(services.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
91+
when(asyncClusterConnection.isClosed()).thenReturn(false);
92+
rpm = mock(ReplicationPeerManager.class);
8593
when(services.getReplicationPeerManager()).thenReturn(rpm);
8694
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
8795
ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
8896
when(rpm.getQueueStorage()).thenReturn(rqs);
89-
when(rpm.getQueueStorage().hasData()).thenReturn(true);
97+
when(rqs.hasData()).thenReturn(true);
9098
when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
9199
ServerManager sm = mock(ServerManager.class);
92100
when(services.getServerManager()).thenReturn(sm);
@@ -383,4 +391,39 @@ public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationExcepti
383391
assertSame(file, iter.next());
384392
assertFalse(iter.hasNext());
385393
}
394+
395+
@Test
396+
public void testPreCleanWhenAsyncClusterConnectionClosed() throws ReplicationException {
397+
assertFalse(services.getAsyncClusterConnection().isClosed());
398+
verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
399+
cleaner.preClean();
400+
verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
401+
verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
402+
403+
when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
404+
assertTrue(services.getAsyncClusterConnection().isClosed());
405+
verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
406+
cleaner.preClean();
407+
verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
408+
// rpm.getQueueStorage().hasData() was not executed, indicating an early return.
409+
verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
410+
}
411+
412+
@Test
413+
public void testGetDeletableFilesWhenAsyncClusterConnectionClosed() throws ReplicationException {
414+
List<FileStatus> files = List.of(new FileStatus());
415+
assertFalse(services.getAsyncClusterConnection().isClosed());
416+
verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
417+
cleaner.getDeletableFiles(files);
418+
verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
419+
verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
420+
421+
when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
422+
assertTrue(services.getAsyncClusterConnection().isClosed());
423+
verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
424+
cleaner.getDeletableFiles(files);
425+
verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
426+
// rpm.getQueueStorage().hasData() was not executed, indicating an early return.
427+
verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
428+
}
386429
}

0 commit comments

Comments
 (0)