001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.kaha.impl.async; 018 019import java.io.File; 020import java.io.FilenameFilter; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025 026import org.apache.activemq.thread.Scheduler; 027import org.apache.activemq.util.ByteSequence; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * An AsyncDataManager that works in read only mode against multiple data directories. 033 * Useful for reading back archived data files. 034 */ 035public class ReadOnlyAsyncDataManager extends AsyncDataManager { 036 037 private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyAsyncDataManager.class); 038 private final ArrayList<File> dirs; 039 040 public ReadOnlyAsyncDataManager(final ArrayList<File> dirs) { 041 this.dirs = dirs; 042 } 043 044 @SuppressWarnings("unchecked") 045 public synchronized void start() throws IOException { 046 if (started) { 047 return; 048 } 049 050 started = true; 051 052 accessorPool = new DataFileAccessorPool(this); 053 054 ArrayList<File> files = new ArrayList<File>(); 055 for (File directory : dirs) { 056 final File d = directory; 057 File[] f = d.listFiles(new FilenameFilter() { 058 public boolean accept(File dir, String n) { 059 return dir.equals(d) && n.startsWith(filePrefix); 060 } 061 }); 062 for (int i = 0; i < f.length; i++) { 063 files.add(f[i]); 064 } 065 } 066 067 for (File file : files) { 068 try { 069 String n = file.getName(); 070 String numStr = n.substring(filePrefix.length(), n.length()); 071 int num = Integer.parseInt(numStr); 072 DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength); 073 fileMap.put(dataFile.getDataFileId(), dataFile); 074 storeSize.addAndGet(dataFile.getLength()); 075 } catch (NumberFormatException e) { 076 // Ignore file that do not match the pattern. 077 } 078 } 079 080 // Sort the list so that we can link the DataFiles together in the 081 // right order. 082 List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values()); 083 Collections.sort(dataFiles); 084 currentWriteFile = null; 085 for (DataFile df : dataFiles) { 086 if (currentWriteFile != null) { 087 currentWriteFile.linkAfter(df); 088 } 089 currentWriteFile = df; 090 fileByFileMap.put(df.getFile(), df); 091 } 092 093 // Need to check the current Write File to see if there was a partial 094 // write to it. 095 if (currentWriteFile != null) { 096 097 // See if the lastSyncedLocation is valid.. 098 Location l = lastAppendLocation.get(); 099 if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) { 100 l = null; 101 } 102 } 103 } 104 105 public synchronized void close() throws IOException { 106 if (!started) { 107 return; 108 } 109 accessorPool.close(); 110 fileMap.clear(); 111 fileByFileMap.clear(); 112 started = false; 113 } 114 115 116 public Location getFirstLocation() throws IllegalStateException, IOException { 117 if( currentWriteFile == null ) { 118 return null; 119 } 120 121 DataFile first = (DataFile)currentWriteFile.getHeadNode(); 122 Location cur = new Location(); 123 cur.setDataFileId(first.getDataFileId()); 124 cur.setOffset(0); 125 cur.setSize(0); 126 return getNextLocation(cur); 127 } 128 129 @Override 130 public synchronized boolean delete() throws IOException { 131 throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager"); 132 } 133}