本篇文章主要介绍了"java.util.concurrent 用法实例详解 -中的java源码",主要涉及到方面的内容,对于Javajrs看球网直播吧_低调看直播体育app软件下载_低调看体育直播感兴趣的同学可以参考一下:
package service;/*** 线程池服务类* * @author EX-LIUQI001**/
public class ThreadPoolSer...
package service;
/**
* 线程池服务类
*
* @author EX-LIUQI001
*
*/
public class ThreadPoolService {
/**
* 默认线程池大小
*/
public static final int DEFAULT_POOL_SIZE = 5;
/**
* 默认一个任务的超时时间,单位为毫秒
*/
public static final long DEFAULT_TASK_TIMEOUT = 1000;
private int poolSize = DEFAULT_POOL_SIZE;
private ExecutorService executorService;
/**
* 根据给定大小创建线程池
*/
public ThreadPoolService(int poolSize) {
setPoolSize(poolSize);
}
/**
* 使用线程池中的线程来执行任务
*/
public void execute(Runnable task) {
executorService.execute(task);
}
/**
* 在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间
*
* @see #invokeAll(List, long)
*/
public List invokeAll(List tasks) {
return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size());
}
/**
* 在线程池中执行所有给定的任务并取回运行结果
*
* @param timeout
* 以毫秒为单位的超时时间,小于0表示不设定超时
* @see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)
*/
public List invokeAll(List tasks, long timeout) {
List nodes = new ArrayList(tasks.size());
try {
List<>> futures = null;
if (timeout < 0) {
futures = executorService.invokeAll(tasks);
} else {
futures = executorService.invokeAll(tasks, timeout,
TimeUnit.MILLISECONDS);
}
for (Future future : futures) {
try {
nodes.add(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return nodes;
}
/**
* 关闭当前ExecutorService
*
* @param timeout
* 以毫秒为单位的超时时间
*/
public void destoryExecutorService(long timeout) {
if (executorService != null && !executorService.isShutdown()) {
try {
executorService
.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
/**
* 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService
*/
public void createExecutorService() {
destoryExecutorService(1000);
executorService = Executors.newFixedThreadPool(poolSize);
}
/**
* 调整线程池大小
*
* @see #createExecutorService()
*/
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
createExecutorService();
}
}
package service;
/**
* 节点类
*
* @author EX-LIUQI001
*
*/
public class Node {
private String name;
private String wsdl;
private String result = "PASS";
private String[] dependencies = new String[] {};
private Lock lock = new ReentrantLock();
/**
* 默认构造方法
*/
public Node() {
}
/**
* 构造节点对象,设置名称及WSDL
*/
public Node(String name, String wsdl) {
this.name = name;
this.wsdl = wsdl;
}
/**
* 返回包含节点名称、WSDL以及验证结果的字符串
*/
@Override
public String toString() {
String toString = "Node: " + name + " WSDL: " + wsdl + " Result: "
+ result;
return toString;
}
// Getter & Setter
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getWsdl() {
return wsdl;
}
public void setWsdl(String wsdl) {
this.wsdl = wsdl;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
public String[] getDependencies() {
return dependencies;
}
public void setDependencies(String[] dependencies) {
this.dependencies = dependencies;
}
public Lock getLock() {
return lock;
}
}
package service;
/**
* 执行验证的任务类
*
* @author EX-LIUQI001
*
*/
public class ValidationTask implements Callable {
private static Logger logger = Logger.getLogger("ValidationTask");
private String wsdl;
/**
* 构造方法,传入节点的WSDL
*/
public ValidationTask(String wsdl) {
this.wsdl = wsdl;
}
/**
* 执行针对某个节点的验证
* 如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证
*/
@Override
public Node call() throws Exception {
Node node = ValidationService.NODE_MAP.get(wsdl);
Lock lock = null;
logger.info("开始验证节点:" + wsdl);
if (node != null) {
lock = node.getLock();
if (lock.tryLock()) {
// 当前没有其他线程验证该节点
logger.info("当前没有其他线程验证节点" + node.getName() + "[" + wsdl + "]");
try {
Node result = MockNodeValidator.validateNode(wsdl);
mergeNode(result, node);
} finally {
lock.unlock();
}
} else {
// 当前有别的线程正在验证该节点,等待结果
logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl
+ "],等待结果");
lock.lock();
lock.unlock();
}
} else {
// 从未进行过验证,这种情况应该只出现在系统启动初期
// 这时是在做初始化,不应该有冲突发生
logger.info("首次验证节点:" + wsdl);
node = MockNodeValidator.validateNode(wsdl);
ValidationService.NODE_MAP.put(wsdl, node);
}
logger.info("节点" + node.getName() + "[" + wsdl + "]验证结束,验证结果:"
+ node.getResult());
return node;
}
/**
* 将src的内容合并进dest节点中,不进行深度拷贝
*/
private Node mergeNode(Node src, Node dest) {
dest.setName(src.getName());
dest.setWsdl(src.getWsdl());
dest.setDependencies(src.getDependencies());
dest.setResult(src.getResult());
return dest;
}
}
package service;
/**
* 有界缓冲区
*
* @author EX-LIUQI001
*
*/
public class BoundedBuffer {
private static final Lock lock = new ReentrantLock();
private static final Condition notFull = lock.newCondition();
private static final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
package service;
/**
* 执行验证的服务类
*
* @author EX-LIUQI001
*
*/
public class ValidationService {
/**
* 全局节点表
*/
public static final Map NODE_MAP = new ConcurrentHashMap();
private ThreadPoolService threadPoolService;
public ValidationService(ThreadPoolService threadPoolService) {
this.threadPoolService = threadPoolService;
}
/**
* 给出一个入口节点的WSDL,通过广度遍历的方式验证与其相关的各个节点
*
* @param wsdl
* 入口节点WSDL
*/
public void validate(List wsdl) {
List visitedNodes = new ArrayList();
List nextRoundNodes = new ArrayList();
nextRoundNodes.addAll(wsdl);
while (nextRoundNodes.size() > 0) {
List tasks = getTasks(nextRoundNodes);
List nodes = threadPoolService.invokeAll(tasks);
visitedNodes.addAll(nextRoundNodes);
nextRoundNodes.clear();
getNextRoundNodes(nodes, visitedNodes, nextRoundNodes);
}
}
private List getNextRoundNodes(List nodes,
List visitedNodes, List nextRoundNodes) {
for (Node node : nodes) {
for (String wsdl : node.getDependencies()) {
if (!visitedNodes.contains(wsdl)) {
nextRoundNodes.add(wsdl);
}
}
}
return nextRoundNodes;
}
private List getTasks(List nodes) {
List tasks = new ArrayList(nodes.size());
for (String wsdl : nodes) {
tasks.add(new ValidationTask(wsdl));
}
return tasks;
}
}
package service;
/**
* 模拟执行节点验证的Mock类
*
* @author EX-LIUQI001
*
*/
public class MockNodeValidator {
public static final List ENTRIES = new ArrayList();
private static final Map NODE_MAP = new HashMap();
private static AtomicInteger count = new AtomicInteger(0);
private static Logger logger = Logger.getLogger("MockNodeValidator");
/*
* 构造模拟数据
*/
static {
Node node0 = new Node("NODE0", "http://node0/check?wsdl"); // 入口0
Node node1 = new Node("NODE1", "http://node1/check?wsdl");
Node node2 = new Node("NODE2", "http://node2/check?wsdl");
Node node3 = new Node("NODE3", "http://node3/check?wsdl");
Node node4 = new Node("NODE4", "http://node4/check?wsdl");
Node node5 = new Node("NODE5", "http://node5/check?wsdl");
Node node6 = new Node("NODE6", "http://node6/check?wsdl"); // 入口1
Node node7 = new Node("NODE7", "http://node7/check?wsdl");
Node node8 = new Node("NODE8", "http://node8/check?wsdl");
Node node9 = new Node("NODE9", "http://node9/check?wsdl");
node0.setDependencies(new String[] { node1.getWsdl(), node2.getWsdl() });
node1.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });
node2.setDependencies(new String[] { node5.getWsdl() });
node6.setDependencies(new String[] { node7.getWsdl(), node8.getWsdl() });
node7.setDependencies(new String[] { node5.getWsdl(), node9.getWsdl() });
node8.setDependencies(new String[] { node3.getWsdl(), node4.getWsdl() });
node2.setResult("FAILED");
NODE_MAP.put(node0.getWsdl(), node0);
NODE_MAP.put(node1.getWsdl(), node1);
NODE_MAP.put(node2.getWsdl(), node2);
NODE_MAP.put(node3.getWsdl(), node3);
NODE_MAP.put(node4.getWsdl(), node4);
NODE_MAP.put(node5.getWsdl(), node5);
NODE_MAP.put(node6.getWsdl(), node6);
NODE_MAP.put(node7.getWsdl(), node7);
NODE_MAP.put(node8.getWsdl(), node8);
NODE_MAP.put(node9.getWsdl(), node9);
ENTRIES.add(node0);
ENTRIES.add(node6);
}
/**
* 模拟执行远程验证返回节点,每次调用等待500ms
*/
public static Node validateNode(String wsdl) {
Node node = cloneNode(NODE_MAP.get(wsdl));
logger.info("验证节点" + node.getName() + "[" + node.getWsdl() + "]");
count.getAndIncrement();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return node;
}
/**
* 获得计数器的值
*/
public static int getCount() {
return count.intValue();
}
/**
* 克隆一个新的Node对象(未执行深度克隆)
*/
public static Node cloneNode(Node originalNode) {
Node newNode = new Node();
newNode.setName(originalNode.getName());
newNode.setWsdl(originalNode.getWsdl());
newNode.setResult(originalNode.getResult());
newNode.setDependencies(originalNode.getDependencies());
return newNode;
}
}
package service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import service.Node;
import service.ThreadPoolService;
import service.ValidationService;
/**
* 模拟执行这个环境的验证
*
* @author EX-LIUQI001
*
*/
public class ValidationStarter implements Runnable {
private List entries;
private ValidationService validationService;
private CountDownLatch signal;
public ValidationStarter(List entries,
ValidationService validationService, CountDownLatch signal) {
this.entries = entries;
this.validationService = validationService;
this.signal = signal;
}
@Override
public void run() {
validationService.validate(entries);
signal.countDown();
}
/**
* 线程池大小为10,初始化执行一次,随后并发三个验证
*/
public static void main(String[] args) {
ThreadPoolService threadPoolService = new ThreadPoolService(10);
ValidationService validationService = new ValidationService(
threadPoolService);
List entries = new ArrayList();
CountDownLatch signal = new CountDownLatch(3);
long start;
long stop;
for (Node node : MockNodeValidator.ENTRIES) {
entries.add(node.getWsdl());
}
start = System.currentTimeMillis();
validationService.validate(entries);
threadPoolService.execute(new ValidationStarter(entries,
validationService, signal));
threadPoolService.execute(new ValidationStarter(entries,
validationService, signal));
threadPoolService.execute(new ValidationStarter(entries,
validationService, signal));
try {
signal.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
stop = System.currentTimeMillis();
threadPoolService.destoryExecutorService(1000);
System.out.println("实际执行验证次数: " + MockNodeValidator.getCount());
System.out.println("实际执行时间: " + (stop - start) + "ms");
}
}
以上就介绍了java.util.concurrent 用法实例详解 -中的java源码,包括了方面的内容,希望对Javajrs看球网直播吧_低调看直播体育app软件下载_低调看体育直播有兴趣的朋友有所帮助。
本文网址链接:http://www.codes51.com/article/detail_104626.html