Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public CompletableFuture<Acknowledge> sendSlotReport(
WorkerResourceSpec workerResourceSpec =
WorkerResourceSpec.fromTotalResourceProfile(
workerTypeWorkerRegistration.getTotalResourceProfile(),
slotReport.getNumSlotStatus());
workerTypeWorkerRegistration.getNumberSlots());
onWorkerRegistered(workerTypeWorkerRegistration.getWorker(), workerResourceSpec);
} else if (registrationResult == SlotManager.RegistrationResult.REJECTED) {
closeTaskManagerConnection(
Expand Down Expand Up @@ -1083,7 +1083,8 @@ private RegistrationResponse registerTaskExecutorInternal(
taskExecutorRegistration.getMemoryConfiguration(),
taskExecutorRegistration.getTotalResourceProfile(),
taskExecutorRegistration.getDefaultSlotResourceProfile(),
taskExecutorRegistration.getNodeId());
taskExecutorRegistration.getNodeId(),
taskExecutorRegistration.getNumberSlots());

log.info(
"Registering TaskManager with ResourceID {} ({}) at ResourceManager",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public class TaskExecutorRegistration implements Serializable {
*/
private final String nodeId;

/** Number of slots in static slot allocation. */
private final int numberSlots;

public TaskExecutorRegistration(
final String taskExecutorAddress,
final ResourceID resourceId,
Expand All @@ -73,7 +76,8 @@ public TaskExecutorRegistration(
final TaskExecutorMemoryConfiguration memoryConfiguration,
final ResourceProfile defaultSlotResourceProfile,
final ResourceProfile totalResourceProfile,
final String nodeId) {
final String nodeId,
final int numberSlots) {
this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
this.resourceId = checkNotNull(resourceId);
this.dataPort = dataPort;
Expand All @@ -83,6 +87,7 @@ public TaskExecutorRegistration(
this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile);
this.totalResourceProfile = checkNotNull(totalResourceProfile);
this.nodeId = checkNotNull(nodeId);
this.numberSlots = numberSlots;
}

public String getTaskExecutorAddress() {
Expand Down Expand Up @@ -120,4 +125,8 @@ public ResourceProfile getTotalResourceProfile() {
public String getNodeId() {
return nodeId;
}

public int getNumberSlots() {
return numberSlots;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable>

private final String nodeId;

private final int numberSlots;

public WorkerRegistration(
TaskExecutorGateway taskExecutorGateway,
WorkerType worker,
Expand All @@ -54,7 +56,8 @@ public WorkerRegistration(
TaskExecutorMemoryConfiguration memoryConfiguration,
ResourceProfile totalResourceProfile,
ResourceProfile defaultSlotResourceProfile,
String nodeId) {
String nodeId,
int numberSlots) {

super(worker.getResourceID(), taskExecutorGateway);

Expand All @@ -66,6 +69,7 @@ public WorkerRegistration(
this.totalResourceProfile = Preconditions.checkNotNull(totalResourceProfile);
this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
this.nodeId = Preconditions.checkNotNull(nodeId);
this.numberSlots = numberSlots;
}

public WorkerType getWorker() {
Expand Down Expand Up @@ -99,4 +103,8 @@ public ResourceProfile getTotalResourceProfile() {
public String getNodeId() {
return nodeId;
}

public int getNumberSlots() {
return numberSlots;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,8 @@ private void connectToResourceManager() {
memoryConfiguration,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile(),
unresolvedTaskManagerLocation.getNodeId());
unresolvedTaskManagerLocation.getNodeId(),
taskManagerConfiguration.getNumberSlots());

resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ static void registerTaskExecutor(
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
ResourceProfile.ZERO,
ResourceProfile.ZERO,
taskExecutorAddress);
taskExecutorAddress,
1);
final CompletableFuture<RegistrationResponse> registrationFuture =
resourceManagerGateway.registerTaskExecutor(
taskExecutorRegistration, TestingUtils.TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ void testDelayedRegisterTaskExecutor() throws Exception {
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE,
taskExecutorGateway.getAddress());
taskExecutorGateway.getAddress(),
1);

CompletableFuture<RegistrationResponse> firstFuture =
rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
Expand Down Expand Up @@ -287,7 +288,8 @@ void testDisconnectTaskExecutor() throws Exception {
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE.multiply(numberSlots),
taskExecutorGateway.getAddress());
taskExecutorGateway.getAddress(),
numberSlots);
final RegistrationResponse registrationResponse =
rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
assertThat(registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
Expand Down Expand Up @@ -364,7 +366,8 @@ private CompletableFuture<RegistrationResponse> registerTaskExecutor(
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
DEFAULT_SLOT_PROFILE,
DEFAULT_SLOT_PROFILE,
taskExecutorAddress),
taskExecutorAddress,
1),
TIMEOUT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ private void registerTaskExecutor(
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
ResourceProfile.ZERO,
ResourceProfile.ZERO,
taskExecutorAddress);
taskExecutorAddress,
1);
final CompletableFuture<RegistrationResponse> registrationFuture =
resourceManagerGateway.registerTaskExecutor(
taskExecutorRegistration, TestingUtils.TIMEOUT);
Expand Down Expand Up @@ -767,7 +768,8 @@ private void registerTaskExecutorAndSlot(
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
ResourceProfile.fromResources(1, 1024),
ResourceProfile.fromResources(1, 1024).multiply(slotCount),
taskExecutorGateway.getAddress());
taskExecutorGateway.getAddress(),
slotCount);
RegistrationResponse registrationResult =
resourceManagerGateway
.registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,8 @@ CompletableFuture<RegistrationResponse> registerTaskExecutor(
TESTING_CONFIG,
ResourceProfile.ZERO,
ResourceProfile.ZERO,
resourceID.toString());
resourceID.toString(),
1);

return resourceManager
.getSelfGateway(ResourceManagerGateway.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
Expand Down Expand Up @@ -63,8 +66,19 @@ class TaskExecutorRecoveryTest {
new EachCallbackWrapper<>(rpcServiceExtension);

@Test
void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
throws Exception {
void testRecoveredTaskExecutorWillRestoreAllocationStateWithFixedSlotRequest(
@TempDir File tempDir) throws Exception {
testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, false);
}

@Test
void testRecoveredTaskExecutorWillRestoreAllocationStateWithDynamicSlotRequest(
@TempDir File tempDir) throws Exception {
testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, true);
}

private void testRecoveredTaskExecutorWillRestoreAllocationState(
File tempDir, boolean useDynamicRequest) throws Exception {
final ResourceID resourceId = ResourceID.generate();

final Configuration configuration = new Configuration();
Expand All @@ -82,6 +96,20 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
return CompletableFuture.completedFuture(Acknowledge.get());
});

final ArrayBlockingQueue<TaskExecutorRegistration> taskExecutorRegistrations =
new ArrayBlockingQueue<>(2);

testingResourceManagerGateway.setRegisterTaskExecutorFunction(
taskExecutorRegistration -> {
taskExecutorRegistrations.offer(taskExecutorRegistration);
return CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
taskExecutorRegistration.getResourceId(),
new ClusterInformation("localhost", 1234),
null));
});

final TestingRpcService rpcService = rpcServiceExtension.getTestingRpcService();
rpcService.registerGateway(
testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
Expand Down Expand Up @@ -118,8 +146,14 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)

assertThat(slotReport.getNumSlotStatus(), is(2));

final TaskExecutorRegistration taskExecutorRegistration = taskExecutorRegistrations.take();
assertThat(taskExecutorRegistration.getNumberSlots(), is(2));

final SlotStatus slotStatus = slotReport.iterator().next();
final SlotID allocatedSlotID = slotStatus.getSlotID();
final SlotID allocatedSlotID =
useDynamicRequest
? SlotID.getDynamicSlotID(slotStatus.getSlotID().getResourceID())
: slotStatus.getSlotID();

final AllocationID allocationId = new AllocationID();
taskExecutorGateway
Expand Down Expand Up @@ -160,16 +194,26 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
recoveredTaskExecutor.start();

final TaskExecutorSlotReport recoveredSlotReport = queue.take();

final int expectedNumberOfSlots = useDynamicRequest ? 3 : 2;
assertThat(
recoveredSlotReport.getSlotReport().getNumSlotStatus(), is(expectedNumberOfSlots));
for (SlotStatus status : recoveredSlotReport.getSlotReport()) {
if (status.getSlotID().equals(allocatedSlotID)) {
boolean isAllocatedSlot =
useDynamicRequest
? status.getSlotID().getSlotNumber() == 2
: status.getSlotID().equals(allocatedSlotID);
if (isAllocatedSlot) {
assertThat(status.getJobID(), is(jobId));
assertThat(status.getAllocationID(), is(allocationId));
} else {
assertThat(status.getJobID(), is(nullValue()));
}
}

final TaskExecutorRegistration recoveredTaskExecutorRegistration =
taskExecutorRegistrations.take();
assertThat(recoveredTaskExecutorRegistration.getNumberSlots(), is(2));

final Collection<SlotOffer> take = offeredSlots.take();

assertThat(take, hasSize(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ void testResourceManagerRegistrationIsRejected() {
TASK_MANAGER_MEMORY_CONFIGURATION,
ResourceProfile.ZERO,
ResourceProfile.ZERO,
TASK_MANAGER_NODE_ID);
TASK_MANAGER_NODE_ID,
1);
return new TaskExecutorToResourceManagerConnection(
LOGGER,
rpcService,
Expand Down