重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
import java.util.List;
创新互联长期为上千客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为中牟企业提供专业的成都做网站、网站制作,中牟网站改版等技术服务。拥有10年丰富建站经验和众多成功案例,为您定制开发。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 并发处理器
* 适用于如下场景(举例):
* 一个任务队列, 有150个任务需要并发处理,使用此对象,可以每次并发执行20次(可设置),则总共串行执行8次并发,可获取执行结果
*
* @param T 类型T限制为任务Callable使用的数据对象和返回结果的数据对象为同一个bean
*/
public class ConcurrentExcutorT
{
/** 非空,所有任务数组 */
private CallableT[] tasks;
/** 非空,每次并发需要处理的任务数 */
private int numb;
/** 可选,存放返回结果,这里有个限制,泛型T必须为Callable返回的类型T */
private ListT result;
/**
* 无参构造
*/
public ConcurrentExcutor()
{
super();
}
/**
* 不需要返回结果的任务用此创建对象
* @param tasks
* @param numb
*/
public ConcurrentExcutor(CallableT[] tasks, int numb)
{
super();
this.tasks = tasks;
this.numb = numb;
}
/**
* 需要结果集用此方法创建对象
* @param tasks
* @param numb
* @param result
*/
public ConcurrentExcutor(CallableT[] tasks, int numb, ListT result)
{
super();
this.tasks = tasks;
this.numb = numb;
this.result = result;
}
public void excute()
{
// 参数校验
if(tasks == null || numb 1)
{
return;
}
// 待处理的任务数
int num = tasks.length;
if(num == 0)
{
return;
}
// 第一层循环,每numb条数据作为一次并发
for(int i=0; i(int)Math.floor(num/numb) + 1; i++)
{
// 用于记录此次numb条任务的处理结果
Future[] futureArray;
if(numb num)
{
futureArray = new Future[num];
}
else
{
futureArray = new Future[numb];
}
// 创建线程容器
ExecutorService es = Executors.newCachedThreadPool();
// 第二层循环,针对这numb条数据进行处理
for(int j=i*numb; j(i+1)*numb; j++)
{
// 如果超出数组长度,退出循环
if(j + 1 num)
{
break;
}
// 执行任务,并设置Future到数组中
futureArray[j%numb] = es.submit(tasks[j]);
}
// 将结果放入result中
if (result != null)
{
for (int j = 0; j futureArray.length; j++)
{
try
{
if(futureArray[j] != null)
{
Object o = futureArray[j].get();
result.add((T)o);
}
}
catch (InterruptedException e)
{
System.out.println("处理Future时发生InterruptedException异常,目标Future为: " + futureArray[j].toString());
e.printStackTrace();
}
catch (ExecutionException e)
{
System.out.println("处理Future时发生ExecutionException异常,目标Future为: " + futureArray[j].toString());
e.printStackTrace();
}
}
}
es.shutdown();
}
}
接口 java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。一个 ExecutorService 实例因此特别像一个线程池。事实上,在 java.util.concurrent 包中的 ExecutorService 的实现就是一个线程池的实现。
这里有一个简单的使用Java 实现的 ExectorService 样例:
使用 newFixedThreadPool() 工厂方法创建一个 ExecutorService ,上述代码创建了一个可以容纳10个线程任务的线程池。其次,向 execute() 方法中传递一个异步的 Runnable 接口的实现,这样做会让 ExecutorService 中的某个线程执行这个 Runnable 线程。
什么是线程池?
总归为:池化技术 ---》数据库连接池 缓存架构 缓存池 线程池 内存池,连接池,这种思想演变成缓存架构技术--- JDK设计思想有千丝万缕的联系
首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。
Java 中的 ThreadPoolExecutor 类
java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的一个类,因此如果要透彻地了解Java 中的线程池,必须先了解这个类。下面我们来看一下 ThreadPoolExecutor 类的具体实现源码。
在 ThreadPoolExecutor 类中提供了四个构造方法:
从上面的代码可以得知,ThreadPoolExecutor 继承了 AbstractExecutorService 类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
下面解释下一下构造器中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads() 或者 prestartCoreThread()方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize,即当线程池中的线程数大于 corePoolSize 时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为0;
unit:参数 keepAliveTime 的时间单位,有 7 种取值,在 TimeUnit 类中有 7 种静态属性:
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue 和 PriorityBlockingQueue 使用较少,一般使用 LinkedBlockingQueue 和 Synchronous。线程池的排队策略与 BlockingQueue 有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:
具体参数的配置与线程池的关系将在下一节讲述。
从上面给出的 ThreadPoolExecutor 类的代码可以知道,ThreadPoolExecutor 继承了AbstractExecutorService,我们来看一下 AbstractExecutorService 的实现:
AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 接口。
我们接着看 ExecutorService 接口的实现:
而 ExecutorService 又是继承了 Executor 接口,我们看一下 Executor 接口的实现: