线程池中如果让具有相同属性的任务按顺序执行

需求

难免有这样的需求:希望具有相同属性的任务在线程池中按顺序执行,而且在线程只要有空闲,任务必须马上执行。

实现一

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.tommy.json.Match;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* @author Donny
* @createTime 2020-01-10 10:23
* @description 具有相同属性的任务串行执行。
*/
public class OrderedThreadManager2 {
/**
* 存放有序任务
* 每个任务内部有一个阻塞队列,存放所有属性相同的待执行任务,保证它们按顺序执行。
*
* 因为后续可能没有某个key相关的任务到来,会导致存储的SerialTask越来越多,所以这里缓存一段时间就移除。
*/
private Cache<Object,SerialTask> serialTaskCache = CacheBuilder.newBuilder().expireAfterWrite(30,TimeUnit.MINUTES).build();
/**
* 任务执行器
*/
private ThreadPoolExecutor executor;

public OrderedThreadManager2(int min,int max) {
if (min <= 0 || max <= 0) {
throw new IllegalArgumentException("线程数必须>0");
}
ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("ordered-thread-%d").build();
executor = new ThreadPoolExecutor(min,max,60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),threadFactory);
}

/**
* 相同赛事id的任务串行执行
* @param match
* @param consumer
*/
public void executeSeriallyTask(Match match,Consumer<Match> consumer) {
SerialTask task = addTask(match);
try {
task.addTask(()->consumer.accept(match));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private SerialTask addTask(Match match) {
Object key = match.getId();
SerialTask task = serialTaskCache.getIfPresent(key);
if (null == task) {
synchronized (this) {
task = serialTaskCache.getIfPresent(key);
if (null == task) {
task = new SerialTask();
serialTaskCache.put(key,task);
}
}
}
return task;
}

public void executeTask(List<Match> matches, Consumer<Match> consumer) {
if (matches != null && !matches.isEmpty()) {
matches.forEach(match->executeSeriallyTask(match,consumer));
}
}

private class SerialTask implements Runnable {
// 存储具有相同属性的任务的阻塞队列
private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>(10);
private volatile boolean running = false;

@Override
public void run() {
Runnable r;
for (;;) {
try {
running = true;
r = tasks.poll(50,TimeUnit.MILLISECONDS);
if (r != null) {
r.run();
}
else {
if (tasks.isEmpty()) {
running = false;
return;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

public void addTask(Runnable runnable) throws InterruptedException {
synchronized (this) {
tasks.put(runnable);

// 如果当前SerialTask的任务队列已经是空的了,则立即执行任务
if (!running) {
executor.execute(this);
running = true;
}
}
}
}
}

外部可以调用executeSeriallyTask或executeTask方法来添加一个或多个任务,我这里是以match对象的id属性来区分的,这里其实还可以设计的通用一些,可以外部指定根据哪些属性来区分是不是同一类对象,这一类的任务需要按顺序处理。

实现二

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.tommy.json.Match;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.stream.IntStream;

/**
* @author Donny
* @createTime 2019-09-19 14:05
* @description 按照id取模 相同id的让同一个线程处理 即同一个对象的多次数据串行处理。
*/
public class OrderedThreadManager {
private List<ExecutorService> services;
private int size;

public OrderedThreadManager(int size, ThreadFactory threadFactory) {
if (size <= 0) {
throw new IllegalArgumentException("线程数必须>0");
}

this.size = size;
this.services = new ArrayList<>(size);
IntStream.rangeClosed(1,size).forEach(idx->this.services.add(Executors.newSingleThreadExecutor(threadFactory)));
}

public OrderedThreadManager(int size) {
this(size,ThreadFactorys.RATIO_THREAD_FACTORY);
}

private void route(int id,Runnable task) {
this.services.get(id % size).execute(task);
}

public void route(Match match,Runnable task) {
route(match.getId(),task);
}

public void route(List<Match> matches, Consumer<Match> consumer) {
if (matches != null && !matches.isEmpty()) {
matches.forEach(match->route(match.getId(),()->consumer.accept(match)));
}
}

public void shutdown() {
this.services.forEach(ExecutorService::shutdown);
}
}

外部可以通过route方法的多个重载版本来选择适当的方式调用。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author Donny
* @createTime 2019-09-19 14:13
* @description
*/
public class Test {

public static void main(String[] args) {
RatioService ratioService = new RatioService(new RatioDbSaveThreadManager());
final List<Match> matches = new ArrayList<>();
IntStream.rangeClosed(1,25).forEach(idx-> {
Match match = Match.builder().id(idx % 5).name("match " + idx).build();
matches.add(match);
});

// test1(matches,ratioService);
test2(matches,ratioService);
}

private static void test1(List<Match> matches,RatioService ratioService) {
OrderedThreadManager threadManager = new OrderedThreadManager(3,ThreadFactorys.RATIO_THREAD_FACTORY);
threadManager.route(matches, ratioService::handleOdds);
}

private static void test2(List<Match> matches,RatioService ratioService) {
OrderedThreadManager2 orderedThreadManager = new OrderedThreadManager2(10,10);
orderedThreadManager.executeTask(matches, ratioService::handleOdds);
}
}

第二种实现更简单一些,但坏处也很明显,可能导致线程资源浪费。比如10个任务,可能好几个任务按模取余后相同,导致他们被按顺序处理。这样处理效率就比较低了。

Author: Donny
Link: https://tommy88.top/2020/01/10/线程池中如果让具有相同属性的任务按顺序执行/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
微信打赏