001    /**
002     * Copyright (C) 2012 FuseSource, Inc.
003     * http://fusesource.com
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     *    http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.fusesource.hawtdispatch;
019    
020    import java.util.LinkedList;
021    import java.util.concurrent.Executor;
022    
023    import static org.fusesource.hawtdispatch.Dispatch.*;
024    
025    /**
026     * Sends runnable tasks to a DispatchQueue via a an EventAggregator
027     * so that they first batch up on the sender side before being
028     * sent to the DispatchQueue which then executes that tasks.
029     *
030     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031     */
032    public class AggregatingExecutor implements Executor {
033    
034        final DispatchQueue queue;
035        final CustomDispatchSource<Runnable, LinkedList<Runnable>> source;
036    
037        public AggregatingExecutor(DispatchQueue queue) {
038            this.queue = queue;
039            this.source = createSource(EventAggregators.<Runnable>linkedList(), queue);
040            this.source.setEventHandler(new Task() {
041                public void run() {
042                    for (Runnable runnable: source.getData() ) {
043                        try {
044                            runnable.run();
045                        } catch (Exception e) {
046                          Thread thread = Thread.currentThread();
047                          thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
048                        }
049                    }
050                }
051            });
052            this.source.resume();
053        }
054    
055    
056        public void suspend() {
057            source.suspend();
058        }
059    
060        public void resume() {
061            source.resume();
062        }
063    
064        public void execute(Runnable task) {
065            if (getCurrentQueue() == null) {
066                queue.execute(new TaskWrapper(task));
067            } else {
068                source.merge(task);
069            }
070        }
071    
072    }