youyichannel

志于道,据于德,依于仁,游于艺!

0%

实现一个简易线程池

「线程池工作的核心流程」

核心流程:

  • 线程池中有 N 个工作线程
  • 把任务提交给线程池运行
  • 如果线程池已满,把任务放入队列
  • 当线程有空闲时,获取队列中任务来执行
/**
* 自定义线程池
* <p>
* 核心流程:
* <ul>线程池中有 N 个工作线程</ul>
* <ul>把任务提交给线程池运行</ul>
* <ul>如果线程池已满,把任务放入队列</ul>
* <ul>当有线程空闲时,获取队列中任务来执行</ul>
*
* @author <a href="https://github.com/yoyocraft">youyi</a>
*/
public class MyThreadPoolExecutor implements Executor {

private final Logger logger = LoggerFactory.getLogger(MyThreadPoolExecutor.class);

/**
* 记录线程池中线程数量
*/
private final AtomicInteger ctl = new AtomicInteger(0);


/**
* 核心线程数
*/
private volatile int corePoolSize;

/**
* 最大线程数
*/
private volatile int maximumPoolSize;

/**
* 工作队列,阻塞队列
*/
private final BlockingQueue<Runnable> workQueue;

public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
BlockingQueue<Runnable> workQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
}

@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}

int c = ctl.get();
// 线程池中线程数量小于核心线程数,创建新线程执行任务
if (c < corePoolSize) {
// 添加任务失败
if (!addWorker(command)) {
reject();
}
return;
}
// 线程池中线程数量大于等于核心线程数,将任务加入队列
if (!workQueue.offer(command)) {
// 任务队列满,尝试添加新线程
if (!addWorker(command)) {
reject();
}
}
}

private boolean addWorker(Runnable task) {
int c = ctl.get();
if (c >= maximumPoolSize) {
return false;
}

Worker worker = new Worker(task);
worker.thread.start();
ctl.incrementAndGet();
return true;
}


private void reject() {
throw new RejectedExecutionException(
"cannot execute task, thread count: " + ctl.get() + ", workQueue size: "
+ workQueue.size());
}

private final class Worker implements Runnable {

final Thread thread;
Runnable firstTask;

public Worker(Runnable firstTask) {
this.thread = new Thread(this);
this.firstTask = firstTask;
}

@Override
public void run() {
Runnable task = firstTask;

try {
// 执行任务
while (task != null || (task = getTask()) != null) {
task.run();
// 线程池满
if (ctl.get() > maximumPoolSize) {
break;
}
task = null;
}
} finally {
ctl.decrementAndGet();
}
}

private Runnable getTask() {
for (; ; ) {
try {
logger.info("workQueue size: {}", workQueue.size());
return workQueue.take();
} catch (InterruptedException e) {
logger.error("InterruptedException", e);
}
}
}
}
}

测试:

public class MyThreadPoolExecutorTest {

private static final Logger logger = LoggerFactory.getLogger(MyThreadPoolExecutorTest.class);

public static void main(String[] args) {
MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(2, 2,
new ArrayBlockingQueue<>(10));

for (int i = 0; i < 10; i++) {
int taskNum = i;
myThreadPoolExecutor.execute(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("正在执行任务{}", taskNum);
});
}
}
}

输出:

15:53:13.094 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务0
15:53:13.094 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务1
15:53:13.101 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 8
15:53:13.101 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 8
15:53:14.607 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务3
15:53:14.607 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务2
15:53:14.608 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 6
15:53:14.608 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 6
15:53:16.110 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务4
15:53:16.111 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 4
15:53:16.112 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务5
15:53:16.113 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 3
15:53:17.614 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务7
15:53:17.614 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务6
15:53:17.615 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 2
15:53:17.615 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 2
15:53:19.116 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务9
15:53:19.117 [Thread-1] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 0
15:53:19.117 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutorTest -- 正在执行任务8
15:53:19.117 [Thread-0] INFO com.youyi.threadpool.MyThreadPoolExecutor -- workQueue size: 0