001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.File;
020    import java.io.IOException;
021    
022    import org.apache.activeio.journal.Journal;
023    import org.apache.activeio.journal.active.JournalImpl;
024    import org.apache.activeio.journal.active.JournalLockedException;
025    import org.apache.activemq.store.PersistenceAdapter;
026    import org.apache.activemq.store.PersistenceAdapterFactory;
027    import org.apache.activemq.store.jdbc.DataSourceSupport;
028    import org.apache.activemq.store.jdbc.JDBCAdapter;
029    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
030    import org.apache.activemq.store.jdbc.Statements;
031    import org.apache.activemq.thread.TaskRunnerFactory;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    /**
036     * Factory class that can create PersistenceAdapter objects.
037     * 
038     * @org.apache.xbean.XBean
039     * @version $Revision: 1.4 $
040     */
041    public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
042    
043        private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
044    
045        private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapterFactory.class);
046    
047        private int journalLogFileSize = 1024 * 1024 * 20;
048        private int journalLogFiles = 2;
049        private TaskRunnerFactory taskRunnerFactory;
050        private Journal journal;
051        private boolean useJournal = true;
052        private boolean useQuickJournal;
053        private File journalArchiveDirectory;
054        private boolean failIfJournalIsLocked;
055        private int journalThreadPriority = Thread.MAX_PRIORITY;
056        private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
057        private boolean useDedicatedTaskRunner;
058    
059        public PersistenceAdapter createPersistenceAdapter() throws IOException {
060            jdbcPersistenceAdapter.setDataSource(getDataSource());
061    
062            if (!useJournal) {
063                return jdbcPersistenceAdapter;
064            }
065            return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
066    
067        }
068    
069        public int getJournalLogFiles() {
070            return journalLogFiles;
071        }
072    
073        /**
074         * Sets the number of journal log files to use
075         */
076        public void setJournalLogFiles(int journalLogFiles) {
077            this.journalLogFiles = journalLogFiles;
078        }
079    
080        public int getJournalLogFileSize() {
081            return journalLogFileSize;
082        }
083    
084        /**
085         * Sets the size of the journal log files
086         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
087         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
088         */
089        public void setJournalLogFileSize(int journalLogFileSize) {
090            this.journalLogFileSize = journalLogFileSize;
091        }
092    
093        public JDBCPersistenceAdapter getJdbcAdapter() {
094            return jdbcPersistenceAdapter;
095        }
096    
097        public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
098            this.jdbcPersistenceAdapter = jdbcAdapter;
099        }
100    
101        public boolean isUseJournal() {
102            return useJournal;
103        }
104    
105        /**
106         * Enables or disables the use of the journal. The default is to use the
107         * journal
108         * 
109         * @param useJournal
110         */
111        public void setUseJournal(boolean useJournal) {
112            this.useJournal = useJournal;
113        }
114    
115        public boolean isUseDedicatedTaskRunner() {
116            return useDedicatedTaskRunner;
117        }
118        
119        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
120            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
121        }
122        
123        public TaskRunnerFactory getTaskRunnerFactory() {
124            if (taskRunnerFactory == null) {
125                taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
126                                                          true, 1000, isUseDedicatedTaskRunner());
127            }
128            return taskRunnerFactory;
129        }
130    
131        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
132            this.taskRunnerFactory = taskRunnerFactory;
133        }
134    
135        public Journal getJournal() throws IOException {
136            if (journal == null) {
137                createJournal();
138            }
139            return journal;
140        }
141    
142        public void setJournal(Journal journal) {
143            this.journal = journal;
144        }
145    
146        public File getJournalArchiveDirectory() {
147            if (journalArchiveDirectory == null && useQuickJournal) {
148                journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
149            }
150            return journalArchiveDirectory;
151        }
152    
153        public void setJournalArchiveDirectory(File journalArchiveDirectory) {
154            this.journalArchiveDirectory = journalArchiveDirectory;
155        }
156    
157        public boolean isUseQuickJournal() {
158            return useQuickJournal;
159        }
160    
161        /**
162         * Enables or disables the use of quick journal, which keeps messages in the
163         * journal and just stores a reference to the messages in JDBC. Defaults to
164         * false so that messages actually reside long term in the JDBC database.
165         */
166        public void setUseQuickJournal(boolean useQuickJournal) {
167            this.useQuickJournal = useQuickJournal;
168        }
169    
170        public JDBCAdapter getAdapter() throws IOException {
171            return jdbcPersistenceAdapter.getAdapter();
172        }
173    
174        public void setAdapter(JDBCAdapter adapter) {
175            jdbcPersistenceAdapter.setAdapter(adapter);
176        }
177    
178        public Statements getStatements() {
179            return jdbcPersistenceAdapter.getStatements();
180        }
181    
182        public void setStatements(Statements statements) {
183            jdbcPersistenceAdapter.setStatements(statements);
184        }
185    
186        public boolean isUseDatabaseLock() {
187            return jdbcPersistenceAdapter.isUseDatabaseLock();
188        }
189    
190        /**
191         * Sets whether or not an exclusive database lock should be used to enable
192         * JDBC Master/Slave. Enabled by default.
193         */
194        public void setUseDatabaseLock(boolean useDatabaseLock) {
195            jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
196        }
197    
198        public boolean isCreateTablesOnStartup() {
199            return jdbcPersistenceAdapter.isCreateTablesOnStartup();
200        }
201    
202        /**
203         * Sets whether or not tables are created on startup
204         */
205        public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
206            jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
207        }
208    
209        public int getJournalThreadPriority() {
210            return journalThreadPriority;
211        }
212    
213        /**
214         * Sets the thread priority of the journal thread
215         */
216        public void setJournalThreadPriority(int journalThreadPriority) {
217            this.journalThreadPriority = journalThreadPriority;
218        }
219    
220        /**
221         * @throws IOException
222         */
223        protected void createJournal() throws IOException {
224            File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
225            if (failIfJournalIsLocked) {
226                journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
227                                          getJournalArchiveDirectory());
228            } else {
229                while (true) {
230                    try {
231                        journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
232                                                  getJournalArchiveDirectory());
233                        break;
234                    } catch (JournalLockedException e) {
235                        LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
236                                 + " seconds for the journal to be unlocked.");
237                        try {
238                            Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
239                        } catch (InterruptedException e1) {
240                        }
241                    }
242                }
243            }
244        }
245    
246    }