001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.stomp; 018 019 import java.io.IOException; 020 import java.io.OutputStreamWriter; 021 import java.io.PrintWriter; 022 import java.util.HashMap; 023 import java.util.Iterator; 024 import java.util.Map; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 028 import javax.jms.JMSException; 029 030 import org.apache.activemq.command.ActiveMQDestination; 031 import org.apache.activemq.command.ActiveMQMessage; 032 import org.apache.activemq.command.ActiveMQTempQueue; 033 import org.apache.activemq.command.ActiveMQTempTopic; 034 import org.apache.activemq.command.Command; 035 import org.apache.activemq.command.ConnectionError; 036 import org.apache.activemq.command.ConnectionId; 037 import org.apache.activemq.command.ConnectionInfo; 038 import org.apache.activemq.command.ConsumerId; 039 import org.apache.activemq.command.ConsumerInfo; 040 import org.apache.activemq.command.DestinationInfo; 041 import org.apache.activemq.command.ExceptionResponse; 042 import org.apache.activemq.command.LocalTransactionId; 043 import org.apache.activemq.command.MessageAck; 044 import org.apache.activemq.command.MessageDispatch; 045 import org.apache.activemq.command.MessageId; 046 import org.apache.activemq.command.ProducerId; 047 import org.apache.activemq.command.ProducerInfo; 048 import org.apache.activemq.command.RemoveSubscriptionInfo; 049 import org.apache.activemq.command.Response; 050 import org.apache.activemq.command.SessionId; 051 import org.apache.activemq.command.SessionInfo; 052 import org.apache.activemq.command.ShutdownInfo; 053 import org.apache.activemq.command.TransactionId; 054 import org.apache.activemq.command.TransactionInfo; 055 import org.apache.activemq.util.ByteArrayOutputStream; 056 import org.apache.activemq.util.FactoryFinder; 057 import org.apache.activemq.util.IOExceptionSupport; 058 import org.apache.activemq.util.IdGenerator; 059 import org.apache.activemq.util.IntrospectionSupport; 060 import org.apache.activemq.util.LongSequenceGenerator; 061 import org.springframework.context.ApplicationContext; 062 import org.springframework.context.ApplicationContextAware; 063 064 /** 065 * @author <a href="http://hiramchirino.com">chirino</a> 066 */ 067 public class ProtocolConverter { 068 069 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 070 071 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 072 private final SessionId sessionId = new SessionId(connectionId, -1); 073 private final ProducerId producerId = new ProducerId(sessionId, 1); 074 075 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 076 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 077 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 078 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); 079 080 private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 081 private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); 082 private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); 083 private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); 084 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); 085 private final StompTransportFilter transportFilter; 086 087 private final Object commnadIdMutex = new Object(); 088 private int lastCommandId; 089 private final AtomicBoolean connected = new AtomicBoolean(false); 090 private final FrameTranslator frameTranslator; 091 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); 092 private final ApplicationContext applicationContext; 093 094 public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator, ApplicationContext applicationContext) { 095 this.transportFilter = stompTransportFilter; 096 this.frameTranslator = translator; 097 this.applicationContext = applicationContext; 098 } 099 100 protected int generateCommandId() { 101 synchronized (commnadIdMutex) { 102 return lastCommandId++; 103 } 104 } 105 106 protected ResponseHandler createResponseHandler(final StompFrame command) { 107 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 108 if (receiptId != null) { 109 return new ResponseHandler() { 110 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 111 if (response.isException()) { 112 // Generally a command can fail.. but that does not invalidate the connection. 113 // We report back the failure but we don't close the connection. 114 Throwable exception = ((ExceptionResponse)response).getException(); 115 handleException(exception, command); 116 } else { 117 StompFrame sc = new StompFrame(); 118 sc.setAction(Stomp.Responses.RECEIPT); 119 sc.setHeaders(new HashMap<String, String>(1)); 120 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 121 transportFilter.sendToStomp(sc); 122 } 123 } 124 }; 125 } 126 return null; 127 } 128 129 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 130 command.setCommandId(generateCommandId()); 131 if (handler != null) { 132 command.setResponseRequired(true); 133 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 134 } 135 transportFilter.sendToActiveMQ(command); 136 } 137 138 protected void sendToStomp(StompFrame command) throws IOException { 139 transportFilter.sendToStomp(command); 140 } 141 142 protected FrameTranslator findTranslator(String header) { 143 FrameTranslator translator = frameTranslator; 144 try { 145 if (header != null) { 146 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER 147 .newInstance(header); 148 if (translator instanceof ApplicationContextAware) { 149 ((ApplicationContextAware)translator).setApplicationContext(applicationContext); 150 } 151 } 152 } catch (Exception ignore) { 153 // if anything goes wrong use the default translator 154 } 155 156 return translator; 157 } 158 159 /** 160 * Convert a stomp command 161 * 162 * @param command 163 */ 164 public void onStompCommand(StompFrame command) throws IOException, JMSException { 165 try { 166 167 if (command.getClass() == StompFrameError.class) { 168 throw ((StompFrameError)command).getException(); 169 } 170 171 String action = command.getAction(); 172 if (action.startsWith(Stomp.Commands.SEND)) { 173 onStompSend(command); 174 } else if (action.startsWith(Stomp.Commands.ACK)) { 175 onStompAck(command); 176 } else if (action.startsWith(Stomp.Commands.BEGIN)) { 177 onStompBegin(command); 178 } else if (action.startsWith(Stomp.Commands.COMMIT)) { 179 onStompCommit(command); 180 } else if (action.startsWith(Stomp.Commands.ABORT)) { 181 onStompAbort(command); 182 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) { 183 onStompSubscribe(command); 184 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) { 185 onStompUnsubscribe(command); 186 } else if (action.startsWith(Stomp.Commands.CONNECT)) { 187 onStompConnect(command); 188 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) { 189 onStompDisconnect(command); 190 } else { 191 throw new ProtocolException("Unknown STOMP action: " + action); 192 } 193 194 } catch (ProtocolException e) { 195 handleException(e, command); 196 // Some protocol errors can cause the connection to get closed. 197 if( e.isFatal() ) { 198 getTransportFilter().onException(e); 199 } 200 } 201 } 202 203 protected void handleException(Throwable exception, StompFrame command) throws IOException { 204 // Let the stomp client know about any protocol errors. 205 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 206 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); 207 exception.printStackTrace(stream); 208 stream.close(); 209 210 HashMap<String, String> headers = new HashMap<String, String>(); 211 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); 212 213 if (command != null) { 214 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 215 if (receiptId != null) { 216 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 217 } 218 } 219 220 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); 221 sendToStomp(errorMessage); 222 } 223 224 protected void onStompSend(StompFrame command) throws IOException, JMSException { 225 checkConnected(); 226 227 Map<String, String> headers = command.getHeaders(); 228 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 229 headers.remove("transaction"); 230 231 ActiveMQMessage message = convertMessage(command); 232 233 message.setProducerId(producerId); 234 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); 235 message.setMessageId(id); 236 message.setJMSTimestamp(System.currentTimeMillis()); 237 238 if (stompTx != null) { 239 TransactionId activemqTx = transactions.get(stompTx); 240 if (activemqTx == null) { 241 throw new ProtocolException("Invalid transaction id: " + stompTx); 242 } 243 message.setTransactionId(activemqTx); 244 } 245 246 message.onSend(); 247 sendToActiveMQ(message, createResponseHandler(command)); 248 249 } 250 251 protected void onStompAck(StompFrame command) throws ProtocolException { 252 checkConnected(); 253 254 // TODO: acking with just a message id is very bogus 255 // since the same message id could have been sent to 2 different 256 // subscriptions 257 // on the same stomp connection. For example, when 2 subs are created on 258 // the same topic. 259 260 Map<String, String> headers = command.getHeaders(); 261 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 262 if (messageId == null) { 263 throw new ProtocolException("ACK received without a message-id to acknowledge!"); 264 } 265 266 TransactionId activemqTx = null; 267 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 268 if (stompTx != null) { 269 activemqTx = transactions.get(stompTx); 270 if (activemqTx == null) { 271 throw new ProtocolException("Invalid transaction id: " + stompTx); 272 } 273 } 274 275 boolean acked = false; 276 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 277 StompSubscription sub = iter.next(); 278 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 279 if (ack != null) { 280 ack.setTransactionId(activemqTx); 281 sendToActiveMQ(ack, createResponseHandler(command)); 282 acked = true; 283 break; 284 } 285 } 286 287 if (!acked) { 288 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); 289 } 290 291 } 292 293 protected void onStompBegin(StompFrame command) throws ProtocolException { 294 checkConnected(); 295 296 Map<String, String> headers = command.getHeaders(); 297 298 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 299 300 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { 301 throw new ProtocolException("Must specify the transaction you are beginning"); 302 } 303 304 if (transactions.get(stompTx) != null) { 305 throw new ProtocolException("The transaction was allready started: " + stompTx); 306 } 307 308 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); 309 transactions.put(stompTx, activemqTx); 310 311 TransactionInfo tx = new TransactionInfo(); 312 tx.setConnectionId(connectionId); 313 tx.setTransactionId(activemqTx); 314 tx.setType(TransactionInfo.BEGIN); 315 316 sendToActiveMQ(tx, createResponseHandler(command)); 317 318 } 319 320 protected void onStompCommit(StompFrame command) throws ProtocolException { 321 checkConnected(); 322 323 Map<String, String> headers = command.getHeaders(); 324 325 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 326 if (stompTx == null) { 327 throw new ProtocolException("Must specify the transaction you are committing"); 328 } 329 330 TransactionId activemqTx = transactions.remove(stompTx); 331 if (activemqTx == null) { 332 throw new ProtocolException("Invalid transaction id: " + stompTx); 333 } 334 335 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 336 StompSubscription sub = iter.next(); 337 sub.onStompCommit(activemqTx); 338 } 339 340 TransactionInfo tx = new TransactionInfo(); 341 tx.setConnectionId(connectionId); 342 tx.setTransactionId(activemqTx); 343 tx.setType(TransactionInfo.COMMIT_ONE_PHASE); 344 345 sendToActiveMQ(tx, createResponseHandler(command)); 346 347 } 348 349 protected void onStompAbort(StompFrame command) throws ProtocolException { 350 checkConnected(); 351 Map<String, String> headers = command.getHeaders(); 352 353 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 354 if (stompTx == null) { 355 throw new ProtocolException("Must specify the transaction you are committing"); 356 } 357 358 TransactionId activemqTx = transactions.remove(stompTx); 359 if (activemqTx == null) { 360 throw new ProtocolException("Invalid transaction id: " + stompTx); 361 } 362 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 363 StompSubscription sub = iter.next(); 364 try { 365 sub.onStompAbort(activemqTx); 366 } catch (Exception e) { 367 throw new ProtocolException("Transaction abort failed", false, e); 368 } 369 } 370 371 TransactionInfo tx = new TransactionInfo(); 372 tx.setConnectionId(connectionId); 373 tx.setTransactionId(activemqTx); 374 tx.setType(TransactionInfo.ROLLBACK); 375 376 sendToActiveMQ(tx, createResponseHandler(command)); 377 378 } 379 380 protected void onStompSubscribe(StompFrame command) throws ProtocolException { 381 checkConnected(); 382 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)); 383 Map<String, String> headers = command.getHeaders(); 384 385 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); 386 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); 387 388 ActiveMQDestination actualDest = translator.convertDestination(this, destination); 389 ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 390 ConsumerInfo consumerInfo = new ConsumerInfo(id); 391 consumerInfo.setPrefetchSize(1000); 392 consumerInfo.setDispatchAsync(true); 393 394 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR); 395 consumerInfo.setSelector(selector); 396 397 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); 398 399 consumerInfo.setDestination(translator.convertDestination(this, destination)); 400 401 StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 402 stompSubscription.setDestination(actualDest); 403 404 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); 405 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { 406 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); 407 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { 408 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK); 409 } else { 410 stompSubscription.setAckMode(StompSubscription.AUTO_ACK); 411 } 412 413 subscriptionsByConsumerId.put(id, stompSubscription); 414 sendToActiveMQ(consumerInfo, createResponseHandler(command)); 415 416 } 417 418 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { 419 checkConnected(); 420 Map<String, String> headers = command.getHeaders(); 421 422 ActiveMQDestination destination = null; 423 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); 424 if (o != null) { 425 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o); 426 } 427 428 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); 429 430 if (subscriptionId == null && destination == null) { 431 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); 432 } 433 434 // check if it is a durable subscription 435 String durable = command.getHeaders().get("activemq.subscriptionName"); 436 if (durable != null) { 437 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 438 info.setClientId(durable); 439 info.setSubscriptionName(durable); 440 info.setConnectionId(connectionId); 441 sendToActiveMQ(info, createResponseHandler(command)); 442 return; 443 } 444 445 // TODO: Unsubscribing using a destination is a bit wierd if multiple 446 // subscriptions 447 // are created with the same destination. Perhaps this should be 448 // removed. 449 // 450 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 451 StompSubscription sub = iter.next(); 452 if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) { 453 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 454 iter.remove(); 455 return; 456 } 457 } 458 459 throw new ProtocolException("No subscription matched."); 460 } 461 462 ConnectionInfo connectionInfo = new ConnectionInfo(); 463 464 protected void onStompConnect(final StompFrame command) throws ProtocolException { 465 466 if (connected.get()) { 467 throw new ProtocolException("Allready connected."); 468 } 469 470 final Map<String, String> headers = command.getHeaders(); 471 472 // allow anyone to login for now 473 String login = headers.get(Stomp.Headers.Connect.LOGIN); 474 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE); 475 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID); 476 477 478 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); 479 480 connectionInfo.setConnectionId(connectionId); 481 if (clientId != null) { 482 connectionInfo.setClientId(clientId); 483 } else { 484 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 485 } 486 487 connectionInfo.setResponseRequired(true); 488 connectionInfo.setUserName(login); 489 connectionInfo.setPassword(passcode); 490 connectionInfo.setTransportContext(transportFilter.getPeerCertificates()); 491 492 sendToActiveMQ(connectionInfo, new ResponseHandler() { 493 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 494 495 if (response.isException()) { 496 // If the connection attempt fails we close the socket. 497 Throwable exception = ((ExceptionResponse)response).getException(); 498 handleException(exception, command); 499 getTransportFilter().onException(IOExceptionSupport.create(exception)); 500 return; 501 } 502 503 final SessionInfo sessionInfo = new SessionInfo(sessionId); 504 sendToActiveMQ(sessionInfo, null); 505 506 final ProducerInfo producerInfo = new ProducerInfo(producerId); 507 sendToActiveMQ(producerInfo, new ResponseHandler() { 508 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 509 510 if (response.isException()) { 511 // If the connection attempt fails we close the socket. 512 Throwable exception = ((ExceptionResponse)response).getException(); 513 handleException(exception, command); 514 getTransportFilter().onException(IOExceptionSupport.create(exception)); 515 } 516 517 connected.set(true); 518 HashMap<String, String> responseHeaders = new HashMap<String, String>(); 519 520 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); 521 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); 522 if (requestId == null) { 523 // TODO legacy 524 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED); 525 } 526 if (requestId != null) { 527 // TODO legacy 528 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); 529 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId); 530 } 531 532 StompFrame sc = new StompFrame(); 533 sc.setAction(Stomp.Responses.CONNECTED); 534 sc.setHeaders(responseHeaders); 535 sendToStomp(sc); 536 } 537 }); 538 539 } 540 }); 541 } 542 543 protected void onStompDisconnect(StompFrame command) throws ProtocolException { 544 checkConnected(); 545 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); 546 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); 547 connected.set(false); 548 } 549 550 protected void checkConnected() throws ProtocolException { 551 if (!connected.get()) { 552 throw new ProtocolException("Not connected."); 553 } 554 } 555 556 /** 557 * Dispatch a ActiveMQ command 558 * 559 * @param command 560 * @throws IOException 561 */ 562 public void onActiveMQCommand(Command command) throws IOException, JMSException { 563 if (command.isResponse()) { 564 565 Response response = (Response)command; 566 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 567 if (rh != null) { 568 rh.onResponse(this, response); 569 } else { 570 // Pass down any unexpected errors. Should this close the connection? 571 if (response.isException()) { 572 Throwable exception = ((ExceptionResponse)response).getException(); 573 handleException(exception, null); 574 } 575 } 576 } else if (command.isMessageDispatch()) { 577 578 MessageDispatch md = (MessageDispatch)command; 579 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); 580 if (sub != null) { 581 sub.onMessageDispatch(md); 582 } 583 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 584 // Pass down any unexpected async errors. Should this close the connection? 585 Throwable exception = ((ConnectionError)command).getException(); 586 handleException(exception, null); 587 } 588 } 589 590 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { 591 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command); 592 return msg; 593 } 594 595 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { 596 if (ignoreTransformation == true) { 597 return frameTranslator.convertMessage(this, message); 598 } else { 599 return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message); 600 } 601 } 602 603 public StompTransportFilter getTransportFilter() { 604 return transportFilter; 605 } 606 607 public ActiveMQDestination createTempQueue(String name) { 608 ActiveMQDestination rc = tempDestinations.get(name); 609 if( rc == null ) { 610 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); 611 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 612 tempDestinations.put(name, rc); 613 } 614 return rc; 615 } 616 617 public ActiveMQDestination createTempTopic(String name) { 618 ActiveMQDestination rc = tempDestinations.get(name); 619 if( rc == null ) { 620 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); 621 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 622 tempDestinations.put(name, rc); 623 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); 624 } 625 return rc; 626 } 627 628 public String getCreatedTempDestinationName(ActiveMQDestination destination) { 629 return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); 630 } 631 }