001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.Iterator; 020import java.util.LinkedList; 021import java.util.ListIterator; 022import java.util.concurrent.CancellationException; 023import java.util.concurrent.Future; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.MessageReference; 029import org.apache.activemq.broker.region.Subscription; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.command.MessageId; 032import org.apache.activemq.store.MessageRecoveryListener; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Store based cursor 038 * 039 */ 040public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { 041 private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class); 042 protected final Destination regionDestination; 043 protected final PendingList batchList; 044 private Iterator<MessageReference> iterator = null; 045 protected boolean batchResetNeeded = false; 046 protected int size; 047 private final LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); 048 private static int SYNC_ADD = 0; 049 private static int ASYNC_ADD = 1; 050 final MessageId[] lastCachedIds = new MessageId[2]; 051 protected boolean hadSpace = false; 052 053 054 055 protected AbstractStoreCursor(Destination destination) { 056 super((destination != null ? destination.isPrioritizedMessages():false)); 057 this.regionDestination=destination; 058 if (this.prioritizedMessages) { 059 this.batchList= new PrioritizedPendingList(); 060 } else { 061 this.batchList = new OrderedPendingList(); 062 } 063 } 064 065 066 @Override 067 public final synchronized void start() throws Exception{ 068 if (!isStarted()) { 069 super.start(); 070 resetBatch(); 071 resetSize(); 072 setCacheEnabled(size==0&&useCache); 073 } 074 } 075 076 protected void resetSize() { 077 this.size = getStoreSize(); 078 } 079 080 @Override 081 public void rebase() { 082 resetSize(); 083 } 084 085 @Override 086 public final synchronized void stop() throws Exception { 087 resetBatch(); 088 super.stop(); 089 gc(); 090 } 091 092 093 @Override 094 public final boolean recoverMessage(Message message) throws Exception { 095 return recoverMessage(message,false); 096 } 097 098 public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { 099 boolean recovered = false; 100 message.setRegionDestination(regionDestination); 101 if (recordUniqueId(message.getMessageId())) { 102 if (!cached) { 103 if( message.getMemoryUsage()==null ) { 104 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 105 } 106 } 107 message.incrementReferenceCount(); 108 batchList.addMessageLast(message); 109 clearIterator(true); 110 recovered = true; 111 } else if (!cached) { 112 // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart 113 if (message.isRecievedByDFBridge()) { 114 // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true 115 if (LOG.isTraceEnabled()) { 116 LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 117 } 118 } else { 119 LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 120 duplicate(message); 121 } 122 } else { 123 LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 124 if (gotToTheStore(message)) { 125 duplicate(message); 126 } 127 } 128 return recovered; 129 } 130 131 public static boolean gotToTheStore(Message message) throws Exception { 132 if (message.isRecievedByDFBridge()) { 133 // concurrent store and dispatch - wait to see if the message gets to the store to see 134 // if the index suppressed it (original still present), or whether it was stored and needs to be removed 135 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 136 if (possibleFuture instanceof Future) { 137 ((Future) possibleFuture).get(); 138 } 139 // need to access again after wait on future 140 Object sequence = message.getMessageId().getFutureOrSequenceLong(); 141 return (sequence != null && sequence instanceof Long && Long.compare((Long) sequence, -1l) != 0); 142 } 143 return true; 144 } 145 146 // track for processing outside of store index lock so we can dlq 147 final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>(); 148 private void duplicate(Message message) { 149 duplicatesFromStore.add(message); 150 } 151 152 void dealWithDuplicates() { 153 for (Message message : duplicatesFromStore) { 154 regionDestination.duplicateFromStore(message, getSubscription()); 155 } 156 duplicatesFromStore.clear(); 157 } 158 159 @Override 160 public final synchronized void reset() { 161 if (batchList.isEmpty()) { 162 try { 163 fillBatch(); 164 } catch (Exception e) { 165 LOG.error("{} - Failed to fill batch", this, e); 166 throw new RuntimeException(e); 167 } 168 } 169 clearIterator(true); 170 size(); 171 } 172 173 174 @Override 175 public synchronized void release() { 176 clearIterator(false); 177 } 178 179 private synchronized void clearIterator(boolean ensureIterator) { 180 boolean haveIterator = this.iterator != null; 181 this.iterator=null; 182 if(haveIterator&&ensureIterator) { 183 ensureIterator(); 184 } 185 } 186 187 private synchronized void ensureIterator() { 188 if(this.iterator==null) { 189 this.iterator=this.batchList.iterator(); 190 } 191 } 192 193 194 public final void finished() { 195 } 196 197 198 @Override 199 public final synchronized boolean hasNext() { 200 if (batchList.isEmpty()) { 201 try { 202 fillBatch(); 203 } catch (Exception e) { 204 LOG.error("{} - Failed to fill batch", this, e); 205 throw new RuntimeException(e); 206 } 207 } 208 ensureIterator(); 209 return this.iterator.hasNext(); 210 } 211 212 213 @Override 214 public final synchronized MessageReference next() { 215 MessageReference result = null; 216 if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { 217 result = this.iterator.next(); 218 } 219 last = result; 220 if (result != null) { 221 result.incrementReferenceCount(); 222 } 223 return result; 224 } 225 226 @Override 227 public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception { 228 boolean disableCache = false; 229 if (hasSpace()) { 230 if (!isCacheEnabled() && size==0 && isStarted() && useCache) { 231 if (LOG.isTraceEnabled()) { 232 LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); 233 } 234 setCacheEnabled(true); 235 } 236 if (isCacheEnabled()) { 237 if (recoverMessage(node.getMessage(),true)) { 238 trackLastCached(node); 239 } else { 240 dealWithDuplicates(); 241 return false; 242 } 243 } 244 } else { 245 disableCache = true; 246 } 247 248 if (disableCache && isCacheEnabled()) { 249 if (LOG.isTraceEnabled()) { 250 LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); 251 } 252 syncWithStore(node.getMessage()); 253 setCacheEnabled(false); 254 } 255 size++; 256 return true; 257 } 258 259 private void syncWithStore(Message currentAdd) throws Exception { 260 pruneLastCached(); 261 if (lastCachedIds[SYNC_ADD] == null) { 262 // possibly only async adds, lets wait on the potential last add and reset from there 263 for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { 264 MessageId lastPending = it.previous(); 265 Object futureOrLong = lastPending.getFutureOrSequenceLong(); 266 if (futureOrLong instanceof Future) { 267 Future future = (Future) futureOrLong; 268 if (future.isCancelled()) { 269 continue; 270 } 271 try { 272 future.get(5, TimeUnit.SECONDS); 273 setLastCachedId(ASYNC_ADD, lastPending); 274 } catch (CancellationException ok) { 275 continue; 276 } catch (TimeoutException potentialDeadlock) { 277 LOG.debug("{} timed out waiting for async add", this, potentialDeadlock); 278 } catch (Exception worstCaseWeReplay) { 279 LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay); 280 } 281 } else { 282 setLastCachedId(ASYNC_ADD, lastPending); 283 } 284 break; 285 } 286 if (lastCachedIds[ASYNC_ADD] != null) { 287 // ensure we don't skip current possibly sync add b/c we waited on the future 288 if (isAsync(currentAdd) || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) { 289 setBatch(lastCachedIds[ASYNC_ADD]); 290 } 291 } 292 } else { 293 setBatch(lastCachedIds[SYNC_ADD]); 294 } 295 // cleanup 296 lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null; 297 pendingCachedIds.clear(); 298 } 299 300 private void trackLastCached(MessageReference node) { 301 if (isAsync(node.getMessage())) { 302 pruneLastCached(); 303 pendingCachedIds.add(node.getMessageId()); 304 } else { 305 setLastCachedId(SYNC_ADD, node.getMessageId()); 306 } 307 } 308 309 private static final boolean isAsync(Message message) { 310 return message.isRecievedByDFBridge() || message.getMessageId().getFutureOrSequenceLong() instanceof Future; 311 } 312 313 private void pruneLastCached() { 314 for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { 315 MessageId candidate = it.next(); 316 final Object futureOrLong = candidate.getFutureOrSequenceLong(); 317 if (futureOrLong instanceof Future) { 318 Future future = (Future) futureOrLong; 319 if (future.isCancelled()) { 320 it.remove(); 321 } else { 322 // we don't want to wait for work to complete 323 break; 324 } 325 } else { 326 // complete 327 setLastCachedId(ASYNC_ADD, candidate); 328 329 // keep lock step with sync adds while order is preserved 330 if (lastCachedIds[SYNC_ADD] != null) { 331 long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong(); 332 if (Long.compare((Long)futureOrLong, next) == 0) { 333 setLastCachedId(SYNC_ADD, candidate); 334 } 335 } 336 it.remove(); 337 } 338 } 339 } 340 341 private void setLastCachedId(final int index, MessageId candidate) { 342 MessageId lastCacheId = lastCachedIds[index]; 343 if (lastCacheId == null) { 344 lastCachedIds[index] = candidate; 345 } else { 346 Object lastCacheFutureOrSequenceLong = lastCacheId.getFutureOrSequenceLong(); 347 Object candidateOrSequenceLong = candidate.getFutureOrSequenceLong(); 348 if (lastCacheFutureOrSequenceLong == null) { // possibly null for topics 349 lastCachedIds[index] = candidate; 350 } else if (candidateOrSequenceLong != null && 351 Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong)) > 0) { 352 lastCachedIds[index] = candidate; 353 } 354 } 355 } 356 357 protected void setBatch(MessageId messageId) throws Exception { 358 } 359 360 361 @Override 362 public synchronized void addMessageFirst(MessageReference node) throws Exception { 363 setCacheEnabled(false); 364 size++; 365 } 366 367 368 @Override 369 public final synchronized void remove() { 370 size--; 371 if (iterator!=null) { 372 iterator.remove(); 373 } 374 if (last != null) { 375 last.decrementReferenceCount(); 376 } 377 } 378 379 380 @Override 381 public final synchronized void remove(MessageReference node) { 382 if (batchList.remove(node) != null) { 383 size--; 384 setCacheEnabled(false); 385 } 386 } 387 388 389 @Override 390 public final synchronized void clear() { 391 gc(); 392 } 393 394 395 @Override 396 public synchronized void gc() { 397 for (MessageReference msg : batchList) { 398 rollback(msg.getMessageId()); 399 msg.decrementReferenceCount(); 400 } 401 batchList.clear(); 402 clearIterator(false); 403 batchResetNeeded = true; 404 setCacheEnabled(false); 405 } 406 407 @Override 408 protected final synchronized void fillBatch() { 409 if (LOG.isTraceEnabled()) { 410 LOG.trace("{} fillBatch", this); 411 } 412 if (batchResetNeeded) { 413 resetSize(); 414 setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size)); 415 resetBatch(); 416 this.batchResetNeeded = false; 417 } 418 if (this.batchList.isEmpty() && this.size >0) { 419 try { 420 doFillBatch(); 421 } catch (Exception e) { 422 LOG.error("{} - Failed to fill batch", this, e); 423 throw new RuntimeException(e); 424 } 425 } 426 } 427 428 429 @Override 430 public final synchronized boolean isEmpty() { 431 // negative means more messages added to store through queue.send since last reset 432 return size == 0; 433 } 434 435 436 @Override 437 public final synchronized boolean hasMessagesBufferedToDeliver() { 438 return !batchList.isEmpty(); 439 } 440 441 442 @Override 443 public final synchronized int size() { 444 if (size < 0) { 445 this.size = getStoreSize(); 446 } 447 return size; 448 } 449 450 @Override 451 public final synchronized long messageSize() { 452 return getStoreMessageSize(); 453 } 454 455 @Override 456 public String toString() { 457 return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded 458 + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() 459 + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() 460 + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null") 461 + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null"); 462 } 463 464 protected abstract void doFillBatch() throws Exception; 465 466 protected abstract void resetBatch(); 467 468 protected abstract int getStoreSize(); 469 470 protected abstract long getStoreMessageSize(); 471 472 protected abstract boolean isStoreEmpty(); 473 474 public Subscription getSubscription() { 475 return null; 476 } 477}