001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicReference;
043
044import org.apache.activemq.broker.ConnectionContext;
045import org.apache.activemq.broker.region.BaseDestination;
046import org.apache.activemq.broker.scheduler.JobSchedulerStore;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQQueue;
049import org.apache.activemq.command.ActiveMQTempQueue;
050import org.apache.activemq.command.ActiveMQTempTopic;
051import org.apache.activemq.command.ActiveMQTopic;
052import org.apache.activemq.command.Message;
053import org.apache.activemq.command.MessageAck;
054import org.apache.activemq.command.MessageId;
055import org.apache.activemq.command.ProducerId;
056import org.apache.activemq.command.SubscriptionInfo;
057import org.apache.activemq.command.TransactionId;
058import org.apache.activemq.openwire.OpenWireFormat;
059import org.apache.activemq.protobuf.Buffer;
060import org.apache.activemq.store.AbstractMessageStore;
061import org.apache.activemq.store.IndexListener;
062import org.apache.activemq.store.ListenableFuture;
063import org.apache.activemq.store.MessageRecoveryListener;
064import org.apache.activemq.store.MessageStore;
065import org.apache.activemq.store.MessageStoreStatistics;
066import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
067import org.apache.activemq.store.NoLocalSubscriptionAware;
068import org.apache.activemq.store.PersistenceAdapter;
069import org.apache.activemq.store.TopicMessageStore;
070import org.apache.activemq.store.TransactionIdTransformer;
071import org.apache.activemq.store.TransactionStore;
072import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
073import org.apache.activemq.store.kahadb.data.KahaDestination;
074import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
075import org.apache.activemq.store.kahadb.data.KahaLocation;
076import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
077import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
078import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
079import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
080import org.apache.activemq.store.kahadb.disk.journal.Location;
081import org.apache.activemq.store.kahadb.disk.page.Transaction;
082import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
083import org.apache.activemq.usage.MemoryUsage;
084import org.apache.activemq.usage.SystemUsage;
085import org.apache.activemq.util.ServiceStopper;
086import org.apache.activemq.util.ThreadPoolUtils;
087import org.apache.activemq.wireformat.WireFormat;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware {
092    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
093    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
094
095    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
096    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
097            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
098    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
099    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
100            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
101
102    protected ExecutorService queueExecutor;
103    protected ExecutorService topicExecutor;
104    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
105    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
106    final WireFormat wireFormat = new OpenWireFormat();
107    private SystemUsage usageManager;
108    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
109    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
110    Semaphore globalQueueSemaphore;
111    Semaphore globalTopicSemaphore;
112    private boolean concurrentStoreAndDispatchQueues = true;
113    // when true, message order may be compromised when cache is exhausted if store is out
114    // or order w.r.t cache
115    private boolean concurrentStoreAndDispatchTopics = false;
116    private final boolean concurrentStoreAndDispatchTransactions = false;
117    private int maxAsyncJobs = MAX_ASYNC_JOBS;
118    private final KahaDBTransactionStore transactionStore;
119    private TransactionIdTransformer transactionIdTransformer;
120
121    public KahaDBStore() {
122        this.transactionStore = new KahaDBTransactionStore(this);
123        this.transactionIdTransformer = new TransactionIdTransformer() {
124            @Override
125            public TransactionId transform(TransactionId txid) {
126                return txid;
127            }
128        };
129    }
130
131    @Override
132    public String toString() {
133        return "KahaDB:[" + directory.getAbsolutePath() + "]";
134    }
135
136    @Override
137    public void setBrokerName(String brokerName) {
138    }
139
140    @Override
141    public void setUsageManager(SystemUsage usageManager) {
142        this.usageManager = usageManager;
143    }
144
145    public SystemUsage getUsageManager() {
146        return this.usageManager;
147    }
148
149    /**
150     * @return the concurrentStoreAndDispatch
151     */
152    public boolean isConcurrentStoreAndDispatchQueues() {
153        return this.concurrentStoreAndDispatchQueues;
154    }
155
156    /**
157     * @param concurrentStoreAndDispatch
158     *            the concurrentStoreAndDispatch to set
159     */
160    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
161        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
162    }
163
164    /**
165     * @return the concurrentStoreAndDispatch
166     */
167    public boolean isConcurrentStoreAndDispatchTopics() {
168        return this.concurrentStoreAndDispatchTopics;
169    }
170
171    /**
172     * @param concurrentStoreAndDispatch
173     *            the concurrentStoreAndDispatch to set
174     */
175    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
176        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
177    }
178
179    public boolean isConcurrentStoreAndDispatchTransactions() {
180        return this.concurrentStoreAndDispatchTransactions;
181    }
182
183    /**
184     * @return the maxAsyncJobs
185     */
186    public int getMaxAsyncJobs() {
187        return this.maxAsyncJobs;
188    }
189
190    /**
191     * @param maxAsyncJobs
192     *            the maxAsyncJobs to set
193     */
194    public void setMaxAsyncJobs(int maxAsyncJobs) {
195        this.maxAsyncJobs = maxAsyncJobs;
196    }
197
198
199    @Override
200    protected void configureMetadata() {
201        if (brokerService != null) {
202            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
203            wireFormat.setVersion(metadata.openwireVersion);
204
205            if (LOG.isDebugEnabled()) {
206                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
207            }
208
209        }
210    }
211
212    @Override
213    public void doStart() throws Exception {
214        //configure the metadata before start, right now
215        //this is just the open wire version
216        configureMetadata();
217
218        super.doStart();
219
220        if (brokerService != null) {
221            // In case the recovered store used a different OpenWire version log a warning
222            // to assist in determining why journal reads fail.
223            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
224                LOG.warn("Existing Store uses a different OpenWire version[{}] " +
225                         "than the version configured[{}] reverting to the version " +
226                         "used by this store, some newer broker features may not work" +
227                         "as expected.",
228                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
229
230                // Update the broker service instance to the actual version in use.
231                wireFormat.setVersion(metadata.openwireVersion);
232                brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
233            }
234        }
235
236        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
237        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
238        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
239        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
240        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
241            asyncQueueJobQueue, new ThreadFactory() {
242                @Override
243                public Thread newThread(Runnable runnable) {
244                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
245                    thread.setDaemon(true);
246                    return thread;
247                }
248            });
249        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
250            asyncTopicJobQueue, new ThreadFactory() {
251                @Override
252                public Thread newThread(Runnable runnable) {
253                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
254                    thread.setDaemon(true);
255                    return thread;
256                }
257            });
258    }
259
260    @Override
261    public void doStop(ServiceStopper stopper) throws Exception {
262        // drain down async jobs
263        LOG.info("Stopping async queue tasks");
264        if (this.globalQueueSemaphore != null) {
265            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
266        }
267        synchronized (this.asyncQueueMaps) {
268            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
269                synchronized (m) {
270                    for (StoreTask task : m.values()) {
271                        task.cancel();
272                    }
273                }
274            }
275            this.asyncQueueMaps.clear();
276        }
277        LOG.info("Stopping async topic tasks");
278        if (this.globalTopicSemaphore != null) {
279            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
280        }
281        synchronized (this.asyncTopicMaps) {
282            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
283                synchronized (m) {
284                    for (StoreTask task : m.values()) {
285                        task.cancel();
286                    }
287                }
288            }
289            this.asyncTopicMaps.clear();
290        }
291        if (this.globalQueueSemaphore != null) {
292            this.globalQueueSemaphore.drainPermits();
293        }
294        if (this.globalTopicSemaphore != null) {
295            this.globalTopicSemaphore.drainPermits();
296        }
297        if (this.queueExecutor != null) {
298            ThreadPoolUtils.shutdownNow(queueExecutor);
299            queueExecutor = null;
300        }
301        if (this.topicExecutor != null) {
302            ThreadPoolUtils.shutdownNow(topicExecutor);
303            topicExecutor = null;
304        }
305        LOG.info("Stopped KahaDB");
306        super.doStop(stopper);
307    }
308
309    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
310        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
311            @Override
312            public Location execute(Transaction tx) throws IOException {
313                StoredDestination sd = getStoredDestination(destination, tx);
314                Long sequence = sd.messageIdIndex.get(tx, key);
315                if (sequence == null) {
316                    return null;
317                }
318                return sd.orderIndex.get(tx, sequence).location;
319            }
320        });
321    }
322
323    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
324        StoreQueueTask task = null;
325        synchronized (store.asyncTaskMap) {
326            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
327        }
328        return task;
329    }
330
331    // with asyncTaskMap locked
332    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
333        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
334        this.queueExecutor.execute(task);
335    }
336
337    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
338        StoreTopicTask task = null;
339        synchronized (store.asyncTaskMap) {
340            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
341        }
342        return task;
343    }
344
345    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
346        synchronized (store.asyncTaskMap) {
347            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
348        }
349        this.topicExecutor.execute(task);
350    }
351
352    @Override
353    public TransactionStore createTransactionStore() throws IOException {
354        return this.transactionStore;
355    }
356
357    public boolean getForceRecoverIndex() {
358        return this.forceRecoverIndex;
359    }
360
361    public void setForceRecoverIndex(boolean forceRecoverIndex) {
362        this.forceRecoverIndex = forceRecoverIndex;
363    }
364
365    public class KahaDBMessageStore extends AbstractMessageStore {
366        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
367        protected KahaDestination dest;
368        private final int maxAsyncJobs;
369        private final Semaphore localDestinationSemaphore;
370
371        double doneTasks, canceledTasks = 0;
372
373        public KahaDBMessageStore(ActiveMQDestination destination) {
374            super(destination);
375            this.dest = convert(destination);
376            this.maxAsyncJobs = getMaxAsyncJobs();
377            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
378        }
379
380        @Override
381        public ActiveMQDestination getDestination() {
382            return destination;
383        }
384
385        @Override
386        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
387                throws IOException {
388            if (isConcurrentStoreAndDispatchQueues()) {
389                message.beforeMarshall(wireFormat);
390                StoreQueueTask result = new StoreQueueTask(this, context, message);
391                ListenableFuture<Object> future = result.getFuture();
392                message.getMessageId().setFutureOrSequenceLong(future);
393                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
394                result.aquireLocks();
395                synchronized (asyncTaskMap) {
396                    addQueueTask(this, result);
397                    if (indexListener != null) {
398                        indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
399                    }
400                }
401                return future;
402            } else {
403                return super.asyncAddQueueMessage(context, message);
404            }
405        }
406
407        @Override
408        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
409            if (isConcurrentStoreAndDispatchQueues()) {
410                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
411                StoreQueueTask task = null;
412                synchronized (asyncTaskMap) {
413                    task = (StoreQueueTask) asyncTaskMap.get(key);
414                }
415                if (task != null) {
416                    if (ack.isInTransaction() || !task.cancel()) {
417                        try {
418                            task.future.get();
419                        } catch (InterruptedException e) {
420                            throw new InterruptedIOException(e.toString());
421                        } catch (Exception ignored) {
422                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
423                        }
424                        removeMessage(context, ack);
425                    } else {
426                        indexLock.writeLock().lock();
427                        try {
428                            metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
429                        } finally {
430                            indexLock.writeLock().unlock();
431                        }
432                        synchronized (asyncTaskMap) {
433                            asyncTaskMap.remove(key);
434                        }
435                    }
436                } else {
437                    removeMessage(context, ack);
438                }
439            } else {
440                removeMessage(context, ack);
441            }
442        }
443
444        @Override
445        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
446            final KahaAddMessageCommand command = new KahaAddMessageCommand();
447            command.setDestination(dest);
448            command.setMessageId(message.getMessageId().toProducerKey());
449            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
450            command.setPriority(message.getPriority());
451            command.setPrioritySupported(isPrioritizedMessages());
452            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
453            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
454            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
455                // sync add? (for async, future present from getFutureOrSequenceLong)
456                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
457
458                @Override
459                public void sequenceAssignedWithIndexLocked(final long sequence) {
460                    message.getMessageId().setFutureOrSequenceLong(sequence);
461                    if (indexListener != null) {
462                        if (possibleFuture == null) {
463                            trackPendingAdd(dest, sequence);
464                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
465                                @Override
466                                public void run() {
467                                    trackPendingAddComplete(dest, sequence);
468                                }
469                            }));
470                        }
471                    }
472                }
473            }, null);
474        }
475
476        @Override
477        public void updateMessage(Message message) throws IOException {
478            if (LOG.isTraceEnabled()) {
479                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
480            }
481            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
482            KahaAddMessageCommand command = new KahaAddMessageCommand();
483            command.setDestination(dest);
484            command.setMessageId(message.getMessageId().toProducerKey());
485            command.setPriority(message.getPriority());
486            command.setPrioritySupported(prioritizedMessages);
487            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
488            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
489            updateMessageCommand.setMessage(command);
490            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
491        }
492
493        @Override
494        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
495            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
496            command.setDestination(dest);
497            command.setMessageId(ack.getLastMessageId().toProducerKey());
498            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
499
500            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
501            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
502            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
503        }
504
505        @Override
506        public void removeAllMessages(ConnectionContext context) throws IOException {
507            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
508            command.setDestination(dest);
509            store(command, true, null, null);
510        }
511
512        @Override
513        public Message getMessage(MessageId identity) throws IOException {
514            final String key = identity.toProducerKey();
515
516            // Hopefully one day the page file supports concurrent read
517            // operations... but for now we must
518            // externally synchronize...
519            Location location;
520            indexLock.writeLock().lock();
521            try {
522                location = findMessageLocation(key, dest);
523            } finally {
524                indexLock.writeLock().unlock();
525            }
526            if (location == null) {
527                return null;
528            }
529
530            return loadMessage(location);
531        }
532
533        @Override
534        public boolean isEmpty() throws IOException {
535            indexLock.writeLock().lock();
536            try {
537                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
538                    @Override
539                    public Boolean execute(Transaction tx) throws IOException {
540                        // Iterate through all index entries to get a count of
541                        // messages in the destination.
542                        StoredDestination sd = getStoredDestination(dest, tx);
543                        return sd.locationIndex.isEmpty(tx);
544                    }
545                });
546            } finally {
547                indexLock.writeLock().unlock();
548            }
549        }
550
551        @Override
552        public void recover(final MessageRecoveryListener listener) throws Exception {
553            // recovery may involve expiry which will modify
554            indexLock.writeLock().lock();
555            try {
556                pageFile.tx().execute(new Transaction.Closure<Exception>() {
557                    @Override
558                    public void execute(Transaction tx) throws Exception {
559                        StoredDestination sd = getStoredDestination(dest, tx);
560                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
561                        sd.orderIndex.resetCursorPosition();
562                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
563                                .hasNext(); ) {
564                            Entry<Long, MessageKeys> entry = iterator.next();
565                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
566                                continue;
567                            }
568                            Message msg = loadMessage(entry.getValue().location);
569                            listener.recoverMessage(msg);
570                        }
571                    }
572                });
573            } finally {
574                indexLock.writeLock().unlock();
575            }
576        }
577
578        @Override
579        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
580            indexLock.writeLock().lock();
581            try {
582                pageFile.tx().execute(new Transaction.Closure<Exception>() {
583                    @Override
584                    public void execute(Transaction tx) throws Exception {
585                        StoredDestination sd = getStoredDestination(dest, tx);
586                        Entry<Long, MessageKeys> entry = null;
587                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
588                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
589                            entry = iterator.next();
590                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
591                                continue;
592                            }
593                            Message msg = loadMessage(entry.getValue().location);
594                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
595                            listener.recoverMessage(msg);
596                            counter++;
597                            if (counter >= maxReturned) {
598                                break;
599                            }
600                        }
601                        sd.orderIndex.stoppedIterating();
602                    }
603                });
604            } finally {
605                indexLock.writeLock().unlock();
606            }
607        }
608
609        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
610            int counter = 0;
611            String id;
612            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
613                id = iterator.next();
614                iterator.remove();
615                Long sequence = sd.messageIdIndex.get(tx, id);
616                if (sequence != null) {
617                    if (sd.orderIndex.alreadyDispatched(sequence)) {
618                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
619                        counter++;
620                        if (counter >= maxReturned) {
621                            break;
622                        }
623                    } else {
624                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
625                    }
626                } else {
627                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
628                }
629            }
630            return counter;
631        }
632
633
634        @Override
635        public void resetBatching() {
636            if (pageFile.isLoaded()) {
637                indexLock.writeLock().lock();
638                try {
639                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
640                        @Override
641                        public void execute(Transaction tx) throws Exception {
642                            StoredDestination sd = getExistingStoredDestination(dest, tx);
643                            if (sd != null) {
644                                sd.orderIndex.resetCursorPosition();}
645                            }
646                        });
647                } catch (Exception e) {
648                    LOG.error("Failed to reset batching",e);
649                } finally {
650                    indexLock.writeLock().unlock();
651                }
652            }
653        }
654
655        @Override
656        public void setBatch(final MessageId identity) throws IOException {
657            indexLock.writeLock().lock();
658            try {
659                pageFile.tx().execute(new Transaction.Closure<IOException>() {
660                    @Override
661                    public void execute(Transaction tx) throws IOException {
662                        StoredDestination sd = getStoredDestination(dest, tx);
663                        Long location = (Long) identity.getFutureOrSequenceLong();
664                        Long pending = sd.orderIndex.minPendingAdd();
665                        if (pending != null) {
666                            location = Math.min(location, pending-1);
667                        }
668                        sd.orderIndex.setBatch(tx, location);
669                    }
670                });
671            } finally {
672                indexLock.writeLock().unlock();
673            }
674        }
675
676        @Override
677        public void setMemoryUsage(MemoryUsage memoryUsage) {
678        }
679        @Override
680        public void start() throws Exception {
681            super.start();
682        }
683        @Override
684        public void stop() throws Exception {
685            super.stop();
686        }
687
688        protected void lockAsyncJobQueue() {
689            try {
690                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
691                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
692                }
693            } catch (Exception e) {
694                LOG.error("Failed to lock async jobs for " + this.destination, e);
695            }
696        }
697
698        protected void unlockAsyncJobQueue() {
699            this.localDestinationSemaphore.release(this.maxAsyncJobs);
700        }
701
702        protected void acquireLocalAsyncLock() {
703            try {
704                this.localDestinationSemaphore.acquire();
705            } catch (InterruptedException e) {
706                LOG.error("Failed to aquire async lock for " + this.destination, e);
707            }
708        }
709
710        protected void releaseLocalAsyncLock() {
711            this.localDestinationSemaphore.release();
712        }
713
714        @Override
715        public String toString(){
716            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
717        }
718
719        @Override
720        protected void recoverMessageStoreStatistics() throws IOException {
721            try {
722                MessageStoreStatistics recoveredStatistics;
723                lockAsyncJobQueue();
724                indexLock.writeLock().lock();
725                try {
726                    recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
727                        @Override
728                        public MessageStoreStatistics execute(Transaction tx) throws IOException {
729                            MessageStoreStatistics statistics = new MessageStoreStatistics();
730
731                            // Iterate through all index entries to get the size of each message
732                            StoredDestination sd = getStoredDestination(dest, tx);
733                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
734                                int locationSize = iterator.next().getKey().getSize();
735                                statistics.getMessageCount().increment();
736                                statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
737                            }
738                           return statistics;
739                        }
740                    });
741                    getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
742                    getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
743                } finally {
744                    indexLock.writeLock().unlock();
745                }
746            } finally {
747                unlockAsyncJobQueue();
748            }
749        }
750    }
751
752    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
753        private final AtomicInteger subscriptionCount = new AtomicInteger();
754        protected final MessageStoreSubscriptionStatistics messageStoreSubStats =
755                new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics());
756
757        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
758            super(destination);
759            this.subscriptionCount.set(getAllSubscriptions().length);
760            if (isConcurrentStoreAndDispatchTopics()) {
761                asyncTopicMaps.add(asyncTaskMap);
762            }
763        }
764
765        @Override
766        protected void recoverMessageStoreStatistics() throws IOException {
767            super.recoverMessageStoreStatistics();
768            this.recoverMessageStoreSubMetrics();
769        }
770
771        @Override
772        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
773                throws IOException {
774            if (isConcurrentStoreAndDispatchTopics()) {
775                message.beforeMarshall(wireFormat);
776                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
777                result.aquireLocks();
778                addTopicTask(this, result);
779                return result.getFuture();
780            } else {
781                return super.asyncAddTopicMessage(context, message);
782            }
783        }
784
785        @Override
786        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
787                                MessageId messageId, MessageAck ack) throws IOException {
788            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
789            if (isConcurrentStoreAndDispatchTopics()) {
790                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
791                StoreTopicTask task = null;
792                synchronized (asyncTaskMap) {
793                    task = (StoreTopicTask) asyncTaskMap.get(key);
794                }
795                if (task != null) {
796                    if (task.addSubscriptionKey(subscriptionKey)) {
797                        removeTopicTask(this, messageId);
798                        if (task.cancel()) {
799                            synchronized (asyncTaskMap) {
800                                asyncTaskMap.remove(key);
801                            }
802                        }
803                    }
804                } else {
805                    doAcknowledge(context, subscriptionKey, messageId, ack);
806                }
807            } else {
808                doAcknowledge(context, subscriptionKey, messageId, ack);
809            }
810        }
811
812        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
813                throws IOException {
814            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
815            command.setDestination(dest);
816            command.setSubscriptionKey(subscriptionKey);
817            command.setMessageId(messageId.toProducerKey());
818            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
819            if (ack != null && ack.isUnmatchedAck()) {
820                command.setAck(UNMATCHED);
821            } else {
822                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
823                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
824            }
825            store(command, false, null, null);
826        }
827
828        @Override
829        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
830            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
831                    .getSubscriptionName());
832            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
833            command.setDestination(dest);
834            command.setSubscriptionKey(subscriptionKey.toString());
835            command.setRetroactive(retroactive);
836            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
837            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
838            store(command, isEnableJournalDiskSyncs() && true, null, null);
839            this.subscriptionCount.incrementAndGet();
840        }
841
842        @Override
843        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
844            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
845            command.setDestination(dest);
846            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
847            store(command, isEnableJournalDiskSyncs() && true, null, null);
848            this.subscriptionCount.decrementAndGet();
849        }
850
851        @Override
852        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
853
854            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
855            indexLock.writeLock().lock();
856            try {
857                pageFile.tx().execute(new Transaction.Closure<IOException>() {
858                    @Override
859                    public void execute(Transaction tx) throws IOException {
860                        StoredDestination sd = getStoredDestination(dest, tx);
861                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
862                                .hasNext();) {
863                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
864                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
865                                    .getValue().getSubscriptionInfo().newInput()));
866                            subscriptions.add(info);
867
868                        }
869                    }
870                });
871            } finally {
872                indexLock.writeLock().unlock();
873            }
874
875            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
876            subscriptions.toArray(rc);
877            return rc;
878        }
879
880        @Override
881        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
882            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
883            indexLock.writeLock().lock();
884            try {
885                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
886                    @Override
887                    public SubscriptionInfo execute(Transaction tx) throws IOException {
888                        StoredDestination sd = getStoredDestination(dest, tx);
889                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
890                        if (command == null) {
891                            return null;
892                        }
893                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
894                                .getSubscriptionInfo().newInput()));
895                    }
896                });
897            } finally {
898                indexLock.writeLock().unlock();
899            }
900        }
901
902        @Override
903        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
904            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
905
906            if (isEnableSubscriptionStatistics()) {
907                return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount();
908            } else {
909
910                indexLock.writeLock().lock();
911                try {
912                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
913                        @Override
914                        public Integer execute(Transaction tx) throws IOException {
915                            StoredDestination sd = getStoredDestination(dest, tx);
916                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
917                            if (cursorPos == null) {
918                                // The subscription might not exist.
919                                return 0;
920                            }
921
922                            return (int) getStoredMessageCount(tx, sd, subscriptionKey);
923                        }
924                    });
925                } finally {
926                    indexLock.writeLock().unlock();
927                }
928            }
929        }
930
931
932        @Override
933        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
934            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
935            if (isEnableSubscriptionStatistics()) {
936                return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize();
937            } else {
938                indexLock.writeLock().lock();
939                try {
940                    return pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
941                        @Override
942                        public Long execute(Transaction tx) throws IOException {
943                            StoredDestination sd = getStoredDestination(dest, tx);
944                            LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
945                            if (cursorPos == null) {
946                                // The subscription might not exist.
947                                return 0l;
948                            }
949
950                            return getStoredMessageSize(tx, sd, subscriptionKey);
951                        }
952                    });
953                } finally {
954                    indexLock.writeLock().unlock();
955                }
956            }
957        }
958
959        protected void recoverMessageStoreSubMetrics() throws IOException {
960            if (isEnableSubscriptionStatistics()) {
961
962                final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics();
963                indexLock.writeLock().lock();
964                try {
965                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
966                        @Override
967                        public void execute(Transaction tx) throws IOException {
968                            StoredDestination sd = getStoredDestination(dest, tx);
969                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions
970                                    .iterator(tx); iterator.hasNext();) {
971                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
972
973                                String subscriptionKey = entry.getKey();
974                                LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
975                                if (cursorPos != null) {
976                                    long size = getStoredMessageSize(tx, sd, subscriptionKey);
977                                    statistics.getMessageCount(subscriptionKey)
978                                            .setCount(getStoredMessageCount(tx, sd, subscriptionKey));
979                                    statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0);
980                                }
981                            }
982                        }
983                    });
984                } finally {
985                    indexLock.writeLock().unlock();
986                }
987            }
988        }
989
990        @Override
991        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
992                throws Exception {
993            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
994            @SuppressWarnings("unused")
995            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
996            indexLock.writeLock().lock();
997            try {
998                pageFile.tx().execute(new Transaction.Closure<Exception>() {
999                    @Override
1000                    public void execute(Transaction tx) throws Exception {
1001                        StoredDestination sd = getStoredDestination(dest, tx);
1002                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
1003                        sd.orderIndex.setBatch(tx, cursorPos);
1004                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
1005                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
1006                                .hasNext();) {
1007                            Entry<Long, MessageKeys> entry = iterator.next();
1008                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1009                                continue;
1010                            }
1011                            listener.recoverMessage(loadMessage(entry.getValue().location));
1012                        }
1013                        sd.orderIndex.resetCursorPosition();
1014                    }
1015                });
1016            } finally {
1017                indexLock.writeLock().unlock();
1018            }
1019        }
1020
1021        @Override
1022        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
1023                final MessageRecoveryListener listener) throws Exception {
1024            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1025            @SuppressWarnings("unused")
1026            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
1027            indexLock.writeLock().lock();
1028            try {
1029                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1030                    @Override
1031                    public void execute(Transaction tx) throws Exception {
1032                        StoredDestination sd = getStoredDestination(dest, tx);
1033                        sd.orderIndex.resetCursorPosition();
1034                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
1035                        if (moc == null) {
1036                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
1037                            if (pos == null) {
1038                                // sub deleted
1039                                return;
1040                            }
1041                            sd.orderIndex.setBatch(tx, pos);
1042                            moc = sd.orderIndex.cursor;
1043                        } else {
1044                            sd.orderIndex.cursor.sync(moc);
1045                        }
1046
1047                        Entry<Long, MessageKeys> entry = null;
1048                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
1049                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
1050                                .hasNext();) {
1051                            entry = iterator.next();
1052                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1053                                continue;
1054                            }
1055                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
1056                                counter++;
1057                            }
1058                            if (counter >= maxReturned || listener.hasSpace() == false) {
1059                                break;
1060                            }
1061                        }
1062                        sd.orderIndex.stoppedIterating();
1063                        if (entry != null) {
1064                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
1065                            sd.subscriptionCursors.put(subscriptionKey, copy);
1066                        }
1067                    }
1068                });
1069            } finally {
1070                indexLock.writeLock().unlock();
1071            }
1072        }
1073
1074        @Override
1075        public void resetBatching(String clientId, String subscriptionName) {
1076            try {
1077                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1078                indexLock.writeLock().lock();
1079                try {
1080                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1081                        @Override
1082                        public void execute(Transaction tx) throws IOException {
1083                            StoredDestination sd = getStoredDestination(dest, tx);
1084                            sd.subscriptionCursors.remove(subscriptionKey);
1085                        }
1086                    });
1087                }finally {
1088                    indexLock.writeLock().unlock();
1089                }
1090            } catch (IOException e) {
1091                throw new RuntimeException(e);
1092            }
1093        }
1094
1095        @Override
1096        public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
1097            return messageStoreSubStats;
1098        }
1099    }
1100
1101    String subscriptionKey(String clientId, String subscriptionName) {
1102        return clientId + ":" + subscriptionName;
1103    }
1104
1105    @Override
1106    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1107        String key = key(convert(destination));
1108        MessageStore store = storeCache.get(key(convert(destination)));
1109        if (store == null) {
1110            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
1111            store = storeCache.putIfAbsent(key, queueStore);
1112            if (store == null) {
1113                store = queueStore;
1114            }
1115        }
1116
1117        return store;
1118    }
1119
1120    @Override
1121    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1122        String key = key(convert(destination));
1123        MessageStore store = storeCache.get(key(convert(destination)));
1124        if (store == null) {
1125            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1126            store = storeCache.putIfAbsent(key, topicStore);
1127            if (store == null) {
1128                store = topicStore;
1129            }
1130        }
1131
1132        return (TopicMessageStore) store;
1133    }
1134
1135    /**
1136     * Cleanup method to remove any state associated with the given destination.
1137     * This method does not stop the message store (it might not be cached).
1138     *
1139     * @param destination
1140     *            Destination to forget
1141     */
1142    @Override
1143    public void removeQueueMessageStore(ActiveMQQueue destination) {
1144    }
1145
1146    /**
1147     * Cleanup method to remove any state associated with the given destination
1148     * This method does not stop the message store (it might not be cached).
1149     *
1150     * @param destination
1151     *            Destination to forget
1152     */
1153    @Override
1154    public void removeTopicMessageStore(ActiveMQTopic destination) {
1155    }
1156
1157    @Override
1158    public void deleteAllMessages() throws IOException {
1159        deleteAllMessages = true;
1160    }
1161
1162    @Override
1163    public Set<ActiveMQDestination> getDestinations() {
1164        try {
1165            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1166            indexLock.writeLock().lock();
1167            try {
1168                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1169                    @Override
1170                    public void execute(Transaction tx) throws IOException {
1171                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1172                                .hasNext();) {
1173                            Entry<String, StoredDestination> entry = iterator.next();
1174                            //Removing isEmpty topic check - see AMQ-5875
1175                            rc.add(convert(entry.getKey()));
1176                        }
1177                    }
1178                });
1179            }finally {
1180                indexLock.writeLock().unlock();
1181            }
1182            return rc;
1183        } catch (IOException e) {
1184            throw new RuntimeException(e);
1185        }
1186    }
1187
1188    @Override
1189    public long getLastMessageBrokerSequenceId() throws IOException {
1190        return 0;
1191    }
1192
1193    @Override
1194    public long getLastProducerSequenceId(ProducerId id) {
1195        indexLock.writeLock().lock();
1196        try {
1197            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1198        } finally {
1199            indexLock.writeLock().unlock();
1200        }
1201    }
1202
1203    @Override
1204    public long size() {
1205        try {
1206            return journalSize.get() + getPageFile().getDiskSize();
1207        } catch (IOException e) {
1208            throw new RuntimeException(e);
1209        }
1210    }
1211
1212    @Override
1213    public void beginTransaction(ConnectionContext context) throws IOException {
1214        throw new IOException("Not yet implemented.");
1215    }
1216    @Override
1217    public void commitTransaction(ConnectionContext context) throws IOException {
1218        throw new IOException("Not yet implemented.");
1219    }
1220    @Override
1221    public void rollbackTransaction(ConnectionContext context) throws IOException {
1222        throw new IOException("Not yet implemented.");
1223    }
1224
1225    @Override
1226    public void checkpoint(boolean sync) throws IOException {
1227        super.checkpointCleanup(sync);
1228    }
1229
1230    // /////////////////////////////////////////////////////////////////
1231    // Internal helper methods.
1232    // /////////////////////////////////////////////////////////////////
1233
1234    /**
1235     * @param location
1236     * @return
1237     * @throws IOException
1238     */
1239    Message loadMessage(Location location) throws IOException {
1240        try {
1241            JournalCommand<?> command = load(location);
1242            KahaAddMessageCommand addMessage = null;
1243            switch (command.type()) {
1244                case KAHA_UPDATE_MESSAGE_COMMAND:
1245                    addMessage = ((KahaUpdateMessageCommand) command).getMessage();
1246                    break;
1247                default:
1248                    addMessage = (KahaAddMessageCommand) command;
1249            }
1250            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1251            return msg;
1252        } catch (IOException ioe) {
1253            LOG.error("Failed to load message at: {}", location , ioe);
1254            brokerService.handleIOException(ioe);
1255            throw ioe;
1256        }
1257    }
1258
1259    // /////////////////////////////////////////////////////////////////
1260    // Internal conversion methods.
1261    // /////////////////////////////////////////////////////////////////
1262
1263    KahaLocation convert(Location location) {
1264        KahaLocation rc = new KahaLocation();
1265        rc.setLogId(location.getDataFileId());
1266        rc.setOffset(location.getOffset());
1267        return rc;
1268    }
1269
1270    KahaDestination convert(ActiveMQDestination dest) {
1271        KahaDestination rc = new KahaDestination();
1272        rc.setName(dest.getPhysicalName());
1273        switch (dest.getDestinationType()) {
1274        case ActiveMQDestination.QUEUE_TYPE:
1275            rc.setType(DestinationType.QUEUE);
1276            return rc;
1277        case ActiveMQDestination.TOPIC_TYPE:
1278            rc.setType(DestinationType.TOPIC);
1279            return rc;
1280        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1281            rc.setType(DestinationType.TEMP_QUEUE);
1282            return rc;
1283        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1284            rc.setType(DestinationType.TEMP_TOPIC);
1285            return rc;
1286        default:
1287            return null;
1288        }
1289    }
1290
1291    ActiveMQDestination convert(String dest) {
1292        int p = dest.indexOf(":");
1293        if (p < 0) {
1294            throw new IllegalArgumentException("Not in the valid destination format");
1295        }
1296        int type = Integer.parseInt(dest.substring(0, p));
1297        String name = dest.substring(p + 1);
1298        return convert(type, name);
1299    }
1300
1301    private ActiveMQDestination convert(KahaDestination commandDestination) {
1302        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1303    }
1304
1305    private ActiveMQDestination convert(int type, String name) {
1306        switch (KahaDestination.DestinationType.valueOf(type)) {
1307        case QUEUE:
1308            return new ActiveMQQueue(name);
1309        case TOPIC:
1310            return new ActiveMQTopic(name);
1311        case TEMP_QUEUE:
1312            return new ActiveMQTempQueue(name);
1313        case TEMP_TOPIC:
1314            return new ActiveMQTempTopic(name);
1315        default:
1316            throw new IllegalArgumentException("Not in the valid destination format");
1317        }
1318    }
1319
1320    public TransactionIdTransformer getTransactionIdTransformer() {
1321        return transactionIdTransformer;
1322    }
1323
1324    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1325        this.transactionIdTransformer = transactionIdTransformer;
1326    }
1327
1328    static class AsyncJobKey {
1329        MessageId id;
1330        ActiveMQDestination destination;
1331
1332        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1333            this.id = id;
1334            this.destination = destination;
1335        }
1336
1337        @Override
1338        public boolean equals(Object obj) {
1339            if (obj == this) {
1340                return true;
1341            }
1342            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1343                    && destination.equals(((AsyncJobKey) obj).destination);
1344        }
1345
1346        @Override
1347        public int hashCode() {
1348            return id.hashCode() + destination.hashCode();
1349        }
1350
1351        @Override
1352        public String toString() {
1353            return destination.getPhysicalName() + "-" + id;
1354        }
1355    }
1356
1357    public interface StoreTask {
1358        public boolean cancel();
1359
1360        public void aquireLocks();
1361
1362        public void releaseLocks();
1363    }
1364
1365    class StoreQueueTask implements Runnable, StoreTask {
1366        protected final Message message;
1367        protected final ConnectionContext context;
1368        protected final KahaDBMessageStore store;
1369        protected final InnerFutureTask future;
1370        protected final AtomicBoolean done = new AtomicBoolean();
1371        protected final AtomicBoolean locked = new AtomicBoolean();
1372
1373        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1374            this.store = store;
1375            this.context = context;
1376            this.message = message;
1377            this.future = new InnerFutureTask(this);
1378        }
1379
1380        public ListenableFuture<Object> getFuture() {
1381            return this.future;
1382        }
1383
1384        @Override
1385        public boolean cancel() {
1386            if (this.done.compareAndSet(false, true)) {
1387                return this.future.cancel(false);
1388            }
1389            return false;
1390        }
1391
1392        @Override
1393        public void aquireLocks() {
1394            if (this.locked.compareAndSet(false, true)) {
1395                try {
1396                    globalQueueSemaphore.acquire();
1397                    store.acquireLocalAsyncLock();
1398                    message.incrementReferenceCount();
1399                } catch (InterruptedException e) {
1400                    LOG.warn("Failed to aquire lock", e);
1401                }
1402            }
1403
1404        }
1405
1406        @Override
1407        public void releaseLocks() {
1408            if (this.locked.compareAndSet(true, false)) {
1409                store.releaseLocalAsyncLock();
1410                globalQueueSemaphore.release();
1411                message.decrementReferenceCount();
1412            }
1413        }
1414
1415        @Override
1416        public void run() {
1417            this.store.doneTasks++;
1418            try {
1419                if (this.done.compareAndSet(false, true)) {
1420                    this.store.addMessage(context, message);
1421                    removeQueueTask(this.store, this.message.getMessageId());
1422                    this.future.complete();
1423                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1424                    System.err.println(this.store.dest.getName() + " cancelled: "
1425                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1426                    this.store.canceledTasks = this.store.doneTasks = 0;
1427                }
1428            } catch (Exception e) {
1429                this.future.setException(e);
1430            }
1431        }
1432
1433        protected Message getMessage() {
1434            return this.message;
1435        }
1436
1437        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1438
1439            private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
1440
1441            public InnerFutureTask(Runnable runnable) {
1442                super(runnable, null);
1443            }
1444
1445            public void setException(final Exception e) {
1446                super.setException(e);
1447            }
1448
1449            public void complete() {
1450                super.set(null);
1451            }
1452
1453            @Override
1454            public void done() {
1455                fireListener();
1456            }
1457
1458            @Override
1459            public void addListener(Runnable listener) {
1460                this.listenerRef.set(listener);
1461                if (isDone()) {
1462                    fireListener();
1463                }
1464            }
1465
1466            private void fireListener() {
1467                Runnable listener = listenerRef.getAndSet(null);
1468                if (listener != null) {
1469                    try {
1470                        listener.run();
1471                    } catch (Exception ignored) {
1472                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1473                    }
1474                }
1475            }
1476        }
1477    }
1478
1479    class StoreTopicTask extends StoreQueueTask {
1480        private final int subscriptionCount;
1481        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1482        private final KahaDBTopicMessageStore topicStore;
1483        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1484                int subscriptionCount) {
1485            super(store, context, message);
1486            this.topicStore = store;
1487            this.subscriptionCount = subscriptionCount;
1488
1489        }
1490
1491        @Override
1492        public void aquireLocks() {
1493            if (this.locked.compareAndSet(false, true)) {
1494                try {
1495                    globalTopicSemaphore.acquire();
1496                    store.acquireLocalAsyncLock();
1497                    message.incrementReferenceCount();
1498                } catch (InterruptedException e) {
1499                    LOG.warn("Failed to aquire lock", e);
1500                }
1501            }
1502        }
1503
1504        @Override
1505        public void releaseLocks() {
1506            if (this.locked.compareAndSet(true, false)) {
1507                message.decrementReferenceCount();
1508                store.releaseLocalAsyncLock();
1509                globalTopicSemaphore.release();
1510            }
1511        }
1512
1513        /**
1514         * add a key
1515         *
1516         * @param key
1517         * @return true if all acknowledgements received
1518         */
1519        public boolean addSubscriptionKey(String key) {
1520            synchronized (this.subscriptionKeys) {
1521                this.subscriptionKeys.add(key);
1522            }
1523            return this.subscriptionKeys.size() >= this.subscriptionCount;
1524        }
1525
1526        @Override
1527        public void run() {
1528            this.store.doneTasks++;
1529            try {
1530                if (this.done.compareAndSet(false, true)) {
1531                    this.topicStore.addMessage(context, message);
1532                    // apply any acks we have
1533                    synchronized (this.subscriptionKeys) {
1534                        for (String key : this.subscriptionKeys) {
1535                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1536
1537                        }
1538                    }
1539                    removeTopicTask(this.topicStore, this.message.getMessageId());
1540                    this.future.complete();
1541                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1542                    System.err.println(this.store.dest.getName() + " cancelled: "
1543                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1544                    this.store.canceledTasks = this.store.doneTasks = 0;
1545                }
1546            } catch (Exception e) {
1547                this.future.setException(e);
1548            }
1549        }
1550    }
1551
1552    public class StoreTaskExecutor extends ThreadPoolExecutor {
1553
1554        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1555            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1556        }
1557
1558        @Override
1559        protected void afterExecute(Runnable runnable, Throwable throwable) {
1560            super.afterExecute(runnable, throwable);
1561
1562            if (runnable instanceof StoreTask) {
1563               ((StoreTask)runnable).releaseLocks();
1564            }
1565        }
1566    }
1567
1568    @Override
1569    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1570        return new JobSchedulerStoreImpl();
1571    }
1572
1573    /* (non-Javadoc)
1574     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
1575     */
1576    @Override
1577    public boolean isPersistNoLocal() {
1578        // Prior to v11 the broker did not store the noLocal value for durable subs.
1579        return brokerService.getStoreOpenWireVersion() >= 11;
1580    }
1581}