Skip to content

YARN-11843. Fix potential deadlock when auto-correction of container allocation is enabled #7855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void handle(Event event) {
}

@Override
protected boolean isDrained() {
public boolean isDrained() {
synchronized (mutex) {
return drained;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,11 +722,10 @@ protected void autoCorrectContainerAllocation(List<ResourceRequest> resourceRequ
if (allocatedContainers != null) {
for (RMContainer rmContainer : allocatedContainers) {
if (extraContainers > 0) {
// Change the state of the container from ALLOCATED to EXPIRED since it is not required.
// Change the state of the container from ALLOCATED to RELEASED
// since it is not required.
LOG.debug("Removing extra container:{}", rmContainer.getContainer());
completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
asyncContainerRelease(rmContainer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it cause the message at the end of the container to be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the message will be updated from "Container expired since it was unused" to "Container released by application", which I believe is more appropriate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the message will be updated from "Container expired since it was unused" to "Container released by application", which I believe is more appropriate.

No, I think the old is more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the old one, "expired since it was unused" is not accurate because it was released due to over-allocation, not because the waiting period expired. For the new one, "Container released by application" has been used in different scenarios that actively released by application, I think over-allocation could be included in this scope. @shameersss1 could you please share your thoughts about this?

application.newlyAllocatedContainers.remove(rmContainer);
extraContainers--;
}
Expand Down Expand Up @@ -770,10 +769,10 @@ private void recoverResourceRequestForContainer(RMContainer rmContainer) {
}

// when auto correct container allocation is enabled, there can be a case when extra containers
// go to expired state from allocated state. When such scenario happens do not re-attempt the
// go to released state from allocated state. When such scenario happens do not re-attempt the
// container request since this is expected.
if (autoCorrectContainerAllocation &&
RMContainerState.EXPIRED.equals(rmContainer.getState())) {
RMContainerState.RELEASED.equals(rmContainer.getState())) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
Expand Down Expand Up @@ -291,7 +293,7 @@ private void testMaximumAllocationVCoresHelper(
*/
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
public void testAutoCorrectContainerAllocation(SchedulerType type) throws IOException {
public void testAutoCorrectContainerAllocation(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
Configuration conf = new Configuration(getConf());
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION, true);
Expand Down Expand Up @@ -372,7 +374,8 @@ private RMContainer createMockRMContainer(int containerId, NodeId nodeId,
Container container = mock(Container.class);

// Mock the Container instance with the specified parameters
when(container.getResource()).thenReturn(Resource.newInstance(memory, 1));
Resource resource = Resource.newInstance(memory, 1);
when(container.getResource()).thenReturn(resource);
when(container.getPriority()).thenReturn(priority);
when(container.getId()).thenReturn(ContainerId.newContainerId(appAttemptId, containerId));
when(container.getNodeId()).thenReturn(nodeId);
Expand All @@ -388,6 +391,10 @@ private RMContainer createMockRMContainer(int containerId, NodeId nodeId,
when(rmContainer.getContainer()).thenReturn(container);
when(rmContainer.getContainerId()).thenReturn(
ContainerId.newContainerId(appAttemptId, containerId));
ResourceRequest resourceRequest =
BuilderUtils.newResourceRequest(priority, "*", resource, 1);
when(rmContainer.getContainerRequest()).thenReturn(
new ContainerRequest(ImmutableList.of(resourceRequest)));

return rmContainer;
}
Expand Down Expand Up @@ -470,7 +477,7 @@ private void testContainerAskAndNewlyAllocatedContainerOne(AbstractYarnScheduler
*/
private void testContainerAskZeroAndNewlyAllocatedContainerOne(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, SchedulerNode schedulerNode, NodeId nodeId,
Priority priority, ApplicationAttemptId appAttemptId) {
Priority priority, ApplicationAttemptId appAttemptId) throws Exception {
// Create a resource request with 0 containers, 1024 MB memory, and GUARANTEED execution type
ResourceRequest resourceRequest = createResourceRequest(1024, 1,
0, priority, 0L,
Expand All @@ -485,14 +492,29 @@ private void testContainerAskZeroAndNewlyAllocatedContainerOne(AbstractYarnSched
// Add the RMContainer to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);

// Simulate the container is released and expect it won't be recovered
// when calling AbstractYarnScheduler#recoverResourceRequestForContainer
when(rmContainer1.getState()).thenReturn(RMContainerState.RELEASED);

// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);

// Make sure all event is handled
Dispatcher dispatcher = scheduler.rmContext.getDispatcher();
if (dispatcher instanceof DrainDispatcher) {
GenericTestUtils.waitFor(
() -> ((DrainDispatcher) dispatcher).isDrained(),
500, 3000);
}

// Assert that the container ask remains 0
assertEquals(0, resourceRequest.getNumContainers());

// Assert that there are no newly allocated containers
assertEquals(0, application.pullNewlyAllocatedContainers().size());

// Assert that the next pending ask is null
assertNull(application.appSchedulingInfo.getNextPendingAsk());
}

/**
Expand Down