001    /**
002     * Copyright (C) 2012 FuseSource, Inc.
003     * http://fusesource.com
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     *    http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.fusesource.hawtdispatch.transport;
019    
020    import org.fusesource.hawtdispatch.Dispatch;
021    import org.fusesource.hawtdispatch.Task;
022    
023    import java.util.concurrent.TimeUnit;
024    
025    /**
026     * <p>A HeartBeatMonitor can be used to watch the read and write
027     * activity of a transport and raise events when the write side
028     * or read side has been idle too long.</p>
029     *
030     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
031     */
032    public class HeartBeatMonitor {
033    
034        Transport transport;
035        long initialWriteCheckDelay;
036        long initialReadCheckDelay;
037        long writeInterval;
038        long readInterval;
039    
040        Task onKeepAlive = Dispatch.NOOP;
041        Task onDead = Dispatch.NOOP;
042    
043        short session = 0;
044    
045        boolean readSuspendedInterval;
046        short readSuspendCount;
047    
048        public void suspendRead() {
049            readSuspendCount++;
050            readSuspendedInterval = true;
051        }
052    
053        public void resumeRead() {
054            readSuspendCount--;
055        }
056    
057        private void schedule(final short session, long interval, final Task func) {
058            if (this.session == session) {
059                transport.getDispatchQueue().executeAfter(interval, TimeUnit.MILLISECONDS, new Task() {
060                    public void run() {
061                        if (HeartBeatMonitor.this.session == session) {
062                            func.run();
063                        }
064                    }
065                });
066            }
067        }
068    
069        private void scheduleCheckWrites(final short session) {
070            final ProtocolCodec codec = transport.getProtocolCodec();
071            Task func;
072            if (codec == null) {
073                func = new Task() {
074                    public void run() {
075                        scheduleCheckWrites(session);
076                    }
077                };
078            } else {
079                final long lastWriteCounter = codec.getWriteCounter();
080                func = new Task() {
081                    public void run() {
082                        if (lastWriteCounter == codec.getWriteCounter()) {
083                            onKeepAlive.run();
084                        }
085                        scheduleCheckWrites(session);
086                    }
087                };
088            }
089            schedule(session, writeInterval, func);
090        }
091    
092        private void scheduleCheckReads(final short session) {
093            final ProtocolCodec codec = transport.getProtocolCodec();
094            Task func;
095            if (codec == null) {
096                func = new Task() {
097                    public void run() {
098                        scheduleCheckReads(session);
099                    }
100                };
101            } else {
102                final long lastReadCounter = codec.getReadCounter();
103                func = new Task() {
104                    public void run() {
105                        if (lastReadCounter == codec.getReadCounter() && !readSuspendedInterval && readSuspendCount == 0) {
106                            onDead.run();
107                        }
108                        readSuspendedInterval = false;
109                        scheduleCheckReads(session);
110                    }
111                };
112            }
113            schedule(session, readInterval, func);
114        }
115    
116        public void start() {
117            session++;
118            readSuspendedInterval = false;
119            if (writeInterval != 0) {
120                if (initialWriteCheckDelay != 0) {
121                    transport.getDispatchQueue().executeAfter(initialWriteCheckDelay, TimeUnit.MILLISECONDS, new Task() {
122                        public void run() {
123                            scheduleCheckWrites(session);
124                        }
125                    });
126                } else {
127                    scheduleCheckWrites(session);
128                }
129            }
130            if (readInterval != 0) {
131                if (initialReadCheckDelay != 0) {
132                    transport.getDispatchQueue().executeAfter(initialReadCheckDelay, TimeUnit.MILLISECONDS, new Task() {
133                        public void run() {
134                            scheduleCheckReads(session);
135                        }
136                    });
137                } else {
138                    scheduleCheckReads(session);
139                }
140            }
141        }
142    
143        public void stop() {
144            session++;
145        }
146    
147    
148        public long getInitialReadCheckDelay() {
149            return initialReadCheckDelay;
150        }
151    
152        public void setInitialReadCheckDelay(long initialReadCheckDelay) {
153            this.initialReadCheckDelay = initialReadCheckDelay;
154        }
155    
156        public long getInitialWriteCheckDelay() {
157            return initialWriteCheckDelay;
158        }
159    
160        public void setInitialWriteCheckDelay(long initialWriteCheckDelay) {
161            this.initialWriteCheckDelay = initialWriteCheckDelay;
162        }
163    
164        public Task getOnDead() {
165            return onDead;
166        }
167    
168        public void setOnDead(Task onDead) {
169            this.onDead = onDead;
170        }
171    
172        public Task getOnKeepAlive() {
173            return onKeepAlive;
174        }
175    
176        public void setOnKeepAlive(Task onKeepAlive) {
177            this.onKeepAlive = onKeepAlive;
178        }
179    
180        public long getWriteInterval() {
181            return writeInterval;
182        }
183    
184        public void setWriteInterval(long writeInterval) {
185            this.writeInterval = writeInterval;
186        }
187    
188        public Transport getTransport() {
189            return transport;
190        }
191    
192        public void setTransport(Transport transport) {
193            this.transport = transport;
194        }
195    
196        public long getReadInterval() {
197            return readInterval;
198        }
199    
200        public void setReadInterval(long readInterval) {
201            this.readInterval = readInterval;
202        }
203    }