001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport; 018 019 import java.io.IOException; 020 import java.net.URI; 021 022 import org.apache.activemq.util.ServiceSupport; 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 026 /** 027 * A useful base class for transport implementations. 028 * 029 * @version $Revision: 1.1 $ 030 */ 031 public abstract class TransportSupport extends ServiceSupport implements Transport { 032 private static final Log LOG = LogFactory.getLog(TransportSupport.class); 033 034 TransportListener transportListener; 035 036 /** 037 * Returns the current transport listener 038 */ 039 public TransportListener getTransportListener() { 040 return transportListener; 041 } 042 043 /** 044 * Registers an inbound command listener 045 * 046 * @param commandListener 047 */ 048 public void setTransportListener(TransportListener commandListener) { 049 this.transportListener = commandListener; 050 } 051 052 /** 053 * narrow acceptance 054 * 055 * @param target 056 * @return 'this' if assignable 057 */ 058 public <T> T narrow(Class<T> target) { 059 boolean assignableFrom = target.isAssignableFrom(getClass()); 060 if (assignableFrom) { 061 return target.cast(this); 062 } 063 return null; 064 } 065 066 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 067 throw new AssertionError("Unsupported Method"); 068 } 069 070 public Object request(Object command) throws IOException { 071 throw new AssertionError("Unsupported Method"); 072 } 073 074 public Object request(Object command, int timeout) throws IOException { 075 throw new AssertionError("Unsupported Method"); 076 } 077 078 /** 079 * Process the inbound command 080 */ 081 public void doConsume(Object command) { 082 if (command != null) { 083 if (transportListener != null) { 084 transportListener.onCommand(command); 085 } else { 086 LOG.error("No transportListener available to process inbound command: " + command); 087 } 088 } 089 } 090 091 /** 092 * Passes any IO exceptions into the transport listener 093 */ 094 public void onException(IOException e) { 095 if (transportListener != null) { 096 try { 097 transportListener.onException(e); 098 } catch (RuntimeException e2) { 099 // Handle any unexpected runtime exceptions by debug logging them. 100 LOG.debug("Unexpected runtime exception: "+e2, e2); 101 } 102 } 103 } 104 105 protected void checkStarted() throws IOException { 106 if (!isStarted()) { 107 throw new IOException("The transport is not running."); 108 } 109 } 110 111 public boolean isFaultTolerant() { 112 return false; 113 } 114 115 116 public void reconnect(URI uri) throws IOException { 117 throw new IOException("Not supported"); 118 } 119 120 public boolean isDisposed() { 121 return isStopped(); 122 } 123 124 public boolean isConnected() { 125 return isStarted(); 126 } 127 128 }