001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Set; 024import java.util.concurrent.Callable; 025 026import javax.management.ObjectName; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.LockableServiceSupport; 031import org.apache.activemq.broker.Locker; 032import org.apache.activemq.broker.jmx.AnnotatedMBean; 033import org.apache.activemq.broker.jmx.PersistenceAdapterView; 034import org.apache.activemq.broker.scheduler.JobSchedulerStore; 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.ActiveMQQueue; 037import org.apache.activemq.command.ActiveMQTopic; 038import org.apache.activemq.command.LocalTransactionId; 039import org.apache.activemq.command.ProducerId; 040import org.apache.activemq.command.TransactionId; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.protobuf.Buffer; 043import org.apache.activemq.store.JournaledStore; 044import org.apache.activemq.store.MessageStore; 045import org.apache.activemq.store.NoLocalSubscriptionAware; 046import org.apache.activemq.store.PersistenceAdapter; 047import org.apache.activemq.store.SharedFileLocker; 048import org.apache.activemq.store.TopicMessageStore; 049import org.apache.activemq.store.TransactionIdTransformer; 050import org.apache.activemq.store.TransactionIdTransformerAware; 051import org.apache.activemq.store.TransactionStore; 052import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 053import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 054import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 055import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 056import org.apache.activemq.usage.SystemUsage; 057import org.apache.activemq.util.ServiceStopper; 058 059/** 060 * An implementation of {@link PersistenceAdapter} designed for use with 061 * KahaDB - Embedded Lightweight Non-Relational Database 062 * 063 * @org.apache.xbean.XBean element="kahaDB" 064 * 065 */ 066public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, 067 JournaledStore, TransactionIdTransformerAware, NoLocalSubscriptionAware { 068 069 private final KahaDBStore letter = new KahaDBStore(); 070 071 /** 072 * @param context 073 * @throws IOException 074 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 075 */ 076 @Override 077 public void beginTransaction(ConnectionContext context) throws IOException { 078 this.letter.beginTransaction(context); 079 } 080 081 /** 082 * @param sync 083 * @throws IOException 084 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 085 */ 086 @Override 087 public void checkpoint(boolean sync) throws IOException { 088 this.letter.checkpoint(sync); 089 } 090 091 /** 092 * @param context 093 * @throws IOException 094 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 095 */ 096 @Override 097 public void commitTransaction(ConnectionContext context) throws IOException { 098 this.letter.commitTransaction(context); 099 } 100 101 /** 102 * @param destination 103 * @return MessageStore 104 * @throws IOException 105 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 106 */ 107 @Override 108 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 109 return this.letter.createQueueMessageStore(destination); 110 } 111 112 /** 113 * @param destination 114 * @return TopicMessageStore 115 * @throws IOException 116 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 117 */ 118 @Override 119 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 120 return this.letter.createTopicMessageStore(destination); 121 } 122 123 /** 124 * @return TransactionStore 125 * @throws IOException 126 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 127 */ 128 @Override 129 public TransactionStore createTransactionStore() throws IOException { 130 return this.letter.createTransactionStore(); 131 } 132 133 /** 134 * @throws IOException 135 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 136 */ 137 @Override 138 public void deleteAllMessages() throws IOException { 139 this.letter.deleteAllMessages(); 140 } 141 142 /** 143 * @return destinations 144 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 145 */ 146 @Override 147 public Set<ActiveMQDestination> getDestinations() { 148 return this.letter.getDestinations(); 149 } 150 151 /** 152 * @return lastMessageBrokerSequenceId 153 * @throws IOException 154 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 155 */ 156 @Override 157 public long getLastMessageBrokerSequenceId() throws IOException { 158 return this.letter.getLastMessageBrokerSequenceId(); 159 } 160 161 @Override 162 public long getLastProducerSequenceId(ProducerId id) throws IOException { 163 return this.letter.getLastProducerSequenceId(id); 164 } 165 166 /** 167 * @param destination 168 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 169 */ 170 @Override 171 public void removeQueueMessageStore(ActiveMQQueue destination) { 172 this.letter.removeQueueMessageStore(destination); 173 } 174 175 /** 176 * @param destination 177 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 178 */ 179 @Override 180 public void removeTopicMessageStore(ActiveMQTopic destination) { 181 this.letter.removeTopicMessageStore(destination); 182 } 183 184 /** 185 * @param context 186 * @throws IOException 187 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 188 */ 189 @Override 190 public void rollbackTransaction(ConnectionContext context) throws IOException { 191 this.letter.rollbackTransaction(context); 192 } 193 194 /** 195 * @param brokerName 196 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 197 */ 198 @Override 199 public void setBrokerName(String brokerName) { 200 this.letter.setBrokerName(brokerName); 201 } 202 203 /** 204 * @param usageManager 205 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 206 */ 207 @Override 208 public void setUsageManager(SystemUsage usageManager) { 209 this.letter.setUsageManager(usageManager); 210 } 211 212 /** 213 * @return the size of the store 214 * @see org.apache.activemq.store.PersistenceAdapter#size() 215 */ 216 @Override 217 public long size() { 218 return this.letter.size(); 219 } 220 221 /** 222 * @throws Exception 223 * @see org.apache.activemq.Service#start() 224 */ 225 @Override 226 public void doStart() throws Exception { 227 this.letter.start(); 228 229 if (brokerService != null && brokerService.isUseJmx()) { 230 PersistenceAdapterView view = new PersistenceAdapterView(this); 231 view.setInflightTransactionViewCallable(new Callable<String>() { 232 @Override 233 public String call() throws Exception { 234 return letter.getTransactions(); 235 } 236 }); 237 view.setDataViewCallable(new Callable<String>() { 238 @Override 239 public String call() throws Exception { 240 return letter.getJournal().getFileMap().keySet().toString(); 241 } 242 }); 243 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, 244 createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString())); 245 } 246 } 247 248 /** 249 * @throws Exception 250 * @see org.apache.activemq.Service#stop() 251 */ 252 @Override 253 public void doStop(ServiceStopper stopper) throws Exception { 254 this.letter.stop(); 255 256 if (brokerService != null && brokerService.isUseJmx()) { 257 ObjectName brokerObjectName = brokerService.getBrokerObjectName(); 258 brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString())); 259 } 260 } 261 262 /** 263 * Get the journalMaxFileLength 264 * 265 * @return the journalMaxFileLength 266 */ 267 @Override 268 public int getJournalMaxFileLength() { 269 return this.letter.getJournalMaxFileLength(); 270 } 271 272 /** 273 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 274 * be used 275 * 276 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 277 */ 278 public void setJournalMaxFileLength(int journalMaxFileLength) { 279 this.letter.setJournalMaxFileLength(journalMaxFileLength); 280 } 281 282 /** 283 * Set the max number of producers (LRU cache) to track for duplicate sends 284 */ 285 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 286 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); 287 } 288 289 public int getMaxFailoverProducersToTrack() { 290 return this.letter.getMaxFailoverProducersToTrack(); 291 } 292 293 /** 294 * set the audit window depth for duplicate suppression (should exceed the max transaction 295 * batch) 296 */ 297 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 298 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); 299 } 300 301 public int getFailoverProducersAuditDepth() { 302 return this.letter.getFailoverProducersAuditDepth(); 303 } 304 305 /** 306 * Get the checkpointInterval 307 * 308 * @return the checkpointInterval 309 */ 310 public long getCheckpointInterval() { 311 return this.letter.getCheckpointInterval(); 312 } 313 314 /** 315 * Set the checkpointInterval 316 * 317 * @param checkpointInterval 318 * the checkpointInterval to set 319 */ 320 public void setCheckpointInterval(long checkpointInterval) { 321 this.letter.setCheckpointInterval(checkpointInterval); 322 } 323 324 /** 325 * Get the cleanupInterval 326 * 327 * @return the cleanupInterval 328 */ 329 public long getCleanupInterval() { 330 return this.letter.getCleanupInterval(); 331 } 332 333 /** 334 * Set the cleanupInterval 335 * 336 * @param cleanupInterval 337 * the cleanupInterval to set 338 */ 339 public void setCleanupInterval(long cleanupInterval) { 340 this.letter.setCleanupInterval(cleanupInterval); 341 } 342 343 /** 344 * Get the indexWriteBatchSize 345 * 346 * @return the indexWriteBatchSize 347 */ 348 public int getIndexWriteBatchSize() { 349 return this.letter.getIndexWriteBatchSize(); 350 } 351 352 /** 353 * Set the indexWriteBatchSize 354 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 355 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 356 * @param indexWriteBatchSize 357 * the indexWriteBatchSize to set 358 */ 359 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 360 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 361 } 362 363 /** 364 * Get the journalMaxWriteBatchSize 365 * 366 * @return the journalMaxWriteBatchSize 367 */ 368 public int getJournalMaxWriteBatchSize() { 369 return this.letter.getJournalMaxWriteBatchSize(); 370 } 371 372 /** 373 * Set the journalMaxWriteBatchSize 374 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 375 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 376 * @param journalMaxWriteBatchSize 377 * the journalMaxWriteBatchSize to set 378 */ 379 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 380 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 381 } 382 383 /** 384 * Get the enableIndexWriteAsync 385 * 386 * @return the enableIndexWriteAsync 387 */ 388 public boolean isEnableIndexWriteAsync() { 389 return this.letter.isEnableIndexWriteAsync(); 390 } 391 392 /** 393 * Set the enableIndexWriteAsync 394 * 395 * @param enableIndexWriteAsync 396 * the enableIndexWriteAsync to set 397 */ 398 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 399 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 400 } 401 402 /** 403 * Get the directory 404 * 405 * @return the directory 406 */ 407 @Override 408 public File getDirectory() { 409 return this.letter.getDirectory(); 410 } 411 412 /** 413 * @param dir 414 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 415 */ 416 @Override 417 public void setDirectory(File dir) { 418 this.letter.setDirectory(dir); 419 } 420 421 /** 422 * @return the currently configured location of the KahaDB index files. 423 */ 424 public File getIndexDirectory() { 425 return this.letter.getIndexDirectory(); 426 } 427 428 /** 429 * Sets the directory where KahaDB index files should be written. 430 * 431 * @param indexDirectory 432 * the directory where the KahaDB store index files should be written. 433 */ 434 public void setIndexDirectory(File indexDirectory) { 435 this.letter.setIndexDirectory(indexDirectory); 436 } 437 438 /** 439 * Get the enableJournalDiskSyncs 440 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 441 * @return the enableJournalDiskSyncs 442 */ 443 public boolean isEnableJournalDiskSyncs() { 444 return this.letter.isEnableJournalDiskSyncs(); 445 } 446 447 /** 448 * Set the enableJournalDiskSyncs 449 * 450 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 451 * @param enableJournalDiskSyncs 452 * the enableJournalDiskSyncs to set 453 */ 454 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 455 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 456 } 457 458 /** 459 * @return 460 */ 461 public String getJournalDiskSyncStrategy() { 462 return letter.getJournalDiskSyncStrategy(); 463 } 464 465 public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() { 466 return letter.getJournalDiskSyncStrategyEnum(); 467 } 468 469 /** 470 * @param journalDiskSyncStrategy 471 */ 472 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 473 letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 474 } 475 476 /** 477 * @return 478 */ 479 public long getJournalDiskSyncInterval() { 480 return letter.getJournalDiskSyncInterval(); 481 } 482 483 /** 484 * @param journalDiskSyncInterval 485 */ 486 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 487 letter.setJournalDiskSyncInterval(journalDiskSyncInterval); 488 } 489 490 /** 491 * Get the indexCacheSize 492 * 493 * @return the indexCacheSize 494 */ 495 public int getIndexCacheSize() { 496 return this.letter.getIndexCacheSize(); 497 } 498 499 /** 500 * Set the indexCacheSize 501 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 502 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 503 * @param indexCacheSize 504 * the indexCacheSize to set 505 */ 506 public void setIndexCacheSize(int indexCacheSize) { 507 this.letter.setIndexCacheSize(indexCacheSize); 508 } 509 510 /** 511 * Get the ignoreMissingJournalfiles 512 * 513 * @return the ignoreMissingJournalfiles 514 */ 515 public boolean isIgnoreMissingJournalfiles() { 516 return this.letter.isIgnoreMissingJournalfiles(); 517 } 518 519 /** 520 * Set the ignoreMissingJournalfiles 521 * 522 * @param ignoreMissingJournalfiles 523 * the ignoreMissingJournalfiles to set 524 */ 525 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 526 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 527 } 528 529 public boolean isChecksumJournalFiles() { 530 return letter.isChecksumJournalFiles(); 531 } 532 533 public boolean isCheckForCorruptJournalFiles() { 534 return letter.isCheckForCorruptJournalFiles(); 535 } 536 537 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 538 letter.setChecksumJournalFiles(checksumJournalFiles); 539 } 540 541 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 542 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 543 } 544 545 @Override 546 public void setBrokerService(BrokerService brokerService) { 547 super.setBrokerService(brokerService); 548 letter.setBrokerService(brokerService); 549 } 550 551 public String getPreallocationScope() { 552 return letter.getPreallocationScope(); 553 } 554 555 public void setPreallocationScope(String preallocationScope) { 556 this.letter.setPreallocationScope(preallocationScope); 557 } 558 559 public String getPreallocationStrategy() { 560 return letter.getPreallocationStrategy(); 561 } 562 563 public void setPreallocationStrategy(String preallocationStrategy) { 564 this.letter.setPreallocationStrategy(preallocationStrategy); 565 } 566 567 public boolean isArchiveDataLogs() { 568 return letter.isArchiveDataLogs(); 569 } 570 571 public void setArchiveDataLogs(boolean archiveDataLogs) { 572 letter.setArchiveDataLogs(archiveDataLogs); 573 } 574 575 public File getDirectoryArchive() { 576 return letter.getDirectoryArchive(); 577 } 578 579 public void setDirectoryArchive(File directoryArchive) { 580 letter.setDirectoryArchive(directoryArchive); 581 } 582 583 public boolean isConcurrentStoreAndDispatchQueues() { 584 return letter.isConcurrentStoreAndDispatchQueues(); 585 } 586 587 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 588 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); 589 } 590 591 public boolean isConcurrentStoreAndDispatchTopics() { 592 return letter.isConcurrentStoreAndDispatchTopics(); 593 } 594 595 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 596 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); 597 } 598 599 public int getMaxAsyncJobs() { 600 return letter.getMaxAsyncJobs(); 601 } 602 /** 603 * @param maxAsyncJobs 604 * the maxAsyncJobs to set 605 */ 606 public void setMaxAsyncJobs(int maxAsyncJobs) { 607 letter.setMaxAsyncJobs(maxAsyncJobs); 608 } 609 610 /** 611 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 612 * 613 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 614 */ 615 @Deprecated 616 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException { 617 getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay); 618 } 619 620 public boolean getForceRecoverIndex() { 621 return letter.getForceRecoverIndex(); 622 } 623 624 public void setForceRecoverIndex(boolean forceRecoverIndex) { 625 letter.setForceRecoverIndex(forceRecoverIndex); 626 } 627 628 public boolean isArchiveCorruptedIndex() { 629 return letter.isArchiveCorruptedIndex(); 630 } 631 632 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 633 letter.setArchiveCorruptedIndex(archiveCorruptedIndex); 634 } 635 636 public float getIndexLFUEvictionFactor() { 637 return letter.getIndexLFUEvictionFactor(); 638 } 639 640 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 641 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor); 642 } 643 644 public boolean isUseIndexLFRUEviction() { 645 return letter.isUseIndexLFRUEviction(); 646 } 647 648 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 649 letter.setUseIndexLFRUEviction(useIndexLFRUEviction); 650 } 651 652 public void setEnableIndexDiskSyncs(boolean diskSyncs) { 653 letter.setEnableIndexDiskSyncs(diskSyncs); 654 } 655 656 public boolean isEnableIndexDiskSyncs() { 657 return letter.isEnableIndexDiskSyncs(); 658 } 659 660 public void setEnableIndexRecoveryFile(boolean enable) { 661 letter.setEnableIndexRecoveryFile(enable); 662 } 663 664 public boolean isEnableIndexRecoveryFile() { 665 return letter.isEnableIndexRecoveryFile(); 666 } 667 668 public void setEnableIndexPageCaching(boolean enable) { 669 letter.setEnableIndexPageCaching(enable); 670 } 671 672 public boolean isEnableIndexPageCaching() { 673 return letter.isEnableIndexPageCaching(); 674 } 675 676 public int getCompactAcksAfterNoGC() { 677 return letter.getCompactAcksAfterNoGC(); 678 } 679 680 /** 681 * Sets the number of GC cycles where no journal logs were removed before an attempt to 682 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 683 * <p> 684 * A value of -1 will disable this feature. 685 * 686 * @param compactAcksAfterNoGC 687 * Number of empty GC cycles before we rewrite old ACKS. 688 */ 689 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 690 this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC); 691 } 692 693 public boolean isCompactAcksIgnoresStoreGrowth() { 694 return this.letter.isCompactAcksIgnoresStoreGrowth(); 695 } 696 697 /** 698 * Configure if Ack compaction will occur regardless of continued growth of the 699 * journal logs meaning that the store has not run out of space yet. Because the 700 * compaction operation can be costly this value is defaulted to off and the Ack 701 * compaction is only done when it seems that the store cannot grow and larger. 702 * 703 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 704 */ 705 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 706 this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth); 707 } 708 709 /** 710 * Returns whether Ack compaction is enabled 711 * 712 * @return enableAckCompaction 713 */ 714 public boolean isEnableAckCompaction() { 715 return letter.isEnableAckCompaction(); 716 } 717 718 /** 719 * Configure if the Ack compaction task should be enabled to run 720 * 721 * @param enableAckCompaction 722 */ 723 public void setEnableAckCompaction(boolean enableAckCompaction) { 724 letter.setEnableAckCompaction(enableAckCompaction); 725 } 726 727 /** 728 * Whether non-blocking subscription statistics have been enabled 729 * 730 * @return 731 */ 732 public boolean isEnableSubscriptionStatistics() { 733 return letter.isEnableSubscriptionStatistics(); 734 } 735 736 /** 737 * Enable caching statistics for each subscription to allow non-blocking 738 * retrieval of metrics. This could incur some overhead to compute if there are a lot 739 * of subscriptions. 740 * 741 * @param enableSubscriptionStatistics 742 */ 743 public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) { 744 letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics); 745 } 746 747 public KahaDBStore getStore() { 748 return letter; 749 } 750 751 public KahaTransactionInfo createTransactionInfo(TransactionId txid) { 752 if (txid == null) { 753 return null; 754 } 755 KahaTransactionInfo rc = new KahaTransactionInfo(); 756 757 if (txid.isLocalTransaction()) { 758 LocalTransactionId t = (LocalTransactionId) txid; 759 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 760 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 761 kahaTxId.setTransactionId(t.getValue()); 762 rc.setLocalTransactionId(kahaTxId); 763 } else { 764 XATransactionId t = (XATransactionId) txid; 765 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 766 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 767 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 768 kahaTxId.setFormatId(t.getFormatId()); 769 rc.setXaTransactionId(kahaTxId); 770 } 771 return rc; 772 } 773 774 @Override 775 public Locker createDefaultLocker() throws IOException { 776 SharedFileLocker locker = new SharedFileLocker(); 777 locker.configure(this); 778 return locker; 779 } 780 781 @Override 782 public void init() throws Exception {} 783 784 @Override 785 public String toString() { 786 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 787 return "KahaDBPersistenceAdapter[" + path + "]"; 788 } 789 790 @Override 791 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 792 getStore().setTransactionIdTransformer(transactionIdTransformer); 793 } 794 795 @Override 796 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 797 return this.letter.createJobSchedulerStore(); 798 } 799 800 /* (non-Javadoc) 801 * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() 802 */ 803 @Override 804 public boolean isPersistNoLocal() { 805 return this.letter.isPersistNoLocal(); 806 } 807}