001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.File; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.LinkedHashMap; 030import java.util.TreeMap; 031import java.util.Map.Entry; 032import java.util.concurrent.atomic.AtomicBoolean; 033 034import org.apache.activemq.command.SubscriptionInfo; 035import org.apache.activemq.command.TransactionId; 036import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 037import org.apache.activemq.store.kahadb.data.KahaDestination; 038import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 039import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 040import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 041import org.apache.activemq.util.ByteSequence; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.apache.kahadb.index.BTreeIndex; 045import org.apache.kahadb.page.PageFile; 046import org.apache.kahadb.page.Transaction; 047import org.apache.kahadb.util.LongMarshaller; 048import org.apache.kahadb.util.Marshaller; 049import org.apache.kahadb.util.StringMarshaller; 050import org.apache.kahadb.util.VariableMarshaller; 051 052public class TempMessageDatabase { 053 054 private static final Logger LOG = LoggerFactory.getLogger(TempMessageDatabase.class); 055 056 public static final int CLOSED_STATE = 1; 057 public static final int OPEN_STATE = 2; 058 059 protected BTreeIndex<String, StoredDestination> destinations; 060 protected PageFile pageFile; 061 062 protected File directory; 063 064 boolean enableIndexWriteAsync = true; 065 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 066 067 protected AtomicBoolean started = new AtomicBoolean(); 068 protected AtomicBoolean opened = new AtomicBoolean(); 069 070 public TempMessageDatabase() { 071 } 072 073 public void start() throws Exception { 074 if (started.compareAndSet(false, true)) { 075 load(); 076 } 077 } 078 079 public void stop() throws Exception { 080 if (started.compareAndSet(true, false)) { 081 unload(); 082 } 083 } 084 085 private void loadPageFile() throws IOException { 086 synchronized (indexMutex) { 087 final PageFile pageFile = getPageFile(); 088 pageFile.load(); 089 pageFile.tx().execute(new Transaction.Closure<IOException>() { 090 public void execute(Transaction tx) throws IOException { 091 destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 092 destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 093 destinations.setValueMarshaller(new StoredDestinationMarshaller()); 094 destinations.load(tx); 095 } 096 }); 097 pageFile.flush(); 098 storedDestinations.clear(); 099 } 100 } 101 102 /** 103 * @throws IOException 104 */ 105 public void open() throws IOException { 106 if( opened.compareAndSet(false, true) ) { 107 loadPageFile(); 108 } 109 } 110 111 public void load() throws IOException { 112 synchronized (indexMutex) { 113 open(); 114 pageFile.unload(); 115 pageFile.delete(); 116 loadPageFile(); 117 } 118 } 119 120 121 public void close() throws IOException, InterruptedException { 122 if( opened.compareAndSet(true, false)) { 123 synchronized (indexMutex) { 124 pageFile.unload(); 125 } 126 } 127 } 128 129 public void unload() throws IOException, InterruptedException { 130 synchronized (indexMutex) { 131 if( pageFile.isLoaded() ) { 132 close(); 133 } 134 } 135 } 136 137 public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException { 138 if (txid!=null) { 139 synchronized (indexMutex) { 140 ArrayList<Operation> inflightTx = getInflightTx(txid); 141 inflightTx.add(new AddOpperation(command, data)); 142 } 143 } else { 144 synchronized (indexMutex) { 145 pageFile.tx().execute(new Transaction.Closure<IOException>() { 146 public void execute(Transaction tx) throws IOException { 147 upadateIndex(tx, command, data); 148 } 149 }); 150 } 151 } 152 } 153 154 public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException { 155 if (txid!=null) { 156 synchronized (indexMutex) { 157 ArrayList<Operation> inflightTx = getInflightTx(txid); 158 inflightTx.add(new RemoveOpperation(command)); 159 } 160 } else { 161 synchronized (indexMutex) { 162 pageFile.tx().execute(new Transaction.Closure<IOException>() { 163 public void execute(Transaction tx) throws IOException { 164 updateIndex(tx, command); 165 } 166 }); 167 } 168 } 169 170 } 171 172 public void process(final KahaRemoveDestinationCommand command) throws IOException { 173 synchronized (indexMutex) { 174 pageFile.tx().execute(new Transaction.Closure<IOException>() { 175 public void execute(Transaction tx) throws IOException { 176 updateIndex(tx, command); 177 } 178 }); 179 } 180 } 181 182 public void process(final KahaSubscriptionCommand command) throws IOException { 183 synchronized (indexMutex) { 184 pageFile.tx().execute(new Transaction.Closure<IOException>() { 185 public void execute(Transaction tx) throws IOException { 186 updateIndex(tx, command); 187 } 188 }); 189 } 190 } 191 192 public void processCommit(TransactionId key) throws IOException { 193 synchronized (indexMutex) { 194 ArrayList<Operation> inflightTx = inflightTransactions.remove(key); 195 if (inflightTx == null) { 196 inflightTx = preparedTransactions.remove(key); 197 } 198 if (inflightTx == null) { 199 return; 200 } 201 202 final ArrayList<Operation> messagingTx = inflightTx; 203 pageFile.tx().execute(new Transaction.Closure<IOException>() { 204 public void execute(Transaction tx) throws IOException { 205 for (Operation op : messagingTx) { 206 op.execute(tx); 207 } 208 } 209 }); 210 } 211 } 212 213 public void processPrepare(TransactionId key) { 214 synchronized (indexMutex) { 215 ArrayList<Operation> tx = inflightTransactions.remove(key); 216 if (tx != null) { 217 preparedTransactions.put(key, tx); 218 } 219 } 220 } 221 222 public void processRollback(TransactionId key) { 223 synchronized (indexMutex) { 224 ArrayList<Operation> tx = inflightTransactions.remove(key); 225 if (tx == null) { 226 preparedTransactions.remove(key); 227 } 228 } 229 } 230 231 // ///////////////////////////////////////////////////////////////// 232 // These methods do the actual index updates. 233 // ///////////////////////////////////////////////////////////////// 234 235 protected final Object indexMutex = new Object(); 236 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 237 238 private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException { 239 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 240 241 // Skip adding the message to the index if this is a topic and there are 242 // no subscriptions. 243 if (sd.subscriptions != null && sd.ackPositions.isEmpty()) { 244 return; 245 } 246 247 // Add the message. 248 long id = sd.nextMessageId++; 249 Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 250 if( previous == null ) { 251 sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data)); 252 } else { 253 // restore the previous value.. Looks like this was a redo of a previously 254 // added message. We don't want to assing it a new id as the other indexes would 255 // be wrong.. 256 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 257 } 258 } 259 260 private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException { 261 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 262 if (!command.hasSubscriptionKey()) { 263 264 // In the queue case we just remove the message from the index.. 265 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 266 if (sequenceId != null) { 267 sd.orderIndex.remove(tx, sequenceId); 268 } 269 } else { 270 // In the topic case we need remove the message once it's been acked 271 // by all the subs 272 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 273 274 // Make sure it's a valid message id... 275 if (sequence != null) { 276 String subscriptionKey = command.getSubscriptionKey(); 277 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence); 278 279 // The following method handles deleting un-referenced messages. 280 removeAckByteSequence(tx, sd, subscriptionKey, prev); 281 282 // Add it to the new location set. 283 addAckByteSequence(sd, sequence, subscriptionKey); 284 } 285 286 } 287 } 288 289 private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException { 290 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 291 sd.orderIndex.clear(tx); 292 sd.orderIndex.unload(tx); 293 tx.free(sd.orderIndex.getPageId()); 294 295 sd.messageIdIndex.clear(tx); 296 sd.messageIdIndex.unload(tx); 297 tx.free(sd.messageIdIndex.getPageId()); 298 299 if (sd.subscriptions != null) { 300 sd.subscriptions.clear(tx); 301 sd.subscriptions.unload(tx); 302 tx.free(sd.subscriptions.getPageId()); 303 304 sd.subscriptionAcks.clear(tx); 305 sd.subscriptionAcks.unload(tx); 306 tx.free(sd.subscriptionAcks.getPageId()); 307 } 308 309 String key = key(command.getDestination()); 310 storedDestinations.remove(key); 311 destinations.remove(tx, key); 312 } 313 314 private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException { 315 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 316 317 // If set then we are creating it.. otherwise we are destroying the sub 318 if (command.hasSubscriptionInfo()) { 319 String subscriptionKey = command.getSubscriptionKey(); 320 sd.subscriptions.put(tx, subscriptionKey, command); 321 long ackByteSequence=-1; 322 if (!command.getRetroactive()) { 323 ackByteSequence = sd.nextMessageId-1; 324 } 325 326 sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence); 327 addAckByteSequence(sd, ackByteSequence, subscriptionKey); 328 } else { 329 // delete the sub... 330 String subscriptionKey = command.getSubscriptionKey(); 331 sd.subscriptions.remove(tx, subscriptionKey); 332 Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey); 333 if( prev!=null ) { 334 removeAckByteSequence(tx, sd, subscriptionKey, prev); 335 } 336 } 337 338 } 339 340 public HashSet<Integer> getJournalFilesBeingReplicated() { 341 return journalFilesBeingReplicated; 342 } 343 344 // ///////////////////////////////////////////////////////////////// 345 // StoredDestination related implementation methods. 346 // ///////////////////////////////////////////////////////////////// 347 348 349 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 350 351 class StoredSubscription { 352 SubscriptionInfo subscriptionInfo; 353 String lastAckId; 354 ByteSequence lastAckByteSequence; 355 ByteSequence cursor; 356 } 357 358 static class MessageRecord { 359 final String messageId; 360 final ByteSequence data; 361 362 public MessageRecord(String messageId, ByteSequence location) { 363 this.messageId=messageId; 364 this.data=location; 365 } 366 367 @Override 368 public String toString() { 369 return "["+messageId+","+data+"]"; 370 } 371 } 372 373 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageRecord> { 374 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 375 376 public MessageRecord readPayload(DataInput dataIn) throws IOException { 377 return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn)); 378 } 379 380 public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException { 381 dataOut.writeUTF(object.messageId); 382 ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut); 383 } 384 } 385 386 static class StoredDestination { 387 long nextMessageId; 388 BTreeIndex<Long, MessageRecord> orderIndex; 389 BTreeIndex<String, Long> messageIdIndex; 390 391 // These bits are only set for Topics 392 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 393 BTreeIndex<String, Long> subscriptionAcks; 394 HashMap<String, Long> subscriptionCursors; 395 TreeMap<Long, HashSet<String>> ackPositions; 396 } 397 398 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 399 public Class<StoredDestination> getType() { 400 return StoredDestination.class; 401 } 402 403 public StoredDestination readPayload(DataInput dataIn) throws IOException { 404 StoredDestination value = new StoredDestination(); 405 value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong()); 406 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 407 408 if (dataIn.readBoolean()) { 409 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 410 value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 411 } 412 return value; 413 } 414 415 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 416 dataOut.writeLong(value.orderIndex.getPageId()); 417 dataOut.writeLong(value.messageIdIndex.getPageId()); 418 if (value.subscriptions != null) { 419 dataOut.writeBoolean(true); 420 dataOut.writeLong(value.subscriptions.getPageId()); 421 dataOut.writeLong(value.subscriptionAcks.getPageId()); 422 } else { 423 dataOut.writeBoolean(false); 424 } 425 } 426 } 427 428 static class ByteSequenceMarshaller extends VariableMarshaller<ByteSequence> { 429 final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller(); 430 431 public ByteSequence readPayload(DataInput dataIn) throws IOException { 432 byte data[] = new byte[dataIn.readInt()]; 433 dataIn.readFully(data); 434 return new ByteSequence(data); 435 } 436 437 public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException { 438 dataOut.writeInt(object.getLength()); 439 dataOut.write(object.getData(), object.getOffset(), object.getLength()); 440 } 441 } 442 443 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 444 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 445 446 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 447 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 448 rc.mergeFramed((InputStream)dataIn); 449 return rc; 450 } 451 452 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 453 object.writeFramed((OutputStream)dataOut); 454 } 455 } 456 457 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 458 String key = key(destination); 459 StoredDestination rc = storedDestinations.get(key); 460 if (rc == null) { 461 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 462 rc = loadStoredDestination(tx, key, topic); 463 // Cache it. We may want to remove/unload destinations from the 464 // cache that are not used for a while 465 // to reduce memory usage. 466 storedDestinations.put(key, rc); 467 } 468 return rc; 469 } 470 471 /** 472 * @param tx 473 * @param key 474 * @param topic 475 * @return 476 * @throws IOException 477 */ 478 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 479 // Try to load the existing indexes.. 480 StoredDestination rc = destinations.get(tx, key); 481 if (rc == null) { 482 // Brand new destination.. allocate indexes for it. 483 rc = new StoredDestination(); 484 rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate()); 485 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 486 487 if (topic) { 488 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 489 rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 490 } 491 destinations.put(tx, key, rc); 492 } 493 494 // Configure the marshalers and load. 495 rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 496 rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 497 rc.orderIndex.load(tx); 498 499 // Figure out the next key using the last entry in the destination. 500 Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx); 501 if( lastEntry!=null ) { 502 rc.nextMessageId = lastEntry.getKey()+1; 503 } 504 505 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 506 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 507 rc.messageIdIndex.load(tx); 508 509 // If it was a topic... 510 if (topic) { 511 512 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 513 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 514 rc.subscriptions.load(tx); 515 516 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 517 rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE); 518 rc.subscriptionAcks.load(tx); 519 520 rc.ackPositions = new TreeMap<Long, HashSet<String>>(); 521 rc.subscriptionCursors = new HashMap<String, Long>(); 522 523 for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 524 Entry<String, Long> entry = iterator.next(); 525 addAckByteSequence(rc, entry.getValue(), entry.getKey()); 526 } 527 528 } 529 return rc; 530 } 531 532 /** 533 * @param sd 534 * @param messageSequence 535 * @param subscriptionKey 536 */ 537 private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) { 538 HashSet<String> hs = sd.ackPositions.get(messageSequence); 539 if (hs == null) { 540 hs = new HashSet<String>(); 541 sd.ackPositions.put(messageSequence, hs); 542 } 543 hs.add(subscriptionKey); 544 } 545 546 /** 547 * @param tx 548 * @param sd 549 * @param subscriptionKey 550 * @param sequenceId 551 * @throws IOException 552 */ 553 private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { 554 // Remove the sub from the previous location set.. 555 if (sequenceId != null) { 556 HashSet<String> hs = sd.ackPositions.get(sequenceId); 557 if (hs != null) { 558 hs.remove(subscriptionKey); 559 if (hs.isEmpty()) { 560 HashSet<String> firstSet = sd.ackPositions.values().iterator().next(); 561 sd.ackPositions.remove(sequenceId); 562 563 // Did we just empty out the first set in the 564 // ordered list of ack locations? Then it's time to 565 // delete some messages. 566 if (hs == firstSet) { 567 568 // Find all the entries that need to get deleted. 569 ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>(); 570 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 571 Entry<Long, MessageRecord> entry = iterator.next(); 572 if (entry.getKey().compareTo(sequenceId) <= 0) { 573 // We don't do the actually delete while we are 574 // iterating the BTree since 575 // iterating would fail. 576 deletes.add(entry); 577 } 578 } 579 580 // Do the actual deletes. 581 for (Entry<Long, MessageRecord> entry : deletes) { 582 sd.messageIdIndex.remove(tx,entry.getValue().messageId); 583 sd.orderIndex.remove(tx,entry.getKey()); 584 } 585 } 586 } 587 } 588 } 589 } 590 591 private String key(KahaDestination destination) { 592 return destination.getType().getNumber() + ":" + destination.getName(); 593 } 594 595 // ///////////////////////////////////////////////////////////////// 596 // Transaction related implementation methods. 597 // ///////////////////////////////////////////////////////////////// 598 protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>(); 599 protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>(); 600 601 private ArrayList<Operation> getInflightTx(TransactionId key) { 602 ArrayList<Operation> tx = inflightTransactions.get(key); 603 if (tx == null) { 604 tx = new ArrayList<Operation>(); 605 inflightTransactions.put(key, tx); 606 } 607 return tx; 608 } 609 610 abstract class Operation { 611 abstract public void execute(Transaction tx) throws IOException; 612 } 613 614 class AddOpperation extends Operation { 615 final KahaAddMessageCommand command; 616 private final ByteSequence data; 617 618 public AddOpperation(KahaAddMessageCommand command, ByteSequence location) { 619 this.command = command; 620 this.data = location; 621 } 622 623 public void execute(Transaction tx) throws IOException { 624 upadateIndex(tx, command, data); 625 } 626 627 public KahaAddMessageCommand getCommand() { 628 return command; 629 } 630 } 631 632 class RemoveOpperation extends Operation { 633 final KahaRemoveMessageCommand command; 634 635 public RemoveOpperation(KahaRemoveMessageCommand command) { 636 this.command = command; 637 } 638 639 public void execute(Transaction tx) throws IOException { 640 updateIndex(tx, command); 641 } 642 643 public KahaRemoveMessageCommand getCommand() { 644 return command; 645 } 646 } 647 648 // ///////////////////////////////////////////////////////////////// 649 // Initialization related implementation methods. 650 // ///////////////////////////////////////////////////////////////// 651 652 private PageFile createPageFile() { 653 PageFile index = new PageFile(directory, "temp-db"); 654 index.setEnableWriteThread(isEnableIndexWriteAsync()); 655 index.setWriteBatchSize(getIndexWriteBatchSize()); 656 index.setEnableDiskSyncs(false); 657 index.setEnableRecoveryFile(false); 658 return index; 659 } 660 661 public File getDirectory() { 662 return directory; 663 } 664 665 public void setDirectory(File directory) { 666 this.directory = directory; 667 } 668 669 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 670 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 671 } 672 673 public int getIndexWriteBatchSize() { 674 return setIndexWriteBatchSize; 675 } 676 677 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 678 this.enableIndexWriteAsync = enableIndexWriteAsync; 679 } 680 681 boolean isEnableIndexWriteAsync() { 682 return enableIndexWriteAsync; 683 } 684 685 public PageFile getPageFile() { 686 if (pageFile == null) { 687 pageFile = createPageFile(); 688 } 689 return pageFile; 690 } 691 692}