001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.DataInputStream; 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.Map.Entry; 027 import org.apache.activemq.broker.ConnectionContext; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ActiveMQQueue; 030 import org.apache.activemq.command.ActiveMQTempQueue; 031 import org.apache.activemq.command.ActiveMQTempTopic; 032 import org.apache.activemq.command.ActiveMQTopic; 033 import org.apache.activemq.command.LocalTransactionId; 034 import org.apache.activemq.command.Message; 035 import org.apache.activemq.command.MessageAck; 036 import org.apache.activemq.command.MessageId; 037 import org.apache.activemq.command.SubscriptionInfo; 038 import org.apache.activemq.command.TransactionId; 039 import org.apache.activemq.command.XATransactionId; 040 import org.apache.activemq.openwire.OpenWireFormat; 041 import org.apache.activemq.protobuf.Buffer; 042 import org.apache.activemq.store.AbstractMessageStore; 043 import org.apache.activemq.store.MessageRecoveryListener; 044 import org.apache.activemq.store.MessageStore; 045 import org.apache.activemq.store.PersistenceAdapter; 046 import org.apache.activemq.store.TopicMessageStore; 047 import org.apache.activemq.store.TransactionRecoveryListener; 048 import org.apache.activemq.store.TransactionStore; 049 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 050 import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 051 import org.apache.activemq.store.kahadb.data.KahaDestination; 052 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 053 import org.apache.activemq.store.kahadb.data.KahaLocation; 054 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 055 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 056 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 057 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 058 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 059 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 060 import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 061 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 062 import org.apache.activemq.usage.MemoryUsage; 063 import org.apache.activemq.usage.SystemUsage; 064 import org.apache.activemq.wireformat.WireFormat; 065 import org.apache.kahadb.journal.Location; 066 import org.apache.kahadb.page.Transaction; 067 068 069 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 070 071 private final WireFormat wireFormat = new OpenWireFormat(); 072 073 public void setBrokerName(String brokerName) { 074 } 075 public void setUsageManager(SystemUsage usageManager) { 076 } 077 078 public TransactionStore createTransactionStore() throws IOException { 079 return new TransactionStore(){ 080 081 public void commit(TransactionId txid, boolean wasPrepared) throws IOException { 082 store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true); 083 } 084 public void prepare(TransactionId txid) throws IOException { 085 store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true); 086 } 087 public void rollback(TransactionId txid) throws IOException { 088 store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false); 089 } 090 public void recover(TransactionRecoveryListener listener) throws IOException { 091 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) { 092 XATransactionId xid = (XATransactionId)entry.getKey(); 093 ArrayList<Message> messageList = new ArrayList<Message>(); 094 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 095 096 for (Operation op : entry.getValue()) { 097 if( op.getClass() == AddOpperation.class ) { 098 AddOpperation addOp = (AddOpperation)op; 099 Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) ); 100 messageList.add(msg); 101 } else { 102 RemoveOpperation rmOp = (RemoveOpperation)op; 103 MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) ); 104 ackList.add(ack); 105 } 106 } 107 108 Message[] addedMessages = new Message[messageList.size()]; 109 MessageAck[] acks = new MessageAck[ackList.size()]; 110 messageList.toArray(addedMessages); 111 ackList.toArray(acks); 112 listener.recover(xid, addedMessages, acks); 113 } 114 } 115 public void start() throws Exception { 116 } 117 public void stop() throws Exception { 118 } 119 }; 120 } 121 122 public class KahaDBMessageStore extends AbstractMessageStore { 123 protected KahaDestination dest; 124 125 public KahaDBMessageStore(ActiveMQDestination destination) { 126 super(destination); 127 this.dest = convert( destination ); 128 } 129 130 @Override 131 public ActiveMQDestination getDestination() { 132 return destination; 133 } 134 135 public void addMessage(ConnectionContext context, Message message) throws IOException { 136 KahaAddMessageCommand command = new KahaAddMessageCommand(); 137 command.setDestination(dest); 138 command.setMessageId(message.getMessageId().toString()); 139 command.setTransactionInfo( createTransactionInfo(message.getTransactionId()) ); 140 141 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 142 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 143 144 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired()); 145 146 } 147 148 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 149 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 150 command.setDestination(dest); 151 command.setMessageId(ack.getLastMessageId().toString()); 152 command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) ); 153 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired()); 154 } 155 156 public void removeAllMessages(ConnectionContext context) throws IOException { 157 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 158 command.setDestination(dest); 159 store(command, true); 160 } 161 162 public Message getMessage(MessageId identity) throws IOException { 163 final String key = identity.toString(); 164 165 // Hopefully one day the page file supports concurrent read operations... but for now we must 166 // externally synchronize... 167 Location location; 168 synchronized(indexMutex) { 169 location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){ 170 public Location execute(Transaction tx) throws IOException { 171 StoredDestination sd = getStoredDestination(dest, tx); 172 Long sequence = sd.messageIdIndex.get(tx, key); 173 if( sequence ==null ) { 174 return null; 175 } 176 return sd.orderIndex.get(tx, sequence).location; 177 } 178 }); 179 } 180 if( location == null ) { 181 return null; 182 } 183 184 return loadMessage(location); 185 } 186 187 public int getMessageCount() throws IOException { 188 synchronized(indexMutex) { 189 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 190 public Integer execute(Transaction tx) throws IOException { 191 // Iterate through all index entries to get a count of messages in the destination. 192 StoredDestination sd = getStoredDestination(dest, tx); 193 int rc=0; 194 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 195 iterator.next(); 196 rc++; 197 } 198 return rc; 199 } 200 }); 201 } 202 } 203 204 public boolean isEmpty() throws IOException { 205 synchronized(indexMutex) { 206 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>(){ 207 public Boolean execute(Transaction tx) throws IOException { 208 // Iterate through all index entries to get a count of messages in the destination. 209 StoredDestination sd = getStoredDestination(dest, tx); 210 return sd.locationIndex.isEmpty(tx); 211 } 212 }); 213 } 214 } 215 216 217 public void recover(final MessageRecoveryListener listener) throws Exception { 218 synchronized(indexMutex) { 219 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 220 public void execute(Transaction tx) throws Exception { 221 StoredDestination sd = getStoredDestination(dest, tx); 222 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 223 Entry<Long, MessageKeys> entry = iterator.next(); 224 listener.recoverMessage( loadMessage(entry.getValue().location) ); 225 } 226 } 227 }); 228 } 229 } 230 231 long cursorPos=0; 232 233 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 234 synchronized(indexMutex) { 235 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 236 public void execute(Transaction tx) throws Exception { 237 StoredDestination sd = getStoredDestination(dest, tx); 238 Entry<Long, MessageKeys> entry=null; 239 int counter = 0; 240 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 241 entry = iterator.next(); 242 listener.recoverMessage( loadMessage(entry.getValue().location ) ); 243 counter++; 244 if( counter >= maxReturned ) { 245 break; 246 } 247 } 248 if( entry!=null ) { 249 cursorPos = entry.getKey()+1; 250 } 251 } 252 }); 253 } 254 } 255 256 public void resetBatching() { 257 cursorPos=0; 258 } 259 260 261 @Override 262 public void setBatch(MessageId identity) throws IOException { 263 final String key = identity.toString(); 264 265 // Hopefully one day the page file supports concurrent read operations... but for now we must 266 // externally synchronize... 267 Long location; 268 synchronized(indexMutex) { 269 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){ 270 public Long execute(Transaction tx) throws IOException { 271 StoredDestination sd = getStoredDestination(dest, tx); 272 return sd.messageIdIndex.get(tx, key); 273 } 274 }); 275 } 276 if( location!=null ) { 277 cursorPos=location+1; 278 } 279 280 } 281 282 @Override 283 public void setMemoryUsage(MemoryUsage memoeyUSage) { 284 } 285 @Override 286 public void start() throws Exception { 287 } 288 @Override 289 public void stop() throws Exception { 290 } 291 292 } 293 294 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 295 public KahaDBTopicMessageStore(ActiveMQTopic destination) { 296 super(destination); 297 } 298 299 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { 300 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 301 command.setDestination(dest); 302 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 303 command.setMessageId(messageId.toString()); 304 // We are not passed a transaction info.. so we can't participate in a transaction. 305 // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack 306 // to pass back to the XA recover method. 307 // command.setTransactionInfo(); 308 store(command, false); 309 } 310 311 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 312 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); 313 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 314 command.setDestination(dest); 315 command.setSubscriptionKey(subscriptionKey); 316 command.setRetroactive(retroactive); 317 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 318 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 319 store(command, isEnableJournalDiskSyncs() && true); 320 } 321 322 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 323 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 324 command.setDestination(dest); 325 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 326 store(command, isEnableJournalDiskSyncs() && true); 327 } 328 329 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 330 331 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 332 synchronized(indexMutex) { 333 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 334 public void execute(Transaction tx) throws IOException { 335 StoredDestination sd = getStoredDestination(dest, tx); 336 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { 337 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 338 SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) ); 339 subscriptions.add(info); 340 341 } 342 } 343 }); 344 } 345 346 SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; 347 subscriptions.toArray(rc); 348 return rc; 349 } 350 351 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 352 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 353 synchronized(indexMutex) { 354 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){ 355 public SubscriptionInfo execute(Transaction tx) throws IOException { 356 StoredDestination sd = getStoredDestination(dest, tx); 357 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 358 if( command ==null ) { 359 return null; 360 } 361 return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) ); 362 } 363 }); 364 } 365 } 366 367 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 368 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 369 synchronized(indexMutex) { 370 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 371 public Integer execute(Transaction tx) throws IOException { 372 StoredDestination sd = getStoredDestination(dest, tx); 373 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 374 if ( cursorPos==null ) { 375 // The subscription might not exist. 376 return 0; 377 } 378 cursorPos += 1; 379 380 int counter = 0; 381 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 382 iterator.next(); 383 counter++; 384 } 385 return counter; 386 } 387 }); 388 } 389 } 390 391 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 392 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 393 synchronized(indexMutex) { 394 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 395 public void execute(Transaction tx) throws Exception { 396 StoredDestination sd = getStoredDestination(dest, tx); 397 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 398 cursorPos += 1; 399 400 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 401 Entry<Long, MessageKeys> entry = iterator.next(); 402 listener.recoverMessage( loadMessage(entry.getValue().location ) ); 403 } 404 } 405 }); 406 } 407 } 408 409 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { 410 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 411 synchronized(indexMutex) { 412 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 413 public void execute(Transaction tx) throws Exception { 414 StoredDestination sd = getStoredDestination(dest, tx); 415 Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); 416 if( cursorPos == null ) { 417 cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 418 cursorPos += 1; 419 } 420 421 Entry<Long, MessageKeys> entry=null; 422 int counter = 0; 423 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 424 entry = iterator.next(); 425 listener.recoverMessage( loadMessage(entry.getValue().location ) ); 426 counter++; 427 if( counter >= maxReturned ) { 428 break; 429 } 430 } 431 if( entry!=null ) { 432 sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); 433 } 434 } 435 }); 436 } 437 } 438 439 public void resetBatching(String clientId, String subscriptionName) { 440 try { 441 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 442 synchronized(indexMutex) { 443 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 444 public void execute(Transaction tx) throws IOException { 445 StoredDestination sd = getStoredDestination(dest, tx); 446 sd.subscriptionCursors.remove(subscriptionKey); 447 } 448 }); 449 } 450 } catch (IOException e) { 451 throw new RuntimeException(e); 452 } 453 } 454 } 455 456 String subscriptionKey(String clientId, String subscriptionName){ 457 return clientId+":"+subscriptionName; 458 } 459 460 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 461 return new KahaDBMessageStore(destination); 462 } 463 464 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 465 return new KahaDBTopicMessageStore(destination); 466 } 467 468 /** 469 * Cleanup method to remove any state associated with the given destination. 470 * This method does not stop the message store (it might not be cached). 471 * 472 * @param destination Destination to forget 473 */ 474 public void removeQueueMessageStore(ActiveMQQueue destination) { 475 } 476 477 /** 478 * Cleanup method to remove any state associated with the given destination 479 * This method does not stop the message store (it might not be cached). 480 * 481 * @param destination Destination to forget 482 */ 483 public void removeTopicMessageStore(ActiveMQTopic destination) { 484 } 485 486 public void deleteAllMessages() throws IOException { 487 deleteAllMessages=true; 488 } 489 490 491 public Set<ActiveMQDestination> getDestinations() { 492 try { 493 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 494 synchronized(indexMutex) { 495 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 496 public void execute(Transaction tx) throws IOException { 497 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 498 Entry<String, StoredDestination> entry = iterator.next(); 499 rc.add(convert(entry.getKey())); 500 } 501 } 502 }); 503 } 504 return rc; 505 } catch (IOException e) { 506 throw new RuntimeException(e); 507 } 508 } 509 510 public long getLastMessageBrokerSequenceId() throws IOException { 511 return 0; 512 } 513 514 public long size() { 515 if ( !started.get() ) { 516 return 0; 517 } 518 try { 519 return journal.getDiskSize() + pageFile.getDiskSize(); 520 } catch (IOException e) { 521 throw new RuntimeException(e); 522 } 523 } 524 525 public void beginTransaction(ConnectionContext context) throws IOException { 526 throw new IOException("Not yet implemented."); 527 } 528 public void commitTransaction(ConnectionContext context) throws IOException { 529 throw new IOException("Not yet implemented."); 530 } 531 public void rollbackTransaction(ConnectionContext context) throws IOException { 532 throw new IOException("Not yet implemented."); 533 } 534 535 public void checkpoint(boolean sync) throws IOException { 536 super.checkpointCleanup(false); 537 } 538 539 540 /////////////////////////////////////////////////////////////////// 541 // Internal helper methods. 542 /////////////////////////////////////////////////////////////////// 543 544 /** 545 * @param location 546 * @return 547 * @throws IOException 548 */ 549 Message loadMessage(Location location) throws IOException { 550 KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location); 551 Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) ); 552 return msg; 553 } 554 555 /////////////////////////////////////////////////////////////////// 556 // Internal conversion methods. 557 /////////////////////////////////////////////////////////////////// 558 559 KahaTransactionInfo createTransactionInfo(TransactionId txid) { 560 if( txid ==null ) { 561 return null; 562 } 563 KahaTransactionInfo rc = new KahaTransactionInfo(); 564 565 // Link it up to the previous record that was part of the transaction. 566 ArrayList<Operation> tx = inflightTransactions.get(txid); 567 if( tx!=null ) { 568 rc.setPreviousEntry(convert(tx.get(tx.size()-1).location)); 569 } 570 571 if( txid.isLocalTransaction() ) { 572 LocalTransactionId t = (LocalTransactionId)txid; 573 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 574 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 575 kahaTxId.setTransacitonId(t.getValue()); 576 rc.setLocalTransacitonId(kahaTxId); 577 } else { 578 XATransactionId t = (XATransactionId)txid; 579 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 580 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 581 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 582 kahaTxId.setFormatId(t.getFormatId()); 583 rc.setXaTransacitonId(kahaTxId); 584 } 585 return rc; 586 } 587 588 KahaLocation convert(Location location) { 589 KahaLocation rc = new KahaLocation(); 590 rc.setLogId(location.getDataFileId()); 591 rc.setOffset(location.getOffset()); 592 return rc; 593 } 594 595 KahaDestination convert(ActiveMQDestination dest) { 596 KahaDestination rc = new KahaDestination(); 597 rc.setName(dest.getPhysicalName()); 598 switch( dest.getDestinationType() ) { 599 case ActiveMQDestination.QUEUE_TYPE: 600 rc.setType(DestinationType.QUEUE); 601 return rc; 602 case ActiveMQDestination.TOPIC_TYPE: 603 rc.setType(DestinationType.TOPIC); 604 return rc; 605 case ActiveMQDestination.TEMP_QUEUE_TYPE: 606 rc.setType(DestinationType.TEMP_QUEUE); 607 return rc; 608 case ActiveMQDestination.TEMP_TOPIC_TYPE: 609 rc.setType(DestinationType.TEMP_TOPIC); 610 return rc; 611 default: 612 return null; 613 } 614 } 615 616 ActiveMQDestination convert(String dest) { 617 int p = dest.indexOf(":"); 618 if( p<0 ) { 619 throw new IllegalArgumentException("Not in the valid destination format"); 620 } 621 int type = Integer.parseInt(dest.substring(0, p)); 622 String name = dest.substring(p+1); 623 624 switch( KahaDestination.DestinationType.valueOf(type) ) { 625 case QUEUE: 626 return new ActiveMQQueue(name); 627 case TOPIC: 628 return new ActiveMQTopic(name); 629 case TEMP_QUEUE: 630 return new ActiveMQTempQueue(name); 631 case TEMP_TOPIC: 632 return new ActiveMQTempTopic(name); 633 default: 634 throw new IllegalArgumentException("Not in the valid destination format"); 635 } 636 } 637 638 }