001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Set;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.CountDownLatch;
028    import java.util.concurrent.FutureTask;
029    import java.util.concurrent.LinkedBlockingQueue;
030    import java.util.concurrent.ThreadFactory;
031    import java.util.concurrent.ThreadPoolExecutor;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    
035    import org.apache.activeio.journal.InvalidRecordLocationException;
036    import org.apache.activeio.journal.Journal;
037    import org.apache.activeio.journal.JournalEventListener;
038    import org.apache.activeio.journal.RecordLocation;
039    import org.apache.activeio.packet.ByteArrayPacket;
040    import org.apache.activeio.packet.Packet;
041    import org.apache.activemq.broker.BrokerService;
042    import org.apache.activemq.broker.BrokerServiceAware;
043    import org.apache.activemq.broker.ConnectionContext;
044    import org.apache.activemq.command.ActiveMQDestination;
045    import org.apache.activemq.command.ActiveMQQueue;
046    import org.apache.activemq.command.ActiveMQTopic;
047    import org.apache.activemq.command.DataStructure;
048    import org.apache.activemq.command.JournalQueueAck;
049    import org.apache.activemq.command.JournalTopicAck;
050    import org.apache.activemq.command.JournalTrace;
051    import org.apache.activemq.command.JournalTransaction;
052    import org.apache.activemq.command.Message;
053    import org.apache.activemq.command.MessageAck;
054    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055    import org.apache.activemq.openwire.OpenWireFormat;
056    import org.apache.activemq.store.MessageStore;
057    import org.apache.activemq.store.PersistenceAdapter;
058    import org.apache.activemq.store.TopicMessageStore;
059    import org.apache.activemq.store.TransactionStore;
060    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061    import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062    import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063    import org.apache.activemq.thread.Scheduler;
064    import org.apache.activemq.thread.Task;
065    import org.apache.activemq.thread.TaskRunner;
066    import org.apache.activemq.thread.TaskRunnerFactory;
067    import org.apache.activemq.usage.Usage;
068    import org.apache.activemq.usage.UsageListener;
069    import org.apache.activemq.usage.SystemUsage;
070    import org.apache.activemq.util.ByteSequence;
071    import org.apache.activemq.util.IOExceptionSupport;
072    import org.apache.activemq.wireformat.WireFormat;
073    import org.apache.commons.logging.Log;
074    import org.apache.commons.logging.LogFactory;
075    
076    /**
077     * An implementation of {@link PersistenceAdapter} designed for use with a
078     * {@link Journal} and then check pointing asynchronously on a timeout with some
079     * other long term persistent storage.
080     * 
081     * @org.apache.xbean.XBean
082     * @version $Revision: 1.17 $
083     */
084    public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
085    
086        private BrokerService brokerService;
087            
088        protected static final Scheduler scheduler = Scheduler.getInstance();
089        private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
090    
091        private Journal journal;
092        private PersistenceAdapter longTermPersistence;
093    
094        private final WireFormat wireFormat = new OpenWireFormat();
095    
096        private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
097        private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
098    
099        private SystemUsage usageManager;
100        private long checkpointInterval = 1000 * 60 * 5;
101        private long lastCheckpointRequest = System.currentTimeMillis();
102        private long lastCleanup = System.currentTimeMillis();
103        private int maxCheckpointWorkers = 10;
104        private int maxCheckpointMessageAddSize = 1024 * 1024;
105    
106        private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
107        private ThreadPoolExecutor checkpointExecutor;
108    
109        private TaskRunner checkpointTask;
110        private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
111        private boolean fullCheckPoint;
112    
113        private AtomicBoolean started = new AtomicBoolean(false);
114    
115        private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
116    
117        private TaskRunnerFactory taskRunnerFactory;
118    
119        public JournalPersistenceAdapter() {        
120        }
121        
122        public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
123            setJournal(journal);
124            setTaskRunnerFactory(taskRunnerFactory);
125            setPersistenceAdapter(longTermPersistence);
126        }
127    
128        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
129            this.taskRunnerFactory = taskRunnerFactory;
130        }
131    
132        public void setJournal(Journal journal) {
133            this.journal = journal;
134            journal.setJournalEventListener(this);
135        }
136        
137        public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
138            this.longTermPersistence = longTermPersistence;
139        }
140        
141        final Runnable createPeriodicCheckpointTask() {
142            return new Runnable() {
143                public void run() {
144                    long lastTime = 0;
145                    synchronized (this) {
146                        lastTime = lastCheckpointRequest;
147                    }
148                    if (System.currentTimeMillis() > lastTime + checkpointInterval) {
149                        checkpoint(false, true);
150                    }
151                }
152            };
153        }
154    
155        /**
156         * @param usageManager The UsageManager that is controlling the
157         *                destination's memory usage.
158         */
159        public void setUsageManager(SystemUsage usageManager) {
160            this.usageManager = usageManager;
161            longTermPersistence.setUsageManager(usageManager);
162        }
163    
164        public Set<ActiveMQDestination> getDestinations() {
165            Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
166            destinations.addAll(queues.keySet());
167            destinations.addAll(topics.keySet());
168            return destinations;
169        }
170    
171        private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
172            if (destination.isQueue()) {
173                return createQueueMessageStore((ActiveMQQueue)destination);
174            } else {
175                return createTopicMessageStore((ActiveMQTopic)destination);
176            }
177        }
178    
179        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
180            JournalMessageStore store = queues.get(destination);
181            if (store == null) {
182                MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
183                store = new JournalMessageStore(this, checkpointStore, destination);
184                queues.put(destination, store);
185            }
186            return store;
187        }
188    
189        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
190            JournalTopicMessageStore store = topics.get(destinationName);
191            if (store == null) {
192                TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
193                store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
194                topics.put(destinationName, store);
195            }
196            return store;
197        }
198    
199        /**
200         * Cleanup method to remove any state associated with the given destination
201         *
202         * @param destination Destination to forget
203         */
204        public void removeQueueMessageStore(ActiveMQQueue destination) {
205            queues.remove(destination);
206        }
207    
208        /**
209         * Cleanup method to remove any state associated with the given destination
210         *
211         * @param destination Destination to forget
212         */
213        public void removeTopicMessageStore(ActiveMQTopic destination) {
214            topics.remove(destination);
215        }
216    
217        public TransactionStore createTransactionStore() throws IOException {
218            return transactionStore;
219        }
220    
221        public long getLastMessageBrokerSequenceId() throws IOException {
222            return longTermPersistence.getLastMessageBrokerSequenceId();
223        }
224    
225        public void beginTransaction(ConnectionContext context) throws IOException {
226            longTermPersistence.beginTransaction(context);
227        }
228    
229        public void commitTransaction(ConnectionContext context) throws IOException {
230            longTermPersistence.commitTransaction(context);
231        }
232    
233        public void rollbackTransaction(ConnectionContext context) throws IOException {
234            longTermPersistence.rollbackTransaction(context);
235        }
236    
237        public synchronized void start() throws Exception {
238            if (!started.compareAndSet(false, true)) {
239                return;
240            }
241    
242            checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
243                public boolean iterate() {
244                    return doCheckpoint();
245                }
246            }, "ActiveMQ Journal Checkpoint Worker");
247    
248            checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
249                public Thread newThread(Runnable runable) {
250                    Thread t = new Thread(runable, "Journal checkpoint worker");
251                    t.setPriority(7);
252                    return t;
253                }
254            });
255            // checkpointExecutor.allowCoreThreadTimeOut(true);
256    
257            this.usageManager.getMemoryUsage().addUsageListener(this);
258    
259            if (longTermPersistence instanceof JDBCPersistenceAdapter) {
260                // Disabled periodic clean up as it deadlocks with the checkpoint
261                // operations.
262                ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
263            }
264    
265            longTermPersistence.start();
266            createTransactionStore();
267            recover();
268    
269            // Do a checkpoint periodically.
270            scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
271    
272        }
273    
274        public void stop() throws Exception {
275    
276            this.usageManager.getMemoryUsage().removeUsageListener(this);
277            if (!started.compareAndSet(true, false)) {
278                return;
279            }
280    
281            scheduler.cancel(periodicCheckpointTask);
282    
283            // Take one final checkpoint and stop checkpoint processing.
284            checkpoint(true, true);
285            checkpointTask.shutdown();
286            checkpointExecutor.shutdown();
287    
288            queues.clear();
289            topics.clear();
290    
291            IOException firstException = null;
292            try {
293                journal.close();
294            } catch (Exception e) {
295                firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
296            }
297            longTermPersistence.stop();
298    
299            if (firstException != null) {
300                throw firstException;
301            }
302        }
303    
304        // Properties
305        // -------------------------------------------------------------------------
306        public PersistenceAdapter getLongTermPersistence() {
307            return longTermPersistence;
308        }
309    
310        /**
311         * @return Returns the wireFormat.
312         */
313        public WireFormat getWireFormat() {
314            return wireFormat;
315        }
316    
317        // Implementation methods
318        // -------------------------------------------------------------------------
319    
320        /**
321         * The Journal give us a call back so that we can move old data out of the
322         * journal. Taking a checkpoint does this for us.
323         * 
324         * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
325         */
326        public void overflowNotification(RecordLocation safeLocation) {
327            checkpoint(false, true);
328        }
329    
330        /**
331         * When we checkpoint we move all the journalled data to long term storage.
332         * 
333         * @param stopping
334         * @param b
335         */
336        public void checkpoint(boolean sync, boolean fullCheckpoint) {
337            try {
338                if (journal == null) {
339                    throw new IllegalStateException("Journal is closed.");
340                }
341    
342                long now = System.currentTimeMillis();
343                CountDownLatch latch = null;
344                synchronized (this) {
345                    latch = nextCheckpointCountDownLatch;
346                    lastCheckpointRequest = now;
347                    if (fullCheckpoint) {
348                        this.fullCheckPoint = true;
349                    }
350                }
351    
352                checkpointTask.wakeup();
353    
354                if (sync) {
355                    LOG.debug("Waking for checkpoint to complete.");
356                    latch.await();
357                }
358            } catch (InterruptedException e) {
359                Thread.currentThread().interrupt();
360                LOG.warn("Request to start checkpoint failed: " + e, e);
361            }
362        }
363    
364        public void checkpoint(boolean sync) {
365            checkpoint(sync, sync);
366        }
367    
368        /**
369         * This does the actual checkpoint.
370         * 
371         * @return
372         */
373        public boolean doCheckpoint() {
374            CountDownLatch latch = null;
375            boolean fullCheckpoint;
376            synchronized (this) {
377                latch = nextCheckpointCountDownLatch;
378                nextCheckpointCountDownLatch = new CountDownLatch(1);
379                fullCheckpoint = this.fullCheckPoint;
380                this.fullCheckPoint = false;
381            }
382            try {
383    
384                LOG.debug("Checkpoint started.");
385                RecordLocation newMark = null;
386    
387                ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
388    
389                //
390                // We do many partial checkpoints (fullCheckpoint==false) to move
391                // topic messages
392                // to long term store as soon as possible.
393                // 
394                // We want to avoid doing that for queue messages since removes the
395                // come in the same
396                // checkpoint cycle will nullify the previous message add.
397                // Therefore, we only
398                // checkpoint queues on the fullCheckpoint cycles.
399                //
400                if (fullCheckpoint) {
401                    Iterator<JournalMessageStore> iterator = queues.values().iterator();
402                    while (iterator.hasNext()) {
403                        try {
404                            final JournalMessageStore ms = iterator.next();
405                            FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
406                                public RecordLocation call() throws Exception {
407                                    return ms.checkpoint();
408                                }
409                            });
410                            futureTasks.add(task);
411                            checkpointExecutor.execute(task);
412                        } catch (Exception e) {
413                            LOG.error("Failed to checkpoint a message store: " + e, e);
414                        }
415                    }
416                }
417    
418                Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
419                while (iterator.hasNext()) {
420                    try {
421                        final JournalTopicMessageStore ms = iterator.next();
422                        FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
423                            public RecordLocation call() throws Exception {
424                                return ms.checkpoint();
425                            }
426                        });
427                        futureTasks.add(task);
428                        checkpointExecutor.execute(task);
429                    } catch (Exception e) {
430                        LOG.error("Failed to checkpoint a message store: " + e, e);
431                    }
432                }
433    
434                try {
435                    for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
436                        FutureTask<RecordLocation> ft = iter.next();
437                        RecordLocation mark = ft.get();
438                        // We only set a newMark on full checkpoints.
439                        if (fullCheckpoint) {
440                            if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
441                                newMark = mark;
442                            }
443                        }
444                    }
445                } catch (Throwable e) {
446                    LOG.error("Failed to checkpoint a message store: " + e, e);
447                }
448    
449                if (fullCheckpoint) {
450                    try {
451                        if (newMark != null) {
452                            LOG.debug("Marking journal at: " + newMark);
453                            journal.setMark(newMark, true);
454                        }
455                    } catch (Exception e) {
456                        LOG.error("Failed to mark the Journal: " + e, e);
457                    }
458    
459                    if (longTermPersistence instanceof JDBCPersistenceAdapter) {
460                        // We may be check pointing more often than the
461                        // checkpointInterval if under high use
462                        // But we don't want to clean up the db that often.
463                        long now = System.currentTimeMillis();
464                        if (now > lastCleanup + checkpointInterval) {
465                            lastCleanup = now;
466                            ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
467                        }
468                    }
469                }
470    
471                LOG.debug("Checkpoint done.");
472            } finally {
473                latch.countDown();
474            }
475            synchronized (this) {
476                return this.fullCheckPoint;
477            }
478    
479        }
480    
481        /**
482         * @param location
483         * @return
484         * @throws IOException
485         */
486        public DataStructure readCommand(RecordLocation location) throws IOException {
487            try {
488                Packet packet = journal.read(location);
489                return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
490            } catch (InvalidRecordLocationException e) {
491                throw createReadException(location, e);
492            } catch (IOException e) {
493                throw createReadException(location, e);
494            }
495        }
496    
497        /**
498         * Move all the messages that were in the journal into long term storage. We
499         * just replay and do a checkpoint.
500         * 
501         * @throws IOException
502         * @throws IOException
503         * @throws InvalidRecordLocationException
504         * @throws IllegalStateException
505         */
506        private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
507    
508            RecordLocation pos = null;
509            int transactionCounter = 0;
510    
511            LOG.info("Journal Recovery Started from: " + journal);
512            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
513    
514            // While we have records in the journal.
515            while ((pos = journal.getNextRecordLocation(pos)) != null) {
516                Packet data = journal.read(pos);
517                DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
518    
519                if (c instanceof Message) {
520                    Message message = (Message)c;
521                    JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
522                    if (message.isInTransaction()) {
523                        transactionStore.addMessage(store, message, pos);
524                    } else {
525                        store.replayAddMessage(context, message);
526                        transactionCounter++;
527                    }
528                } else {
529                    switch (c.getDataStructureType()) {
530                    case JournalQueueAck.DATA_STRUCTURE_TYPE: {
531                        JournalQueueAck command = (JournalQueueAck)c;
532                        JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
533                        if (command.getMessageAck().isInTransaction()) {
534                            transactionStore.removeMessage(store, command.getMessageAck(), pos);
535                        } else {
536                            store.replayRemoveMessage(context, command.getMessageAck());
537                            transactionCounter++;
538                        }
539                    }
540                        break;
541                    case JournalTopicAck.DATA_STRUCTURE_TYPE: {
542                        JournalTopicAck command = (JournalTopicAck)c;
543                        JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
544                        if (command.getTransactionId() != null) {
545                            transactionStore.acknowledge(store, command, pos);
546                        } else {
547                            store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
548                            transactionCounter++;
549                        }
550                    }
551                        break;
552                    case JournalTransaction.DATA_STRUCTURE_TYPE: {
553                        JournalTransaction command = (JournalTransaction)c;
554                        try {
555                            // Try to replay the packet.
556                            switch (command.getType()) {
557                            case JournalTransaction.XA_PREPARE:
558                                transactionStore.replayPrepare(command.getTransactionId());
559                                break;
560                            case JournalTransaction.XA_COMMIT:
561                            case JournalTransaction.LOCAL_COMMIT:
562                                Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
563                                if (tx == null) {
564                                    break; // We may be trying to replay a commit
565                                }
566                                // that
567                                // was already committed.
568    
569                                // Replay the committed operations.
570                                tx.getOperations();
571                                for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
572                                    TxOperation op = (TxOperation)iter.next();
573                                    if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
574                                        op.store.replayAddMessage(context, (Message)op.data);
575                                    }
576                                    if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
577                                        op.store.replayRemoveMessage(context, (MessageAck)op.data);
578                                    }
579                                    if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
580                                        JournalTopicAck ack = (JournalTopicAck)op.data;
581                                        ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
582                                    }
583                                }
584                                transactionCounter++;
585                                break;
586                            case JournalTransaction.LOCAL_ROLLBACK:
587                            case JournalTransaction.XA_ROLLBACK:
588                                transactionStore.replayRollback(command.getTransactionId());
589                                break;
590                            default:
591                                throw new IOException("Invalid journal command type: " + command.getType());
592                            }
593                        } catch (IOException e) {
594                            LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
595                        }
596                    }
597                        break;
598                    case JournalTrace.DATA_STRUCTURE_TYPE:
599                        JournalTrace trace = (JournalTrace)c;
600                        LOG.debug("TRACE Entry: " + trace.getMessage());
601                        break;
602                    default:
603                        LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
604                    }
605                }
606            }
607    
608            RecordLocation location = writeTraceMessage("RECOVERED", true);
609            journal.setMark(location, true);
610    
611            LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
612        }
613    
614        private IOException createReadException(RecordLocation location, Exception e) {
615            return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
616        }
617    
618        protected IOException createWriteException(DataStructure packet, Exception e) {
619            return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
620        }
621    
622        protected IOException createWriteException(String command, Exception e) {
623            return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
624        }
625    
626        protected IOException createRecoveryFailedException(Exception e) {
627            return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
628        }
629    
630        /**
631         * @param command
632         * @param sync
633         * @return
634         * @throws IOException
635         */
636        public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
637            if (started.get()) {
638                try {
639                        return journal.write(toPacket(wireFormat.marshal(command)), sync);
640                } catch (IOException ioe) {
641                        LOG.error("Cannot write to the journal", ioe);
642                        brokerService.handleIOException(ioe);
643                        throw ioe;
644                }
645            }
646            throw new IOException("closed");
647        }
648    
649        private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
650            JournalTrace trace = new JournalTrace();
651            trace.setMessage(message);
652            return writeCommand(trace, sync);
653        }
654    
655        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
656            newPercentUsage = (newPercentUsage / 10) * 10;
657            oldPercentUsage = (oldPercentUsage / 10) * 10;
658            if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
659                boolean sync = newPercentUsage >= 90;
660                checkpoint(sync, true);
661            }
662        }
663    
664        public JournalTransactionStore getTransactionStore() {
665            return transactionStore;
666        }
667    
668        public void deleteAllMessages() throws IOException {
669            try {
670                JournalTrace trace = new JournalTrace();
671                trace.setMessage("DELETED");
672                RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
673                journal.setMark(location, true);
674                LOG.info("Journal deleted: ");
675            } catch (IOException e) {
676                throw e;
677            } catch (Throwable e) {
678                throw IOExceptionSupport.create(e);
679            }
680            longTermPersistence.deleteAllMessages();
681        }
682    
683        public SystemUsage getUsageManager() {
684            return usageManager;
685        }
686    
687        public int getMaxCheckpointMessageAddSize() {
688            return maxCheckpointMessageAddSize;
689        }
690    
691        public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
692            this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
693        }
694    
695        public int getMaxCheckpointWorkers() {
696            return maxCheckpointWorkers;
697        }
698    
699        public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
700            this.maxCheckpointWorkers = maxCheckpointWorkers;
701        }
702    
703        public boolean isUseExternalMessageReferences() {
704            return false;
705        }
706    
707        public void setUseExternalMessageReferences(boolean enable) {
708            if (enable) {
709                throw new IllegalArgumentException("The journal does not support message references.");
710            }
711        }
712    
713        public Packet toPacket(ByteSequence sequence) {
714            return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
715        }
716    
717        public ByteSequence toByteSequence(Packet packet) {
718            org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
719            return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
720        }
721    
722        public void setBrokerName(String brokerName) {
723            longTermPersistence.setBrokerName(brokerName);
724        }
725    
726        public String toString() {
727            return "JournalPersistenceAdapator(" + longTermPersistence + ")";
728        }
729    
730        public void setDirectory(File dir) {
731        }
732        
733        public long size(){
734            return 0;
735        }
736    
737        public void setBrokerService(BrokerService brokerService) {
738            this.brokerService = brokerService;
739            PersistenceAdapter pa = getLongTermPersistence();
740            if( pa instanceof BrokerServiceAware ) {
741                ((BrokerServiceAware)pa).setBrokerService(brokerService);
742            }
743        }
744    
745    }