參考资料: 阿里巴巴开源项目 CobarClient 源代码实现。
分享作者:闫建忠
分享时间:2014年5月7日
—————————————————————————————
并行调度封装类设计: BXexample.java
package org.hdht.business.ordermanager.quartzjob; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.exception.ExceptionUtils; import org.springframework.dao.ConcurrencyFailureException; public class BXexample { private static ExecutorService createCustomExecutorService(int poolSize, final String method) { int coreSize = Runtime.getRuntime().availableProcessors();//返回系统CUP数量 if (poolSize < coreSize) { coreSize = poolSize; } ThreadFactory tf = new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r, "thread created at BXexample method [" + method + "]"); t.setDaemon(true); return t; } }; BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>(); final ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, poolSize, 60, TimeUnit.SECONDS, queueToUse, tf, new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } public static <T> List<T> getSubListPage(List<T> list, int skip,int pageSize) { if (list == null || list.isEmpty()) { return null; } int startIndex = skip; int endIndex = skip + pageSize; if (startIndex > endIndex || startIndex > list.size()) { return null; } if (endIndex > list.size()) { endIndex = list.size(); } return list.subList(startIndex, endIndex); } public static void BXfunction(Collection<?> paramCollection,final ExectueCallBack ecb){ //构建运行器 ExecutorService executor = createCustomExecutorService(Runtime.getRuntime().availableProcessors(), "batchExecuteProjection"); try { //监视器 final CountDownLatch latch = new CountDownLatch(paramCollection.size()); final StringBuffer exceptionStaktrace = new StringBuffer(); Iterator<?> iter = paramCollection.iterator(); while (iter.hasNext()) { final Object entity = iter.next(); Runnable task = new Runnable() { public void run() { try { ecb.doExectue(entity); } catch (Throwable t) { exceptionStaktrace.append(ExceptionUtils.getFullStackTrace(t)); } finally { latch.countDown(); } } }; executor.execute(task);//并行调度 } try { latch.await();//监视器等待全部线程运行完成 } catch (InterruptedException e) { //调度异常 throw new ConcurrencyFailureException( "unexpected interruption when re-arranging parameter collection into sub-collections ",e); } if (exceptionStaktrace.length() > 0) { //业务异常 throw new ConcurrencyFailureException( "unpected exception when re-arranging parameter collection, check previous log for details.\n"+ exceptionStaktrace); } } finally { executor.shutdown();//运行器关闭 } } }
回调接口类设计:ExectueCallBack.java
package org.hdht.business.ordermanager.quartzjob; public interface ExectueCallBack { void doExectue(Object executor) throws Exception; }
演示样例(hello 演示样例)
public static void main(String[] args) { List<String> paramCollection = new ArrayList<String>(); paramCollection.add("9"); paramCollection.add("2"); paramCollection.add("18"); paramCollection.add("7"); paramCollection.add("6"); paramCollection.add("1"); paramCollection.add("3"); paramCollection.add("4"); paramCollection.add("14"); paramCollection.add("13"); int freesize = 3;//当前处理能力 for(int i=0;i<paramCollection.size();i=i+freesize){ List<String> tl = BXexample.getSubListPage(paramCollection, i, freesize); BXexample.BXfunction(tl,new ExectueCallBack() { public void doExectue(Object executor) throws Exception { int k = Integer.parseInt((String)executor); for(int i=0;i<k*10000000;i++){ //运行循环 } System.out.println(k+":hello world"); } }); } }
演示样例(实际业务应用演示样例)
/** * 并行调度相关处理 * * 按卫星*日期 ,将待处理的任务分解为 卫星+日期 粒度的子任务 加入到paramMapList列表中 */ List<Map<String, Object>> paramMapList = new ArrayList<Map<String, Object>>(); for (Iterator<OrderParamSatellite> iterator = paramSatellites.iterator(); iterator.hasNext();) { OrderParamSatellite paramSatellite = iterator.next(); paramMapList.addAll(this.getParamMapList(paramSatellite)); } //依据集群最大处理能力,分页处理任务列表,作为list截取的步长 int fsize = HostServerQueue.getInstance().freeSize(); for(int i=0;i<paramMapList.size();i=i+fsize){ List<Map<String, Object>> tl = BXexample.getSubListPage(paramMapList, i, fsize); //并行调度 BXexample.BXfunction(tl,new ExectueCallBack(){ public void doExectue(Object executor) throws Exception { ExecuteOrderBTask((Map<String, Object>)executor); } }); //动态查找空暇节点数量,即集群最大处理能力 fsize = HostServerQueue.getInstance().freeSize(); }