001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.EOFException;
020import java.io.File;
021import java.io.FileNotFoundException;
022import java.io.FilenameFilter;
023import java.io.IOException;
024import java.io.RandomAccessFile;
025import java.io.UnsupportedEncodingException;
026import java.nio.ByteBuffer;
027import java.nio.channels.ClosedByInterruptException;
028import java.nio.channels.FileChannel;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.LinkedHashMap;
034import java.util.LinkedList;
035import java.util.Map;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.Executors;
040import java.util.concurrent.Future;
041import java.util.concurrent.ScheduledExecutorService;
042import java.util.concurrent.ScheduledFuture;
043import java.util.concurrent.ThreadFactory;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicLong;
046import java.util.concurrent.atomic.AtomicReference;
047import java.util.zip.Adler32;
048import java.util.zip.Checksum;
049
050import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
051import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
052import org.apache.activemq.store.kahadb.disk.util.Sequence;
053import org.apache.activemq.util.ByteSequence;
054import org.apache.activemq.util.DataByteArrayInputStream;
055import org.apache.activemq.util.DataByteArrayOutputStream;
056import org.apache.activemq.util.IOHelper;
057import org.apache.activemq.util.RecoverableRandomAccessFile;
058import org.apache.activemq.util.ThreadPoolUtils;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Manages DataFiles
064 */
065public class Journal {
066    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
067    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
068
069    private static final int PREALLOC_CHUNK_SIZE = 1024*1024;
070
071    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
072    public static final int RECORD_HEAD_SPACE = 4 + 1;
073
074    public static final byte USER_RECORD_TYPE = 1;
075    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
076    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
077    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
078    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
079    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
080    public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader();
081    public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt();
082    public static final byte EOF_EOT = '4';
083    public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
084
085    private ScheduledExecutorService scheduler;
086
087    // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
088    public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
089        DataFile dataFile = getDataFile(recoveryPosition);
090        // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
091        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
092        try {
093            int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
094            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
095            LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
096
097            // skip corruption on getNextLocation
098            recoveryPosition.setOffset((int) sequence.getLast() + 1);
099            recoveryPosition.setSize(-1);
100
101            dataFile.corruptedBlocks.add(sequence);
102        } catch (IOException e) {
103        } finally {
104            accessorPool.closeDataFileAccessor(reader);
105        }
106    }
107
108    public DataFileAccessorPool getAccessorPool() {
109        return accessorPool;
110    }
111
112    public enum PreallocationStrategy {
113        SPARSE_FILE,
114        OS_KERNEL_COPY,
115        ZEROS,
116        CHUNKED_ZEROS;
117    }
118
119    public enum PreallocationScope {
120        ENTIRE_JOURNAL,
121        ENTIRE_JOURNAL_ASYNC,
122        NONE;
123    }
124
125    public enum JournalDiskSyncStrategy {
126        ALWAYS,
127        PERIODIC,
128        NEVER;
129    }
130
131    private static byte[] createBatchControlRecordHeader() {
132        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
133            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
134            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
135            os.write(BATCH_CONTROL_RECORD_MAGIC);
136            ByteSequence sequence = os.toByteSequence();
137            sequence.compact();
138            return sequence.getData();
139        } catch (IOException e) {
140            throw new RuntimeException("Could not create batch control record header.", e);
141        }
142    }
143
144    private static byte[] createEmptyBatchControlRecordHeader() {
145        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
146            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
147            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
148            os.write(BATCH_CONTROL_RECORD_MAGIC);
149            os.writeInt(0);
150            os.writeLong(0l);
151            ByteSequence sequence = os.toByteSequence();
152            sequence.compact();
153            return sequence.getData();
154        } catch (IOException e) {
155            throw new RuntimeException("Could not create empty batch control record header.", e);
156        }
157    }
158
159    private static byte[] createEofBatchAndLocationRecord() {
160        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
161            os.writeInt(EOF_INT);
162            os.writeByte(EOF_EOT);
163            ByteSequence sequence = os.toByteSequence();
164            sequence.compact();
165            return sequence.getData();
166        } catch (IOException e) {
167            throw new RuntimeException("Could not create eof header.", e);
168        }
169    }
170
171    public static final String DEFAULT_DIRECTORY = ".";
172    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
173    public static final String DEFAULT_FILE_PREFIX = "db-";
174    public static final String DEFAULT_FILE_SUFFIX = ".log";
175    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
176    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
177    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
178
179    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
180
181    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
182
183    protected File directory = new File(DEFAULT_DIRECTORY);
184    protected File directoryArchive;
185    private boolean directoryArchiveOverridden = false;
186
187    protected String filePrefix = DEFAULT_FILE_PREFIX;
188    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
189    protected boolean started;
190
191    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
192    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
193
194    protected FileAppender appender;
195    protected DataFileAccessorPool accessorPool;
196
197    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
198    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
199    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
200
201    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
202    protected ScheduledFuture cleanupTask;
203    protected AtomicLong totalLength = new AtomicLong();
204    protected boolean archiveDataLogs;
205    private ReplicationTarget replicationTarget;
206    protected boolean checksum;
207    protected boolean checkForCorruptionOnStartup;
208    protected boolean enableAsyncDiskSync = true;
209    private int nextDataFileId = 1;
210    private Object dataFileIdLock = new Object();
211    private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
212    private volatile DataFile nextDataFile;
213
214    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
215    protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
216    private File osKernelCopyTemplateFile = null;
217    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
218
219    public interface DataFileRemovedListener {
220        void fileRemoved(DataFile datafile);
221    }
222
223    private DataFileRemovedListener dataFileRemovedListener;
224
225    public synchronized void start() throws IOException {
226        if (started) {
227            return;
228        }
229
230        long start = System.currentTimeMillis();
231        accessorPool = new DataFileAccessorPool(this);
232        started = true;
233
234        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
235
236        File[] files = directory.listFiles(new FilenameFilter() {
237            @Override
238            public boolean accept(File dir, String n) {
239                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
240            }
241        });
242
243        if (files != null) {
244            for (File file : files) {
245                try {
246                    String n = file.getName();
247                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
248                    int num = Integer.parseInt(numStr);
249                    DataFile dataFile = new DataFile(file, num);
250                    fileMap.put(dataFile.getDataFileId(), dataFile);
251                    totalLength.addAndGet(dataFile.getLength());
252                } catch (NumberFormatException e) {
253                    // Ignore file that do not match the pattern.
254                }
255            }
256
257            // Sort the list so that we can link the DataFiles together in the
258            // right order.
259            LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
260            Collections.sort(l);
261            for (DataFile df : l) {
262                if (df.getLength() == 0) {
263                    // possibly the result of a previous failed write
264                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
265                    continue;
266                } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
267                    continue;
268                }
269                dataFiles.addLast(df);
270                fileByFileMap.put(df.getFile(), df);
271
272                if( isCheckForCorruptionOnStartup() ) {
273                    lastAppendLocation.set(recoveryCheck(df));
274                }
275            }
276        }
277
278        if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) {
279            // create a template file that will be used to pre-allocate the journal files
280            if (osKernelCopyTemplateFile == null) {
281                osKernelCopyTemplateFile = createJournalTemplateFile();
282            }
283        }
284
285        scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
286            @Override
287            public Thread newThread(Runnable r) {
288                Thread schedulerThread = new Thread(r);
289                schedulerThread.setName("ActiveMQ Journal Scheduled executor");
290                schedulerThread.setDaemon(true);
291                return schedulerThread;
292            }
293        });
294
295        // init current write file
296        if (dataFiles.isEmpty()) {
297            nextDataFileId = 1;
298            rotateWriteFile();
299        } else {
300            currentDataFile.set(dataFiles.getTail());
301            nextDataFileId = currentDataFile.get().dataFileId + 1;
302        }
303
304        if( lastAppendLocation.get()==null ) {
305            DataFile df = dataFiles.getTail();
306            lastAppendLocation.set(recoveryCheck(df));
307        }
308
309        // ensure we don't report unused space of last journal file in size metric
310        int lastFileLength = dataFiles.getTail().getLength();
311        if (totalLength.get() > lastFileLength && lastAppendLocation.get().getOffset() > 0) {
312            totalLength.addAndGet(lastAppendLocation.get().getOffset() - lastFileLength);
313        }
314
315        cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
316            @Override
317            public void run() {
318                cleanup();
319            }
320        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
321
322        long end = System.currentTimeMillis();
323        LOG.trace("Startup took: "+(end-start)+" ms");
324    }
325
326    public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
327
328        if (PreallocationScope.NONE != preallocationScope) {
329
330            if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
331                doPreallocationKernelCopy(file);
332            } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
333                doPreallocationZeros(file);
334            } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
335                doPreallocationChunkedZeros(file);
336            } else {
337                doPreallocationSparseFile(file);
338            }
339        }
340    }
341
342    private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
343        final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD);
344        try {
345            FileChannel channel = file.getChannel();
346            channel.position(0);
347            channel.write(journalEof);
348            channel.position(maxFileLength - 5);
349            journalEof.rewind();
350            channel.write(journalEof);
351            channel.force(false);
352            channel.position(0);
353        } catch (ClosedByInterruptException ignored) {
354            LOG.trace("Could not preallocate journal file with sparse file", ignored);
355        } catch (IOException e) {
356            LOG.error("Could not preallocate journal file with sparse file", e);
357        }
358    }
359
360    private void doPreallocationZeros(RecoverableRandomAccessFile file) {
361        ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
362        buffer.put(EOF_RECORD);
363        buffer.rewind();
364        try {
365            FileChannel channel = file.getChannel();
366            channel.write(buffer);
367            channel.force(false);
368            channel.position(0);
369        } catch (ClosedByInterruptException ignored) {
370            LOG.trace("Could not preallocate journal file with zeros", ignored);
371        } catch (IOException e) {
372            LOG.error("Could not preallocate journal file with zeros", e);
373        }
374    }
375
376    private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
377        try {
378            RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");
379            templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
380            templateRaf.close();
381        } catch (ClosedByInterruptException ignored) {
382            LOG.trace("Could not preallocate journal file with kernel copy", ignored);
383        } catch (FileNotFoundException e) {
384            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
385        } catch (IOException e) {
386            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
387        }
388    }
389
390    private File createJournalTemplateFile() {
391        String fileName = "db-log.template";
392        File rc = new File(directory, fileName);
393        try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) {
394            templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD));
395            templateRaf.setLength(maxFileLength);
396            templateRaf.getChannel().force(true);
397        } catch (FileNotFoundException e) {
398            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
399        } catch (IOException e) {
400            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
401        }
402        return rc;
403    }
404
405    private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
406
407        ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
408        buffer.put(EOF_RECORD);
409        buffer.rewind();
410
411        try {
412            FileChannel channel = file.getChannel();
413
414            int remLen = maxFileLength;
415            while (remLen > 0) {
416                if (remLen < buffer.remaining()) {
417                    buffer.limit(remLen);
418                }
419                int writeLen = channel.write(buffer);
420                remLen -= writeLen;
421                buffer.rewind();
422            }
423
424            channel.force(false);
425            channel.position(0);
426        } catch (ClosedByInterruptException ignored) {
427            LOG.trace("Could not preallocate journal file with zeros", ignored);
428        } catch (IOException e) {
429            LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
430        }
431    }
432
433    private static byte[] bytes(String string) {
434        try {
435            return string.getBytes("UTF-8");
436        } catch (UnsupportedEncodingException e) {
437            throw new RuntimeException(e);
438        }
439    }
440
441    public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
442        int firstBatchRecordSize = -1;
443        if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
444            Location location = new Location();
445            location.setDataFileId(dataFile.getDataFileId());
446            location.setOffset(0);
447
448            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
449            try {
450                firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
451            } catch (Exception ignored) {
452            } finally {
453                accessorPool.closeDataFileAccessor(reader);
454            }
455        }
456        return firstBatchRecordSize == 0;
457    }
458
459    protected Location recoveryCheck(DataFile dataFile) throws IOException {
460        Location location = new Location();
461        location.setDataFileId(dataFile.getDataFileId());
462        location.setOffset(0);
463
464        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
465        try {
466            while (true) {
467                int size = checkBatchRecord(reader, location.getOffset());
468                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
469                    if (size == 0) {
470                        // eof batch record
471                        break;
472                    }
473                    location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
474                } else {
475
476                    // Perhaps it's just some corruption... scan through the
477                    // file to find the next valid batch record. We
478                    // may have subsequent valid batch records.
479                    int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
480                    if (nextOffset >= 0) {
481                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
482                        LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
483                        dataFile.corruptedBlocks.add(sequence);
484                        location.setOffset(nextOffset);
485                    } else {
486                        break;
487                    }
488                }
489            }
490
491        } catch (IOException e) {
492        } finally {
493            accessorPool.closeDataFileAccessor(reader);
494        }
495
496        int existingLen = dataFile.getLength();
497        dataFile.setLength(location.getOffset());
498        if (existingLen > dataFile.getLength()) {
499            totalLength.addAndGet(dataFile.getLength() - existingLen);
500        }
501
502        if (!dataFile.corruptedBlocks.isEmpty()) {
503            // Is the end of the data file corrupted?
504            if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
505                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
506            }
507        }
508
509        return location;
510    }
511
512    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
513        ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
514        byte data[] = new byte[1024*4];
515        ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
516
517        int pos = 0;
518        while (true) {
519            pos = bs.indexOf(header, pos);
520            if (pos >= 0) {
521                return offset + pos;
522            } else {
523                // need to load the next data chunck in..
524                if (bs.length != data.length) {
525                    // If we had a short read then we were at EOF
526                    return -1;
527                }
528                offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
529                bs = new ByteSequence(data, 0, reader.read(offset, data));
530                pos = 0;
531            }
532        }
533    }
534
535    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
536        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
537
538        try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) {
539
540            reader.readFully(offset, controlRecord);
541
542            // check for journal eof
543            if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
544                // eof batch
545                return 0;
546            }
547
548            // Assert that it's a batch record.
549            for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
550                if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
551                    return -1;
552                }
553            }
554
555            int size = controlIs.readInt();
556            if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
557                return -1;
558            }
559
560            if (isChecksum()) {
561
562                long expectedChecksum = controlIs.readLong();
563                if (expectedChecksum == 0) {
564                    // Checksuming was not enabled when the record was stored.
565                    // we can't validate the record :(
566                    return size;
567                }
568
569                byte data[] = new byte[size];
570                reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
571
572                Checksum checksum = new Adler32();
573                checksum.update(data, 0, data.length);
574
575                if (expectedChecksum != checksum.getValue()) {
576                    return -1;
577                }
578            }
579            return size;
580        }
581    }
582
583    void addToTotalLength(int size) {
584        totalLength.addAndGet(size);
585    }
586
587    public long length() {
588        return totalLength.get();
589    }
590
591    private void rotateWriteFile() throws IOException {
592       synchronized (dataFileIdLock) {
593            DataFile dataFile = nextDataFile;
594            if (dataFile == null) {
595                dataFile = newDataFile();
596            }
597            synchronized (currentDataFile) {
598                fileMap.put(dataFile.getDataFileId(), dataFile);
599                fileByFileMap.put(dataFile.getFile(), dataFile);
600                dataFiles.addLast(dataFile);
601                currentDataFile.set(dataFile);
602            }
603            nextDataFile = null;
604        }
605        if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) {
606            preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask);
607        }
608    }
609
610    private Runnable preAllocateNextDataFileTask = new Runnable() {
611        @Override
612        public void run() {
613            if (nextDataFile == null) {
614                synchronized (dataFileIdLock){
615                    try {
616                        nextDataFile = newDataFile();
617                    } catch (IOException e) {
618                        LOG.warn("Failed to proactively allocate data file", e);
619                    }
620                }
621            }
622        }
623    };
624
625    private volatile Future preAllocateNextDataFileFuture;
626
627    private DataFile newDataFile() throws IOException {
628        int nextNum = nextDataFileId++;
629        File file = getFile(nextNum);
630        DataFile nextWriteFile = new DataFile(file, nextNum);
631        preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
632        return nextWriteFile;
633    }
634
635
636    public DataFile reserveDataFile() {
637        synchronized (dataFileIdLock) {
638            int nextNum = nextDataFileId++;
639            File file = getFile(nextNum);
640            DataFile reservedDataFile = new DataFile(file, nextNum);
641            synchronized (currentDataFile) {
642                fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
643                fileByFileMap.put(file, reservedDataFile);
644                if (dataFiles.isEmpty()) {
645                    dataFiles.addLast(reservedDataFile);
646                } else {
647                    dataFiles.getTail().linkBefore(reservedDataFile);
648                }
649            }
650            return reservedDataFile;
651        }
652    }
653
654    public File getFile(int nextNum) {
655        String fileName = filePrefix + nextNum + fileSuffix;
656        File file = new File(directory, fileName);
657        return file;
658    }
659
660    DataFile getDataFile(Location item) throws IOException {
661        Integer key = Integer.valueOf(item.getDataFileId());
662        DataFile dataFile = null;
663        synchronized (currentDataFile) {
664            dataFile = fileMap.get(key);
665        }
666        if (dataFile == null) {
667            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
668            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
669        }
670        return dataFile;
671    }
672
673    public void close() throws IOException {
674        synchronized (this) {
675            if (!started) {
676                return;
677            }
678            cleanupTask.cancel(true);
679            if (preAllocateNextDataFileFuture != null) {
680                preAllocateNextDataFileFuture.cancel(true);
681            }
682            ThreadPoolUtils.shutdownGraceful(scheduler, 4000);
683            accessorPool.close();
684        }
685        // the appender can be calling back to to the journal blocking a close AMQ-5620
686        appender.close();
687        synchronized (currentDataFile) {
688            fileMap.clear();
689            fileByFileMap.clear();
690            dataFiles.clear();
691            lastAppendLocation.set(null);
692            started = false;
693        }
694    }
695
696    public synchronized void cleanup() {
697        if (accessorPool != null) {
698            accessorPool.disposeUnused();
699        }
700    }
701
702    public synchronized boolean delete() throws IOException {
703
704        // Close all open file handles...
705        appender.close();
706        accessorPool.close();
707
708        boolean result = true;
709        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
710            DataFile dataFile = i.next();
711            result &= dataFile.delete();
712        }
713
714        if (preAllocateNextDataFileFuture != null) {
715            preAllocateNextDataFileFuture.cancel(true);
716        }
717        synchronized (dataFileIdLock) {
718            if (nextDataFile != null) {
719                nextDataFile.delete();
720                nextDataFile = null;
721            }
722        }
723
724        totalLength.set(0);
725        synchronized (currentDataFile) {
726            fileMap.clear();
727            fileByFileMap.clear();
728            lastAppendLocation.set(null);
729            dataFiles = new LinkedNodeList<DataFile>();
730        }
731        // reopen open file handles...
732        accessorPool = new DataFileAccessorPool(this);
733        appender = new DataFileAppender(this);
734        return result;
735    }
736
737    public void removeDataFiles(Set<Integer> files) throws IOException {
738        for (Integer key : files) {
739            // Can't remove the data file (or subsequent files) that is currently being written to.
740            if (key >= lastAppendLocation.get().getDataFileId()) {
741                continue;
742            }
743            DataFile dataFile = null;
744            synchronized (currentDataFile) {
745                dataFile = fileMap.remove(key);
746                if (dataFile != null) {
747                    fileByFileMap.remove(dataFile.getFile());
748                    dataFile.unlink();
749                }
750            }
751            if (dataFile != null) {
752                forceRemoveDataFile(dataFile);
753            }
754        }
755    }
756
757    private void forceRemoveDataFile(DataFile dataFile) throws IOException {
758        accessorPool.disposeDataFileAccessors(dataFile);
759        totalLength.addAndGet(-dataFile.getLength());
760        if (archiveDataLogs) {
761            File directoryArchive = getDirectoryArchive();
762            if (directoryArchive.exists()) {
763                LOG.debug("Archive directory exists: {}", directoryArchive);
764            } else {
765                if (directoryArchive.isAbsolute())
766                if (LOG.isDebugEnabled()) {
767                    LOG.debug("Archive directory [{}] does not exist - creating it now",
768                            directoryArchive.getAbsolutePath());
769                }
770                IOHelper.mkdirs(directoryArchive);
771            }
772            LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
773            dataFile.move(directoryArchive);
774            LOG.debug("Successfully moved data file");
775        } else {
776            LOG.debug("Deleting data file: {}", dataFile);
777            if (dataFile.delete()) {
778                LOG.debug("Discarded data file: {}", dataFile);
779            } else {
780                LOG.warn("Failed to discard data file : {}", dataFile.getFile());
781            }
782        }
783        if (dataFileRemovedListener != null) {
784            dataFileRemovedListener.fileRemoved(dataFile);
785        }
786    }
787
788    /**
789     * @return the maxFileLength
790     */
791    public int getMaxFileLength() {
792        return maxFileLength;
793    }
794
795    /**
796     * @param maxFileLength the maxFileLength to set
797     */
798    public void setMaxFileLength(int maxFileLength) {
799        this.maxFileLength = maxFileLength;
800    }
801
802    @Override
803    public String toString() {
804        return directory.toString();
805    }
806
807    public Location getNextLocation(Location location) throws IOException, IllegalStateException {
808        return getNextLocation(location, null);
809    }
810
811    public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException {
812        Location cur = null;
813        while (true) {
814            if (cur == null) {
815                if (location == null) {
816                    DataFile head = null;
817                    synchronized (currentDataFile) {
818                        head = dataFiles.getHead();
819                    }
820                    if (head == null) {
821                        return null;
822                    }
823                    cur = new Location();
824                    cur.setDataFileId(head.getDataFileId());
825                    cur.setOffset(0);
826                } else {
827                    // Set to the next offset..
828                    if (location.getSize() == -1) {
829                        cur = new Location(location);
830                    } else {
831                        cur = new Location(location);
832                        cur.setOffset(location.getOffset() + location.getSize());
833                    }
834                }
835            } else {
836                cur.setOffset(cur.getOffset() + cur.getSize());
837            }
838
839            DataFile dataFile = getDataFile(cur);
840
841            // Did it go into the next file??
842            if (dataFile.getLength() <= cur.getOffset()) {
843                synchronized (currentDataFile) {
844                    dataFile = dataFile.getNext();
845                }
846                if (dataFile == null) {
847                    return null;
848                } else {
849                    cur.setDataFileId(dataFile.getDataFileId().intValue());
850                    cur.setOffset(0);
851                    if (limit != null && cur.compareTo(limit) >= 0) {
852                        LOG.trace("reached limit: {} at: {}", limit, cur);
853                        return null;
854                    }
855                }
856            }
857
858            // Load in location size and type.
859            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
860            try {
861                reader.readLocationDetails(cur);
862            } catch (EOFException eof) {
863                LOG.trace("EOF on next: " + location + ", cur: " + cur);
864                throw eof;
865            } finally {
866                accessorPool.closeDataFileAccessor(reader);
867            }
868
869            Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset());
870            if (corruptedRange != null) {
871                // skip corruption
872                cur.setSize((int) corruptedRange.range());
873            } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT ||
874                    (cur.getType() == 0 && cur.getSize() == 0)) {
875                // eof - jump to next datafile
876                // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for
877                // replay of existing journals
878                // possibly journal is larger than maxFileLength after config change
879                cur.setSize(EOF_RECORD.length);
880                cur.setOffset(Math.max(maxFileLength, dataFile.getLength()));
881            } else if (cur.getType() == USER_RECORD_TYPE) {
882                // Only return user records.
883                return cur;
884            }
885        }
886    }
887
888    public ByteSequence read(Location location) throws IOException, IllegalStateException {
889        DataFile dataFile = getDataFile(location);
890        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
891        ByteSequence rc = null;
892        try {
893            rc = reader.readRecord(location);
894        } finally {
895            accessorPool.closeDataFileAccessor(reader);
896        }
897        return rc;
898    }
899
900    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
901        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
902        return loc;
903    }
904
905    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
906        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
907        return loc;
908    }
909
910    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
911        DataFile dataFile = getDataFile(location);
912        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
913        try {
914            updater.updateRecord(location, data, sync);
915        } finally {
916            accessorPool.closeDataFileAccessor(updater);
917        }
918    }
919
920    public PreallocationStrategy getPreallocationStrategy() {
921        return preallocationStrategy;
922    }
923
924    public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
925        this.preallocationStrategy = preallocationStrategy;
926    }
927
928    public PreallocationScope getPreallocationScope() {
929        return preallocationScope;
930    }
931
932    public void setPreallocationScope(PreallocationScope preallocationScope) {
933        this.preallocationScope = preallocationScope;
934    }
935
936    public File getDirectory() {
937        return directory;
938    }
939
940    public void setDirectory(File directory) {
941        this.directory = directory;
942    }
943
944    public String getFilePrefix() {
945        return filePrefix;
946    }
947
948    public void setFilePrefix(String filePrefix) {
949        this.filePrefix = filePrefix;
950    }
951
952    public Map<WriteKey, WriteCommand> getInflightWrites() {
953        return inflightWrites;
954    }
955
956    public Location getLastAppendLocation() {
957        return lastAppendLocation.get();
958    }
959
960    public void setLastAppendLocation(Location lastSyncedLocation) {
961        this.lastAppendLocation.set(lastSyncedLocation);
962    }
963
964    public File getDirectoryArchive() {
965        if (!directoryArchiveOverridden && (directoryArchive == null)) {
966            // create the directoryArchive relative to the journal location
967            directoryArchive = new File(directory.getAbsolutePath() +
968                    File.separator + DEFAULT_ARCHIVE_DIRECTORY);
969        }
970        return directoryArchive;
971    }
972
973    public void setDirectoryArchive(File directoryArchive) {
974        directoryArchiveOverridden = true;
975        this.directoryArchive = directoryArchive;
976    }
977
978    public boolean isArchiveDataLogs() {
979        return archiveDataLogs;
980    }
981
982    public void setArchiveDataLogs(boolean archiveDataLogs) {
983        this.archiveDataLogs = archiveDataLogs;
984    }
985
986    public DataFile getDataFileById(int dataFileId) {
987        synchronized (currentDataFile) {
988            return fileMap.get(Integer.valueOf(dataFileId));
989        }
990    }
991
992    public DataFile getCurrentDataFile(int capacity) throws IOException {
993        //First just acquire the currentDataFile lock and return if no rotation needed
994        synchronized (currentDataFile) {
995            if (currentDataFile.get().getLength() + capacity < maxFileLength) {
996                return currentDataFile.get();
997            }
998        }
999
1000        //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks
1001        //then re-check if rotation is needed
1002        synchronized (dataFileIdLock) {
1003            synchronized (currentDataFile) {
1004                if (currentDataFile.get().getLength() + capacity >= maxFileLength) {
1005                    rotateWriteFile();
1006                }
1007                return currentDataFile.get();
1008            }
1009        }
1010    }
1011
1012    public Integer getCurrentDataFileId() {
1013        synchronized (currentDataFile) {
1014            return currentDataFile.get().getDataFileId();
1015        }
1016    }
1017
1018    /**
1019     * Get a set of files - only valid after start()
1020     *
1021     * @return files currently being used
1022     */
1023    public Set<File> getFiles() {
1024        synchronized (currentDataFile) {
1025            return fileByFileMap.keySet();
1026        }
1027    }
1028
1029    public Map<Integer, DataFile> getFileMap() {
1030        synchronized (currentDataFile) {
1031            return new TreeMap<Integer, DataFile>(fileMap);
1032        }
1033    }
1034
1035    public long getDiskSize() {
1036        return totalLength.get();
1037    }
1038
1039    public void setReplicationTarget(ReplicationTarget replicationTarget) {
1040        this.replicationTarget = replicationTarget;
1041    }
1042
1043    public ReplicationTarget getReplicationTarget() {
1044        return replicationTarget;
1045    }
1046
1047    public String getFileSuffix() {
1048        return fileSuffix;
1049    }
1050
1051    public void setFileSuffix(String fileSuffix) {
1052        this.fileSuffix = fileSuffix;
1053    }
1054
1055    public boolean isChecksum() {
1056        return checksum;
1057    }
1058
1059    public void setChecksum(boolean checksumWrites) {
1060        this.checksum = checksumWrites;
1061    }
1062
1063    public boolean isCheckForCorruptionOnStartup() {
1064        return checkForCorruptionOnStartup;
1065    }
1066
1067    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
1068        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
1069    }
1070
1071    public void setWriteBatchSize(int writeBatchSize) {
1072        this.writeBatchSize = writeBatchSize;
1073    }
1074
1075    public int getWriteBatchSize() {
1076        return writeBatchSize;
1077    }
1078
1079    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
1080       this.totalLength = storeSizeAccumulator;
1081    }
1082
1083    public void setEnableAsyncDiskSync(boolean val) {
1084        this.enableAsyncDiskSync = val;
1085    }
1086
1087    public boolean isEnableAsyncDiskSync() {
1088        return enableAsyncDiskSync;
1089    }
1090
1091    public JournalDiskSyncStrategy getJournalDiskSyncStrategy() {
1092        return journalDiskSyncStrategy;
1093    }
1094
1095    public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) {
1096        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
1097    }
1098
1099    public boolean isJournalDiskSyncPeriodic() {
1100        return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy);
1101    }
1102
1103    public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
1104        this.dataFileRemovedListener = dataFileRemovedListener;
1105    }
1106
1107    public static class WriteCommand extends LinkedNode<WriteCommand> {
1108        public final Location location;
1109        public final ByteSequence data;
1110        final boolean sync;
1111        public final Runnable onComplete;
1112
1113        public WriteCommand(Location location, ByteSequence data, boolean sync) {
1114            this.location = location;
1115            this.data = data;
1116            this.sync = sync;
1117            this.onComplete = null;
1118        }
1119
1120        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
1121            this.location = location;
1122            this.data = data;
1123            this.onComplete = onComplete;
1124            this.sync = false;
1125        }
1126    }
1127
1128    public static class WriteKey {
1129        private final int file;
1130        private final long offset;
1131        private final int hash;
1132
1133        public WriteKey(Location item) {
1134            file = item.getDataFileId();
1135            offset = item.getOffset();
1136            // TODO: see if we can build a better hash
1137            hash = (int)(file ^ offset);
1138        }
1139
1140        @Override
1141        public int hashCode() {
1142            return hash;
1143        }
1144
1145        @Override
1146        public boolean equals(Object obj) {
1147            if (obj instanceof WriteKey) {
1148                WriteKey di = (WriteKey)obj;
1149                return di.file == file && di.offset == offset;
1150            }
1151            return false;
1152        }
1153    }
1154}