001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.ra; 018 019 import java.io.Serializable; 020 021 import javax.jms.BytesMessage; 022 import javax.jms.Destination; 023 import javax.jms.JMSException; 024 import javax.jms.MapMessage; 025 import javax.jms.Message; 026 import javax.jms.MessageConsumer; 027 import javax.jms.MessageListener; 028 import javax.jms.MessageProducer; 029 import javax.jms.ObjectMessage; 030 import javax.jms.Queue; 031 import javax.jms.QueueBrowser; 032 import javax.jms.QueueReceiver; 033 import javax.jms.QueueSender; 034 import javax.jms.QueueSession; 035 import javax.jms.Session; 036 import javax.jms.StreamMessage; 037 import javax.jms.TemporaryQueue; 038 import javax.jms.TemporaryTopic; 039 import javax.jms.TextMessage; 040 import javax.jms.Topic; 041 import javax.jms.TopicPublisher; 042 import javax.jms.TopicSession; 043 import javax.jms.TopicSubscriber; 044 045 /** 046 * A {@link Session} implementation which can be used with the ActiveMQ JCA 047 * Resource Adapter to publish messages using the same JMS session that is used 048 * to dispatch messages. 049 * 050 * @version $Revision$ 051 */ 052 public class InboundSessionProxy implements Session, QueueSession, TopicSession { 053 054 private InboundContext sessionAndProducer; 055 056 public Session getSession() throws JMSException { 057 return getSessionAndProducer().getSession(); 058 } 059 060 public QueueSession getQueueSession() throws JMSException { 061 Session session = getSession(); 062 if (session instanceof QueueSession) { 063 return (QueueSession)session; 064 } else { 065 throw new JMSException("The underlying JMS Session does not support QueueSession semantics: " + session); 066 } 067 } 068 069 public TopicSession getTopicSession() throws JMSException { 070 Session session = getSession(); 071 if (session instanceof TopicSession) { 072 return (TopicSession)session; 073 } else { 074 throw new JMSException("The underlying JMS Session does not support TopicSession semantics: " + session); 075 } 076 } 077 078 public InboundContext getSessionAndProducer() throws JMSException { 079 if (sessionAndProducer == null) { 080 sessionAndProducer = InboundContextSupport.getActiveSessionAndProducer(); 081 if (sessionAndProducer == null) { 082 throw new JMSException("No currently active Session. This JMS provider cannot be used outside a MessageListener.onMessage() invocation"); 083 } 084 } 085 return sessionAndProducer; 086 } 087 088 public MessageProducer createProducer(Destination destination) throws JMSException { 089 return new InboundMessageProducerProxy(getSessionAndProducer().getMessageProducer(), destination); 090 } 091 092 public void close() throws JMSException { 093 // we don't allow users to close this session 094 // as its used by the JCA container 095 } 096 097 public void commit() throws JMSException { 098 // the JCA container will handle transactions 099 } 100 101 public void rollback() throws JMSException { 102 // the JCA container will handle transactions 103 } 104 105 public void recover() throws JMSException { 106 // the JCA container will handle recovery 107 } 108 109 public void run() { 110 try { 111 getSession().run(); 112 } catch (JMSException e) { 113 throw new RuntimeException("Failed to run() on session due to: " + e, e); 114 } 115 } 116 117 // Straightforward delegation methods 118 // ------------------------------------------------------------------------- 119 120 public QueueBrowser createBrowser(Queue queue) throws JMSException { 121 return getSession().createBrowser(queue); 122 } 123 124 public QueueBrowser createBrowser(Queue queue, String s) throws JMSException { 125 return getSession().createBrowser(queue, s); 126 } 127 128 public BytesMessage createBytesMessage() throws JMSException { 129 return getSession().createBytesMessage(); 130 } 131 132 public MessageConsumer createConsumer(Destination destination) throws JMSException { 133 return getSession().createConsumer(destination); 134 } 135 136 public MessageConsumer createConsumer(Destination destination, String s) throws JMSException { 137 return getSession().createConsumer(destination, s); 138 } 139 140 public MessageConsumer createConsumer(Destination destination, String s, boolean b) throws JMSException { 141 return getSession().createConsumer(destination, s, b); 142 } 143 144 public TopicSubscriber createDurableSubscriber(Topic topic, String s) throws JMSException { 145 return getSession().createDurableSubscriber(topic, s); 146 } 147 148 public TopicSubscriber createDurableSubscriber(Topic topic, String s, String s1, boolean b) throws JMSException { 149 return getSession().createDurableSubscriber(topic, s, s1, b); 150 } 151 152 public MapMessage createMapMessage() throws JMSException { 153 return getSession().createMapMessage(); 154 } 155 156 public Message createMessage() throws JMSException { 157 return getSession().createMessage(); 158 } 159 160 public ObjectMessage createObjectMessage() throws JMSException { 161 return getSession().createObjectMessage(); 162 } 163 164 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { 165 return getSession().createObjectMessage(serializable); 166 } 167 168 public Queue createQueue(String s) throws JMSException { 169 return getSession().createQueue(s); 170 } 171 172 public StreamMessage createStreamMessage() throws JMSException { 173 return getSession().createStreamMessage(); 174 } 175 176 public TemporaryQueue createTemporaryQueue() throws JMSException { 177 return getSession().createTemporaryQueue(); 178 } 179 180 public TemporaryTopic createTemporaryTopic() throws JMSException { 181 return getSession().createTemporaryTopic(); 182 } 183 184 public TextMessage createTextMessage() throws JMSException { 185 return getSession().createTextMessage(); 186 } 187 188 public TextMessage createTextMessage(String s) throws JMSException { 189 return getSession().createTextMessage(s); 190 } 191 192 public Topic createTopic(String s) throws JMSException { 193 return getSession().createTopic(s); 194 } 195 196 public int getAcknowledgeMode() throws JMSException { 197 return getSession().getAcknowledgeMode(); 198 } 199 200 public MessageListener getMessageListener() throws JMSException { 201 return getSession().getMessageListener(); 202 } 203 204 public boolean getTransacted() throws JMSException { 205 return getSession().getTransacted(); 206 } 207 208 public void setMessageListener(MessageListener messageListener) throws JMSException { 209 getSession().setMessageListener(messageListener); 210 } 211 212 public void unsubscribe(String s) throws JMSException { 213 getSession().unsubscribe(s); 214 } 215 216 public QueueReceiver createReceiver(Queue queue) throws JMSException { 217 return getQueueSession().createReceiver(queue); 218 } 219 220 public QueueReceiver createReceiver(Queue queue, String s) throws JMSException { 221 return getQueueSession().createReceiver(queue, s); 222 } 223 224 public QueueSender createSender(Queue queue) throws JMSException { 225 return new InboundMessageProducerProxy(getSessionAndProducer().getMessageProducer(), queue); 226 } 227 228 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 229 return getTopicSession().createSubscriber(topic); 230 } 231 232 public TopicSubscriber createSubscriber(Topic topic, String s, boolean b) throws JMSException { 233 return getTopicSession().createSubscriber(topic, s, b); 234 } 235 236 public TopicPublisher createPublisher(Topic topic) throws JMSException { 237 return getTopicSession().createPublisher(topic); 238 } 239 240 public String toString() { 241 try { 242 return "InboundSessionProxy { " + getSession() + " }"; 243 } catch (JMSException e) { 244 return "InboundSessionProxy { null }"; 245 } 246 } 247 248 }