自定义线程池拆分任务列表

需求

为了减少网络开销,会将数据打包(一个大的List)发送到接收方处理。如果直接使用ThreadPoolExecutor则只会有一个线程处理整个任务List,会导致耗时很久。所以需要将任务进行分割,然后分配给多个线程处理。

实现

这里通过继承ThreadPoolExecutor,来实现任务的拆分,并且一个线程池提供了1个任务分发线程和一个线程池空闲检测线程。任务分发线程会从阻塞队列获取任务,然后进行拆分,分配给线程池执行,在线程池忙时会阻塞。空闲线程池检测线程会在线程池有空闲线程时通知任务分发线程分发任务。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@Slf4j
public class DataProcessThreadPoolManager extends ThreadPoolExecutor {
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000);
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private int splitSize = 100; // list多少个元素拆分为一组
private int workQueueCapacity = 0; // 线程池工作队列的容量

public DataProcessThreadPoolManager(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, BlockingQueue<Runnable> queue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
workQueueCapacity = workQueue.remainingCapacity();
init();
}

public DataProcessThreadPoolManager(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
workQueueCapacity = workQueue.remainingCapacity();
init();
}

public DataProcessThreadPoolManager(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler,int splitSize) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
workQueueCapacity = workQueue.remainingCapacity();
this.splitSize = splitSize;
init();
}
public DataProcessThreadPoolManager(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
workQueueCapacity = workQueue.remainingCapacity();
init();
}

private void init() {
// 任务分发线程
new Thread(() -> {
while (true) {
try {
allocateTask();
} catch (Exception e) {
log.error("任务分发线程异常",e);
}
}
}).start();

// 线程池空闲线程检测
new Thread(()-> {
while (true) {
try {
lock.lockInterruptibly();
if (getQueue().size() < workQueueCapacity)
notFull.signal();
} catch (InterruptedException e) {
log.error("线程池空闲线程检测线程异常",e);
} finally {
lock.unlock();
}

}
}).start();
}

public <T> void addTask(List<T> list, Consumer<List<T>> consumer) {
try {
if (CollectionUtil.isNotEmpty(list) && list.size() > splitSize) {
// 拆分List
splitData(list,consumer);
}
else {
Runnable task = () -> consumer.accept(list); // 将list包装成Runnable,后续给线程池执行
queue.put(task);
}
} catch (InterruptedException e) {
log.error("添加任务到线程池异常",e);
}
}

public void allocateTask() throws InterruptedException {
lock.lockInterruptibly();
try {
if (getQueue().size() == workQueueCapacity) // 线程池工作队列已满 暂停加入任务到队列
notFull.await();

Runnable task = queue.take();
if (null != task)
execute(task);
} finally {
lock.unlock();
}

}

/**
* 任务分解 将一个大的List拆分为多个小List
* @param list
* @param consumer
*/
private void splitData(List list, Consumer consumer) {
int size = list.size();
int start = 0;
int totalPages = size / splitSize;
if (totalPages * splitSize < size)
totalPages += 1;

for (int i=1;i<=totalPages;i++) {
int end = i*splitSize;
if (end > size) end = size;
addTask(list.subList(start,end),consumer);
start = end;
}
}
}

使用方法:

1
2
3
4
5
// 实例化自定义线程池
private final static DataProcessThreadPoolManager BASE_DATA_THREAD_POOL = new DataProcessThreadPoolManager(100, 100, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.DiscardPolicy(),100);
// 通过addTask方法添加待执行的任务List(这里不是Runnable,是你实际要处理的数据类型)
BASE_DATA_THREAD_POOL.addTask(tournamentBaseData.getData(),list -> tournamentService.handle(list));

上面的自定义线程池实现了下面几个功能:

1.任务列表拆分,将一个大的List拆分为多个小的List,每个线程执行拆分后的小的List。

2.在线程池忙时会进行阻塞,不会导致线程池执行拒绝策略,导致任务不执行。

Donny wechat
欢迎关注我的个人公众号
打赏,是超越赞的一种表达。
Show comments from Gitment