001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.amq; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.io.RandomAccessFile; 022 import java.nio.channels.FileLock; 023 import java.util.Date; 024 import java.util.HashSet; 025 import java.util.Iterator; 026 import java.util.Map; 027 import java.util.Set; 028 import java.util.concurrent.ConcurrentHashMap; 029 import java.util.concurrent.CountDownLatch; 030 import java.util.concurrent.atomic.AtomicBoolean; 031 import java.util.concurrent.atomic.AtomicInteger; 032 import java.util.concurrent.atomic.AtomicLong; 033 034 import org.apache.activeio.journal.Journal; 035 import org.apache.activemq.broker.BrokerService; 036 import org.apache.activemq.broker.BrokerServiceAware; 037 import org.apache.activemq.broker.ConnectionContext; 038 import org.apache.activemq.command.ActiveMQDestination; 039 import org.apache.activemq.command.ActiveMQQueue; 040 import org.apache.activemq.command.ActiveMQTopic; 041 import org.apache.activemq.command.DataStructure; 042 import org.apache.activemq.command.JournalQueueAck; 043 import org.apache.activemq.command.JournalTopicAck; 044 import org.apache.activemq.command.JournalTrace; 045 import org.apache.activemq.command.JournalTransaction; 046 import org.apache.activemq.command.Message; 047 import org.apache.activemq.command.SubscriptionInfo; 048 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 049 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 050 import org.apache.activemq.kaha.impl.async.Location; 051 import org.apache.activemq.kaha.impl.index.hash.HashIndex; 052 import org.apache.activemq.openwire.OpenWireFormat; 053 import org.apache.activemq.store.MessageStore; 054 import org.apache.activemq.store.PersistenceAdapter; 055 import org.apache.activemq.store.ReferenceStore; 056 import org.apache.activemq.store.ReferenceStoreAdapter; 057 import org.apache.activemq.store.TopicMessageStore; 058 import org.apache.activemq.store.TopicReferenceStore; 059 import org.apache.activemq.store.TransactionStore; 060 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter; 061 import org.apache.activemq.thread.DefaultThreadPools; 062 import org.apache.activemq.thread.Scheduler; 063 import org.apache.activemq.thread.Task; 064 import org.apache.activemq.thread.TaskRunner; 065 import org.apache.activemq.thread.TaskRunnerFactory; 066 import org.apache.activemq.usage.SystemUsage; 067 import org.apache.activemq.usage.Usage; 068 import org.apache.activemq.usage.UsageListener; 069 import org.apache.activemq.util.ByteSequence; 070 import org.apache.activemq.util.IOExceptionSupport; 071 import org.apache.activemq.util.IOHelper; 072 import org.apache.activemq.wireformat.WireFormat; 073 import org.apache.commons.logging.Log; 074 import org.apache.commons.logging.LogFactory; 075 076 077 /** 078 * An implementation of {@link PersistenceAdapter} designed for use with a 079 * {@link Journal} and then check pointing asynchronously on a timeout with some 080 * other long term persistent storage. 081 * 082 * @org.apache.xbean.XBean element="amqPersistenceAdapter" 083 * @version $Revision: 1.17 $ 084 */ 085 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware { 086 087 private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class); 088 private static final Scheduler scheduler = Scheduler.getInstance(); 089 private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>(); 090 private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>(); 091 private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; 092 private static final boolean BROKEN_FILE_LOCK; 093 private static final boolean DISABLE_LOCKING; 094 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; 095 private AsyncDataManager asyncDataManager; 096 private ReferenceStoreAdapter referenceStoreAdapter; 097 private TaskRunnerFactory taskRunnerFactory; 098 private WireFormat wireFormat = new OpenWireFormat(); 099 private SystemUsage usageManager; 100 private long checkpointInterval = 1000 * 20; 101 private int maxCheckpointMessageAddSize = 1024 * 4; 102 private AMQTransactionStore transactionStore = new AMQTransactionStore(this); 103 private TaskRunner checkpointTask; 104 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); 105 private final AtomicBoolean started = new AtomicBoolean(false); 106 private Runnable periodicCheckpointTask; 107 private Runnable periodicCleanupTask; 108 private boolean deleteAllMessages; 109 private boolean syncOnWrite; 110 private boolean syncOnTransaction=true; 111 private String brokerName = ""; 112 private File directory; 113 private File directoryArchive; 114 private BrokerService brokerService; 115 private AtomicLong storeSize = new AtomicLong(); 116 private boolean persistentIndex=true; 117 private boolean useNio = true; 118 private boolean archiveDataLogs=false; 119 private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL; 120 private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH; 121 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; 122 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; 123 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; 124 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; 125 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; 126 private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH; 127 private Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> (); 128 private String directoryPath = ""; 129 private RandomAccessFile lockFile; 130 private FileLock lock; 131 private boolean disableLocking = DISABLE_LOCKING; 132 private boolean failIfJournalIsLocked; 133 private boolean lockLogged; 134 private boolean lockAquired; 135 private boolean recoverReferenceStore=true; 136 private boolean forceRecoverReferenceStore=false; 137 138 public String getBrokerName() { 139 return this.brokerName; 140 } 141 142 public void setBrokerName(String brokerName) { 143 this.brokerName = brokerName; 144 if (this.referenceStoreAdapter != null) { 145 this.referenceStoreAdapter.setBrokerName(brokerName); 146 } 147 } 148 149 public BrokerService getBrokerService() { 150 return brokerService; 151 } 152 153 public void setBrokerService(BrokerService brokerService) { 154 this.brokerService = brokerService; 155 } 156 157 public synchronized void start() throws Exception { 158 if (!started.compareAndSet(false, true)) { 159 return; 160 } 161 if (this.directory == null) { 162 if (brokerService != null) { 163 this.directory = brokerService.getBrokerDataDirectory(); 164 165 } else { 166 this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName)); 167 this.directory = new File(directory, "amqstore"); 168 this.directoryPath=directory.getAbsolutePath(); 169 } 170 } 171 if (this.directoryArchive == null) { 172 this.directoryArchive = new File(this.directory,"archive"); 173 } 174 IOHelper.mkdirs(this.directory); 175 lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); 176 lock(); 177 LOG.info("AMQStore starting using directory: " + directory); 178 if (archiveDataLogs) { 179 IOHelper.mkdirs(this.directoryArchive); 180 } 181 182 if (this.usageManager != null) { 183 this.usageManager.getMemoryUsage().addUsageListener(this); 184 } 185 if (asyncDataManager == null) { 186 asyncDataManager = createAsyncDataManager(); 187 } 188 if (referenceStoreAdapter == null) { 189 referenceStoreAdapter = createReferenceStoreAdapter(); 190 } 191 referenceStoreAdapter.setDirectory(new File(directory, "kr-store")); 192 referenceStoreAdapter.setBrokerName(getBrokerName()); 193 referenceStoreAdapter.setUsageManager(usageManager); 194 referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength()); 195 if (taskRunnerFactory == null) { 196 taskRunnerFactory = createTaskRunnerFactory(); 197 } 198 199 if (failIfJournalIsLocked) { 200 asyncDataManager.lock(); 201 } else { 202 while (true) { 203 try { 204 asyncDataManager.lock(); 205 break; 206 } catch (IOException e) { 207 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e); 208 try { 209 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); 210 } catch (InterruptedException e1) { 211 } 212 } 213 } 214 } 215 216 asyncDataManager.start(); 217 if (deleteAllMessages) { 218 asyncDataManager.delete(); 219 try { 220 JournalTrace trace = new JournalTrace(); 221 trace.setMessage("DELETED " + new Date()); 222 Location location = asyncDataManager.write(wireFormat.marshal(trace), false); 223 asyncDataManager.setMark(location, true); 224 LOG.info("Journal deleted: "); 225 deleteAllMessages = false; 226 } catch (IOException e) { 227 throw e; 228 } catch (Throwable e) { 229 throw IOExceptionSupport.create(e); 230 } 231 referenceStoreAdapter.deleteAllMessages(); 232 } 233 referenceStoreAdapter.start(); 234 Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse(); 235 LOG.info("Active data files: " + files); 236 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { 237 238 public boolean iterate() { 239 doCheckpoint(); 240 return false; 241 } 242 }, "ActiveMQ Journal Checkpoint Worker"); 243 createTransactionStore(); 244 245 // 246 // The following was attempting to reduce startup times by avoiding the 247 // log 248 // file scanning that recovery performs. The problem with it is that XA 249 // transactions 250 // only live in transaction log and are not stored in the reference 251 // store, but they still 252 // need to be recovered when the broker starts up. 253 254 if (isForceRecoverReferenceStore() 255 || (isRecoverReferenceStore() && !referenceStoreAdapter 256 .isStoreValid())) { 257 LOG.warn("The ReferenceStore is not valid - recovering ..."); 258 recover(); 259 LOG.info("Finished recovering the ReferenceStore"); 260 } else { 261 Location location = writeTraceMessage("RECOVERED " + new Date(), 262 true); 263 asyncDataManager.setMark(location, true); 264 // recover transactions 265 getTransactionStore().setPreparedTransactions( 266 referenceStoreAdapter.retrievePreparedState()); 267 } 268 269 // Do a checkpoint periodically. 270 periodicCheckpointTask = new Runnable() { 271 272 public void run() { 273 checkpoint(false); 274 } 275 }; 276 scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval()); 277 periodicCleanupTask = new Runnable() { 278 279 public void run() { 280 cleanup(); 281 } 282 }; 283 scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval()); 284 285 if (lockAquired && lockLogged) { 286 LOG.info("Aquired lock for AMQ Store" + getDirectory()); 287 if (brokerService != null) { 288 brokerService.getBroker().nowMasterBroker(); 289 } 290 } 291 292 } 293 294 public void stop() throws Exception { 295 296 if (!started.compareAndSet(true, false)) { 297 return; 298 } 299 unlock(); 300 if (lockFile != null) { 301 lockFile.close(); 302 lockFile = null; 303 } 304 this.usageManager.getMemoryUsage().removeUsageListener(this); 305 synchronized (this) { 306 scheduler.cancel(periodicCheckpointTask); 307 scheduler.cancel(periodicCleanupTask); 308 } 309 Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); 310 while (queueIterator.hasNext()) { 311 AMQMessageStore ms = queueIterator.next(); 312 ms.stop(); 313 } 314 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator(); 315 while (topicIterator.hasNext()) { 316 final AMQTopicMessageStore ms = topicIterator.next(); 317 ms.stop(); 318 } 319 // Take one final checkpoint and stop checkpoint processing. 320 checkpoint(true); 321 synchronized (this) { 322 checkpointTask.shutdown(); 323 } 324 referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions()); 325 queues.clear(); 326 topics.clear(); 327 IOException firstException = null; 328 referenceStoreAdapter.stop(); 329 referenceStoreAdapter = null; 330 try { 331 LOG.debug("Journal close"); 332 asyncDataManager.close(); 333 } catch (Exception e) { 334 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); 335 } 336 if (firstException != null) { 337 throw firstException; 338 } 339 } 340 341 /** 342 * When we checkpoint we move all the journalled data to long term storage. 343 * 344 * @param sync 345 */ 346 public void checkpoint(boolean sync) { 347 try { 348 if (asyncDataManager == null) { 349 throw new IllegalStateException("Journal is closed."); 350 } 351 CountDownLatch latch = null; 352 synchronized (this) { 353 latch = nextCheckpointCountDownLatch; 354 checkpointTask.wakeup(); 355 } 356 if (sync) { 357 if (LOG.isDebugEnabled()) { 358 LOG.debug("Waitng for checkpoint to complete."); 359 } 360 latch.await(); 361 } 362 referenceStoreAdapter.checkpoint(sync); 363 } catch (InterruptedException e) { 364 Thread.currentThread().interrupt(); 365 LOG.warn("Request to start checkpoint failed: " + e, e); 366 } catch (IOException e) { 367 LOG.error("checkpoint failed: " + e, e); 368 } 369 } 370 371 /** 372 * This does the actual checkpoint. 373 * 374 * @return true if successful 375 */ 376 public boolean doCheckpoint() { 377 CountDownLatch latch = null; 378 synchronized (this) { 379 latch = nextCheckpointCountDownLatch; 380 nextCheckpointCountDownLatch = new CountDownLatch(1); 381 } 382 try { 383 if (LOG.isDebugEnabled()) { 384 LOG.debug("Checkpoint started."); 385 } 386 387 Location currentMark = asyncDataManager.getMark(); 388 Location newMark = currentMark; 389 Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); 390 while (queueIterator.hasNext()) { 391 final AMQMessageStore ms = queueIterator.next(); 392 Location mark = (Location)ms.getMark(); 393 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { 394 newMark = mark; 395 } 396 } 397 Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator(); 398 while (topicIterator.hasNext()) { 399 final AMQTopicMessageStore ms = topicIterator.next(); 400 Location mark = (Location)ms.getMark(); 401 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { 402 newMark = mark; 403 } 404 } 405 try { 406 if (newMark != currentMark) { 407 if (LOG.isDebugEnabled()) { 408 LOG.debug("Marking journal at: " + newMark); 409 } 410 asyncDataManager.setMark(newMark, false); 411 writeTraceMessage("CHECKPOINT " + new Date(), true); 412 } 413 } catch (Exception e) { 414 LOG.error("Failed to mark the Journal: " + e, e); 415 } 416 if (LOG.isDebugEnabled()) { 417 LOG.debug("Checkpoint done."); 418 } 419 } finally { 420 latch.countDown(); 421 } 422 return true; 423 } 424 425 /** 426 * Cleans up the data files 427 * @throws IOException 428 */ 429 public void cleanup() { 430 try { 431 Set<Integer>inProgress = new HashSet<Integer>(); 432 if (LOG.isDebugEnabled()) { 433 LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values()); 434 } 435 for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) { 436 inProgress.addAll(set.keySet()); 437 } 438 Integer lastDataFile = asyncDataManager.getCurrentDataFileId(); 439 inProgress.add(lastDataFile); 440 lastDataFile = asyncDataManager.getMark().getDataFileId(); 441 inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse()); 442 Location lastActiveTx = transactionStore.checkpoint(); 443 if (lastActiveTx != null) { 444 lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId()); 445 } 446 LOG.debug("lastDataFile: " + lastDataFile); 447 asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1); 448 } catch (IOException e) { 449 LOG.error("Could not cleanup data files: " + e, e); 450 } 451 } 452 453 public Set<ActiveMQDestination> getDestinations() { 454 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations()); 455 destinations.addAll(queues.keySet()); 456 destinations.addAll(topics.keySet()); 457 return destinations; 458 } 459 460 MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 461 if (destination.isQueue()) { 462 return createQueueMessageStore((ActiveMQQueue)destination); 463 } else { 464 return createTopicMessageStore((ActiveMQTopic)destination); 465 } 466 } 467 468 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 469 AMQMessageStore store = queues.get(destination); 470 if (store == null) { 471 ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination); 472 store = new AMQMessageStore(this, checkpointStore, destination); 473 try { 474 store.start(); 475 } catch (Exception e) { 476 throw IOExceptionSupport.create(e); 477 } 478 queues.put(destination, store); 479 } 480 return store; 481 } 482 483 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 484 AMQTopicMessageStore store = topics.get(destinationName); 485 if (store == null) { 486 TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); 487 store = new AMQTopicMessageStore(this,checkpointStore, destinationName); 488 try { 489 store.start(); 490 } catch (Exception e) { 491 throw IOExceptionSupport.create(e); 492 } 493 topics.put(destinationName, store); 494 } 495 return store; 496 } 497 498 /** 499 * Cleanup method to remove any state associated with the given destination 500 * 501 * @param destination 502 */ 503 public void removeQueueMessageStore(ActiveMQQueue destination) { 504 AMQMessageStore store= queues.remove(destination); 505 referenceStoreAdapter.removeQueueMessageStore(destination); 506 } 507 508 /** 509 * Cleanup method to remove any state associated with the given destination 510 * 511 * @param destination 512 */ 513 public void removeTopicMessageStore(ActiveMQTopic destination) { 514 topics.remove(destination); 515 } 516 517 public TransactionStore createTransactionStore() throws IOException { 518 return transactionStore; 519 } 520 521 public long getLastMessageBrokerSequenceId() throws IOException { 522 return referenceStoreAdapter.getLastMessageBrokerSequenceId(); 523 } 524 525 public void beginTransaction(ConnectionContext context) throws IOException { 526 referenceStoreAdapter.beginTransaction(context); 527 } 528 529 public void commitTransaction(ConnectionContext context) throws IOException { 530 referenceStoreAdapter.commitTransaction(context); 531 } 532 533 public void rollbackTransaction(ConnectionContext context) throws IOException { 534 referenceStoreAdapter.rollbackTransaction(context); 535 } 536 537 public boolean isPersistentIndex() { 538 return persistentIndex; 539 } 540 541 public void setPersistentIndex(boolean persistentIndex) { 542 this.persistentIndex = persistentIndex; 543 } 544 545 /** 546 * @param location 547 * @return 548 * @throws IOException 549 */ 550 public DataStructure readCommand(Location location) throws IOException { 551 try { 552 ByteSequence packet = asyncDataManager.read(location); 553 return (DataStructure)wireFormat.unmarshal(packet); 554 } catch (IOException e) { 555 throw createReadException(location, e); 556 } 557 } 558 559 /** 560 * Move all the messages that were in the journal into long term storage. We 561 * just replay and do a checkpoint. 562 * 563 * @throws IOException 564 * @throws IOException 565 * @throws InvalidLocationException 566 * @throws IllegalStateException 567 */ 568 private void recover() throws IllegalStateException, IOException { 569 referenceStoreAdapter.clearMessages(); 570 Location pos = null; 571 int redoCounter = 0; 572 LOG.info("Journal Recovery Started from: " + asyncDataManager); 573 long start = System.currentTimeMillis(); 574 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 575 // While we have records in the journal. 576 while ((pos = asyncDataManager.getNextLocation(pos)) != null) { 577 ByteSequence data = asyncDataManager.read(pos); 578 DataStructure c = (DataStructure)wireFormat.unmarshal(data); 579 if (c instanceof Message) { 580 Message message = (Message)c; 581 AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination()); 582 if (message.isInTransaction()) { 583 transactionStore.addMessage(store, message, pos); 584 } else { 585 if (store.replayAddMessage(context, message, pos)) { 586 redoCounter++; 587 } 588 } 589 } else { 590 switch (c.getDataStructureType()) { 591 case SubscriptionInfo.DATA_STRUCTURE_TYPE: { 592 referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c); 593 } 594 break; 595 case JournalQueueAck.DATA_STRUCTURE_TYPE: { 596 JournalQueueAck command = (JournalQueueAck)c; 597 AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination()); 598 if (command.getMessageAck().isInTransaction()) { 599 transactionStore.removeMessage(store, command.getMessageAck(), pos); 600 } else { 601 if (store.replayRemoveMessage(context, command.getMessageAck())) { 602 redoCounter++; 603 } 604 } 605 } 606 break; 607 case JournalTopicAck.DATA_STRUCTURE_TYPE: { 608 JournalTopicAck command = (JournalTopicAck)c; 609 AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination()); 610 if (command.getTransactionId() != null) { 611 transactionStore.acknowledge(store, command, pos); 612 } else { 613 if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) { 614 redoCounter++; 615 } 616 } 617 } 618 break; 619 case JournalTransaction.DATA_STRUCTURE_TYPE: { 620 JournalTransaction command = (JournalTransaction)c; 621 try { 622 // Try to replay the packet. 623 switch (command.getType()) { 624 case JournalTransaction.XA_PREPARE: 625 transactionStore.replayPrepare(command.getTransactionId()); 626 break; 627 case JournalTransaction.XA_COMMIT: 628 case JournalTransaction.LOCAL_COMMIT: 629 AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 630 if (tx == null) { 631 break; // We may be trying to replay a commit 632 } 633 // that 634 // was already committed. 635 // Replay the committed operations. 636 tx.getOperations(); 637 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 638 AMQTxOperation op = (AMQTxOperation)iter.next(); 639 if (op.replay(this, context)) { 640 redoCounter++; 641 } 642 } 643 break; 644 case JournalTransaction.LOCAL_ROLLBACK: 645 case JournalTransaction.XA_ROLLBACK: 646 transactionStore.replayRollback(command.getTransactionId()); 647 break; 648 default: 649 throw new IOException("Invalid journal command type: " + command.getType()); 650 } 651 } catch (IOException e) { 652 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); 653 } 654 } 655 break; 656 case JournalTrace.DATA_STRUCTURE_TYPE: 657 JournalTrace trace = (JournalTrace)c; 658 LOG.debug("TRACE Entry: " + trace.getMessage()); 659 break; 660 default: 661 LOG.error("Unknown type of record in transaction log which will be discarded: " + c); 662 } 663 } 664 } 665 Location location = writeTraceMessage("RECOVERED " + new Date(), true); 666 asyncDataManager.setMark(location, true); 667 long end = System.currentTimeMillis(); 668 LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds."); 669 } 670 671 private IOException createReadException(Location location, Exception e) { 672 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); 673 } 674 675 protected IOException createWriteException(DataStructure packet, Exception e) { 676 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); 677 } 678 679 protected IOException createWriteException(String command, Exception e) { 680 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); 681 } 682 683 protected IOException createRecoveryFailedException(Exception e) { 684 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); 685 } 686 687 /** 688 * @param command 689 * @param syncHint 690 * @return 691 * @throws IOException 692 */ 693 public Location writeCommand(DataStructure command, boolean syncHint) throws IOException { 694 return writeCommand(command, syncHint,false); 695 } 696 697 public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException { 698 try { 699 return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite))); 700 } catch (IOException ioe) { 701 LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe); 702 brokerService.handleIOException(ioe); 703 throw ioe; 704 } 705 } 706 707 private Location writeTraceMessage(String message, boolean sync) throws IOException { 708 JournalTrace trace = new JournalTrace(); 709 trace.setMessage(message); 710 return writeCommand(trace, sync); 711 } 712 713 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 714 newPercentUsage = (newPercentUsage / 10) * 10; 715 oldPercentUsage = (oldPercentUsage / 10) * 10; 716 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { 717 checkpoint(false); 718 } 719 } 720 721 public AMQTransactionStore getTransactionStore() { 722 return transactionStore; 723 } 724 725 public synchronized void deleteAllMessages() throws IOException { 726 deleteAllMessages = true; 727 } 728 729 public String toString() { 730 return "AMQPersistenceAdapter(" + directory + ")"; 731 } 732 733 // ///////////////////////////////////////////////////////////////// 734 // Subclass overridables 735 // ///////////////////////////////////////////////////////////////// 736 protected AsyncDataManager createAsyncDataManager() { 737 AsyncDataManager manager = new AsyncDataManager(storeSize); 738 manager.setDirectory(new File(directory, "journal")); 739 manager.setDirectoryArchive(getDirectoryArchive()); 740 manager.setArchiveDataLogs(isArchiveDataLogs()); 741 manager.setMaxFileLength(maxFileLength); 742 manager.setUseNio(useNio); 743 return manager; 744 } 745 746 protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { 747 KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize); 748 adaptor.setPersistentIndex(isPersistentIndex()); 749 adaptor.setIndexBinSize(getIndexBinSize()); 750 adaptor.setIndexKeySize(getIndexKeySize()); 751 adaptor.setIndexPageSize(getIndexPageSize()); 752 adaptor.setIndexMaxBinSize(getIndexMaxBinSize()); 753 adaptor.setIndexLoadFactor(getIndexLoadFactor()); 754 return adaptor; 755 } 756 757 protected TaskRunnerFactory createTaskRunnerFactory() { 758 return DefaultThreadPools.getDefaultTaskRunnerFactory(); 759 } 760 761 // ///////////////////////////////////////////////////////////////// 762 // Property Accessors 763 // ///////////////////////////////////////////////////////////////// 764 public AsyncDataManager getAsyncDataManager() { 765 return asyncDataManager; 766 } 767 768 public void setAsyncDataManager(AsyncDataManager asyncDataManager) { 769 this.asyncDataManager = asyncDataManager; 770 } 771 772 public ReferenceStoreAdapter getReferenceStoreAdapter() { 773 return referenceStoreAdapter; 774 } 775 776 public TaskRunnerFactory getTaskRunnerFactory() { 777 return taskRunnerFactory; 778 } 779 780 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 781 this.taskRunnerFactory = taskRunnerFactory; 782 } 783 784 /** 785 * @return Returns the wireFormat. 786 */ 787 public WireFormat getWireFormat() { 788 return wireFormat; 789 } 790 791 public void setWireFormat(WireFormat wireFormat) { 792 this.wireFormat = wireFormat; 793 } 794 795 public SystemUsage getUsageManager() { 796 return usageManager; 797 } 798 799 public void setUsageManager(SystemUsage usageManager) { 800 this.usageManager = usageManager; 801 } 802 803 public int getMaxCheckpointMessageAddSize() { 804 return maxCheckpointMessageAddSize; 805 } 806 807 /** 808 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 809 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 810 */ 811 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { 812 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; 813 } 814 815 816 public synchronized File getDirectory() { 817 return directory; 818 } 819 820 public synchronized void setDirectory(File directory) { 821 this.directory = directory; 822 } 823 824 public boolean isSyncOnWrite() { 825 return this.syncOnWrite; 826 } 827 828 public void setSyncOnWrite(boolean syncOnWrite) { 829 this.syncOnWrite = syncOnWrite; 830 } 831 832 public boolean isSyncOnTransaction() { 833 return syncOnTransaction; 834 } 835 836 public void setSyncOnTransaction(boolean syncOnTransaction) { 837 this.syncOnTransaction = syncOnTransaction; 838 } 839 840 /** 841 * @param referenceStoreAdapter the referenceStoreAdapter to set 842 */ 843 public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) { 844 this.referenceStoreAdapter = referenceStoreAdapter; 845 } 846 847 public long size(){ 848 return storeSize.get(); 849 } 850 851 public boolean isUseNio() { 852 return useNio; 853 } 854 855 public void setUseNio(boolean useNio) { 856 this.useNio = useNio; 857 } 858 859 public int getMaxFileLength() { 860 return maxFileLength; 861 } 862 863 /** 864 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 865 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 866 */ 867 public void setMaxFileLength(int maxFileLength) { 868 this.maxFileLength = maxFileLength; 869 } 870 871 public long getCleanupInterval() { 872 return cleanupInterval; 873 } 874 875 public void setCleanupInterval(long cleanupInterval) { 876 this.cleanupInterval = cleanupInterval; 877 } 878 879 public long getCheckpointInterval() { 880 return checkpointInterval; 881 } 882 883 public void setCheckpointInterval(long checkpointInterval) { 884 this.checkpointInterval = checkpointInterval; 885 } 886 887 public int getIndexBinSize() { 888 return indexBinSize; 889 } 890 891 public void setIndexBinSize(int indexBinSize) { 892 this.indexBinSize = indexBinSize; 893 } 894 895 public int getIndexKeySize() { 896 return indexKeySize; 897 } 898 899 public void setIndexKeySize(int indexKeySize) { 900 this.indexKeySize = indexKeySize; 901 } 902 903 public int getIndexPageSize() { 904 return indexPageSize; 905 } 906 907 public int getIndexMaxBinSize() { 908 return indexMaxBinSize; 909 } 910 911 public void setIndexMaxBinSize(int maxBinSize) { 912 this.indexMaxBinSize = maxBinSize; 913 } 914 915 /** 916 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 917 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 918 */ 919 public void setIndexPageSize(int indexPageSize) { 920 this.indexPageSize = indexPageSize; 921 } 922 923 public void setIndexLoadFactor(int factor){ 924 this.indexLoadFactor=factor; 925 } 926 927 public int getIndexLoadFactor(){ 928 return this.indexLoadFactor; 929 } 930 931 public int getMaxReferenceFileLength() { 932 return maxReferenceFileLength; 933 } 934 935 /** 936 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 937 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 938 */ 939 public void setMaxReferenceFileLength(int maxReferenceFileLength) { 940 this.maxReferenceFileLength = maxReferenceFileLength; 941 } 942 943 public File getDirectoryArchive() { 944 return directoryArchive; 945 } 946 947 public void setDirectoryArchive(File directoryArchive) { 948 this.directoryArchive = directoryArchive; 949 } 950 951 public boolean isArchiveDataLogs() { 952 return archiveDataLogs; 953 } 954 955 public void setArchiveDataLogs(boolean archiveDataLogs) { 956 this.archiveDataLogs = archiveDataLogs; 957 } 958 959 public boolean isDisableLocking() { 960 return disableLocking; 961 } 962 963 public void setDisableLocking(boolean disableLocking) { 964 this.disableLocking = disableLocking; 965 } 966 967 /** 968 * @return the recoverReferenceStore 969 */ 970 public boolean isRecoverReferenceStore() { 971 return recoverReferenceStore; 972 } 973 974 /** 975 * @param recoverReferenceStore the recoverReferenceStore to set 976 */ 977 public void setRecoverReferenceStore(boolean recoverReferenceStore) { 978 this.recoverReferenceStore = recoverReferenceStore; 979 } 980 981 /** 982 * @return the forceRecoverReferenceStore 983 */ 984 public boolean isForceRecoverReferenceStore() { 985 return forceRecoverReferenceStore; 986 } 987 988 /** 989 * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set 990 */ 991 public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) { 992 this.forceRecoverReferenceStore = forceRecoverReferenceStore; 993 } 994 995 996 protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) { 997 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store); 998 if (map == null) { 999 map = new ConcurrentHashMap<Integer, AtomicInteger>(); 1000 dataFilesInProgress.put(store, map); 1001 } 1002 AtomicInteger count = map.get(dataFileId); 1003 if (count == null) { 1004 count = new AtomicInteger(0); 1005 map.put(dataFileId, count); 1006 } 1007 count.incrementAndGet(); 1008 } 1009 1010 protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) { 1011 Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store); 1012 if (map != null) { 1013 AtomicInteger count = map.get(dataFileId); 1014 if (count != null) { 1015 int newCount = count.decrementAndGet(); 1016 if (newCount <=0) { 1017 map.remove(dataFileId); 1018 } 1019 } 1020 if (map.isEmpty()) { 1021 dataFilesInProgress.remove(store); 1022 } 1023 } 1024 } 1025 1026 1027 1028 protected void lock() throws Exception { 1029 lockLogged = false; 1030 lockAquired = false; 1031 do { 1032 if (doLock()) { 1033 lockAquired = true; 1034 } else { 1035 if (!lockLogged) { 1036 LOG.warn("Waiting to Lock the Store " + getDirectory()); 1037 lockLogged = true; 1038 } 1039 Thread.sleep(1000); 1040 } 1041 1042 } while (!lockAquired && !disableLocking); 1043 } 1044 1045 private synchronized void unlock() throws IOException { 1046 if (!disableLocking && (null != lock)) { 1047 //clear property doesn't work on some platforms 1048 System.getProperties().remove(getPropertyKey()); 1049 System.clearProperty(getPropertyKey()); 1050 assert(System.getProperty(getPropertyKey())==null); 1051 if (lock.isValid()) { 1052 lock.release(); 1053 lock.channel().close(); 1054 1055 } 1056 lock = null; 1057 } 1058 } 1059 1060 1061 protected boolean doLock() throws IOException { 1062 boolean result = true; 1063 if (!disableLocking && directory != null && lock == null) { 1064 String key = getPropertyKey(); 1065 String property = System.getProperty(key); 1066 if (null == property) { 1067 if (!BROKEN_FILE_LOCK) { 1068 lock = lockFile.getChannel().tryLock(); 1069 if (lock == null) { 1070 result = false; 1071 } else { 1072 System.setProperty(key, new Date().toString()); 1073 } 1074 } 1075 } else { // already locked 1076 result = false; 1077 } 1078 } 1079 return result; 1080 } 1081 1082 private String getPropertyKey() throws IOException { 1083 return getClass().getName() + ".lock." + directory.getCanonicalPath(); 1084 } 1085 1086 static { 1087 BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX 1088 + ".FileLockBroken", 1089 "false")); 1090 DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX 1091 + ".DisableLocking", 1092 "false")); 1093 } 1094 }