001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import org.apache.activeio.journal.Journal;
020    import org.apache.activemq.broker.BrokerService;
021    import org.apache.activemq.broker.BrokerServiceAware;
022    import org.apache.activemq.broker.ConnectionContext;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.ActiveMQQueue;
025    import org.apache.activemq.command.ActiveMQTopic;
026    import org.apache.activemq.store.MessageStore;
027    import org.apache.activemq.store.PersistenceAdapter;
028    import org.apache.activemq.store.TopicMessageStore;
029    import org.apache.activemq.store.TransactionStore;
030    import org.apache.activemq.usage.SystemUsage;
031    import java.io.File;
032    import java.io.IOException;
033    import java.util.Set;
034    /**
035     * An implementation of {@link PersistenceAdapter} designed for use with a
036     * {@link Journal} and then check pointing asynchronously on a timeout with some
037     * other long term persistent storage.
038     * 
039     * @org.apache.xbean.XBean element="kahaDB"
040     * @version $Revision: 1.17 $
041     */
042    public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
043        private KahaDBStore letter = new KahaDBStore();
044        
045    
046        /**
047         * @param context
048         * @throws IOException
049         * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
050         */
051        public void beginTransaction(ConnectionContext context) throws IOException {
052            this.letter.beginTransaction(context);
053        }
054    
055        /**
056         * @param sync
057         * @throws IOException
058         * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
059         */
060        public void checkpoint(boolean sync) throws IOException {
061            this.letter.checkpoint(sync);
062        }
063    
064        /**
065         * @param context
066         * @throws IOException
067         * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
068         */
069        public void commitTransaction(ConnectionContext context) throws IOException {
070            this.letter.commitTransaction(context);
071        }
072    
073        /**
074         * @param destination
075         * @return MessageStore
076         * @throws IOException
077         * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
078         */
079        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
080            return this.letter.createQueueMessageStore(destination);
081        }
082    
083        /**
084         * @param destination
085         * @return TopicMessageStore
086         * @throws IOException
087         * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
088         */
089        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
090            return this.letter.createTopicMessageStore(destination);
091        }
092    
093        /**
094         * @return TrandactionStore
095         * @throws IOException
096         * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
097         */
098        public TransactionStore createTransactionStore() throws IOException {
099            return this.letter.createTransactionStore();
100        }
101    
102        /**
103         * @throws IOException
104         * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
105         */
106        public void deleteAllMessages() throws IOException {
107            this.letter.deleteAllMessages();
108        }
109    
110        /**
111         * @return destinations
112         * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
113         */
114        public Set<ActiveMQDestination> getDestinations() {
115            return this.letter.getDestinations();
116        }
117    
118        /**
119         * @return lastMessageBrokerSequenceId
120         * @throws IOException
121         * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
122         */
123        public long getLastMessageBrokerSequenceId() throws IOException {
124            return this.letter.getLastMessageBrokerSequenceId();
125        }
126    
127        /**
128         * @param destination
129         * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
130         */
131        public void removeQueueMessageStore(ActiveMQQueue destination) {
132            this.letter.removeQueueMessageStore(destination);
133        }
134    
135        /**
136         * @param destination
137         * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
138         */
139        public void removeTopicMessageStore(ActiveMQTopic destination) {
140            this.letter.removeTopicMessageStore(destination);
141        }
142    
143        /**
144         * @param context
145         * @throws IOException
146         * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
147         */
148        public void rollbackTransaction(ConnectionContext context) throws IOException {
149            this.letter.rollbackTransaction(context);
150        }
151    
152        /**
153         * @param brokerName
154         * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
155         */
156        public void setBrokerName(String brokerName) {
157            this.letter.setBrokerName(brokerName);
158        }
159    
160        
161    
162        /**
163         * @param usageManager
164         * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
165         */
166        public void setUsageManager(SystemUsage usageManager) {
167            this.letter.setUsageManager(usageManager);
168        }
169    
170        /**
171         * @return the size of the store
172         * @see org.apache.activemq.store.PersistenceAdapter#size()
173         */
174        public long size() {
175            return this.letter.size();
176        }
177    
178        /**
179         * @throws Exception
180         * @see org.apache.activemq.Service#start()
181         */
182        public void start() throws Exception {
183            this.letter.start();
184        }
185    
186        /**
187         * @throws Exception
188         * @see org.apache.activemq.Service#stop()
189         */
190        public void stop() throws Exception {
191            this.letter.stop();
192        }
193    
194        /**
195         * Get the journalMaxFileLength
196         * @return the journalMaxFileLength
197         */
198        public int getJournalMaxFileLength() {
199            return this.letter.getJournalMaxFileLength();
200        }
201    
202        /**
203         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
204         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
205         */
206        public void setJournalMaxFileLength(int journalMaxFileLength) {
207            this.letter.setJournalMaxFileLength(journalMaxFileLength);
208        }
209    
210        /**
211         * Get the checkpointInterval
212         * @return the checkpointInterval
213         */
214        public long getCheckpointInterval() {
215            return this.letter.getCheckpointInterval();
216        }
217    
218        /**
219         * Set the checkpointInterval
220         * @param checkpointInterval the checkpointInterval to set
221         */
222        public void setCheckpointInterval(long checkpointInterval) {
223            this.letter.setCheckpointInterval(checkpointInterval);
224        }
225    
226        /**
227         * Get the cleanupInterval
228         * @return the cleanupInterval
229         */
230        public long getCleanupInterval() {
231            return this.letter.getCleanupInterval();
232        }
233    
234        /**
235         * Set the cleanupInterval
236         * @param cleanupInterval the cleanupInterval to set
237         */
238        public void setCleanupInterval(long cleanupInterval) {
239            this.letter.setCleanupInterval(cleanupInterval);
240        }
241    
242        /**
243         * Get the indexWriteBatchSize
244         * @return the indexWriteBatchSize
245         */
246        public int getIndexWriteBatchSize() {
247            return this.letter.getIndexWriteBatchSize();
248        }
249    
250        /**
251         * Set the indexWriteBatchSize
252         * @param indexWriteBatchSize the indexWriteBatchSize to set
253         */
254        public void setIndexWriteBatchSize(int indexWriteBatchSize) {
255            this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
256        }
257    
258        /**
259         * Get the journalMaxWriteBatchSize
260         * @return the journalMaxWriteBatchSize
261         */
262        public int getJournalMaxWriteBatchSize() {
263            return this.letter.getJournalMaxWriteBatchSize();
264        }
265    
266        /**
267         * Set the journalMaxWriteBatchSize
268         * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set
269         */
270        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
271            this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
272        }
273        
274        /**
275         * Get the enableIndexWriteAsync
276         * @return the enableIndexWriteAsync
277         */
278        public boolean isEnableIndexWriteAsync() {
279            return this.letter.isEnableIndexWriteAsync();
280        }
281    
282        /**
283         * Set the enableIndexWriteAsync
284         * @param enableIndexWriteAsync the enableIndexWriteAsync to set
285         */
286        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
287            this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
288        }
289    
290        /**
291         * Get the directory
292         * @return the directory
293         */
294        public File getDirectory() {
295            return this.letter.getDirectory();
296        }
297        
298        /**
299         * @param dir
300         * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
301         */
302        public void setDirectory(File dir) {
303            this.letter.setDirectory(dir);
304        }
305    
306        /**
307         * Get the enableJournalDiskSyncs
308         * @return the enableJournalDiskSyncs
309         */
310        public boolean isEnableJournalDiskSyncs() {
311            return this.letter.isEnableJournalDiskSyncs();
312        }
313    
314        /**
315         * Set the enableJournalDiskSyncs
316         * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set
317         */
318        public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
319            this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
320        }
321        
322        /**
323         * Get the indexCacheSize
324         * @return the indexCacheSize
325         */
326        public int getIndexCacheSize() {
327            return this.letter.getIndexCacheSize();
328        }
329    
330        /**
331         * Set the indexCacheSize
332         * @param indexCacheSize the indexCacheSize to set
333         */
334        public void setIndexCacheSize(int indexCacheSize) {
335            this.letter.setIndexCacheSize(indexCacheSize);
336        }
337        
338        /**
339         * Get the ignoreMissingJournalfiles
340         * @return the ignoreMissingJournalfiles
341         */
342        public boolean isIgnoreMissingJournalfiles() {
343            return this.letter.isIgnoreMissingJournalfiles();
344        }
345    
346        /**
347         * Set the ignoreMissingJournalfiles
348         * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set
349         */
350        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
351            this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
352        }
353    
354        public boolean isChecksumJournalFiles() {
355            return letter.isChecksumJournalFiles();
356        }
357    
358        public boolean isCheckForCorruptJournalFiles() {
359            return letter.isCheckForCorruptJournalFiles();
360        }
361    
362        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
363            letter.setChecksumJournalFiles(checksumJournalFiles);
364        }
365    
366        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
367            letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
368        }
369    
370            public void setBrokerService(BrokerService brokerService) {
371                    letter.setBrokerService(brokerService);
372            }
373    }