org.apache.activemq.store.kahadb
Class MessageDatabase

java.lang.Object
  extended by org.apache.activemq.store.kahadb.MessageDatabase
All Implemented Interfaces:
BrokerServiceAware
Direct Known Subclasses:
KahaDBStore

public class MessageDatabase
extends Object
implements BrokerServiceAware


Nested Class Summary
protected static class MessageDatabase.MessageKeysMarshaller
           
protected  class MessageDatabase.Metadata
           
protected  class MessageDatabase.StoredDestinationMarshaller
           
 
Field Summary
protected  Thread checkpointThread
           
static int CLOSED_STATE
           
protected  boolean deleteAllMessages
           
protected  File directory
           
protected  boolean enableJournalDiskSyncs
           
protected  boolean failIfDatabaseIsLocked
           
protected  Object indexMutex
           
protected  LinkedHashMap<TransactionId,ArrayList<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> inflightTransactions
           
protected  Journal journal
           
static int LOG_SLOW_ACCESS_TIME
           
protected  MessageDatabase.Metadata metadata
           
protected  org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller metadataMarshaller
           
static int OPEN_STATE
           
protected  AtomicBoolean opened
           
protected  PageFile pageFile
           
protected  LinkedHashMap<TransactionId,ArrayList<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> preparedTransactions
           
static String PROPERTY_LOG_SLOW_ACCESS_TIME
           
protected  AtomicBoolean started
           
 
Constructor Summary
MessageDatabase()
           
 
Method Summary
 void checkpoint(Callback closure)
           
protected  void checkpointCleanup(boolean cleanup)
           
 void close()
           
 long getCheckpointInterval()
           
 long getCleanupInterval()
           
 File getDirectory()
           
 int getIndexCacheSize()
           
 int getIndexWriteBatchSize()
           
 Journal getJournal()
           
 HashSet<Integer> getJournalFilesBeingReplicated()
           
 int getJournalMaxFileLength()
           
 int getJournalMaxWriteBatchSize()
           
 Location getLastUpdatePosition()
           
 PageFile getPageFile()
           
protected  org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getStoredDestination(KahaDestination destination, Transaction tx)
           
 void incrementalRecover()
           
 boolean isCheckForCorruptJournalFiles()
           
 boolean isChecksumJournalFiles()
           
 boolean isDeleteAllMessages()
           
 boolean isEnableJournalDiskSyncs()
           
 boolean isFailIfDatabaseIsLocked()
           
 boolean isIgnoreMissingJournalfiles()
           
 void load()
           
 JournalCommand load(Location location)
          Loads a previously stored JournalMessage
 void open()
           
protected  void process(KahaCommitCommand command, Location location)
           
protected  void process(KahaPrepareCommand command, Location location)
           
protected  void process(KahaRemoveDestinationCommand command, Location location)
           
protected  void process(KahaRemoveMessageCommand command, Location location)
           
protected  void process(KahaRollbackCommand command, Location location)
           
protected  void process(KahaSubscriptionCommand command, Location location)
           
protected  void recoverIndex(Transaction tx)
           
 void setBrokerService(BrokerService brokerService)
           
 void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
           
 void setCheckpointInterval(long checkpointInterval)
           
 void setChecksumJournalFiles(boolean checksumJournalFiles)
           
 void setCleanupInterval(long cleanupInterval)
           
 void setDeleteAllMessages(boolean deleteAllMessages)
           
 void setDirectory(File directory)
           
 void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
           
 void setEnableJournalDiskSyncs(boolean syncWrites)
           
 void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)
           
 void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
           
 void setIndexCacheSize(int indexCacheSize)
           
 void setIndexWriteBatchSize(int setIndexWriteBatchSize)
           
 void setJournalMaxFileLength(int journalMaxFileLength)
           
 void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
           
 void start()
           
 void stop()
           
 Location store(JournalCommand data)
           
 Location store(JournalCommand data, boolean sync)
          All updated are are funneled through this method.
 void unload()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

PROPERTY_LOG_SLOW_ACCESS_TIME

public static final String PROPERTY_LOG_SLOW_ACCESS_TIME
See Also:
Constant Field Values

LOG_SLOW_ACCESS_TIME

public static final int LOG_SLOW_ACCESS_TIME

CLOSED_STATE

public static final int CLOSED_STATE
See Also:
Constant Field Values

OPEN_STATE

public static final int OPEN_STATE
See Also:
Constant Field Values

pageFile

protected PageFile pageFile

journal

protected Journal journal

metadata

protected MessageDatabase.Metadata metadata

metadataMarshaller

protected org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller metadataMarshaller

failIfDatabaseIsLocked

protected boolean failIfDatabaseIsLocked

deleteAllMessages

protected boolean deleteAllMessages

directory

protected File directory

checkpointThread

protected Thread checkpointThread

enableJournalDiskSyncs

protected boolean enableJournalDiskSyncs

started

protected AtomicBoolean started

opened

