001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.DataInputStream; 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.Map.Entry; 027 028 import org.apache.activemq.broker.ConnectionContext; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.ActiveMQQueue; 031 import org.apache.activemq.command.ActiveMQTempQueue; 032 import org.apache.activemq.command.ActiveMQTempTopic; 033 import org.apache.activemq.command.ActiveMQTopic; 034 import org.apache.activemq.command.Message; 035 import org.apache.activemq.command.MessageAck; 036 import org.apache.activemq.command.MessageId; 037 import org.apache.activemq.command.SubscriptionInfo; 038 import org.apache.activemq.command.TransactionId; 039 import org.apache.activemq.command.XATransactionId; 040 import org.apache.activemq.openwire.OpenWireFormat; 041 import org.apache.activemq.protobuf.Buffer; 042 import org.apache.activemq.store.AbstractMessageStore; 043 import org.apache.activemq.store.MessageRecoveryListener; 044 import org.apache.activemq.store.MessageStore; 045 import org.apache.activemq.store.PersistenceAdapter; 046 import org.apache.activemq.store.TopicMessageStore; 047 import org.apache.activemq.store.TransactionRecoveryListener; 048 import org.apache.activemq.store.TransactionStore; 049 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 050 import org.apache.activemq.store.kahadb.data.KahaDestination; 051 import org.apache.activemq.store.kahadb.data.KahaLocation; 052 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 053 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 054 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 055 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 056 import org.apache.activemq.usage.MemoryUsage; 057 import org.apache.activemq.usage.SystemUsage; 058 import org.apache.activemq.util.ByteSequence; 059 import org.apache.activemq.wireformat.WireFormat; 060 import org.apache.kahadb.journal.Location; 061 import org.apache.kahadb.page.Transaction; 062 063 public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter { 064 065 private WireFormat wireFormat = new OpenWireFormat(); 066 067 public void setBrokerName(String brokerName) { 068 } 069 public void setUsageManager(SystemUsage usageManager) { 070 } 071 072 public TransactionStore createTransactionStore() throws IOException { 073 return new TransactionStore(){ 074 075 public void commit(TransactionId txid, boolean wasPrepared) throws IOException { 076 processCommit(txid); 077 } 078 public void prepare(TransactionId txid) throws IOException { 079 processPrepare(txid); 080 } 081 public void rollback(TransactionId txid) throws IOException { 082 processRollback(txid); 083 } 084 public void recover(TransactionRecoveryListener listener) throws IOException { 085 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) { 086 XATransactionId xid = (XATransactionId)entry.getKey(); 087 ArrayList<Message> messageList = new ArrayList<Message>(); 088 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 089 090 for (Operation op : entry.getValue()) { 091 if( op.getClass() == AddOpperation.class ) { 092 AddOpperation addOp = (AddOpperation)op; 093 Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) ); 094 messageList.add(msg); 095 } else { 096 RemoveOpperation rmOp = (RemoveOpperation)op; 097 MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) ); 098 ackList.add(ack); 099 } 100 } 101 102 Message[] addedMessages = new Message[messageList.size()]; 103 MessageAck[] acks = new MessageAck[ackList.size()]; 104 messageList.toArray(addedMessages); 105 ackList.toArray(acks); 106 listener.recover(xid, addedMessages, acks); 107 } 108 } 109 public void start() throws Exception { 110 } 111 public void stop() throws Exception { 112 } 113 }; 114 } 115 116 public class KahaDBMessageStore extends AbstractMessageStore { 117 protected KahaDestination dest; 118 119 public KahaDBMessageStore(ActiveMQDestination destination) { 120 super(destination); 121 this.dest = convert( destination ); 122 } 123 124 public ActiveMQDestination getDestination() { 125 return destination; 126 } 127 128 public void addMessage(ConnectionContext context, Message message) throws IOException { 129 KahaAddMessageCommand command = new KahaAddMessageCommand(); 130 command.setDestination(dest); 131 command.setMessageId(message.getMessageId().toString()); 132 processAdd(command, message.getTransactionId(), wireFormat.marshal(message)); 133 } 134 135 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 136 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 137 command.setDestination(dest); 138 command.setMessageId(ack.getLastMessageId().toString()); 139 processRemove(command, ack.getTransactionId()); 140 } 141 142 public void removeAllMessages(ConnectionContext context) throws IOException { 143 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 144 command.setDestination(dest); 145 process(command); 146 } 147 148 public Message getMessage(MessageId identity) throws IOException { 149 final String key = identity.toString(); 150 151 // Hopefully one day the page file supports concurrent read operations... but for now we must 152 // externally synchronize... 153 ByteSequence data; 154 synchronized(indexMutex) { 155 data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){ 156 public ByteSequence execute(Transaction tx) throws IOException { 157 StoredDestination sd = getStoredDestination(dest, tx); 158 Long sequence = sd.messageIdIndex.get(tx, key); 159 if( sequence ==null ) { 160 return null; 161 } 162 return sd.orderIndex.get(tx, sequence).data; 163 } 164 }); 165 } 166 if( data == null ) { 167 return null; 168 } 169 170 Message msg = (Message)wireFormat.unmarshal( data ); 171 return msg; 172 } 173 174 public int getMessageCount() throws IOException { 175 synchronized(indexMutex) { 176 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 177 public Integer execute(Transaction tx) throws IOException { 178 // Iterate through all index entries to get a count of messages in the destination. 179 StoredDestination sd = getStoredDestination(dest, tx); 180 int rc=0; 181 for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) { 182 iterator.next(); 183 rc++; 184 } 185 return rc; 186 } 187 }); 188 } 189 } 190 191 public void recover(final MessageRecoveryListener listener) throws Exception { 192 synchronized(indexMutex) { 193 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 194 public void execute(Transaction tx) throws Exception { 195 StoredDestination sd = getStoredDestination(dest, tx); 196 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 197 Entry<Long, MessageRecord> entry = iterator.next(); 198 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) ); 199 } 200 } 201 }); 202 } 203 } 204 205 long cursorPos=0; 206 207 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 208 synchronized(indexMutex) { 209 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 210 public void execute(Transaction tx) throws Exception { 211 StoredDestination sd = getStoredDestination(dest, tx); 212 Entry<Long, MessageRecord> entry=null; 213 int counter = 0; 214 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 215 entry = iterator.next(); 216 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 217 counter++; 218 if( counter >= maxReturned ) { 219 break; 220 } 221 } 222 if( entry!=null ) { 223 cursorPos = entry.getKey()+1; 224 } 225 } 226 }); 227 } 228 } 229 230 public void resetBatching() { 231 cursorPos=0; 232 } 233 234 235 @Override 236 public void setBatch(MessageId identity) throws IOException { 237 final String key = identity.toString(); 238 239 // Hopefully one day the page file supports concurrent read operations... but for now we must 240 // externally synchronize... 241 Long location; 242 synchronized(indexMutex) { 243 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){ 244 public Long execute(Transaction tx) throws IOException { 245 StoredDestination sd = getStoredDestination(dest, tx); 246 return sd.messageIdIndex.get(tx, key); 247 } 248 }); 249 } 250 if( location!=null ) { 251 cursorPos=location+1; 252 } 253 254 } 255 256 public void setMemoryUsage(MemoryUsage memoeyUSage) { 257 } 258 public void start() throws Exception { 259 } 260 public void stop() throws Exception { 261 } 262 263 } 264 265 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 266 public KahaDBTopicMessageStore(ActiveMQTopic destination) { 267 super(destination); 268 } 269 270 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { 271 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 272 command.setDestination(dest); 273 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 274 command.setMessageId(messageId.toString()); 275 // We are not passed a transaction info.. so we can't participate in a transaction. 276 // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack 277 // to pass back to the XA recover method. 278 // command.setTransactionInfo(); 279 processRemove(command, null); 280 } 281 282 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 283 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); 284 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 285 command.setDestination(dest); 286 command.setSubscriptionKey(subscriptionKey); 287 command.setRetroactive(retroactive); 288 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 289 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 290 process(command); 291 } 292 293 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 294 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 295 command.setDestination(dest); 296 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 297 process(command); 298 } 299 300 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 301 302 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 303 synchronized(indexMutex) { 304 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 305 public void execute(Transaction tx) throws IOException { 306 StoredDestination sd = getStoredDestination(dest, tx); 307 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { 308 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 309 SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) ); 310 subscriptions.add(info); 311 312 } 313 } 314 }); 315 } 316 317 SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; 318 subscriptions.toArray(rc); 319 return rc; 320 } 321 322 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 323 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 324 synchronized(indexMutex) { 325 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){ 326 public SubscriptionInfo execute(Transaction tx) throws IOException { 327 StoredDestination sd = getStoredDestination(dest, tx); 328 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 329 if( command ==null ) { 330 return null; 331 } 332 return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) ); 333 } 334 }); 335 } 336 } 337 338 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 339 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 340 synchronized(indexMutex) { 341 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 342 public Integer execute(Transaction tx) throws IOException { 343 StoredDestination sd = getStoredDestination(dest, tx); 344 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 345 if ( cursorPos==null ) { 346 // The subscription might not exist. 347 return 0; 348 } 349 cursorPos += 1; 350 351 int counter = 0; 352 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 353 iterator.next(); 354 counter++; 355 } 356 return counter; 357 } 358 }); 359 } 360 } 361 362 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 363 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 364 synchronized(indexMutex) { 365 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 366 public void execute(Transaction tx) throws Exception { 367 StoredDestination sd = getStoredDestination(dest, tx); 368 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 369 cursorPos += 1; 370 371 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 372 Entry<Long, MessageRecord> entry = iterator.next(); 373 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 374 } 375 } 376 }); 377 } 378 } 379 380 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { 381 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 382 synchronized(indexMutex) { 383 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 384 public void execute(Transaction tx) throws Exception { 385 StoredDestination sd = getStoredDestination(dest, tx); 386 Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); 387 if( cursorPos == null ) { 388 cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 389 cursorPos += 1; 390 } 391 392 Entry<Long, MessageRecord> entry=null; 393 int counter = 0; 394 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 395 entry = iterator.next(); 396 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 397 counter++; 398 if( counter >= maxReturned ) { 399 break; 400 } 401 } 402 if( entry!=null ) { 403 sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); 404 } 405 } 406 }); 407 } 408 } 409 410 public void resetBatching(String clientId, String subscriptionName) { 411 try { 412 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 413 synchronized(indexMutex) { 414 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 415 public void execute(Transaction tx) throws IOException { 416 StoredDestination sd = getStoredDestination(dest, tx); 417 sd.subscriptionCursors.remove(subscriptionKey); 418 } 419 }); 420 } 421 } catch (IOException e) { 422 throw new RuntimeException(e); 423 } 424 } 425 } 426 427 String subscriptionKey(String clientId, String subscriptionName){ 428 return clientId+":"+subscriptionName; 429 } 430 431 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 432 return new KahaDBMessageStore(destination); 433 } 434 435 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 436 return new KahaDBTopicMessageStore(destination); 437 } 438 439 /** 440 * Cleanup method to remove any state associated with the given destination. 441 * This method does not stop the message store (it might not be cached). 442 * 443 * @param destination Destination to forget 444 */ 445 public void removeQueueMessageStore(ActiveMQQueue destination) { 446 } 447 448 /** 449 * Cleanup method to remove any state associated with the given destination 450 * This method does not stop the message store (it might not be cached). 451 * 452 * @param destination Destination to forget 453 */ 454 public void removeTopicMessageStore(ActiveMQTopic destination) { 455 } 456 457 public void deleteAllMessages() throws IOException { 458 } 459 460 461 public Set<ActiveMQDestination> getDestinations() { 462 try { 463 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 464 synchronized(indexMutex) { 465 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 466 public void execute(Transaction tx) throws IOException { 467 for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) { 468 Entry<String, StoredDestination> entry = iterator.next(); 469 rc.add(convert(entry.getKey())); 470 } 471 } 472 }); 473 } 474 return rc; 475 } catch (IOException e) { 476 throw new RuntimeException(e); 477 } 478 } 479 480 public long getLastMessageBrokerSequenceId() throws IOException { 481 return 0; 482 } 483 484 public long size() { 485 if ( !started.get() ) { 486 return 0; 487 } 488 try { 489 return pageFile.getDiskSize(); 490 } catch (IOException e) { 491 throw new RuntimeException(e); 492 } 493 } 494 495 public void beginTransaction(ConnectionContext context) throws IOException { 496 throw new IOException("Not yet implemented."); 497 } 498 public void commitTransaction(ConnectionContext context) throws IOException { 499 throw new IOException("Not yet implemented."); 500 } 501 public void rollbackTransaction(ConnectionContext context) throws IOException { 502 throw new IOException("Not yet implemented."); 503 } 504 505 public void checkpoint(boolean sync) throws IOException { 506 } 507 508 /////////////////////////////////////////////////////////////////// 509 // Internal conversion methods. 510 /////////////////////////////////////////////////////////////////// 511 512 513 514 KahaLocation convert(Location location) { 515 KahaLocation rc = new KahaLocation(); 516 rc.setLogId(location.getDataFileId()); 517 rc.setOffset(location.getOffset()); 518 return rc; 519 } 520 521 KahaDestination convert(ActiveMQDestination dest) { 522 KahaDestination rc = new KahaDestination(); 523 rc.setName(dest.getPhysicalName()); 524 switch( dest.getDestinationType() ) { 525 case ActiveMQDestination.QUEUE_TYPE: 526 rc.setType(DestinationType.QUEUE); 527 return rc; 528 case ActiveMQDestination.TOPIC_TYPE: 529 rc.setType(DestinationType.TOPIC); 530 return rc; 531 case ActiveMQDestination.TEMP_QUEUE_TYPE: 532 rc.setType(DestinationType.TEMP_QUEUE); 533 return rc; 534 case ActiveMQDestination.TEMP_TOPIC_TYPE: 535 rc.setType(DestinationType.TEMP_TOPIC); 536 return rc; 537 default: 538 return null; 539 } 540 } 541 542 ActiveMQDestination convert(String dest) { 543 int p = dest.indexOf(":"); 544 if( p<0 ) { 545 throw new IllegalArgumentException("Not in the valid destination format"); 546 } 547 int type = Integer.parseInt(dest.substring(0, p)); 548 String name = dest.substring(p+1); 549 550 switch( KahaDestination.DestinationType.valueOf(type) ) { 551 case QUEUE: 552 return new ActiveMQQueue(name); 553 case TOPIC: 554 return new ActiveMQTopic(name); 555 case TEMP_QUEUE: 556 return new ActiveMQTempQueue(name); 557 case TEMP_TOPIC: 558 return new ActiveMQTempTopic(name); 559 default: 560 throw new IllegalArgumentException("Not in the valid destination format"); 561 } 562 } 563 564 }