001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.FileLock;
023    import java.util.Date;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.Set;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.atomic.AtomicBoolean;
031    import java.util.concurrent.atomic.AtomicInteger;
032    import java.util.concurrent.atomic.AtomicLong;
033    
034    import org.apache.activeio.journal.Journal;
035    import org.apache.activemq.broker.BrokerService;
036    import org.apache.activemq.broker.BrokerServiceAware;
037    import org.apache.activemq.broker.ConnectionContext;
038    import org.apache.activemq.command.ActiveMQDestination;
039    import org.apache.activemq.command.ActiveMQQueue;
040    import org.apache.activemq.command.ActiveMQTopic;
041    import org.apache.activemq.command.DataStructure;
042    import org.apache.activemq.command.JournalQueueAck;
043    import org.apache.activemq.command.JournalTopicAck;
044    import org.apache.activemq.command.JournalTrace;
045    import org.apache.activemq.command.JournalTransaction;
046    import org.apache.activemq.command.Message;
047    import org.apache.activemq.command.SubscriptionInfo;
048    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
049    import org.apache.activemq.kaha.impl.async.AsyncDataManager;
050    import org.apache.activemq.kaha.impl.async.Location;
051    import org.apache.activemq.kaha.impl.index.hash.HashIndex;
052    import org.apache.activemq.openwire.OpenWireFormat;
053    import org.apache.activemq.store.MessageStore;
054    import org.apache.activemq.store.PersistenceAdapter;
055    import org.apache.activemq.store.ReferenceStore;
056    import org.apache.activemq.store.ReferenceStoreAdapter;
057    import org.apache.activemq.store.TopicMessageStore;
058    import org.apache.activemq.store.TopicReferenceStore;
059    import org.apache.activemq.store.TransactionStore;
060    import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
061    import org.apache.activemq.thread.DefaultThreadPools;
062    import org.apache.activemq.thread.Scheduler;
063    import org.apache.activemq.thread.Task;
064    import org.apache.activemq.thread.TaskRunner;
065    import org.apache.activemq.thread.TaskRunnerFactory;
066    import org.apache.activemq.usage.SystemUsage;
067    import org.apache.activemq.usage.Usage;
068    import org.apache.activemq.usage.UsageListener;
069    import org.apache.activemq.util.ByteSequence;
070    import org.apache.activemq.util.IOExceptionSupport;
071    import org.apache.activemq.util.IOHelper;
072    import org.apache.activemq.wireformat.WireFormat;
073    import org.apache.commons.logging.Log;
074    import org.apache.commons.logging.LogFactory;
075    
076    
077    /**
078     * An implementation of {@link PersistenceAdapter} designed for use with a
079     * {@link Journal} and then check pointing asynchronously on a timeout with some
080     * other long term persistent storage.
081     * 
082     * @org.apache.xbean.XBean element="amqPersistenceAdapter"
083     * @version $Revision: 1.17 $
084     */
085    public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
086    
087        private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
088        private static final Scheduler scheduler = Scheduler.getInstance();
089        private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
090        private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
091        private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
092        private static final boolean BROKEN_FILE_LOCK;
093        private static final boolean DISABLE_LOCKING;
094        private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
095        private AsyncDataManager asyncDataManager;
096        private ReferenceStoreAdapter referenceStoreAdapter;
097        private TaskRunnerFactory taskRunnerFactory;
098        private WireFormat wireFormat = new OpenWireFormat();
099        private SystemUsage usageManager;
100        private long checkpointInterval = 1000 * 20;
101        private int maxCheckpointMessageAddSize = 1024 * 4;
102        private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
103        private TaskRunner checkpointTask;
104        private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
105        private final AtomicBoolean started = new AtomicBoolean(false);
106        private Runnable periodicCheckpointTask;
107        private Runnable periodicCleanupTask;
108        private boolean deleteAllMessages;
109        private boolean syncOnWrite;
110        private boolean syncOnTransaction=true;
111        private String brokerName = "";
112        private File directory;
113        private File directoryArchive;
114        private BrokerService brokerService;
115        private AtomicLong storeSize = new AtomicLong();
116        private boolean persistentIndex=true;
117        private boolean useNio = true;
118        private boolean archiveDataLogs=false;
119        private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
120        private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
121        private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
122        private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
123        private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
124        private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
125        private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
126        private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
127        private Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
128        private String directoryPath = "";
129        private RandomAccessFile lockFile;
130        private FileLock lock;
131        private boolean disableLocking = DISABLE_LOCKING;
132            private boolean failIfJournalIsLocked;
133        private boolean lockLogged;
134        private boolean lockAquired;
135        private boolean recoverReferenceStore=true;
136        private boolean forceRecoverReferenceStore=false;
137    
138        public String getBrokerName() {
139            return this.brokerName;
140        }
141    
142        public void setBrokerName(String brokerName) {
143            this.brokerName = brokerName;
144            if (this.referenceStoreAdapter != null) {
145                this.referenceStoreAdapter.setBrokerName(brokerName);
146            }
147        }
148    
149        public BrokerService getBrokerService() {
150            return brokerService;
151        }
152    
153        public void setBrokerService(BrokerService brokerService) {
154            this.brokerService = brokerService;
155        }
156    
157        public synchronized void start() throws Exception {
158            if (!started.compareAndSet(false, true)) {
159                return;
160            }
161            if (this.directory == null) {
162                if (brokerService != null) {
163                    this.directory = brokerService.getBrokerDataDirectory();
164                   
165                } else {
166                    this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
167                    this.directory = new File(directory, "amqstore");
168                    this.directoryPath=directory.getAbsolutePath();
169                }
170            }
171            if (this.directoryArchive == null) {
172                this.directoryArchive = new File(this.directory,"archive");
173            }
174            IOHelper.mkdirs(this.directory);
175            lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
176            lock();
177            LOG.info("AMQStore starting using directory: " + directory); 
178            if (archiveDataLogs) {
179                IOHelper.mkdirs(this.directoryArchive);
180            }
181    
182            if (this.usageManager != null) {
183                this.usageManager.getMemoryUsage().addUsageListener(this);
184            }
185            if (asyncDataManager == null) {
186                asyncDataManager = createAsyncDataManager();
187            }
188            if (referenceStoreAdapter == null) {
189                referenceStoreAdapter = createReferenceStoreAdapter();
190            }
191            referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
192            referenceStoreAdapter.setBrokerName(getBrokerName());
193            referenceStoreAdapter.setUsageManager(usageManager);
194            referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
195            if (taskRunnerFactory == null) {
196                taskRunnerFactory = createTaskRunnerFactory();
197            }
198            
199            if (failIfJournalIsLocked) {
200                asyncDataManager.lock();
201            } else {
202                while (true) {
203                    try {
204                        asyncDataManager.lock();
205                        break;
206                    } catch (IOException e) {
207                        LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
208                        try {
209                            Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
210                        } catch (InterruptedException e1) {
211                        }
212                    }
213                }
214            }
215            
216            asyncDataManager.start();
217            if (deleteAllMessages) {
218                asyncDataManager.delete();
219                try {
220                    JournalTrace trace = new JournalTrace();
221                    trace.setMessage("DELETED " + new Date());
222                    Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
223                    asyncDataManager.setMark(location, true);
224                    LOG.info("Journal deleted: ");
225                    deleteAllMessages = false;
226                } catch (IOException e) {
227                    throw e;
228                } catch (Throwable e) {
229                    throw IOExceptionSupport.create(e);
230                }
231                referenceStoreAdapter.deleteAllMessages();
232            }
233            referenceStoreAdapter.start();
234            Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
235            LOG.info("Active data files: " + files);
236            checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
237    
238                public boolean iterate() {
239                    doCheckpoint();
240                    return false;
241                }
242            }, "ActiveMQ Journal Checkpoint Worker");
243            createTransactionStore();
244    
245            //
246            // The following was attempting to reduce startup times by avoiding the
247            // log
248            // file scanning that recovery performs. The problem with it is that XA
249            // transactions
250            // only live in transaction log and are not stored in the reference
251            // store, but they still
252            // need to be recovered when the broker starts up.
253    
254            if (isForceRecoverReferenceStore()
255                    || (isRecoverReferenceStore() && !referenceStoreAdapter
256                            .isStoreValid())) {
257                LOG.warn("The ReferenceStore is not valid - recovering ...");
258                recover();
259                LOG.info("Finished recovering the ReferenceStore");
260            } else {
261                Location location = writeTraceMessage("RECOVERED " + new Date(),
262                        true);
263                asyncDataManager.setMark(location, true);
264                // recover transactions
265                getTransactionStore().setPreparedTransactions(
266                        referenceStoreAdapter.retrievePreparedState());
267            }
268    
269            // Do a checkpoint periodically.
270            periodicCheckpointTask = new Runnable() {
271    
272                public void run() {
273                    checkpoint(false);
274                }
275            };
276            scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
277            periodicCleanupTask = new Runnable() {
278    
279                public void run() {
280                    cleanup();
281                }
282            };
283            scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
284            
285            if (lockAquired && lockLogged) {
286                LOG.info("Aquired lock for AMQ Store" + getDirectory());
287                if (brokerService != null) {
288                    brokerService.getBroker().nowMasterBroker();
289                }
290            }
291    
292        }
293    
294        public void stop() throws Exception {
295    
296            if (!started.compareAndSet(true, false)) {
297                return;
298            }
299            unlock();
300            if (lockFile != null) {
301                lockFile.close();
302                lockFile = null;
303            }
304            this.usageManager.getMemoryUsage().removeUsageListener(this);
305            synchronized (this) {
306                scheduler.cancel(periodicCheckpointTask);
307                scheduler.cancel(periodicCleanupTask);
308            }
309            Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
310            while (queueIterator.hasNext()) {
311                AMQMessageStore ms = queueIterator.next();
312                ms.stop();
313            }
314            Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
315            while (topicIterator.hasNext()) {
316                final AMQTopicMessageStore ms = topicIterator.next();
317                ms.stop();
318            }
319            // Take one final checkpoint and stop checkpoint processing.
320            checkpoint(true);
321            synchronized (this) {
322                checkpointTask.shutdown();
323            }
324            referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
325            queues.clear();
326            topics.clear();
327            IOException firstException = null;
328            referenceStoreAdapter.stop();
329            referenceStoreAdapter = null;
330            try {
331                LOG.debug("Journal close");
332                asyncDataManager.close();
333            } catch (Exception e) {
334                firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
335            }
336            if (firstException != null) {
337                throw firstException;
338            }
339        }
340    
341        /**
342         * When we checkpoint we move all the journalled data to long term storage.
343         * 
344         * @param sync
345         */
346        public void checkpoint(boolean sync) {
347            try {
348                if (asyncDataManager == null) {
349                    throw new IllegalStateException("Journal is closed.");
350                }
351                CountDownLatch latch = null;
352                synchronized (this) {
353                    latch = nextCheckpointCountDownLatch;
354                    checkpointTask.wakeup();
355                }
356                if (sync) {
357                    if (LOG.isDebugEnabled()) {
358                        LOG.debug("Waitng for checkpoint to complete.");
359                    }
360                    latch.await();
361                }
362                referenceStoreAdapter.checkpoint(sync);
363            } catch (InterruptedException e) {
364                Thread.currentThread().interrupt();
365                LOG.warn("Request to start checkpoint failed: " + e, e);
366            } catch (IOException e) {
367                LOG.error("checkpoint failed: " + e, e);
368            }
369        }
370    
371        /**
372         * This does the actual checkpoint.
373         * 
374         * @return true if successful
375         */
376        public boolean doCheckpoint() {
377            CountDownLatch latch = null;
378            synchronized (this) {
379                latch = nextCheckpointCountDownLatch;
380                nextCheckpointCountDownLatch = new CountDownLatch(1);
381            }
382            try {
383                if (LOG.isDebugEnabled()) {
384                    LOG.debug("Checkpoint started.");
385                }
386    
387                Location currentMark = asyncDataManager.getMark();
388                Location newMark = currentMark;
389                Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
390                while (queueIterator.hasNext()) {
391                    final AMQMessageStore ms = queueIterator.next();
392                    Location mark = (Location)ms.getMark();
393                    if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
394                        newMark = mark;
395                    }
396                }
397                Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
398                while (topicIterator.hasNext()) {
399                    final AMQTopicMessageStore ms = topicIterator.next();
400                    Location mark = (Location)ms.getMark();
401                    if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
402                        newMark = mark;
403                    }
404                }
405                try {
406                    if (newMark != currentMark) {
407                        if (LOG.isDebugEnabled()) {
408                            LOG.debug("Marking journal at: " + newMark);
409                        }
410                        asyncDataManager.setMark(newMark, false);
411                        writeTraceMessage("CHECKPOINT " + new Date(), true);
412                    }
413                } catch (Exception e) {
414                    LOG.error("Failed to mark the Journal: " + e, e);
415                }
416                if (LOG.isDebugEnabled()) {
417                    LOG.debug("Checkpoint done.");
418                }
419            } finally {
420                latch.countDown();
421            }
422            return true;
423        }
424    
425        /**
426         * Cleans up the data files
427         * @throws IOException
428         */
429        public void cleanup() {
430            try {
431                Set<Integer>inProgress = new HashSet<Integer>();
432                if (LOG.isDebugEnabled()) {
433                    LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
434                }      
435                for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
436                    inProgress.addAll(set.keySet());
437                }
438                Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
439                inProgress.add(lastDataFile);
440                lastDataFile = asyncDataManager.getMark().getDataFileId();
441                inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
442                Location lastActiveTx = transactionStore.checkpoint();
443                if (lastActiveTx != null) {
444                    lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
445                }
446                LOG.debug("lastDataFile: " + lastDataFile);
447                asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
448            } catch (IOException e) {
449                LOG.error("Could not cleanup data files: " + e, e);
450            }
451        }
452    
453        public Set<ActiveMQDestination> getDestinations() {
454            Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
455            destinations.addAll(queues.keySet());
456            destinations.addAll(topics.keySet());
457            return destinations;
458        }
459    
460        MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
461            if (destination.isQueue()) {
462                return createQueueMessageStore((ActiveMQQueue)destination);
463            } else {
464                return createTopicMessageStore((ActiveMQTopic)destination);
465            }
466        }
467    
468        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
469            AMQMessageStore store = queues.get(destination);
470            if (store == null) {
471                ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
472                store = new AMQMessageStore(this, checkpointStore, destination);
473                try {
474                    store.start();
475                } catch (Exception e) {
476                    throw IOExceptionSupport.create(e);
477                }
478                queues.put(destination, store);
479            }
480            return store;
481        }
482    
483        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
484            AMQTopicMessageStore store = topics.get(destinationName);
485            if (store == null) {
486                TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
487                store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
488                try {
489                    store.start();
490                } catch (Exception e) {
491                    throw IOExceptionSupport.create(e);
492                }
493                topics.put(destinationName, store);
494            }
495            return store;
496        }
497    
498        /**
499         * Cleanup method to remove any state associated with the given destination
500         *
501         * @param destination
502         */
503        public void removeQueueMessageStore(ActiveMQQueue destination) {
504            AMQMessageStore store= queues.remove(destination);
505            referenceStoreAdapter.removeQueueMessageStore(destination);
506        }
507    
508        /**
509         * Cleanup method to remove any state associated with the given destination
510         *
511         * @param destination
512         */
513        public void removeTopicMessageStore(ActiveMQTopic destination) {
514            topics.remove(destination);
515        }
516    
517        public TransactionStore createTransactionStore() throws IOException {
518            return transactionStore;
519        }
520    
521        public long getLastMessageBrokerSequenceId() throws IOException {
522            return referenceStoreAdapter.getLastMessageBrokerSequenceId();
523        }
524    
525        public void beginTransaction(ConnectionContext context) throws IOException {
526            referenceStoreAdapter.beginTransaction(context);
527        }
528    
529        public void commitTransaction(ConnectionContext context) throws IOException {
530            referenceStoreAdapter.commitTransaction(context);
531        }
532    
533        public void rollbackTransaction(ConnectionContext context) throws IOException {
534            referenceStoreAdapter.rollbackTransaction(context);
535        }
536        
537        public boolean isPersistentIndex() {
538                    return persistentIndex;
539            }
540    
541            public void setPersistentIndex(boolean persistentIndex) {
542                    this.persistentIndex = persistentIndex;
543            }
544    
545        /**
546         * @param location
547         * @return
548         * @throws IOException
549         */
550        public DataStructure readCommand(Location location) throws IOException {
551            try {
552                ByteSequence packet = asyncDataManager.read(location);
553                return (DataStructure)wireFormat.unmarshal(packet);
554            } catch (IOException e) {
555                throw createReadException(location, e);
556            }
557        }
558    
559        /**
560         * Move all the messages that were in the journal into long term storage. We
561         * just replay and do a checkpoint.
562         * 
563         * @throws IOException
564         * @throws IOException
565         * @throws InvalidLocationException
566         * @throws IllegalStateException
567         */
568        private void recover() throws IllegalStateException, IOException {
569            referenceStoreAdapter.clearMessages();
570            Location pos = null;
571            int redoCounter = 0;
572            LOG.info("Journal Recovery Started from: " + asyncDataManager);
573            long start = System.currentTimeMillis();
574            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
575            // While we have records in the journal.
576            while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
577                ByteSequence data = asyncDataManager.read(pos);
578                DataStructure c = (DataStructure)wireFormat.unmarshal(data);
579                if (c instanceof Message) {
580                    Message message = (Message)c;
581                    AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
582                    if (message.isInTransaction()) {
583                        transactionStore.addMessage(store, message, pos);
584                    } else {
585                        if (store.replayAddMessage(context, message, pos)) {
586                            redoCounter++;
587                        }
588                    }
589                } else {
590                    switch (c.getDataStructureType()) {
591                    case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
592                        referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
593                    }
594                        break;
595                    case JournalQueueAck.DATA_STRUCTURE_TYPE: {
596                        JournalQueueAck command = (JournalQueueAck)c;
597                        AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
598                        if (command.getMessageAck().isInTransaction()) {
599                            transactionStore.removeMessage(store, command.getMessageAck(), pos);
600                        } else {
601                            if (store.replayRemoveMessage(context, command.getMessageAck())) {
602                                redoCounter++;
603                            }
604                        }
605                    }
606                        break;
607                    case JournalTopicAck.DATA_STRUCTURE_TYPE: {
608                        JournalTopicAck command = (JournalTopicAck)c;
609                        AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
610                        if (command.getTransactionId() != null) {
611                            transactionStore.acknowledge(store, command, pos);
612                        } else {
613                            if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
614                                redoCounter++;
615                            }
616                        }
617                    }
618                        break;
619                    case JournalTransaction.DATA_STRUCTURE_TYPE: {
620                        JournalTransaction command = (JournalTransaction)c;
621                        try {
622                            // Try to replay the packet.
623                            switch (command.getType()) {
624                            case JournalTransaction.XA_PREPARE:
625                                transactionStore.replayPrepare(command.getTransactionId());
626                                break;
627                            case JournalTransaction.XA_COMMIT:
628                            case JournalTransaction.LOCAL_COMMIT:
629                                AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
630                                if (tx == null) {
631                                    break; // We may be trying to replay a commit
632                                }
633                                // that
634                                // was already committed.
635                                // Replay the committed operations.
636                                tx.getOperations();
637                                for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
638                                    AMQTxOperation op = (AMQTxOperation)iter.next();
639                                    if (op.replay(this, context)) {
640                                        redoCounter++;
641                                    }
642                                }
643                                break;
644                            case JournalTransaction.LOCAL_ROLLBACK:
645                            case JournalTransaction.XA_ROLLBACK:
646                                transactionStore.replayRollback(command.getTransactionId());
647                                break;
648                            default:
649                                throw new IOException("Invalid journal command type: " + command.getType());
650                            }
651                        } catch (IOException e) {
652                            LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
653                        }
654                    }
655                        break;
656                    case JournalTrace.DATA_STRUCTURE_TYPE:
657                        JournalTrace trace = (JournalTrace)c;
658                        LOG.debug("TRACE Entry: " + trace.getMessage());
659                        break;
660                    default:
661                        LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
662                    }
663                }
664            }
665            Location location = writeTraceMessage("RECOVERED " + new Date(), true);
666            asyncDataManager.setMark(location, true);
667            long end = System.currentTimeMillis();
668            LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
669        }
670    
671        private IOException createReadException(Location location, Exception e) {
672            return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
673        }
674    
675        protected IOException createWriteException(DataStructure packet, Exception e) {
676            return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
677        }
678    
679        protected IOException createWriteException(String command, Exception e) {
680            return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
681        }
682    
683        protected IOException createRecoveryFailedException(Exception e) {
684            return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
685        }
686    
687        /**
688         * @param command
689         * @param syncHint
690         * @return
691         * @throws IOException
692         */
693        public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
694            return writeCommand(command, syncHint,false);
695        }
696        
697        public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
698            try {
699                    return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
700            } catch (IOException ioe) {
701                    LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
702                    brokerService.handleIOException(ioe);
703                    throw ioe;
704            }
705        }
706    
707        private Location writeTraceMessage(String message, boolean sync) throws IOException {
708            JournalTrace trace = new JournalTrace();
709            trace.setMessage(message);
710            return writeCommand(trace, sync);
711        }
712    
713        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
714            newPercentUsage = (newPercentUsage / 10) * 10;
715            oldPercentUsage = (oldPercentUsage / 10) * 10;
716            if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
717                checkpoint(false);
718            }
719        }
720    
721        public AMQTransactionStore getTransactionStore() {
722            return transactionStore;
723        }
724    
725        public synchronized void deleteAllMessages() throws IOException {
726            deleteAllMessages = true;
727        }
728    
729        public String toString() {
730            return "AMQPersistenceAdapter(" + directory + ")";
731        }
732    
733        // /////////////////////////////////////////////////////////////////
734        // Subclass overridables
735        // /////////////////////////////////////////////////////////////////
736        protected AsyncDataManager createAsyncDataManager() {
737            AsyncDataManager manager = new AsyncDataManager(storeSize);
738            manager.setDirectory(new File(directory, "journal"));
739            manager.setDirectoryArchive(getDirectoryArchive());
740            manager.setArchiveDataLogs(isArchiveDataLogs());
741            manager.setMaxFileLength(maxFileLength);
742            manager.setUseNio(useNio);    
743            return manager;
744        }
745    
746        protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
747            KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
748            adaptor.setPersistentIndex(isPersistentIndex());
749            adaptor.setIndexBinSize(getIndexBinSize());
750            adaptor.setIndexKeySize(getIndexKeySize());
751            adaptor.setIndexPageSize(getIndexPageSize());
752            adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
753            adaptor.setIndexLoadFactor(getIndexLoadFactor());
754            return adaptor;
755        }
756    
757        protected TaskRunnerFactory createTaskRunnerFactory() {
758            return DefaultThreadPools.getDefaultTaskRunnerFactory();
759        }
760    
761        // /////////////////////////////////////////////////////////////////
762        // Property Accessors
763        // /////////////////////////////////////////////////////////////////
764        public AsyncDataManager getAsyncDataManager() {
765            return asyncDataManager;
766        }
767    
768        public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
769            this.asyncDataManager = asyncDataManager;
770        }
771    
772        public ReferenceStoreAdapter getReferenceStoreAdapter() {
773            return referenceStoreAdapter;
774        }
775    
776        public TaskRunnerFactory getTaskRunnerFactory() {
777            return taskRunnerFactory;
778        }
779    
780        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
781            this.taskRunnerFactory = taskRunnerFactory;
782        }
783    
784        /**
785         * @return Returns the wireFormat.
786         */
787        public WireFormat getWireFormat() {
788            return wireFormat;
789        }
790    
791        public void setWireFormat(WireFormat wireFormat) {
792            this.wireFormat = wireFormat;
793        }
794    
795        public SystemUsage getUsageManager() {
796            return usageManager;
797        }
798    
799        public void setUsageManager(SystemUsage usageManager) {
800            this.usageManager = usageManager;
801        }
802    
803        public int getMaxCheckpointMessageAddSize() {
804            return maxCheckpointMessageAddSize;
805        }
806    
807        /** 
808         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
809         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
810         */
811        public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
812            this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
813        }
814    
815       
816        public synchronized File getDirectory() {
817            return directory;
818        }
819    
820        public synchronized void setDirectory(File directory) {
821            this.directory = directory;
822        }
823    
824        public boolean isSyncOnWrite() {
825            return this.syncOnWrite;
826        }
827    
828        public void setSyncOnWrite(boolean syncOnWrite) {
829            this.syncOnWrite = syncOnWrite;
830        }
831        
832        public boolean isSyncOnTransaction() {
833            return syncOnTransaction;
834        }
835    
836        public void setSyncOnTransaction(boolean syncOnTransaction) {
837            this.syncOnTransaction = syncOnTransaction;
838        }
839    
840        /**
841         * @param referenceStoreAdapter the referenceStoreAdapter to set
842         */
843        public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
844            this.referenceStoreAdapter = referenceStoreAdapter;
845        }
846        
847        public long size(){
848            return storeSize.get();
849        }
850    
851            public boolean isUseNio() {
852                    return useNio;
853            }
854    
855            public void setUseNio(boolean useNio) {
856                    this.useNio = useNio;
857            }
858    
859            public int getMaxFileLength() {
860                    return maxFileLength;
861            }
862    
863             /**
864          * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
865          * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
866          */
867            public void setMaxFileLength(int maxFileLength) {
868                    this.maxFileLength = maxFileLength;
869            }
870            
871            public long getCleanupInterval() {
872            return cleanupInterval;
873        }
874    
875        public void setCleanupInterval(long cleanupInterval) {
876            this.cleanupInterval = cleanupInterval;
877        }
878    
879        public long getCheckpointInterval() {
880            return checkpointInterval;
881        }
882    
883        public void setCheckpointInterval(long checkpointInterval) {
884            this.checkpointInterval = checkpointInterval;
885        }
886        
887        public int getIndexBinSize() {
888            return indexBinSize;
889        }
890    
891        public void setIndexBinSize(int indexBinSize) {
892            this.indexBinSize = indexBinSize;
893        }
894    
895        public int getIndexKeySize() {
896            return indexKeySize;
897        }
898    
899        public void setIndexKeySize(int indexKeySize) {
900            this.indexKeySize = indexKeySize;
901        }
902    
903        public int getIndexPageSize() {
904            return indexPageSize;
905        }
906        
907        public int getIndexMaxBinSize() {
908            return indexMaxBinSize;
909        }
910    
911        public void setIndexMaxBinSize(int maxBinSize) {
912            this.indexMaxBinSize = maxBinSize;
913        }
914    
915        /**
916         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
917         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
918         */
919        public void setIndexPageSize(int indexPageSize) {
920            this.indexPageSize = indexPageSize;
921        }
922        
923        public void setIndexLoadFactor(int factor){
924            this.indexLoadFactor=factor;    
925        }
926        
927        public int getIndexLoadFactor(){
928            return this.indexLoadFactor;
929        }
930        
931        public int getMaxReferenceFileLength() {
932            return maxReferenceFileLength;
933        }
934    
935        /**
936         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
937         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
938         */
939        public void setMaxReferenceFileLength(int maxReferenceFileLength) {
940            this.maxReferenceFileLength = maxReferenceFileLength;
941        }
942        
943        public File getDirectoryArchive() {
944            return directoryArchive;
945        }
946    
947        public void setDirectoryArchive(File directoryArchive) {
948            this.directoryArchive = directoryArchive;
949        }
950    
951        public boolean isArchiveDataLogs() {
952            return archiveDataLogs;
953        }
954    
955        public void setArchiveDataLogs(boolean archiveDataLogs) {
956            this.archiveDataLogs = archiveDataLogs;
957        }  
958        
959        public boolean isDisableLocking() {
960            return disableLocking;
961        }
962    
963        public void setDisableLocking(boolean disableLocking) {
964            this.disableLocking = disableLocking;
965        }
966        
967        /**
968         * @return the recoverReferenceStore
969         */
970        public boolean isRecoverReferenceStore() {
971            return recoverReferenceStore;
972        }
973    
974        /**
975         * @param recoverReferenceStore the recoverReferenceStore to set
976         */
977        public void setRecoverReferenceStore(boolean recoverReferenceStore) {
978            this.recoverReferenceStore = recoverReferenceStore;
979        }
980    
981        /**
982         * @return the forceRecoverReferenceStore
983         */
984        public boolean isForceRecoverReferenceStore() {
985            return forceRecoverReferenceStore;
986        }
987    
988        /**
989         * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
990         */
991        public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
992            this.forceRecoverReferenceStore = forceRecoverReferenceStore;
993        }
994    
995            
996            protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
997                Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
998                if (map == null) {
999                    map = new ConcurrentHashMap<Integer, AtomicInteger>();
1000                    dataFilesInProgress.put(store, map);
1001                }
1002                AtomicInteger count = map.get(dataFileId);
1003                if (count == null) {
1004                    count = new AtomicInteger(0);
1005                    map.put(dataFileId, count);
1006                }
1007                count.incrementAndGet();
1008            }
1009            
1010            protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
1011            Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1012            if (map != null) {
1013                AtomicInteger count = map.get(dataFileId);
1014                if (count != null) {
1015                    int newCount = count.decrementAndGet(); 
1016                    if (newCount <=0) {
1017                        map.remove(dataFileId);
1018                    }
1019                }
1020                if (map.isEmpty()) {
1021                    dataFilesInProgress.remove(store);
1022                }
1023            }
1024        }
1025            
1026            
1027            
1028            protected void lock() throws Exception {
1029            lockLogged = false;
1030            lockAquired = false;
1031            do {
1032                if (doLock()) {
1033                    lockAquired = true;
1034                } else {
1035                    if (!lockLogged) {
1036                        LOG.warn("Waiting to Lock the Store " + getDirectory());
1037                        lockLogged = true;
1038                    }
1039                    Thread.sleep(1000);
1040                }
1041    
1042            } while (!lockAquired && !disableLocking);
1043        }
1044            
1045            private synchronized void unlock() throws IOException {
1046            if (!disableLocking && (null != lock)) {
1047                //clear property doesn't work on some platforms
1048                System.getProperties().remove(getPropertyKey());
1049                System.clearProperty(getPropertyKey());
1050                assert(System.getProperty(getPropertyKey())==null);
1051                if (lock.isValid()) {
1052                    lock.release();
1053                    lock.channel().close();
1054                    
1055                }
1056                lock = null;
1057            }
1058        }
1059    
1060            
1061            protected boolean doLock() throws IOException {
1062                boolean result = true;
1063                if (!disableLocking && directory != null && lock == null) {
1064                String key = getPropertyKey();
1065                String property = System.getProperty(key);
1066                if (null == property) {
1067                    if (!BROKEN_FILE_LOCK) {
1068                        lock = lockFile.getChannel().tryLock();
1069                        if (lock == null) {
1070                            result = false;
1071                        } else {
1072                            System.setProperty(key, new Date().toString());
1073                        }
1074                    }
1075                } else { // already locked
1076                    result = false;
1077                }
1078            }
1079                return result;
1080            }
1081            
1082            private String getPropertyKey() throws IOException {
1083            return getClass().getName() + ".lock." + directory.getCanonicalPath();
1084        }
1085            
1086            static {
1087                BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
1088                        + ".FileLockBroken",
1089                        "false"));
1090                DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
1091                       + ".DisableLocking",
1092                       "false"));
1093            }
1094    }