001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.journal;
018
019import java.io.File;
020import java.io.IOException;
021
022import org.apache.activeio.journal.Journal;
023import org.apache.activeio.journal.active.JournalImpl;
024import org.apache.activeio.journal.active.JournalLockedException;
025import org.apache.activemq.store.PersistenceAdapter;
026import org.apache.activemq.store.PersistenceAdapterFactory;
027import org.apache.activemq.store.jdbc.DataSourceSupport;
028import org.apache.activemq.store.jdbc.JDBCAdapter;
029import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
030import org.apache.activemq.store.jdbc.Statements;
031import org.apache.activemq.thread.TaskRunnerFactory;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Factory class that can create PersistenceAdapter objects.
037 * 
038 * @org.apache.xbean.XBean
039 * 
040 */
041public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
042
043    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
044
045    private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
046
047    private int journalLogFileSize = 1024 * 1024 * 20;
048    private int journalLogFiles = 2;
049    private TaskRunnerFactory taskRunnerFactory;
050    private Journal journal;
051    private boolean useJournal = true;
052    private boolean useQuickJournal;
053    private File journalArchiveDirectory;
054    private boolean failIfJournalIsLocked;
055    private int journalThreadPriority = Thread.MAX_PRIORITY;
056    private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
057    private boolean useDedicatedTaskRunner;
058
059    public PersistenceAdapter createPersistenceAdapter() throws IOException {
060        jdbcPersistenceAdapter.setDataSource(getDataSource());
061
062        if (!useJournal) {
063            return jdbcPersistenceAdapter;
064        }
065        JournalPersistenceAdapter result =  new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
066        result.setDirectory(getDataDirectoryFile());
067        return result;
068
069    }
070
071    public int getJournalLogFiles() {
072        return journalLogFiles;
073    }
074
075    /**
076     * Sets the number of journal log files to use
077     */
078    public void setJournalLogFiles(int journalLogFiles) {
079        this.journalLogFiles = journalLogFiles;
080    }
081
082    public int getJournalLogFileSize() {
083        return journalLogFileSize;
084    }
085
086    /**
087     * Sets the size of the journal log files
088     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
089     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
090     */
091    public void setJournalLogFileSize(int journalLogFileSize) {
092        this.journalLogFileSize = journalLogFileSize;
093    }
094
095    public JDBCPersistenceAdapter getJdbcAdapter() {
096        return jdbcPersistenceAdapter;
097    }
098
099    public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
100        this.jdbcPersistenceAdapter = jdbcAdapter;
101    }
102
103    public boolean isUseJournal() {
104        return useJournal;
105    }
106
107    /**
108     * Enables or disables the use of the journal. The default is to use the
109     * journal
110     * 
111     * @param useJournal
112     */
113    public void setUseJournal(boolean useJournal) {
114        this.useJournal = useJournal;
115    }
116
117    public boolean isUseDedicatedTaskRunner() {
118        return useDedicatedTaskRunner;
119    }
120    
121    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
122        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
123    }
124    
125    public TaskRunnerFactory getTaskRunnerFactory() {
126        if (taskRunnerFactory == null) {
127            taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
128                                                      true, 1000, isUseDedicatedTaskRunner());
129        }
130        return taskRunnerFactory;
131    }
132
133    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
134        this.taskRunnerFactory = taskRunnerFactory;
135    }
136
137    public Journal getJournal() throws IOException {
138        if (journal == null) {
139            createJournal();
140        }
141        return journal;
142    }
143
144    public void setJournal(Journal journal) {
145        this.journal = journal;
146    }
147
148    public File getJournalArchiveDirectory() {
149        if (journalArchiveDirectory == null && useQuickJournal) {
150            journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
151        }
152        return journalArchiveDirectory;
153    }
154
155    public void setJournalArchiveDirectory(File journalArchiveDirectory) {
156        this.journalArchiveDirectory = journalArchiveDirectory;
157    }
158
159    public boolean isUseQuickJournal() {
160        return useQuickJournal;
161    }
162
163    /**
164     * Enables or disables the use of quick journal, which keeps messages in the
165     * journal and just stores a reference to the messages in JDBC. Defaults to
166     * false so that messages actually reside long term in the JDBC database.
167     */
168    public void setUseQuickJournal(boolean useQuickJournal) {
169        this.useQuickJournal = useQuickJournal;
170    }
171
172    public JDBCAdapter getAdapter() throws IOException {
173        return jdbcPersistenceAdapter.getAdapter();
174    }
175
176    public void setAdapter(JDBCAdapter adapter) {
177        jdbcPersistenceAdapter.setAdapter(adapter);
178    }
179
180    public Statements getStatements() {
181        return jdbcPersistenceAdapter.getStatements();
182    }
183
184    public void setStatements(Statements statements) {
185        jdbcPersistenceAdapter.setStatements(statements);
186    }
187
188    public boolean isUseDatabaseLock() {
189        return jdbcPersistenceAdapter.isUseDatabaseLock();
190    }
191
192    /**
193     * Sets whether or not an exclusive database lock should be used to enable
194     * JDBC Master/Slave. Enabled by default.
195     */
196    public void setUseDatabaseLock(boolean useDatabaseLock) {
197        jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
198    }
199
200    public boolean isCreateTablesOnStartup() {
201        return jdbcPersistenceAdapter.isCreateTablesOnStartup();
202    }
203
204    /**
205     * Sets whether or not tables are created on startup
206     */
207    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
208        jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
209    }
210
211    public int getJournalThreadPriority() {
212        return journalThreadPriority;
213    }
214
215    /**
216     * Sets the thread priority of the journal thread
217     */
218    public void setJournalThreadPriority(int journalThreadPriority) {
219        this.journalThreadPriority = journalThreadPriority;
220    }
221
222    /**
223     * @throws IOException
224     */
225    protected void createJournal() throws IOException {
226        File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
227        if (failIfJournalIsLocked) {
228            journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
229                                      getJournalArchiveDirectory());
230        } else {
231            while (true) {
232                try {
233                    journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
234                                              getJournalArchiveDirectory());
235                    break;
236                } catch (JournalLockedException e) {
237                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
238                             + " seconds for the journal to be unlocked.");
239                    try {
240                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
241                    } catch (InterruptedException e1) {
242                    }
243                }
244            }
245        }
246    }
247
248}