001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.io.OutputStreamWriter;
021    import java.io.PrintWriter;
022    import java.util.HashMap;
023    import java.util.Iterator;
024    import java.util.Map;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQMessage;
032    import org.apache.activemq.command.ActiveMQTempQueue;
033    import org.apache.activemq.command.ActiveMQTempTopic;
034    import org.apache.activemq.command.Command;
035    import org.apache.activemq.command.ConnectionError;
036    import org.apache.activemq.command.ConnectionId;
037    import org.apache.activemq.command.ConnectionInfo;
038    import org.apache.activemq.command.ConsumerId;
039    import org.apache.activemq.command.ConsumerInfo;
040    import org.apache.activemq.command.DestinationInfo;
041    import org.apache.activemq.command.ExceptionResponse;
042    import org.apache.activemq.command.LocalTransactionId;
043    import org.apache.activemq.command.MessageAck;
044    import org.apache.activemq.command.MessageDispatch;
045    import org.apache.activemq.command.MessageId;
046    import org.apache.activemq.command.ProducerId;
047    import org.apache.activemq.command.ProducerInfo;
048    import org.apache.activemq.command.RemoveSubscriptionInfo;
049    import org.apache.activemq.command.Response;
050    import org.apache.activemq.command.SessionId;
051    import org.apache.activemq.command.SessionInfo;
052    import org.apache.activemq.command.ShutdownInfo;
053    import org.apache.activemq.command.TransactionId;
054    import org.apache.activemq.command.TransactionInfo;
055    import org.apache.activemq.util.ByteArrayOutputStream;
056    import org.apache.activemq.util.FactoryFinder;
057    import org.apache.activemq.util.IOExceptionSupport;
058    import org.apache.activemq.util.IdGenerator;
059    import org.apache.activemq.util.IntrospectionSupport;
060    import org.apache.activemq.util.LongSequenceGenerator;
061    import org.springframework.context.ApplicationContext;
062    import org.springframework.context.ApplicationContextAware;
063    
064    /**
065     * @author <a href="http://hiramchirino.com">chirino</a>
066     */
067    public class ProtocolConverter {
068    
069        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
070    
071        private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
072        private final SessionId sessionId = new SessionId(connectionId, -1);
073        private final ProducerId producerId = new ProducerId(sessionId, 1);
074    
075        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
076        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
077        private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
078        private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
079    
080        private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
081        private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
082        private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
083        private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
084        private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
085        private final StompTransportFilter transportFilter;
086    
087        private final Object commnadIdMutex = new Object();
088        private int lastCommandId;
089        private final AtomicBoolean connected = new AtomicBoolean(false);
090        private final FrameTranslator frameTranslator;
091        private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
092        private final ApplicationContext applicationContext;
093    
094        public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator, ApplicationContext applicationContext) {
095            this.transportFilter = stompTransportFilter;
096            this.frameTranslator = translator;
097            this.applicationContext = applicationContext;
098        }
099    
100        protected int generateCommandId() {
101            synchronized (commnadIdMutex) {
102                return lastCommandId++;
103            }
104        }
105    
106        protected ResponseHandler createResponseHandler(final StompFrame command) {
107            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
108            if (receiptId != null) {
109                return new ResponseHandler() {
110                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
111                        if (response.isException()) {
112                            // Generally a command can fail.. but that does not invalidate the connection.
113                            // We report back the failure but we don't close the connection.
114                            Throwable exception = ((ExceptionResponse)response).getException();
115                            handleException(exception, command);
116                        } else {
117                            StompFrame sc = new StompFrame();
118                            sc.setAction(Stomp.Responses.RECEIPT);
119                            sc.setHeaders(new HashMap<String, String>(1));
120                            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
121                            transportFilter.sendToStomp(sc);
122                        }
123                    }
124                };
125            }
126            return null;
127        }
128    
129        protected void sendToActiveMQ(Command command, ResponseHandler handler) {
130            command.setCommandId(generateCommandId());
131            if (handler != null) {
132                command.setResponseRequired(true);
133                resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
134            }
135            transportFilter.sendToActiveMQ(command);
136        }
137    
138        protected void sendToStomp(StompFrame command) throws IOException {
139            transportFilter.sendToStomp(command);
140        }
141        
142        protected FrameTranslator findTranslator(String header) {
143                    FrameTranslator translator = frameTranslator;
144                    try {
145                            if (header != null) {
146                                    translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
147                                                    .newInstance(header);
148                                    if (translator instanceof ApplicationContextAware) {
149                                            ((ApplicationContextAware)translator).setApplicationContext(applicationContext);
150                                    }
151                            }
152                    } catch (Exception ignore) {
153                            // if anything goes wrong use the default translator
154                    }
155                    
156                    return translator;
157            }
158    
159        /**
160             * Convert a stomp command
161             * 
162             * @param command
163             */
164        public void onStompCommand(StompFrame command) throws IOException, JMSException {
165            try {
166    
167                if (command.getClass() == StompFrameError.class) {
168                    throw ((StompFrameError)command).getException();
169                }
170    
171                String action = command.getAction();
172                if (action.startsWith(Stomp.Commands.SEND)) {
173                    onStompSend(command);
174                } else if (action.startsWith(Stomp.Commands.ACK)) {
175                    onStompAck(command);
176                } else if (action.startsWith(Stomp.Commands.BEGIN)) {
177                    onStompBegin(command);
178                } else if (action.startsWith(Stomp.Commands.COMMIT)) {
179                    onStompCommit(command);
180                } else if (action.startsWith(Stomp.Commands.ABORT)) {
181                    onStompAbort(command);
182                } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
183                    onStompSubscribe(command);
184                } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
185                    onStompUnsubscribe(command);
186                } else if (action.startsWith(Stomp.Commands.CONNECT)) {
187                    onStompConnect(command);
188                } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
189                    onStompDisconnect(command);
190                } else {
191                    throw new ProtocolException("Unknown STOMP action: " + action);
192                }
193    
194            } catch (ProtocolException e) {
195                handleException(e, command);
196                // Some protocol errors can cause the connection to get closed.
197                if( e.isFatal() ) {
198                   getTransportFilter().onException(e);
199                }
200            }
201        }
202        
203        protected void handleException(Throwable exception, StompFrame command) throws IOException {
204            // Let the stomp client know about any protocol errors.
205            ByteArrayOutputStream baos = new ByteArrayOutputStream();
206            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
207            exception.printStackTrace(stream);
208            stream.close();
209    
210            HashMap<String, String> headers = new HashMap<String, String>();
211            headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
212    
213            if (command != null) {
214                    final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
215                    if (receiptId != null) {
216                            headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
217                    }
218            }
219    
220            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
221            sendToStomp(errorMessage);
222        }
223    
224        protected void onStompSend(StompFrame command) throws IOException, JMSException {
225            checkConnected();
226    
227            Map<String, String> headers = command.getHeaders();
228            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
229            headers.remove("transaction");
230    
231            ActiveMQMessage message = convertMessage(command);
232    
233            message.setProducerId(producerId);
234            MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
235            message.setMessageId(id);
236            message.setJMSTimestamp(System.currentTimeMillis());
237    
238            if (stompTx != null) {
239                TransactionId activemqTx = transactions.get(stompTx);
240                if (activemqTx == null) {
241                    throw new ProtocolException("Invalid transaction id: " + stompTx);
242                }
243                message.setTransactionId(activemqTx);
244            }
245    
246            message.onSend();
247            sendToActiveMQ(message, createResponseHandler(command));
248    
249        }
250    
251        protected void onStompAck(StompFrame command) throws ProtocolException {
252            checkConnected();
253    
254            // TODO: acking with just a message id is very bogus
255            // since the same message id could have been sent to 2 different
256            // subscriptions
257            // on the same stomp connection. For example, when 2 subs are created on
258            // the same topic.
259    
260            Map<String, String> headers = command.getHeaders();
261            String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
262            if (messageId == null) {
263                throw new ProtocolException("ACK received without a message-id to acknowledge!");
264            }
265    
266            TransactionId activemqTx = null;
267            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
268            if (stompTx != null) {
269                activemqTx = transactions.get(stompTx);
270                if (activemqTx == null) {
271                    throw new ProtocolException("Invalid transaction id: " + stompTx);
272                }
273            }
274    
275            boolean acked = false;
276            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
277                StompSubscription sub = iter.next();
278                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
279                if (ack != null) {
280                    ack.setTransactionId(activemqTx);
281                    sendToActiveMQ(ack, createResponseHandler(command));
282                    acked = true;
283                    break;
284                }
285            }
286    
287            if (!acked) {
288                throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
289            }
290    
291        }
292    
293        protected void onStompBegin(StompFrame command) throws ProtocolException {
294            checkConnected();
295    
296            Map<String, String> headers = command.getHeaders();
297    
298            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
299    
300            if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
301                throw new ProtocolException("Must specify the transaction you are beginning");
302            }
303    
304            if (transactions.get(stompTx) != null) {
305                throw new ProtocolException("The transaction was allready started: " + stompTx);
306            }
307    
308            LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
309            transactions.put(stompTx, activemqTx);
310    
311            TransactionInfo tx = new TransactionInfo();
312            tx.setConnectionId(connectionId);
313            tx.setTransactionId(activemqTx);
314            tx.setType(TransactionInfo.BEGIN);
315    
316            sendToActiveMQ(tx, createResponseHandler(command));
317    
318        }
319    
320        protected void onStompCommit(StompFrame command) throws ProtocolException {
321            checkConnected();
322    
323            Map<String, String> headers = command.getHeaders();
324    
325            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
326            if (stompTx == null) {
327                throw new ProtocolException("Must specify the transaction you are committing");
328            }
329    
330            TransactionId activemqTx = transactions.remove(stompTx);
331            if (activemqTx == null) {
332                throw new ProtocolException("Invalid transaction id: " + stompTx);
333            }
334            
335            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
336                StompSubscription sub = iter.next();
337                sub.onStompCommit(activemqTx);
338            }
339    
340            TransactionInfo tx = new TransactionInfo();
341            tx.setConnectionId(connectionId);
342            tx.setTransactionId(activemqTx);
343            tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
344    
345            sendToActiveMQ(tx, createResponseHandler(command));
346            
347        }
348    
349        protected void onStompAbort(StompFrame command) throws ProtocolException {
350            checkConnected();
351            Map<String, String> headers = command.getHeaders();
352    
353            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
354            if (stompTx == null) {
355                throw new ProtocolException("Must specify the transaction you are committing");
356            }
357    
358            TransactionId activemqTx = transactions.remove(stompTx);
359            if (activemqTx == null) {
360                throw new ProtocolException("Invalid transaction id: " + stompTx);
361            }
362            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
363                StompSubscription sub = iter.next();
364                try {
365                    sub.onStompAbort(activemqTx);
366                } catch (Exception e) {
367                    throw new ProtocolException("Transaction abort failed", false, e);
368                }
369            }
370    
371            TransactionInfo tx = new TransactionInfo();
372            tx.setConnectionId(connectionId);
373            tx.setTransactionId(activemqTx);
374            tx.setType(TransactionInfo.ROLLBACK);
375    
376            sendToActiveMQ(tx, createResponseHandler(command));
377    
378        }
379    
380        protected void onStompSubscribe(StompFrame command) throws ProtocolException {
381            checkConnected();
382            FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
383            Map<String, String> headers = command.getHeaders();
384    
385            String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
386            String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
387    
388            ActiveMQDestination actualDest = translator.convertDestination(this, destination);
389            ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
390            ConsumerInfo consumerInfo = new ConsumerInfo(id);
391            consumerInfo.setPrefetchSize(1000);
392            consumerInfo.setDispatchAsync(true);
393    
394            String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
395            consumerInfo.setSelector(selector);
396    
397            IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
398    
399            consumerInfo.setDestination(translator.convertDestination(this, destination));
400    
401            StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
402            stompSubscription.setDestination(actualDest);
403    
404            String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
405            if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
406                stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
407            } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
408                stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
409            } else {
410                stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
411            }
412    
413            subscriptionsByConsumerId.put(id, stompSubscription);
414            sendToActiveMQ(consumerInfo, createResponseHandler(command));
415    
416        }
417    
418        protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
419            checkConnected();
420            Map<String, String> headers = command.getHeaders();
421    
422            ActiveMQDestination destination = null;
423            Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
424            if (o != null) {
425                destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o);
426            }
427    
428            String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
429    
430            if (subscriptionId == null && destination == null) {
431                throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
432            }
433           
434            // check if it is a durable subscription
435            String durable = command.getHeaders().get("activemq.subscriptionName"); 
436            if (durable != null) {
437                RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
438                info.setClientId(durable);
439                info.setSubscriptionName(durable);
440                info.setConnectionId(connectionId);
441                sendToActiveMQ(info, createResponseHandler(command));
442                return;
443            }
444    
445            // TODO: Unsubscribing using a destination is a bit wierd if multiple
446            // subscriptions
447            // are created with the same destination. Perhaps this should be
448            // removed.
449            //
450            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
451                StompSubscription sub = iter.next();
452                if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
453                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
454                    iter.remove();
455                    return;
456                }
457            }
458           
459            throw new ProtocolException("No subscription matched.");
460        }
461    
462        ConnectionInfo connectionInfo = new ConnectionInfo();
463        
464        protected void onStompConnect(final StompFrame command) throws ProtocolException {
465    
466            if (connected.get()) {
467                throw new ProtocolException("Allready connected.");
468            }
469    
470            final Map<String, String> headers = command.getHeaders();
471    
472            // allow anyone to login for now
473            String login = headers.get(Stomp.Headers.Connect.LOGIN);
474            String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
475            String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
476    
477    
478            IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
479    
480            connectionInfo.setConnectionId(connectionId);
481            if (clientId != null) {
482                connectionInfo.setClientId(clientId);
483            } else {
484                connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
485            }
486    
487            connectionInfo.setResponseRequired(true);
488            connectionInfo.setUserName(login);
489            connectionInfo.setPassword(passcode);
490            connectionInfo.setTransportContext(transportFilter.getPeerCertificates());
491    
492            sendToActiveMQ(connectionInfo, new ResponseHandler() {
493                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
494    
495                    if (response.isException()) {
496                        // If the connection attempt fails we close the socket.
497                        Throwable exception = ((ExceptionResponse)response).getException();
498                        handleException(exception, command);
499                        getTransportFilter().onException(IOExceptionSupport.create(exception));
500                        return;
501                    }
502    
503                    final SessionInfo sessionInfo = new SessionInfo(sessionId);
504                    sendToActiveMQ(sessionInfo, null);
505    
506                    final ProducerInfo producerInfo = new ProducerInfo(producerId);
507                    sendToActiveMQ(producerInfo, new ResponseHandler() {
508                        public void onResponse(ProtocolConverter converter, Response response) throws IOException {
509                            
510                            if (response.isException()) {
511                                // If the connection attempt fails we close the socket.
512                                Throwable exception = ((ExceptionResponse)response).getException();
513                                handleException(exception, command);
514                                getTransportFilter().onException(IOExceptionSupport.create(exception));
515                            }
516                            
517                            connected.set(true);
518                            HashMap<String, String> responseHeaders = new HashMap<String, String>();
519    
520                            responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
521                            String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
522                            if (requestId == null) {
523                                // TODO legacy
524                                requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
525                            }
526                            if (requestId != null) {
527                                // TODO legacy
528                                responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
529                                responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
530                            }
531    
532                            StompFrame sc = new StompFrame();
533                            sc.setAction(Stomp.Responses.CONNECTED);
534                            sc.setHeaders(responseHeaders);
535                            sendToStomp(sc);
536                        }
537                    });
538    
539                }
540            });
541        }
542    
543        protected void onStompDisconnect(StompFrame command) throws ProtocolException {
544            checkConnected();
545            sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
546            sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
547            connected.set(false);
548        }
549    
550        protected void checkConnected() throws ProtocolException {
551            if (!connected.get()) {
552                throw new ProtocolException("Not connected.");
553            }
554        }
555    
556        /**
557         * Dispatch a ActiveMQ command
558         * 
559         * @param command
560         * @throws IOException
561         */
562        public void onActiveMQCommand(Command command) throws IOException, JMSException {
563            if (command.isResponse()) {
564    
565                Response response = (Response)command;
566                ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
567                if (rh != null) {
568                    rh.onResponse(this, response);
569                } else {
570                    // Pass down any unexpected errors. Should this close the connection?
571                    if (response.isException()) {
572                        Throwable exception = ((ExceptionResponse)response).getException();
573                        handleException(exception, null);
574                    }
575                }
576            } else if (command.isMessageDispatch()) {
577    
578                MessageDispatch md = (MessageDispatch)command;
579                StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
580                if (sub != null) {
581                    sub.onMessageDispatch(md);
582                }
583            } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
584                // Pass down any unexpected async errors. Should this close the connection?
585                Throwable exception = ((ConnectionError)command).getException();
586                handleException(exception, null);
587            }
588        }
589    
590        public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
591            ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
592            return msg;
593        }
594    
595        public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
596            if (ignoreTransformation == true) {
597                    return frameTranslator.convertMessage(this, message);
598            } else {
599                    return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
600            }
601        }
602    
603        public StompTransportFilter getTransportFilter() {
604            return transportFilter;
605        }
606    
607            public ActiveMQDestination createTempQueue(String name) {
608            ActiveMQDestination rc = tempDestinations.get(name);
609            if( rc == null ) {
610                rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
611                sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
612                tempDestinations.put(name, rc);
613            }        
614            return rc;
615            }
616    
617            public ActiveMQDestination createTempTopic(String name) {
618            ActiveMQDestination rc = tempDestinations.get(name);
619            if( rc == null ) {
620                rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
621                sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
622                tempDestinations.put(name, rc);
623                tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
624            }        
625            return rc;
626            }
627    
628            public String getCreatedTempDestinationName(ActiveMQDestination destination) {
629                    return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
630            }
631    }