I need a class that helps me to run set of tasks in the same order of submission (like serialized execution). Yet I don’t want to use a single-thread executor as I have a large collection of different set of tasks that tasks from different set can be run in parallel. If I use one single-thread executor, it loses efficiency that tasks from different sets no longer be run in parallel. If I use one single-thread executor per task set, then I’ll end up with lots of threads, which is not ideal.
Out of necessity, I created the following class, OrderedExecutor.
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* An executor that make sure tasks submitted with the same key
* will be executed in the same order as task submission
* (order of calling the {@link #submit(Object, Runnable)} method).
*
* Tasks submitted will be run in the given {@link Executor}.
* There is no restriction on how many threads in the given {@link Executor}
* needs to have (it can be single thread executor as well as a cached thread pool).
*
* If there are more than one thread in the given {@link Executor}, tasks
* submitted with different keys may be executed in parallel, but never
* for tasks submitted with the same key.
*
* * @param <K> type of keys.
*/
public class OrderedExecutor<K> {
private final Executor executor;
private final Map<K, Task> tasks;
/**
* Constructs a {@code OrderedExecutor}.
*
* @param executor tasks will be run in this executor.
*/
public OrderedExecutor(Executor executor) {
this.executor = executor;
this.tasks = new HashMap<K, Task>();
}
/**
* Adds a new task to run for the given key.
*
* @param key the key for applying tasks ordering.
* @param runnable the task to run.
*/
public synchronized void submit(K key, Runnable runnable) {
Task task = tasks.get(key);
if (task == null) {
task = new Task();
tasks.put(key, task);
}
task.add(runnable);
}
/**
* Private inner class for running tasks for each key.
* Each key submitted will have one instance of this class.
*/
private class Task implements Runnable {
private final Lock lock;
private final Queue<Runnable> queue;
Task() {
this.lock = new ReentrantLock();
this.queue = new LinkedList<Runnable>();
}
public void add(Runnable runnable) {
boolean runTask;
lock.lock();
try {
// Run only if no job is running.
runTask = queue.isEmpty();
queue.offer(runnable);
} finally {
lock.unlock();
}
if (runTask) {
executor.execute(this);
}
}
@Override
public void run() {
Runnable runnable;
boolean runAgain;
lock.lock();
try {
runnable = queue.poll();
} finally {
// Run this job again if there are more tasks.
runAgain = !queue.isEmpty();
lock.unlock();
}
try {
runnable.run();
} catch (Exception ex) {
ex.printStackTrace();
}
if (runAgain) {
executor.execute(this);
}
}
}
}
Please feel free to use it if you find yourself in the same situation as me.