protected AtomicBoolean opened

indexMutex

protected final Object indexMutex

inflightTransactions

protected final LinkedHashMap<TransactionId,ArrayList<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> inflightTransactions

preparedTransactions

protected final LinkedHashMap<TransactionId,ArrayList<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> preparedTransactions
Constructor Detail

MessageDatabase

public MessageDatabase()
Method Detail

start

public void start()
           throws Exception
Throws:
Exception

stop

public void stop()
          throws Exception
Throws:
Exception

open

public void open()
          throws IOException
Throws:
IOException

load

public void load()
          throws IOException
Throws:
IOException

close

public void close()
           throws IOException,
                  InterruptedException
Throws:
IOException
InterruptedException

unload

public void unload()
            throws IOException,
                   InterruptedException
Throws:
IOException
InterruptedException

recoverIndex

protected void recoverIndex(Transaction tx)
                     throws IOException
Throws:
IOException

incrementalRecover

public void incrementalRecover()
                        throws IOException
Throws:
IOException

getLastUpdatePosition

public Location getLastUpdatePosition()
                               throws IOException
Throws:
IOException

checkpointCleanup

protected void checkpointCleanup(boolean cleanup)
                          throws IOException
Throws:
IOException

checkpoint

public void checkpoint(Callback closure)
                throws Exception
Throws:
Exception

store

public Location store(JournalCommand data)
               throws IOException
Throws:
IOException

store

public Location store(JournalCommand data,
                      boolean sync)
               throws IOException
All updated are are funneled through this method. The updates are converted to a JournalMessage which is logged to the journal and then the data from the JournalMessage is used to update the index just like it would be done during a recovery process.

Throws:
IOException

load

public JournalCommand load(Location location)
                    throws IOException
Loads a previously stored JournalMessage

Parameters:
location -
Returns:
Throws:
IOException

process

protected void process(KahaRemoveMessageCommand command,
                       Location location)
                throws IOException
Throws:
IOException

process

protected void process(KahaRemoveDestinationCommand command,
                       Location location)
                throws IOException
Throws:
IOException

process

protected void process(KahaSubscriptionCommand command,
                       Location location)
                throws IOException
Throws:
IOException

process

protected void process(KahaCommitCommand command,
                       Location location)
                throws IOException
Throws:
IOException

process

protected void process(KahaPrepareCommand command,
                       Location location)

process

protected void process(KahaRollbackCommand command,
                       Location location)

getJournalFilesBeingReplicated

public HashSet<Integer> getJournalFilesBeingReplicated()

getStoredDestination

protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getStoredDestination(KahaDestination destination,
                                                                                                  Transaction tx)
                                                                                           throws IOException
Throws:
IOException

getJournalMaxWriteBatchSize

public int getJournalMaxWriteBatchSize()

setJournalMaxWriteBatchSize

public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)

getDirectory

public File getDirectory()

setDirectory

public void setDirectory(File directory)

isDeleteAllMessages

public boolean isDeleteAllMessages()

setDeleteAllMessages

public void setDeleteAllMessages(boolean deleteAllMessages)

setIndexWriteBatchSize

public void setIndexWriteBatchSize(int setIndexWriteBatchSize)

getIndexWriteBatchSize

public int getIndexWriteBatchSize()

setEnableIndexWriteAsync

public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)

isEnableJournalDiskSyncs

public boolean isEnableJournalDiskSyncs()

setEnableJournalDiskSyncs

public void setEnableJournalDiskSyncs(boolean syncWrites)

getCheckpointInterval

public long getCheckpointInterval()

setCheckpointInterval

public void setCheckpointInterval(long checkpointInterval)

getCleanupInterval

public long getCleanupInterval()

setCleanupInterval

public void setCleanupInterval(long cleanupInterval)

setJournalMaxFileLength

public void setJournalMaxFileLength(int journalMaxFileLength)

getJournalMaxFileLength

public int getJournalMaxFileLength()

getPageFile

public PageFile getPageFile()

getJournal

public Journal getJournal()

isFailIfDatabaseIsLocked

public boolean isFailIfDatabaseIsLocked()

setFailIfDatabaseIsLocked

public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)

isIgnoreMissingJournalfiles

public boolean isIgnoreMissingJournalfiles()

setIgnoreMissingJournalfiles

public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)

getIndexCacheSize

public int getIndexCacheSize()

setIndexCacheSize

public void setIndexCacheSize(int indexCacheSize)

isCheckForCorruptJournalFiles

public boolean isCheckForCorruptJournalFiles()

setCheckForCorruptJournalFiles

public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)

isChecksumJournalFiles

public boolean isChecksumJournalFiles()

setChecksumJournalFiles

public void setChecksumJournalFiles(boolean checksumJournalFiles)

setBrokerService

public void setBrokerService(BrokerService brokerService)
Specified by:
setBrokerService in interface BrokerServiceAware


Copyright © 2005-2011 Apache Software Foundation. All Rights Reserved.