001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.plist; 018 019import org.apache.activemq.broker.BrokerService; 020import org.apache.activemq.broker.BrokerServiceAware; 021import org.apache.activemq.openwire.OpenWireFormat; 022import org.apache.activemq.store.JournaledStore; 023import org.apache.activemq.store.PList; 024import org.apache.activemq.store.PListStore; 025import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 026import org.apache.activemq.store.kahadb.disk.journal.Journal; 027import org.apache.activemq.store.kahadb.disk.journal.Location; 028import org.apache.activemq.store.kahadb.disk.page.Page; 029import org.apache.activemq.store.kahadb.disk.page.PageFile; 030import org.apache.activemq.store.kahadb.disk.page.Transaction; 031import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 032import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 033import org.apache.activemq.thread.Scheduler; 034import org.apache.activemq.util.*; 035import org.apache.activemq.wireformat.WireFormat; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import java.io.DataInput; 040import java.io.DataOutput; 041import java.io.File; 042import java.io.IOException; 043import java.util.*; 044import java.util.Map.Entry; 045 046/** 047 * @org.apache.xbean.XBean 048 */ 049public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore { 050 static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class); 051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 052 053 static final int CLOSED_STATE = 1; 054 static final int OPEN_STATE = 2; 055 056 private File directory; 057 private File indexDirectory; 058 PageFile pageFile; 059 private Journal journal; 060 private LockFile lockFile; 061 private boolean failIfDatabaseIsLocked; 062 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 063 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 064 private boolean enableIndexWriteAsync = false; 065 private boolean initialized = false; 066 private boolean lazyInit = true; 067 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 068 MetaData metaData = new MetaData(this); 069 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 070 Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>(); 071 final Object indexLock = new Object(); 072 private Scheduler scheduler; 073 private long cleanupInterval = 30000; 074 075 private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE; 076 private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE; 077 private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 078 private boolean indexEnablePageCaching = true; 079 080 public Object getIndexLock() { 081 return indexLock; 082 } 083 084 @Override 085 public void setBrokerService(BrokerService brokerService) { 086 this.scheduler = brokerService.getScheduler(); 087 } 088 089 public int getIndexPageSize() { 090 return indexPageSize; 091 } 092 093 public int getIndexCacheSize() { 094 return indexCacheSize; 095 } 096 097 public int getIndexWriteBatchSize() { 098 return indexWriteBatchSize; 099 } 100 101 public void setIndexPageSize(int indexPageSize) { 102 this.indexPageSize = indexPageSize; 103 } 104 105 public void setIndexCacheSize(int indexCacheSize) { 106 this.indexCacheSize = indexCacheSize; 107 } 108 109 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 110 this.indexWriteBatchSize = indexWriteBatchSize; 111 } 112 113 public boolean getIndexEnablePageCaching() { 114 return indexEnablePageCaching; 115 } 116 117 public void setIndexEnablePageCaching(boolean indexEnablePageCaching) { 118 this.indexEnablePageCaching = indexEnablePageCaching; 119 } 120 121 protected class MetaData { 122 protected MetaData(PListStoreImpl store) { 123 this.store = store; 124 } 125 126 private final PListStoreImpl store; 127 Page<MetaData> page; 128 BTreeIndex<String, PListImpl> lists; 129 130 void createIndexes(Transaction tx) throws IOException { 131 this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId()); 132 } 133 134 void load(Transaction tx) throws IOException { 135 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 136 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 137 this.lists.load(tx); 138 } 139 140 void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException { 141 for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) { 142 Entry<String, PListImpl> entry = i.next(); 143 entry.getValue().load(tx); 144 lists.put(entry.getKey(), entry.getValue()); 145 } 146 } 147 148 public void read(DataInput is) throws IOException { 149 this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong()); 150 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 151 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 152 } 153 154 public void write(DataOutput os) throws IOException { 155 os.writeLong(this.lists.getPageId()); 156 } 157 } 158 159 class MetaDataMarshaller extends VariableMarshaller<MetaData> { 160 private final PListStoreImpl store; 161 162 MetaDataMarshaller(PListStoreImpl store) { 163 this.store = store; 164 } 165 @Override 166 public MetaData readPayload(DataInput dataIn) throws IOException { 167 MetaData rc = new MetaData(this.store); 168 rc.read(dataIn); 169 return rc; 170 } 171 172 @Override 173 public void writePayload(MetaData object, DataOutput dataOut) throws IOException { 174 object.write(dataOut); 175 } 176 } 177 178 class PListMarshaller extends VariableMarshaller<PListImpl> { 179 private final PListStoreImpl store; 180 PListMarshaller(PListStoreImpl store) { 181 this.store = store; 182 } 183 @Override 184 public PListImpl readPayload(DataInput dataIn) throws IOException { 185 PListImpl result = new PListImpl(this.store); 186 result.read(dataIn); 187 return result; 188 } 189 190 @Override 191 public void writePayload(PListImpl list, DataOutput dataOut) throws IOException { 192 list.write(dataOut); 193 } 194 } 195 196 public Journal getJournal() { 197 return this.journal; 198 } 199 200 @Override 201 public File getDirectory() { 202 return directory; 203 } 204 205 @Override 206 public void setDirectory(File directory) { 207 this.directory = directory; 208 } 209 210 public File getIndexDirectory() { 211 return indexDirectory != null ? indexDirectory : directory; 212 } 213 214 public void setIndexDirectory(File indexDirectory) { 215 this.indexDirectory = indexDirectory; 216 } 217 218 @Override 219 public long size() { 220 synchronized (this) { 221 if (!initialized) { 222 return 0; 223 } 224 } 225 try { 226 return journal.getDiskSize() + pageFile.getDiskSize(); 227 } catch (IOException e) { 228 throw new RuntimeException(e); 229 } 230 } 231 232 @Override 233 public PListImpl getPList(final String name) throws Exception { 234 if (!isStarted()) { 235 throw new IllegalStateException("Not started"); 236 } 237 intialize(); 238 synchronized (indexLock) { 239 synchronized (this) { 240 PListImpl result = this.persistentLists.get(name); 241 if (result == null) { 242 final PListImpl pl = new PListImpl(this); 243 pl.setName(name); 244 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 245 @Override 246 public void execute(Transaction tx) throws IOException { 247 pl.setHeadPageId(tx.allocate().getPageId()); 248 pl.load(tx); 249 metaData.lists.put(tx, name, pl); 250 } 251 }); 252 result = pl; 253 this.persistentLists.put(name, pl); 254 } 255 final PListImpl toLoad = result; 256 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 257 @Override 258 public void execute(Transaction tx) throws IOException { 259 toLoad.load(tx); 260 } 261 }); 262 263 return result; 264 } 265 } 266 } 267 268 @Override 269 public boolean removePList(final String name) throws Exception { 270 boolean result = false; 271 synchronized (indexLock) { 272 synchronized (this) { 273 final PList pl = this.persistentLists.remove(name); 274 result = pl != null; 275 if (result) { 276 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 277 @Override 278 public void execute(Transaction tx) throws IOException { 279 metaData.lists.remove(tx, name); 280 pl.destroy(); 281 } 282 }); 283 } 284 } 285 } 286 return result; 287 } 288 289 protected synchronized void intialize() throws Exception { 290 if (isStarted()) { 291 if (this.initialized == false) { 292 if (this.directory == null) { 293 this.directory = getDefaultDirectory(); 294 } 295 IOHelper.mkdirs(this.directory); 296 IOHelper.deleteChildren(this.directory); 297 if (this.indexDirectory != null) { 298 IOHelper.mkdirs(this.indexDirectory); 299 IOHelper.deleteChildren(this.indexDirectory); 300 } 301 lock(); 302 this.journal = new Journal(); 303 this.journal.setDirectory(directory); 304 this.journal.setMaxFileLength(getJournalMaxFileLength()); 305 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); 306 this.journal.start(); 307 this.pageFile = new PageFile(getIndexDirectory(), "tmpDB"); 308 this.pageFile.setEnablePageCaching(getIndexEnablePageCaching()); 309 this.pageFile.setPageSize(getIndexPageSize()); 310 this.pageFile.setWriteBatchSize(getIndexWriteBatchSize()); 311 this.pageFile.setPageCacheSize(getIndexCacheSize()); 312 this.pageFile.load(); 313 314 this.pageFile.tx().execute(new Transaction.Closure<IOException>() { 315 @Override 316 public void execute(Transaction tx) throws IOException { 317 if (pageFile.getPageCount() == 0) { 318 Page<MetaData> page = tx.allocate(); 319 assert page.getPageId() == 0; 320 page.set(metaData); 321 metaData.page = page; 322 metaData.createIndexes(tx); 323 tx.store(metaData.page, metaDataMarshaller, true); 324 325 } else { 326 Page<MetaData> page = tx.load(0, metaDataMarshaller); 327 metaData = page.get(); 328 metaData.page = page; 329 } 330 metaData.load(tx); 331 metaData.loadLists(tx, persistentLists); 332 } 333 }); 334 this.pageFile.flush(); 335 336 if (cleanupInterval > 0) { 337 if (scheduler == null) { 338 scheduler = new Scheduler(PListStoreImpl.class.getSimpleName()); 339 scheduler.start(); 340 } 341 scheduler.executePeriodically(this, cleanupInterval); 342 } 343 this.initialized = true; 344 LOG.info(this + " initialized"); 345 } 346 } 347 } 348 349 protected File getDefaultDirectory() { 350 return new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); 351 } 352 353 protected void cleanupDirectory(final File dir) { 354 if (dir != null && dir.exists()) { 355 IOHelper.delete(dir); 356 } 357 } 358 359 @Override 360 protected synchronized void doStart() throws Exception { 361 if (!lazyInit) { 362 intialize(); 363 } else { 364 if (this.directory == null) { 365 this.directory = getDefaultDirectory(); 366 } 367 //Go ahead and clean up previous data on start up 368 cleanupDirectory(this.directory); 369 cleanupDirectory(this.indexDirectory); 370 } 371 LOG.info(this + " started"); 372 } 373 374 @Override 375 protected synchronized void doStop(ServiceStopper stopper) throws Exception { 376 if (scheduler != null) { 377 if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) { 378 scheduler.stop(); 379 scheduler = null; 380 } 381 } 382 for (PListImpl pl : this.persistentLists.values()) { 383 pl.unload(null); 384 } 385 if (this.pageFile != null) { 386 this.pageFile.unload(); 387 } 388 if (this.journal != null) { 389 journal.close(); 390 } 391 if (this.lockFile != null) { 392 this.lockFile.unlock(); 393 } 394 this.lockFile = null; 395 this.initialized = false; 396 LOG.info(this + " stopped"); 397 398 } 399 400 @Override 401 public void run() { 402 try { 403 if (isStopping()) { 404 return; 405 } 406 final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId(); 407 final Set<Integer> candidates = journal.getFileMap().keySet(); 408 LOG.trace("Full gc candidate set:" + candidates); 409 if (candidates.size() > 1) { 410 // prune current write 411 for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) { 412 if (iterator.next() >= lastJournalFileId) { 413 iterator.remove(); 414 } 415 } 416 List<PListImpl> plists = null; 417 synchronized (indexLock) { 418 synchronized (this) { 419 plists = new ArrayList<PListImpl>(persistentLists.values()); 420 } 421 } 422 for (PListImpl list : plists) { 423 list.claimFileLocations(candidates); 424 if (isStopping()) { 425 return; 426 } 427 LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates); 428 } 429 LOG.trace("GC Candidate set:" + candidates); 430 this.journal.removeDataFiles(candidates); 431 } 432 } catch (IOException e) { 433 LOG.error("Exception on periodic cleanup: " + e, e); 434 } 435 } 436 437 ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 438 ByteSequence result = null; 439 result = this.journal.read(location); 440 return result; 441 } 442 443 Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { 444 return this.journal.write(payload, sync); 445 } 446 447 private void lock() throws IOException { 448 if (lockFile == null) { 449 File lockFileName = new File(directory, "lock"); 450 lockFile = new LockFile(lockFileName, true); 451 if (failIfDatabaseIsLocked) { 452 lockFile.lock(); 453 } else { 454 while (true) { 455 try { 456 lockFile.lock(); 457 break; 458 } catch (IOException e) { 459 LOG.info("Database " + lockFileName + " is locked... waiting " 460 + (DATABASE_LOCKED_WAIT_DELAY / 1000) 461 + " seconds for the database to be unlocked. Reason: " + e); 462 try { 463 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 464 } catch (InterruptedException e1) { 465 } 466 } 467 } 468 } 469 } 470 } 471 472 PageFile getPageFile() { 473 this.pageFile.isLoaded(); 474 return this.pageFile; 475 } 476 477 public boolean isFailIfDatabaseIsLocked() { 478 return failIfDatabaseIsLocked; 479 } 480 481 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 482 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 483 } 484 485 @Override 486 public int getJournalMaxFileLength() { 487 return journalMaxFileLength; 488 } 489 490 public void setJournalMaxFileLength(int journalMaxFileLength) { 491 this.journalMaxFileLength = journalMaxFileLength; 492 } 493 494 public int getJournalMaxWriteBatchSize() { 495 return journalMaxWriteBatchSize; 496 } 497 498 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 499 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 500 } 501 502 public boolean isEnableIndexWriteAsync() { 503 return enableIndexWriteAsync; 504 } 505 506 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 507 this.enableIndexWriteAsync = enableIndexWriteAsync; 508 } 509 510 public long getCleanupInterval() { 511 return cleanupInterval; 512 } 513 514 public void setCleanupInterval(long cleanupInterval) { 515 this.cleanupInterval = cleanupInterval; 516 } 517 518 public boolean isLazyInit() { 519 return lazyInit; 520 } 521 522 public void setLazyInit(boolean lazyInit) { 523 this.lazyInit = lazyInit; 524 } 525 526 @Override 527 public String toString() { 528 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 529 if (indexDirectory != null) { 530 path += "|" + indexDirectory.getAbsolutePath(); 531 } 532 return "PListStore:[" + path + "]"; 533 } 534}