001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.page;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.io.RandomAccessFile;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.LinkedHashMap;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Properties;
038import java.util.TreeMap;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.zip.Adler32;
043import java.util.zip.Checksum;
044
045import org.apache.activemq.store.kahadb.disk.util.Sequence;
046import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
047import org.apache.activemq.util.DataByteArrayOutputStream;
048import org.apache.activemq.util.IOExceptionSupport;
049import org.apache.activemq.util.IOHelper;
050import org.apache.activemq.util.IntrospectionSupport;
051import org.apache.activemq.util.LFUCache;
052import org.apache.activemq.util.LRUCache;
053import org.apache.activemq.util.RecoverableRandomAccessFile;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
059 * be externally synchronized.
060 * <p/>
061 * The file has 3 parts:
062 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
063 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
064 * Page Space: The pages in the page file.
065 */
066public class PageFile {
067
068    private static final String PAGEFILE_SUFFIX = ".data";
069    private static final String RECOVERY_FILE_SUFFIX = ".redo";
070    private static final String FREE_FILE_SUFFIX = ".free";
071
072    // 4k Default page size.
073    public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
074    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
075    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
076
077    private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
078    private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
079
080    // Recovery header is (long offset)
081    private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
082
083    // A PageFile will use a couple of files in this directory
084    private final File directory;
085    // And the file names in that directory will be based on this name.
086    private final String name;
087
088    // File handle used for reading pages..
089    private RecoverableRandomAccessFile readFile;
090    // File handle used for writing pages..
091    private RecoverableRandomAccessFile writeFile;
092    // File handle used for writing pages..
093    private RecoverableRandomAccessFile recoveryFile;
094
095    // The size of pages
096    private int pageSize = DEFAULT_PAGE_SIZE;
097
098    // The minimum number of space allocated to the recovery file in number of pages.
099    private int recoveryFileMinPageCount = 1000;
100    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
101    // to this max size as soon as  possible.
102    private int recoveryFileMaxPageCount = 10000;
103    // The number of pages in the current recovery buffer
104    private int recoveryPageCount;
105
106    private final AtomicBoolean loaded = new AtomicBoolean();
107    // The number of pages we are aiming to write every time we
108    // write to disk.
109    int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
110
111    // We keep a cache of pages recently used?
112    private Map<Long, Page> pageCache;
113    // The cache of recently used pages.
114    private boolean enablePageCaching = true;
115    // How many pages will we keep in the cache?
116    private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
117
118    // Should first log the page write to the recovery buffer? Avoids partial
119    // page write failures..
120    private boolean enableRecoveryFile = true;
121    // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
122    private boolean enableDiskSyncs = true;
123    // Will writes be done in an async thread?
124    private boolean enabledWriteThread = false;
125
126    // These are used if enableAsyncWrites==true
127    private final AtomicBoolean stopWriter = new AtomicBoolean();
128    private Thread writerThread;
129    private CountDownLatch checkpointLatch;
130
131    // Keeps track of writes that are being written to disk.
132    private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
133
134    // Keeps track of free pages.
135    private final AtomicLong nextFreePageId = new AtomicLong();
136    private SequenceSet freeList = new SequenceSet();
137
138    private final AtomicLong nextTxid = new AtomicLong();
139
140    // Persistent settings stored in the page file.
141    private MetaData metaData;
142
143    private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
144
145    private boolean useLFRUEviction = false;
146    private float LFUEvictionFactor = 0.2f;
147
148    /**
149     * Use to keep track of updated pages which have not yet been committed.
150     */
151    static class PageWrite {
152        Page page;
153        byte[] current;
154        byte[] diskBound;
155        long currentLocation = -1;
156        long diskBoundLocation = -1;
157        File tmpFile;
158        int length;
159
160        public PageWrite(Page page, byte[] data) {
161            this.page = page;
162            current = data;
163        }
164
165        public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
166            this.page = page;
167            this.currentLocation = currentLocation;
168            this.tmpFile = tmpFile;
169            this.length = length;
170        }
171
172        public void setCurrent(Page page, byte[] data) {
173            this.page = page;
174            current = data;
175            currentLocation = -1;
176            diskBoundLocation = -1;
177        }
178
179        public void setCurrentLocation(Page page, long location, int length) {
180            this.page = page;
181            this.currentLocation = location;
182            this.length = length;
183            this.current = null;
184        }
185
186        @Override
187        public String toString() {
188            return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
189        }
190
191        @SuppressWarnings("unchecked")
192        public Page getPage() {
193            return page;
194        }
195
196        public byte[] getDiskBound() throws IOException {
197            if (diskBound == null && diskBoundLocation != -1) {
198                diskBound = new byte[length];
199                try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
200                    file.seek(diskBoundLocation);
201                    file.read(diskBound);
202                }
203                diskBoundLocation = -1;
204            }
205            return diskBound;
206        }
207
208        void begin() {
209            if (currentLocation != -1) {
210                diskBoundLocation = currentLocation;
211            } else {
212                diskBound = current;
213            }
214            current = null;
215            currentLocation = -1;
216        }
217
218        /**
219         * @return true if there is no pending writes to do.
220         */
221        boolean done() {
222            diskBoundLocation = -1;
223            diskBound = null;
224            return current == null || currentLocation == -1;
225        }
226
227        boolean isDone() {
228            return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
229        }
230    }
231
232    /**
233     * The MetaData object hold the persistent data associated with a PageFile object.
234     */
235    public static class MetaData {
236
237        String fileType;
238        String fileTypeVersion;
239
240        long metaDataTxId = -1;
241        int pageSize;
242        boolean cleanShutdown;
243        long lastTxId;
244        long freePages;
245
246        public String getFileType() {
247            return fileType;
248        }
249
250        public void setFileType(String fileType) {
251            this.fileType = fileType;
252        }
253
254        public String getFileTypeVersion() {
255            return fileTypeVersion;
256        }
257
258        public void setFileTypeVersion(String version) {
259            this.fileTypeVersion = version;
260        }
261
262        public long getMetaDataTxId() {
263            return metaDataTxId;
264        }
265
266        public void setMetaDataTxId(long metaDataTxId) {
267            this.metaDataTxId = metaDataTxId;
268        }
269
270        public int getPageSize() {
271            return pageSize;
272        }
273
274        public void setPageSize(int pageSize) {
275            this.pageSize = pageSize;
276        }
277
278        public boolean isCleanShutdown() {
279            return cleanShutdown;
280        }
281
282        public void setCleanShutdown(boolean cleanShutdown) {
283            this.cleanShutdown = cleanShutdown;
284        }
285
286        public long getLastTxId() {
287            return lastTxId;
288        }
289
290        public void setLastTxId(long lastTxId) {
291            this.lastTxId = lastTxId;
292        }
293
294        public long getFreePages() {
295            return freePages;
296        }
297
298        public void setFreePages(long value) {
299            this.freePages = value;
300        }
301    }
302
303    public Transaction tx() {
304        assertLoaded();
305        return new Transaction(this);
306    }
307
308    /**
309     * Creates a PageFile in the specified directory who's data files are named by name.
310     */
311    public PageFile(File directory, String name) {
312        this.directory = directory;
313        this.name = name;
314    }
315
316    /**
317     * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
318     *
319     * @throws IOException           if the files cannot be deleted.
320     * @throws IllegalStateException if this PageFile is loaded
321     */
322    public void delete() throws IOException {
323        if (loaded.get()) {
324            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
325        }
326        delete(getMainPageFile());
327        delete(getFreeFile());
328        delete(getRecoveryFile());
329    }
330
331    public void archive() throws IOException {
332        if (loaded.get()) {
333            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
334        }
335        long timestamp = System.currentTimeMillis();
336        archive(getMainPageFile(), String.valueOf(timestamp));
337        archive(getFreeFile(), String.valueOf(timestamp));
338        archive(getRecoveryFile(), String.valueOf(timestamp));
339    }
340
341    /**
342     * @param file
343     * @throws IOException
344     */
345    private void delete(File file) throws IOException {
346        if (file.exists() && !file.delete()) {
347            throw new IOException("Could not delete: " + file.getPath());
348        }
349    }
350
351    private void archive(File file, String suffix) throws IOException {
352        if (file.exists()) {
353            File archive = new File(file.getPath() + "-" + suffix);
354            if (!file.renameTo(archive)) {
355                throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
356            }
357        }
358    }
359
360    /**
361     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
362     * first time the page file is loaded, then this creates the page file in the file system.
363     *
364     * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
365     *                               there was a disk error.
366     * @throws IllegalStateException If the page file was already loaded.
367     */
368    public void load() throws IOException, IllegalStateException {
369        if (loaded.compareAndSet(false, true)) {
370
371            if (enablePageCaching) {
372                if (isUseLFRUEviction()) {
373                    pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
374                } else {
375                    pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
376                }
377            }
378
379            File file = getMainPageFile();
380            IOHelper.mkdirs(file.getParentFile());
381            writeFile = new RecoverableRandomAccessFile(file, "rw");
382            readFile = new RecoverableRandomAccessFile(file, "r");
383
384            if (readFile.length() > 0) {
385                // Load the page size setting cause that can't change once the file is created.
386                loadMetaData();
387                pageSize = metaData.getPageSize();
388            } else {
389                // Store the page size setting cause that can't change once the file is created.
390                metaData = new MetaData();
391                metaData.setFileType(PageFile.class.getName());
392                metaData.setFileTypeVersion("1");
393                metaData.setPageSize(getPageSize());
394                metaData.setCleanShutdown(true);
395                metaData.setFreePages(-1);
396                metaData.setLastTxId(0);
397                storeMetaData();
398            }
399
400            if (enableRecoveryFile) {
401                recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw");
402            }
403
404            boolean needsFreePageRecovery = false;
405
406            if (metaData.isCleanShutdown()) {
407                nextTxid.set(metaData.getLastTxId() + 1);
408                if (metaData.getFreePages() > 0) {
409                    loadFreeList();
410                }
411            } else {
412                LOG.debug(toString() + ", Recovering page file...");
413                nextTxid.set(redoRecoveryUpdates());
414                needsFreePageRecovery = true;
415            }
416
417            if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
418                writeFile.setLength(PAGE_FILE_HEADER_SIZE);
419            }
420            nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
421
422            if (needsFreePageRecovery) {
423                // Scan all to find the free pages after nextFreePageId is set
424                freeList = new SequenceSet();
425                for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
426                    Page page = i.next();
427                    if (page.getType() == Page.PAGE_FREE_TYPE) {
428                        freeList.add(page.getPageId());
429                    }
430                }
431            }
432
433            metaData.setCleanShutdown(false);
434            storeMetaData();
435            getFreeFile().delete();
436            startWriter();
437        } else {
438            throw new IllegalStateException("Cannot load the page file when it is already loaded.");
439        }
440    }
441
442
443    /**
444     * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
445     * once unloaded, you can no longer use the page file to read or write Pages.
446     *
447     * @throws IOException           if there was a disk error occurred while closing the down the page file.
448     * @throws IllegalStateException if the PageFile is not loaded
449     */
450    public void unload() throws IOException {
451        if (loaded.compareAndSet(true, false)) {
452            flush();
453            try {
454                stopWriter();
455            } catch (InterruptedException e) {
456                throw new InterruptedIOException();
457            }
458
459            if (freeList.isEmpty()) {
460                metaData.setFreePages(0);
461            } else {
462                storeFreeList();
463                metaData.setFreePages(freeList.size());
464            }
465
466            metaData.setLastTxId(nextTxid.get() - 1);
467            metaData.setCleanShutdown(true);
468            storeMetaData();
469
470            if (readFile != null) {
471                readFile.close();
472                readFile = null;
473                writeFile.close();
474                writeFile = null;
475                if (enableRecoveryFile) {
476                    recoveryFile.close();
477                    recoveryFile = null;
478                }
479                freeList.clear();
480                if (pageCache != null) {
481                    pageCache = null;
482                }
483                synchronized (writes) {
484                    writes.clear();
485                }
486            }
487        } else {
488            throw new IllegalStateException("Cannot unload the page file when it is not loaded");
489        }
490    }
491
492    public boolean isLoaded() {
493        return loaded.get();
494    }
495
496    /**
497     * Flush and sync all write buffers to disk.
498     *
499     * @throws IOException If an disk error occurred.
500     */
501    public void flush() throws IOException {
502
503        if (enabledWriteThread && stopWriter.get()) {
504            throw new IOException("Page file already stopped: checkpointing is not allowed");
505        }
506
507        // Setup a latch that gets notified when all buffered writes hits the disk.
508        CountDownLatch checkpointLatch;
509        synchronized (writes) {
510            if (writes.isEmpty()) {
511                return;
512            }
513            if (enabledWriteThread) {
514                if (this.checkpointLatch == null) {
515                    this.checkpointLatch = new CountDownLatch(1);
516                }
517                checkpointLatch = this.checkpointLatch;
518                writes.notify();
519            } else {
520                writeBatch();
521                return;
522            }
523        }
524        try {
525            checkpointLatch.await();
526        } catch (InterruptedException e) {
527            InterruptedIOException ioe = new InterruptedIOException();
528            ioe.initCause(e);
529            throw ioe;
530        }
531    }
532
533
534    @Override
535    public String toString() {
536        return "Page File: " + getMainPageFile();
537    }
538
539    ///////////////////////////////////////////////////////////////////
540    // Private Implementation Methods
541    ///////////////////////////////////////////////////////////////////
542    private File getMainPageFile() {
543        return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
544    }
545
546    public File getFreeFile() {
547        return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
548    }
549
550    public File getRecoveryFile() {
551        return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
552    }
553
554    public long toOffset(long pageId) {
555        return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
556    }
557
558    private void loadMetaData() throws IOException {
559
560        ByteArrayInputStream is;
561        MetaData v1 = new MetaData();
562        MetaData v2 = new MetaData();
563        try {
564            Properties p = new Properties();
565            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
566            readFile.seek(0);
567            readFile.readFully(d);
568            is = new ByteArrayInputStream(d);
569            p.load(is);
570            IntrospectionSupport.setProperties(v1, p);
571        } catch (IOException e) {
572            v1 = null;
573        }
574
575        try {
576            Properties p = new Properties();
577            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
578            readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
579            readFile.readFully(d);
580            is = new ByteArrayInputStream(d);
581            p.load(is);
582            IntrospectionSupport.setProperties(v2, p);
583        } catch (IOException e) {
584            v2 = null;
585        }
586
587        if (v1 == null && v2 == null) {
588            throw new IOException("Could not load page file meta data");
589        }
590
591        if (v1 == null || v1.metaDataTxId < 0) {
592            metaData = v2;
593        } else if (v2 == null || v1.metaDataTxId < 0) {
594            metaData = v1;
595        } else if (v1.metaDataTxId == v2.metaDataTxId) {
596            metaData = v1; // use the first since the 2nd could be a partial..
597        } else {
598            metaData = v2; // use the second cause the first is probably a partial.
599        }
600    }
601
602    private void storeMetaData() throws IOException {
603        // Convert the metadata into a property format
604        metaData.metaDataTxId++;
605        Properties p = new Properties();
606        IntrospectionSupport.getProperties(metaData, p, null);
607
608        ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
609        p.store(os, "");
610        if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
611            throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
612        }
613        // Fill the rest with space...
614        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
615        Arrays.fill(filler, (byte) ' ');
616        os.write(filler);
617        os.flush();
618
619        byte[] d = os.toByteArray();
620
621        // So we don't loose it.. write it 2 times...
622        writeFile.seek(0);
623        writeFile.write(d);
624        writeFile.sync();
625        writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
626        writeFile.write(d);
627        writeFile.sync();
628    }
629
630    private void storeFreeList() throws IOException {
631        FileOutputStream os = new FileOutputStream(getFreeFile());
632        DataOutputStream dos = new DataOutputStream(os);
633        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
634        dos.close();
635    }
636
637    private void loadFreeList() throws IOException {
638        freeList.clear();
639        FileInputStream is = new FileInputStream(getFreeFile());
640        DataInputStream dis = new DataInputStream(is);
641        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
642        dis.close();
643    }
644
645    ///////////////////////////////////////////////////////////////////
646    // Property Accessors
647    ///////////////////////////////////////////////////////////////////
648
649    /**
650     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
651     *
652     * @return is the recovery buffer enabled.
653     */
654    public boolean isEnableRecoveryFile() {
655        return enableRecoveryFile;
656    }
657
658    /**
659     * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
660     * may potentially cause partial page writes which can lead to page file corruption.
661     */
662    public void setEnableRecoveryFile(boolean doubleBuffer) {
663        assertNotLoaded();
664        this.enableRecoveryFile = doubleBuffer;
665    }
666
667    /**
668     * @return Are page writes synced to disk?
669     */
670    public boolean isEnableDiskSyncs() {
671        return enableDiskSyncs;
672    }
673
674    /**
675     * Allows you enable syncing writes to disk.
676     */
677    public void setEnableDiskSyncs(boolean syncWrites) {
678        assertNotLoaded();
679        this.enableDiskSyncs = syncWrites;
680    }
681
682    /**
683     * @return the page size
684     */
685    public int getPageSize() {
686        return this.pageSize;
687    }
688
689    /**
690     * @return the amount of content data that a page can hold.
691     */
692    public int getPageContentSize() {
693        return this.pageSize - Page.PAGE_HEADER_SIZE;
694    }
695
696    /**
697     * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
698     * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
699     * can no longer be changed.
700     *
701     * @param pageSize the pageSize to set
702     * @throws IllegalStateException once the page file is loaded.
703     */
704    public void setPageSize(int pageSize) throws IllegalStateException {
705        assertNotLoaded();
706        this.pageSize = pageSize;
707    }
708
709    /**
710     * @return true if read page caching is enabled
711     */
712    public boolean isEnablePageCaching() {
713        return this.enablePageCaching;
714    }
715
716    /**
717     * @param enablePageCaching allows you to enable read page caching
718     */
719    public void setEnablePageCaching(boolean enablePageCaching) {
720        assertNotLoaded();
721        this.enablePageCaching = enablePageCaching;
722    }
723
724    /**
725     * @return the maximum number of pages that will get stored in the read page cache.
726     */
727    public int getPageCacheSize() {
728        return this.pageCacheSize;
729    }
730
731    /**
732     * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
733     */
734    public void setPageCacheSize(int pageCacheSize) {
735        assertNotLoaded();
736        this.pageCacheSize = pageCacheSize;
737    }
738
739    public boolean isEnabledWriteThread() {
740        return enabledWriteThread;
741    }
742
743    public void setEnableWriteThread(boolean enableAsyncWrites) {
744        assertNotLoaded();
745        this.enabledWriteThread = enableAsyncWrites;
746    }
747
748    public long getDiskSize() throws IOException {
749        return toOffset(nextFreePageId.get());
750    }
751
752    /**
753     * @return the number of pages allocated in the PageFile
754     */
755    public long getPageCount() {
756        return nextFreePageId.get();
757    }
758
759    public int getRecoveryFileMinPageCount() {
760        return recoveryFileMinPageCount;
761    }
762
763    public long getFreePageCount() {
764        assertLoaded();
765        return freeList.rangeSize();
766    }
767
768    public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
769        assertNotLoaded();
770        this.recoveryFileMinPageCount = recoveryFileMinPageCount;
771    }
772
773    public int getRecoveryFileMaxPageCount() {
774        return recoveryFileMaxPageCount;
775    }
776
777    public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
778        assertNotLoaded();
779        this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
780    }
781
782    public int getWriteBatchSize() {
783        return writeBatchSize;
784    }
785
786    public void setWriteBatchSize(int writeBatchSize) {
787        this.writeBatchSize = writeBatchSize;
788    }
789
790    public float getLFUEvictionFactor() {
791        return LFUEvictionFactor;
792    }
793
794    public void setLFUEvictionFactor(float LFUEvictionFactor) {
795        this.LFUEvictionFactor = LFUEvictionFactor;
796    }
797
798    public boolean isUseLFRUEviction() {
799        return useLFRUEviction;
800    }
801
802    public void setUseLFRUEviction(boolean useLFRUEviction) {
803        this.useLFRUEviction = useLFRUEviction;
804    }
805
806    ///////////////////////////////////////////////////////////////////
807    // Package Protected Methods exposed to Transaction
808    ///////////////////////////////////////////////////////////////////
809
810    /**
811     * @throws IllegalStateException if the page file is not loaded.
812     */
813    void assertLoaded() throws IllegalStateException {
814        if (!loaded.get()) {
815            throw new IllegalStateException("PageFile is not loaded");
816        }
817    }
818
819    void assertNotLoaded() throws IllegalStateException {
820        if (loaded.get()) {
821            throw new IllegalStateException("PageFile is loaded");
822        }
823    }
824
825    /**
826     * Allocates a block of free pages that you can write data to.
827     *
828     * @param count the number of sequential pages to allocate
829     * @return the first page of the sequential set.
830     * @throws IOException           If an disk error occurred.
831     * @throws IllegalStateException if the PageFile is not loaded
832     */
833    <T> Page<T> allocate(int count) throws IOException {
834        assertLoaded();
835        if (count <= 0) {
836            throw new IllegalArgumentException("The allocation count must be larger than zero");
837        }
838
839        Sequence seq = freeList.removeFirstSequence(count);
840
841        // We may need to create new free pages...
842        if (seq == null) {
843
844            Page<T> first = null;
845            int c = count;
846
847            // Perform the id's only once....
848            long pageId = nextFreePageId.getAndAdd(count);
849            long writeTxnId = nextTxid.getAndAdd(count);
850
851            while (c-- > 0) {
852                Page<T> page = new Page<T>(pageId++);
853                page.makeFree(writeTxnId++);
854
855                if (first == null) {
856                    first = page;
857                }
858
859                addToCache(page);
860                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
861                page.write(out);
862                write(page, out.getData());
863
864                // LOG.debug("allocate writing: "+page.getPageId());
865            }
866
867            return first;
868        }
869
870        Page<T> page = new Page<T>(seq.getFirst());
871        page.makeFree(0);
872        // LOG.debug("allocated: "+page.getPageId());
873        return page;
874    }
875
876    long getNextWriteTransactionId() {
877        return nextTxid.incrementAndGet();
878    }
879
880    synchronized void readPage(long pageId, byte[] data) throws IOException {
881        readFile.seek(toOffset(pageId));
882        readFile.readFully(data);
883    }
884
885    public void freePage(long pageId) {
886        freeList.add(pageId);
887        removeFromCache(pageId);
888    }
889
890    @SuppressWarnings("unchecked")
891    private <T> void write(Page<T> page, byte[] data) throws IOException {
892        final PageWrite write = new PageWrite(page, data);
893        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
894            @Override
895            public Long getKey() {
896                return write.getPage().getPageId();
897            }
898
899            @Override
900            public PageWrite getValue() {
901                return write;
902            }
903
904            @Override
905            public PageWrite setValue(PageWrite value) {
906                return null;
907            }
908        };
909        Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
910        write(Arrays.asList(entries));
911    }
912
913    void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
914        synchronized (writes) {
915            if (enabledWriteThread) {
916                while (writes.size() >= writeBatchSize && !stopWriter.get()) {
917                    try {
918                        writes.wait();
919                    } catch (InterruptedException e) {
920                        Thread.currentThread().interrupt();
921                        throw new InterruptedIOException();
922                    }
923                }
924            }
925
926            boolean longTx = false;
927
928            for (Map.Entry<Long, PageWrite> entry : updates) {
929                Long key = entry.getKey();
930                PageWrite value = entry.getValue();
931                PageWrite write = writes.get(key);
932                if (write == null) {
933                    writes.put(key, value);
934                } else {
935                    if (value.currentLocation != -1) {
936                        write.setCurrentLocation(value.page, value.currentLocation, value.length);
937                        write.tmpFile = value.tmpFile;
938                        longTx = true;
939                    } else {
940                        write.setCurrent(value.page, value.current);
941                    }
942                }
943            }
944
945            // Once we start approaching capacity, notify the writer to start writing
946            // sync immediately for long txs
947            if (longTx || canStartWriteBatch()) {
948
949                if (enabledWriteThread) {
950                    writes.notify();
951                } else {
952                    writeBatch();
953                }
954            }
955        }
956    }
957
958    private boolean canStartWriteBatch() {
959        int capacityUsed = ((writes.size() * 100) / writeBatchSize);
960        if (enabledWriteThread) {
961            // The constant 10 here controls how soon write batches start going to disk..
962            // would be nice to figure out how to auto tune that value.  Make to small and
963            // we reduce through put because we are locking the write mutex too often doing writes
964            return capacityUsed >= 10 || checkpointLatch != null;
965        } else {
966            return capacityUsed >= 80 || checkpointLatch != null;
967        }
968    }
969
970    ///////////////////////////////////////////////////////////////////
971    // Cache Related operations
972    ///////////////////////////////////////////////////////////////////
973    @SuppressWarnings("unchecked")
974    <T> Page<T> getFromCache(long pageId) {
975        synchronized (writes) {
976            PageWrite pageWrite = writes.get(pageId);
977            if (pageWrite != null) {
978                return pageWrite.page;
979            }
980        }
981
982        Page<T> result = null;
983        if (enablePageCaching) {
984            result = pageCache.get(pageId);
985        }
986        return result;
987    }
988
989    void addToCache(Page page) {
990        if (enablePageCaching) {
991            pageCache.put(page.getPageId(), page);
992        }
993    }
994
995    void removeFromCache(long pageId) {
996        if (enablePageCaching) {
997            pageCache.remove(pageId);
998        }
999    }
1000
1001    ///////////////////////////////////////////////////////////////////
1002    // Internal Double write implementation follows...
1003    ///////////////////////////////////////////////////////////////////
1004
1005    private void pollWrites() {
1006        try {
1007            while (!stopWriter.get()) {
1008                // Wait for a notification...
1009                synchronized (writes) {
1010                    writes.notifyAll();
1011
1012                    // If there is not enough to write, wait for a notification...
1013                    while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
1014                        writes.wait(100);
1015                    }
1016
1017                    if (writes.isEmpty()) {
1018                        releaseCheckpointWaiter();
1019                    }
1020                }
1021                writeBatch();
1022            }
1023        } catch (Throwable e) {
1024            LOG.info("An exception was raised while performing poll writes", e);
1025        } finally {
1026            releaseCheckpointWaiter();
1027        }
1028    }
1029
1030    private void writeBatch() throws IOException {
1031
1032        CountDownLatch checkpointLatch;
1033        ArrayList<PageWrite> batch;
1034        synchronized (writes) {
1035            // If there is not enough to write, wait for a notification...
1036
1037            batch = new ArrayList<PageWrite>(writes.size());
1038            // build a write batch from the current write cache.
1039            for (PageWrite write : writes.values()) {
1040                batch.add(write);
1041                // Move the current write to the diskBound write, this lets folks update the
1042                // page again without blocking for this write.
1043                write.begin();
1044                if (write.diskBound == null && write.diskBoundLocation == -1) {
1045                    batch.remove(write);
1046                }
1047            }
1048
1049            // Grab on to the existing checkpoint latch cause once we do this write we can
1050            // release the folks that were waiting for those writes to hit disk.
1051            checkpointLatch = this.checkpointLatch;
1052            this.checkpointLatch = null;
1053        }
1054
1055        // First land the writes in the recovery file
1056        if (enableRecoveryFile) {
1057            Checksum checksum = new Adler32();
1058
1059            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1060
1061            for (PageWrite w : batch) {
1062                try {
1063                    checksum.update(w.getDiskBound(), 0, pageSize);
1064                } catch (Throwable t) {
1065                    throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1066                }
1067                recoveryFile.writeLong(w.page.getPageId());
1068                recoveryFile.write(w.getDiskBound(), 0, pageSize);
1069            }
1070
1071            // Can we shrink the recovery buffer??
1072            if (recoveryPageCount > recoveryFileMaxPageCount) {
1073                int t = Math.max(recoveryFileMinPageCount, batch.size());
1074                recoveryFile.setLength(recoveryFileSizeForPages(t));
1075            }
1076
1077            // Record the page writes in the recovery buffer.
1078            recoveryFile.seek(0);
1079            // Store the next tx id...
1080            recoveryFile.writeLong(nextTxid.get());
1081            // Store the checksum for thw write batch so that on recovery we
1082            // know if we have a consistent
1083            // write batch on disk.
1084            recoveryFile.writeLong(checksum.getValue());
1085            // Write the # of pages that will follow
1086            recoveryFile.writeInt(batch.size());
1087
1088            if (enableDiskSyncs) {
1089                recoveryFile.sync();
1090            }
1091        }
1092
1093        try {
1094            for (PageWrite w : batch) {
1095                writeFile.seek(toOffset(w.page.getPageId()));
1096                writeFile.write(w.getDiskBound(), 0, pageSize);
1097                w.done();
1098            }
1099
1100            if (enableDiskSyncs) {
1101                writeFile.sync();
1102            }
1103        } finally {
1104            synchronized (writes) {
1105                for (PageWrite w : batch) {
1106                    // If there are no more pending writes, then remove it from
1107                    // the write cache.
1108                    if (w.isDone()) {
1109                        writes.remove(w.page.getPageId());
1110                        if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1111                            if (!w.tmpFile.delete()) {
1112                                throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1113                            }
1114                            tmpFilesForRemoval.remove(w.tmpFile);
1115                        }
1116                    }
1117                }
1118            }
1119
1120            if (checkpointLatch != null) {
1121                checkpointLatch.countDown();
1122            }
1123        }
1124    }
1125
1126    public void removeTmpFile(File file) {
1127        tmpFilesForRemoval.add(file);
1128    }
1129
1130    private long recoveryFileSizeForPages(int pageCount) {
1131        return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1132    }
1133
1134    private void releaseCheckpointWaiter() {
1135        if (checkpointLatch != null) {
1136            checkpointLatch.countDown();
1137            checkpointLatch = null;
1138        }
1139    }
1140
1141    /**
1142     * Inspects the recovery buffer and re-applies any
1143     * partially applied page writes.
1144     *
1145     * @return the next transaction id that can be used.
1146     */
1147    private long redoRecoveryUpdates() throws IOException {
1148        if (!enableRecoveryFile) {
1149            return 0;
1150        }
1151        recoveryPageCount = 0;
1152
1153        // Are we initializing the recovery file?
1154        if (recoveryFile.length() == 0) {
1155            // Write an empty header..
1156            recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1157            // Preallocate the minium size for better performance.
1158            recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1159            return 0;
1160        }
1161
1162        // How many recovery pages do we have in the recovery buffer?
1163        recoveryFile.seek(0);
1164        long nextTxId = recoveryFile.readLong();
1165        long expectedChecksum = recoveryFile.readLong();
1166        int pageCounter = recoveryFile.readInt();
1167
1168        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1169        Checksum checksum = new Adler32();
1170        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1171        try {
1172            for (int i = 0; i < pageCounter; i++) {
1173                long offset = recoveryFile.readLong();
1174                byte[] data = new byte[pageSize];
1175                if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1176                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1177                    return nextTxId;
1178                }
1179                checksum.update(data, 0, pageSize);
1180                batch.put(offset, data);
1181            }
1182        } catch (Exception e) {
1183            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1184            // as the pages should still be consistent.
1185            LOG.debug("Redo buffer was not fully intact: ", e);
1186            return nextTxId;
1187        }
1188
1189        recoveryPageCount = pageCounter;
1190
1191        // If the checksum is not valid then the recovery buffer was partially written to disk.
1192        if (checksum.getValue() != expectedChecksum) {
1193            return nextTxId;
1194        }
1195
1196        // Re-apply all the writes in the recovery buffer.
1197        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1198            writeFile.seek(toOffset(e.getKey()));
1199            writeFile.write(e.getValue());
1200        }
1201
1202        // And sync it to disk
1203        writeFile.sync();
1204        return nextTxId;
1205    }
1206
1207    private void startWriter() {
1208        synchronized (writes) {
1209            if (enabledWriteThread) {
1210                stopWriter.set(false);
1211                writerThread = new Thread("KahaDB Page Writer") {
1212                    @Override
1213                    public void run() {
1214                        pollWrites();
1215                    }
1216                };
1217                writerThread.setPriority(Thread.MAX_PRIORITY);
1218                writerThread.setDaemon(true);
1219                writerThread.start();
1220            }
1221        }
1222    }
1223
1224    private void stopWriter() throws InterruptedException {
1225        if (enabledWriteThread) {
1226            stopWriter.set(true);
1227            writerThread.join();
1228        }
1229    }
1230
1231    public File getFile() {
1232        return getMainPageFile();
1233    }
1234
1235    public File getDirectory() {
1236        return directory;
1237    }
1238}