001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.vm; 018 019 import java.io.IOException; 020 import java.io.InterruptedIOException; 021 import java.net.URI; 022 import java.util.concurrent.LinkedBlockingQueue; 023 import java.util.concurrent.atomic.AtomicBoolean; 024 import java.util.concurrent.atomic.AtomicLong; 025 026 import org.apache.activemq.thread.Task; 027 import org.apache.activemq.thread.TaskRunner; 028 import org.apache.activemq.thread.TaskRunnerFactory; 029 import org.apache.activemq.thread.Valve; 030 import org.apache.activemq.transport.FutureResponse; 031 import org.apache.activemq.transport.ResponseCallback; 032 import org.apache.activemq.transport.Transport; 033 import org.apache.activemq.transport.TransportDisposedIOException; 034 import org.apache.activemq.transport.TransportListener; 035 import org.apache.activemq.util.IOExceptionSupport; 036 037 038 /** 039 * A Transport implementation that uses direct method invocations. 040 * 041 * @version $Revision$ 042 */ 043 public class VMTransport implements Transport, Task { 044 045 private static final Object DISCONNECT = new Object(); 046 private static final AtomicLong NEXT_ID = new AtomicLong(0); 047 // still possible to configure dedicated task runner through system property but not programmatically 048 private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false); 049 protected VMTransport peer; 050 protected TransportListener transportListener; 051 protected boolean disposed; 052 protected boolean marshal; 053 protected boolean network; 054 protected boolean async = true; 055 protected int asyncQueueDepth = 2000; 056 protected LinkedBlockingQueue<Object> messageQueue; 057 protected boolean started; 058 protected final URI location; 059 protected final long id; 060 private TaskRunner taskRunner; 061 private final Object lazyInitMutext = new Object(); 062 private final Valve enqueueValve = new Valve(true); 063 private final AtomicBoolean stopping = new AtomicBoolean(); 064 private volatile int receiveCounter; 065 066 public VMTransport(URI location) { 067 this.location = location; 068 this.id = NEXT_ID.getAndIncrement(); 069 } 070 071 public void setPeer(VMTransport peer) { 072 this.peer = peer; 073 } 074 075 public void oneway(Object command) throws IOException { 076 if (disposed) { 077 throw new TransportDisposedIOException("Transport disposed."); 078 } 079 if (peer == null) { 080 throw new IOException("Peer not connected."); 081 } 082 083 084 TransportListener transportListener=null; 085 try { 086 // Disable the peer from changing his state while we try to enqueue onto him. 087 peer.enqueueValve.increment(); 088 089 if (peer.disposed || peer.stopping.get()) { 090 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); 091 } 092 093 if (peer.started) { 094 if (peer.async) { 095 peer.getMessageQueue().put(command); 096 peer.wakeup(); 097 } else { 098 transportListener = peer.transportListener; 099 } 100 } else { 101 peer.getMessageQueue().put(command); 102 } 103 104 } catch (InterruptedException e) { 105 InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); 106 iioe.initCause(e); 107 throw iioe; 108 } finally { 109 // Allow the peer to change state again... 110 peer.enqueueValve.decrement(); 111 } 112 113 if( transportListener!=null ) { 114 if( command == DISCONNECT ) { 115 transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); 116 } else { 117 peer.receiveCounter++; 118 transportListener.onCommand(command); 119 } 120 } 121 } 122 123 public void start() throws Exception { 124 if (transportListener == null) { 125 throw new IOException("TransportListener not set."); 126 } 127 try { 128 enqueueValve.turnOff(); 129 if (messageQueue != null && !async) { 130 Object command; 131 while ((command = messageQueue.poll()) != null && !stopping.get() ) { 132 receiveCounter++; 133 transportListener.onCommand(command); 134 } 135 } 136 started = true; 137 wakeup(); 138 } finally { 139 enqueueValve.turnOn(); 140 } 141 // If we get stopped while starting up, then do the actual stop now 142 // that the enqueueValve is back on. 143 if( stopping.get() ) { 144 stop(); 145 } 146 } 147 148 public void stop() throws Exception { 149 stopping.set(true); 150 151 // If stop() is called while being start()ed.. then we can't stop until we return to the start() method. 152 if( enqueueValve.isOn() ) { 153 154 TaskRunner tr = null; 155 try { 156 enqueueValve.turnOff(); 157 if (!disposed) { 158 started = false; 159 disposed = true; 160 if (taskRunner != null) { 161 tr = taskRunner; 162 taskRunner = null; 163 } 164 } 165 } finally { 166 stopping.set(false); 167 enqueueValve.turnOn(); 168 } 169 if (tr != null) { 170 tr.shutdown(1000); 171 } 172 // let the peer know that we are disconnecting.. 173 try { 174 oneway(DISCONNECT); 175 } catch (Exception ignore) { 176 } 177 } 178 } 179 180 /** 181 * @see org.apache.activemq.thread.Task#iterate() 182 */ 183 public boolean iterate() { 184 185 final TransportListener tl; 186 try { 187 // Disable changing the state variables while we are running... 188 enqueueValve.increment(); 189 tl = transportListener; 190 if (!started || disposed || tl == null || stopping.get()) { 191 if( stopping.get() ) { 192 // drain the queue it since folks could be blocked putting on to 193 // it and that would not allow the stop() method for finishing up. 194 getMessageQueue().clear(); 195 } 196 return false; 197 } 198 } catch (InterruptedException e) { 199 return false; 200 } finally { 201 enqueueValve.decrement(); 202 } 203 204 LinkedBlockingQueue<Object> mq = getMessageQueue(); 205 Object command = mq.poll(); 206 if (command != null) { 207 if( command == DISCONNECT ) { 208 tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); 209 } else { 210 tl.onCommand(command); 211 } 212 return !mq.isEmpty(); 213 } else { 214 return false; 215 } 216 217 } 218 219 public void setTransportListener(TransportListener commandListener) { 220 try { 221 try { 222 enqueueValve.turnOff(); 223 this.transportListener = commandListener; 224 wakeup(); 225 } finally { 226 enqueueValve.turnOn(); 227 } 228 } catch (InterruptedException e) { 229 throw new RuntimeException(e); 230 } 231 } 232 233 private LinkedBlockingQueue<Object> getMessageQueue() { 234 synchronized (lazyInitMutext) { 235 if (messageQueue == null) { 236 messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth); 237 } 238 return messageQueue; 239 } 240 } 241 242 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 243 throw new AssertionError("Unsupported Method"); 244 } 245 246 public Object request(Object command) throws IOException { 247 throw new AssertionError("Unsupported Method"); 248 } 249 250 public Object request(Object command, int timeout) throws IOException { 251 throw new AssertionError("Unsupported Method"); 252 } 253 254 public TransportListener getTransportListener() { 255 return transportListener; 256 } 257 258 public <T> T narrow(Class<T> target) { 259 if (target.isAssignableFrom(getClass())) { 260 return target.cast(this); 261 } 262 return null; 263 } 264 265 public boolean isMarshal() { 266 return marshal; 267 } 268 269 public void setMarshal(boolean marshal) { 270 this.marshal = marshal; 271 } 272 273 public boolean isNetwork() { 274 return network; 275 } 276 277 public void setNetwork(boolean network) { 278 this.network = network; 279 } 280 281 public String toString() { 282 return location + "#" + id; 283 } 284 285 public String getRemoteAddress() { 286 if (peer != null) { 287 return peer.toString(); 288 } 289 return null; 290 } 291 292 /** 293 * @return the async 294 */ 295 public boolean isAsync() { 296 return async; 297 } 298 299 /** 300 * @param async the async to set 301 */ 302 public void setAsync(boolean async) { 303 this.async = async; 304 } 305 306 /** 307 * @return the asyncQueueDepth 308 */ 309 public int getAsyncQueueDepth() { 310 return asyncQueueDepth; 311 } 312 313 /** 314 * @param asyncQueueDepth the asyncQueueDepth to set 315 */ 316 public void setAsyncQueueDepth(int asyncQueueDepth) { 317 this.asyncQueueDepth = asyncQueueDepth; 318 } 319 320 protected void wakeup() { 321 if (async) { 322 synchronized (lazyInitMutext) { 323 if (taskRunner == null) { 324 taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, "VMTransport: " + toString()); 325 } 326 } 327 try { 328 taskRunner.wakeup(); 329 } catch (InterruptedException e) { 330 Thread.currentThread().interrupt(); 331 } 332 } 333 } 334 335 public boolean isFaultTolerant() { 336 return false; 337 } 338 339 public boolean isDisposed() { 340 return disposed; 341 } 342 343 public boolean isConnected() { 344 return started; 345 } 346 347 public void reconnect(URI uri) throws IOException { 348 throw new IOException("Not supported"); 349 } 350 351 public int getReceiveCounter() { 352 return receiveCounter; 353 } 354 }