public class MyThreadPoolExecutor implements Executor {
private final Logger logger = LoggerFactory.getLogger(MyThreadPoolExecutor.class);
private final AtomicInteger ctl = new AtomicInteger(0);
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final BlockingQueue<Runnable> workQueue;
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; }
@Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); }
int c = ctl.get(); if (c < corePoolSize) { if (!addWorker(command)) { reject(); } return; } if (!workQueue.offer(command)) { if (!addWorker(command)) { reject(); } } }
private boolean addWorker(Runnable task) { int c = ctl.get(); if (c >= maximumPoolSize) { return false; }
Worker worker = new Worker(task); worker.thread.start(); ctl.incrementAndGet(); return true; }
private void reject() { throw new RejectedExecutionException( "cannot execute task, thread count: " + ctl.get() + ", workQueue size: " + workQueue.size()); }
private final class Worker implements Runnable {
final Thread thread; Runnable firstTask;
public Worker(Runnable firstTask) { this.thread = new Thread(this); this.firstTask = firstTask; }
@Override public void run() { Runnable task = firstTask;
try { while (task != null || (task = getTask()) != null) { task.run(); if (ctl.get() > maximumPoolSize) { break; } task = null; } } finally { ctl.decrementAndGet(); } }
private Runnable getTask() { for (; ; ) { try { logger.info("workQueue size: {}", workQueue.size()); return workQueue.take(); } catch (InterruptedException e) { logger.error("InterruptedException", e); } } } } }
|