Executor that maintains execution order by group

I need a class that helps me to run set of tasks in the same order of submission (like serialized execution). Yet I don’t want to use a single-thread executor as I have a large collection of different set of tasks that tasks from different set can be run in parallel. If I use one single-thread executor, it loses efficiency that tasks from different sets no longer be run in parallel. If I use one single-thread executor per task set, then I’ll end up with lots of threads, which is not ideal.

Out of necessity, I created the following class, OrderedExecutor.

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * An executor that make sure tasks submitted with the same key
 * will be executed in the same order as task submission
 * (order of calling the {@link #submit(Object, Runnable)} method).
 *
 * Tasks submitted will be run in the given {@link Executor}.
 * There is no restriction on how many threads in the given {@link Executor}
 * needs to have (it can be single thread executor as well as a cached thread pool).
 *
 * If there are more than one thread in the given {@link Executor}, tasks
 * submitted with different keys may be executed in parallel, but never
 * for tasks submitted with the same key.
 * 
 * * @param <K> type of keys.
 */
public class OrderedExecutor<K> {

    private final Executor executor;
    private final Map<K, Task> tasks;

    /**
     * Constructs a {@code OrderedExecutor}.
     *
     * @param executor tasks will be run in this executor.
     */
    public OrderedExecutor(Executor executor) {
        this.executor = executor;
        this.tasks = new HashMap<K, Task>();
    }

    /**
     * Adds a new task to run for the given key.
     *
     * @param key the key for applying tasks ordering.
     * @param runnable the task to run.
     */
    public synchronized void submit(K key, Runnable runnable) {
        Task task = tasks.get(key);
        if (task == null) {
            task = new Task();
            tasks.put(key, task);
        }
        task.add(runnable);
    }

    /**
     * Private inner class for running tasks for each key.
     * Each key submitted will have one instance of this class.
     */
    private class Task implements Runnable {

        private final Lock lock;
        private final Queue<Runnable> queue;

        Task() {
            this.lock = new ReentrantLock();
            this.queue = new LinkedList<Runnable>();
        }

        public void add(Runnable runnable) {
            boolean runTask;
            lock.lock();
            try {
                // Run only if no job is running.
                runTask = queue.isEmpty();
                queue.offer(runnable);
            } finally {
                lock.unlock();
            }
            if (runTask) {
                executor.execute(this);
            }
        }

        @Override
        public void run() {
            // Pick a task to run.
            Runnable runnable;
            lock.lock();
            try {
                runnable = queue.peek();
            } finally {
                lock.unlock();
            }
            try {
                runnable.run();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            // Check to see if there are queued task, if yes, submit for execution.
            lock.lock();
            try {
                queue.poll();
                if (!queue.isEmpty()) {
                    executor.execute(this);
                }
            } finally {
                lock.unlock();
            }
        }
    }
}

Please feel free to use it if you find yourself in the same situation as me.

Advertisements

6 thoughts on “Executor that maintains execution order by group

  1. There is a possibility that two tasks with the same key will be processed simultaneously. Consider situation where run() method polled task from queue, queue become empty but task execution actually is in process. So, in that moment add() method can fire execution of one more run() for the same very key, because queue was empty.

    What you might need is to only peek queue, not poll in run(), and remove head of queue only after runnable.run() was completed.

  2. i believe the tasks map will leak for the life of the executor, it never gets removed after all the tasks have been executed (the task is still there but with an empty queue)

    • That’s a valid comment. The original intend of the class is not for infinite set of keys. It was designed to handle server side network protocol in a way that the key represents an IO thread, which comes from a fixed size thread pool.
      It’s easy enough to modify it to support infinite set of keys by simply passing the key to the Task class and removing itself from the tasks map if the queue is empty in the run() method (line 104).

      • That was my initial thought as well, but I think there would still be a very tight race condition if something gets submitted while its being removed, since only the submit is synchronized. I ended up avoiding the issue by adding synchronized add/remove key methods that limit access to the tasks map.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s