序
本文主要研究一下flink的AbstractNonHaServices
HighAvailabilityServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
public interface HighAvailabilityServices extends AutoCloseable {
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
/**
* This UUID should be used when no proper leader election happens, but a simple
* pre-configured leader is used. That is for example the case in non-highly-available
* standalone setups.
*/
UUID DEFAULT_LEADER_ID = new UUID(0, 0);
/**
* This JobID should be used to identify the old JobManager when using the
* {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
* distinct JobID assigned.
*/
JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
/**
* Gets the leader retriever for the cluster's resource manager.
*/
LeaderRetrievalService getResourceManagerLeaderRetriever();
/**
* Gets the leader retriever for the dispatcher. This leader retrieval service
* is not always accessible.
*/
LeaderRetrievalService getDispatcherLeaderRetriever();
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
* @return Leader retrieval service to retrieve the job manager for the given job
* @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
*/
@Deprecated
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
* @param defaultJobManagerAddress JobManager address which will be returned by
* a static leader retrieval service.
* @return Leader retrieval service to retrieve the job manager for the given job
*/
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
LeaderRetrievalService getWebMonitorLeaderRetriever();
/**
* Gets the leader election service for the cluster's resource manager.
*
* @return Leader election service for the resource manager leader election
*/
LeaderElectionService getResourceManagerLeaderElectionService();
/**
* Gets the leader election service for the cluster's dispatcher.
*
* @return Leader election service for the dispatcher leader election
*/
LeaderElectionService getDispatcherLeaderElectionService();
/**
* Gets the leader election service for the given job.
*
* @param jobID The identifier of the job running the election.
* @return Leader election service for the job manager leader election
*/
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
LeaderElectionService getWebMonitorLeaderElectionService();
/**
* Gets the checkpoint recovery factory for the job manager
*
* @return Checkpoint recovery factory
*/
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
/**
* Gets the submitted job graph store for the job manager
*
* @return Submitted job graph store
* @throws Exception if the submitted job graph store could not be created
*/
SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
/**
* Gets the registry that holds information about whether jobs are currently running.
*
* @return Running job registry to retrieve running jobs
*/
RunningJobsRegistry getRunningJobsRegistry() throws Exception;
/**
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
*
* @return Blob store
* @throws IOException if the blob store could not be created
*/
BlobStore createBlobStore() throws IOException;
// ------------------------------------------------------------------------
// Shutdown and Cleanup
// ------------------------------------------------------------------------
/**
* Closes the high availability services, releasing all resources.
*
* <p>This method <b>does not delete or clean up</b> any data stored in external stores
* (file systems, ZooKeeper, etc). Another instance of the high availability
* services will be able to recover the job.
*
* <p>If an exception occurs during closing services, this method will attempt to
* continue closing other services and report exceptions only after all services
* have been attempted to be closed.
*
* @throws Exception Thrown, if an exception occurred while closing these services.
*/
@Override
void close() throws Exception;
/**
* Closes the high availability services (releasing all resources) and deletes
* all data stored by these services in external stores.
*
* <p>After this method was called, the any job or session that was managed by
* these high availability services will be unrecoverable.
*
* <p>If an exception occurs during cleanup, this method will attempt to
* continue the cleanup and report exceptions only after all cleanup steps have
* been attempted.
*
* @throws Exception Thrown, if an exception occurred while closing these services
* or cleaning up data stored by them.
*/
void closeAndCleanupAllData() throws Exception;
}
- HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
AbstractNonHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
public abstract class AbstractNonHaServices implements HighAvailabilityServices {
protected final Object lock = new Object();
private final RunningJobsRegistry runningJobsRegistry;
private final VoidBlobStore voidBlobStore;
private boolean shutdown;
public AbstractNonHaServices() {
this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
this.voidBlobStore = new VoidBlobStore();
shutdown = false;
}
// ----------------------------------------------------------------------
// HighAvailabilityServices method implementations
// ----------------------------------------------------------------------
@Override
public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneCheckpointRecoveryFactory();
}
}
@Override
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
synchronized (lock) {
checkNotShutdown();
return new StandaloneSubmittedJobGraphStore();
}
}
@Override
public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
synchronized (lock) {
checkNotShutdown();
return runningJobsRegistry;
}
}
@Override
public BlobStore createBlobStore() throws IOException {
synchronized (lock) {
checkNotShutdown();
return voidBlobStore;
}
}
@Override
public void close() throws Exception {
synchronized (lock) {
if (!shutdown) {
shutdown = true;
}
}
}
@Override
public void closeAndCleanupAllData() throws Exception {
// this stores no data, so this method is the same as 'close()'
close();
}
// ----------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------
@GuardedBy("lock")
protected void checkNotShutdown() {
checkState(!shutdown, "high availability services are shut down");
}
protected boolean isShutDown() {
return shutdown;
}
}
- AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
EmbeddedHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
public class EmbeddedHaServices extends AbstractNonHaServices {
private final Executor executor;
private final EmbeddedLeaderService resourceManagerLeaderService;
private final EmbeddedLeaderService dispatcherLeaderService;
private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
private final EmbeddedLeaderService webMonitorLeaderService;
public EmbeddedHaServices(Executor executor) {
this.executor = Preconditions.checkNotNull(executor);
this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
this.jobManagerLeaderServices = new HashMap<>();
this.webMonitorLeaderService = new EmbeddedLeaderService(executor);
}
// ------------------------------------------------------------------------
// services
// ------------------------------------------------------------------------
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return resourceManagerLeaderService.createLeaderRetrievalService();
}
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
return dispatcherLeaderService.createLeaderRetrievalService();
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
return resourceManagerLeaderService.createLeaderElectionService();
}
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
return dispatcherLeaderService.createLeaderElectionService();
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
checkNotNull(jobID);
synchronized (lock) {
checkNotShutdown();
EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
return service.createLeaderRetrievalService();
}
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
return getJobManagerLeaderRetriever(jobID);
}
@Override
public LeaderRetrievalService getWebMonitorLeaderRetriever() {
return webMonitorLeaderService.createLeaderRetrievalService();
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
checkNotNull(jobID);
synchronized (lock) {
checkNotShutdown();
EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
return service.createLeaderElectionService();
}
}
@Override
public LeaderElectionService getWebMonitorLeaderElectionService() {
return webMonitorLeaderService.createLeaderElectionService();
}
// ------------------------------------------------------------------------
// internal
// ------------------------------------------------------------------------
@GuardedBy("lock")
private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
if (service == null) {
service = new EmbeddedLeaderService(executor);
jobManagerLeaderServices.put(jobID, service);
}
return service;
}
// ------------------------------------------------------------------------
// shutdown
// ------------------------------------------------------------------------
@Override
public void close() throws Exception {
synchronized (lock) {
if (!isShutDown()) {
// stop all job manager leader services
for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
service.shutdown();
}
jobManagerLeaderServices.clear();
resourceManagerLeaderService.shutdown();
webMonitorLeaderService.shutdown();
}
super.close();
}
}
}
- EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices
StandaloneHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
public class StandaloneHaServices extends AbstractNonHaServices {
/** The constant name of the ResourceManager RPC endpoint */
private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
/** The fix address of the ResourceManager */
private final String resourceManagerAddress;
/** The fix address of the Dispatcher */
private final String dispatcherAddress;
/** The fix address of the JobManager */
private final String jobManagerAddress;
private final String webMonitorAddress;
/**
* Creates a new services class for the fix pre-defined leaders.
*
* @param resourceManagerAddress The fix address of the ResourceManager
* @param webMonitorAddress
*/
public StandaloneHaServices(
String resourceManagerAddress,
String dispatcherAddress,
String jobManagerAddress,
String webMonitorAddress) {
this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress);
}
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElectionService();
}
}
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElectionService();
}
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElectionService();
}
}
@Override
public LeaderRetrievalService getWebMonitorLeaderRetriever() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);
}
}
@Override
public LeaderElectionService getWebMonitorLeaderElectionService() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElectionService();
}
}
}
- StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices
小结
- HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
- AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
- EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices;StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices