001 /** 002 * Copyright (C) 2012 FuseSource, Inc. 003 * http://fusesource.com 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.fusesource.hawtdispatch.transport; 019 020 import org.fusesource.hawtdispatch.Dispatch; 021 import org.fusesource.hawtdispatch.Task; 022 023 import java.util.concurrent.TimeUnit; 024 025 /** 026 * <p>A HeartBeatMonitor can be used to watch the read and write 027 * activity of a transport and raise events when the write side 028 * or read side has been idle too long.</p> 029 * 030 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 031 */ 032 public class HeartBeatMonitor { 033 034 Transport transport; 035 long initialWriteCheckDelay; 036 long initialReadCheckDelay; 037 long writeInterval; 038 long readInterval; 039 040 Task onKeepAlive = Dispatch.NOOP; 041 Task onDead = Dispatch.NOOP; 042 043 short session = 0; 044 045 boolean readSuspendedInterval; 046 short readSuspendCount; 047 048 public void suspendRead() { 049 readSuspendCount++; 050 readSuspendedInterval = true; 051 } 052 053 public void resumeRead() { 054 readSuspendCount--; 055 } 056 057 private void schedule(final short session, long interval, final Task func) { 058 if (this.session == session) { 059 transport.getDispatchQueue().executeAfter(interval, TimeUnit.MILLISECONDS, new Task() { 060 public void run() { 061 if (HeartBeatMonitor.this.session == session) { 062 func.run(); 063 } 064 } 065 }); 066 } 067 } 068 069 private void scheduleCheckWrites(final short session) { 070 final ProtocolCodec codec = transport.getProtocolCodec(); 071 Task func; 072 if (codec == null) { 073 func = new Task() { 074 public void run() { 075 scheduleCheckWrites(session); 076 } 077 }; 078 } else { 079 final long lastWriteCounter = codec.getWriteCounter(); 080 func = new Task() { 081 public void run() { 082 if (lastWriteCounter == codec.getWriteCounter()) { 083 onKeepAlive.run(); 084 } 085 scheduleCheckWrites(session); 086 } 087 }; 088 } 089 schedule(session, writeInterval, func); 090 } 091 092 private void scheduleCheckReads(final short session) { 093 final ProtocolCodec codec = transport.getProtocolCodec(); 094 Task func; 095 if (codec == null) { 096 func = new Task() { 097 public void run() { 098 scheduleCheckReads(session); 099 } 100 }; 101 } else { 102 final long lastReadCounter = codec.getReadCounter(); 103 func = new Task() { 104 public void run() { 105 if (lastReadCounter == codec.getReadCounter() && !readSuspendedInterval && readSuspendCount == 0) { 106 onDead.run(); 107 } 108 readSuspendedInterval = false; 109 scheduleCheckReads(session); 110 } 111 }; 112 } 113 schedule(session, readInterval, func); 114 } 115 116 public void start() { 117 session++; 118 readSuspendedInterval = false; 119 if (writeInterval != 0) { 120 if (initialWriteCheckDelay != 0) { 121 transport.getDispatchQueue().executeAfter(initialWriteCheckDelay, TimeUnit.MILLISECONDS, new Task() { 122 public void run() { 123 scheduleCheckWrites(session); 124 } 125 }); 126 } else { 127 scheduleCheckWrites(session); 128 } 129 } 130 if (readInterval != 0) { 131 if (initialReadCheckDelay != 0) { 132 transport.getDispatchQueue().executeAfter(initialReadCheckDelay, TimeUnit.MILLISECONDS, new Task() { 133 public void run() { 134 scheduleCheckReads(session); 135 } 136 }); 137 } else { 138 scheduleCheckReads(session); 139 } 140 } 141 } 142 143 public void stop() { 144 session++; 145 } 146 147 148 public long getInitialReadCheckDelay() { 149 return initialReadCheckDelay; 150 } 151 152 public void setInitialReadCheckDelay(long initialReadCheckDelay) { 153 this.initialReadCheckDelay = initialReadCheckDelay; 154 } 155 156 public long getInitialWriteCheckDelay() { 157 return initialWriteCheckDelay; 158 } 159 160 public void setInitialWriteCheckDelay(long initialWriteCheckDelay) { 161 this.initialWriteCheckDelay = initialWriteCheckDelay; 162 } 163 164 public Task getOnDead() { 165 return onDead; 166 } 167 168 public void setOnDead(Task onDead) { 169 this.onDead = onDead; 170 } 171 172 public Task getOnKeepAlive() { 173 return onKeepAlive; 174 } 175 176 public void setOnKeepAlive(Task onKeepAlive) { 177 this.onKeepAlive = onKeepAlive; 178 } 179 180 public long getWriteInterval() { 181 return writeInterval; 182 } 183 184 public void setWriteInterval(long writeInterval) { 185 this.writeInterval = writeInterval; 186 } 187 188 public Transport getTransport() { 189 return transport; 190 } 191 192 public void setTransport(Transport transport) { 193 this.transport = transport; 194 } 195 196 public long getReadInterval() { 197 return readInterval; 198 } 199 200 public void setReadInterval(long readInterval) { 201 this.readInterval = readInterval; 202 } 203 }