001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.Iterator;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028import javax.jms.IllegalStateException;
029import javax.jms.JMSException;
030
031import org.apache.activemq.DestinationDoesNotExistException;
032import org.apache.activemq.advisory.AdvisorySupport;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.ConsumerBrokerExchange;
035import org.apache.activemq.broker.ProducerBrokerExchange;
036import org.apache.activemq.broker.region.policy.PolicyEntry;
037import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ConsumerControl;
040import org.apache.activemq.command.ConsumerId;
041import org.apache.activemq.command.ConsumerInfo;
042import org.apache.activemq.command.Message;
043import org.apache.activemq.command.MessageAck;
044import org.apache.activemq.command.MessageDispatchNotification;
045import org.apache.activemq.command.MessagePull;
046import org.apache.activemq.command.ProducerInfo;
047import org.apache.activemq.command.RemoveSubscriptionInfo;
048import org.apache.activemq.command.Response;
049import org.apache.activemq.filter.DestinationFilter;
050import org.apache.activemq.filter.DestinationMap;
051import org.apache.activemq.security.SecurityContext;
052import org.apache.activemq.thread.TaskRunnerFactory;
053import org.apache.activemq.usage.SystemUsage;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 *
059 */
060public abstract class AbstractRegion implements Region {
061
062    private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
063
064    protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
065    protected final DestinationMap destinationMap = new DestinationMap();
066    protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
067    protected final SystemUsage usageManager;
068    protected final DestinationFactory destinationFactory;
069    protected final DestinationStatistics destinationStatistics;
070    protected final RegionStatistics regionStatistics = new RegionStatistics();
071    protected final RegionBroker broker;
072    protected boolean autoCreateDestinations = true;
073    protected final TaskRunnerFactory taskRunnerFactory;
074    protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
075    protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
076    protected boolean started;
077
078    public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
079            TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
080        if (broker == null) {
081            throw new IllegalArgumentException("null broker");
082        }
083        this.broker = broker;
084        this.destinationStatistics = destinationStatistics;
085        this.usageManager = memoryManager;
086        this.taskRunnerFactory = taskRunnerFactory;
087        if (destinationFactory == null) {
088            throw new IllegalArgumentException("null destinationFactory");
089        }
090        this.destinationFactory = destinationFactory;
091    }
092
093    @Override
094    public final void start() throws Exception {
095        started = true;
096
097        Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
098        for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
099            ActiveMQDestination dest = iter.next();
100
101            ConnectionContext context = new ConnectionContext();
102            context.setBroker(broker.getBrokerService().getBroker());
103            context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
104            context.getBroker().addDestination(context, dest, false);
105        }
106        destinationsLock.readLock().lock();
107        try{
108            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
109                Destination dest = i.next();
110                dest.start();
111            }
112        } finally {
113            destinationsLock.readLock().unlock();
114        }
115    }
116
117    @Override
118    public void stop() throws Exception {
119        started = false;
120        destinationsLock.readLock().lock();
121        try{
122            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
123                Destination dest = i.next();
124                dest.stop();
125            }
126        } finally {
127            destinationsLock.readLock().unlock();
128        }
129
130        destinationsLock.writeLock().lock();
131        try {
132            destinations.clear();
133            regionStatistics.getAdvisoryDestinations().reset();
134            regionStatistics.getDestinations().reset();
135            regionStatistics.getAllDestinations().reset();
136        } finally {
137            destinationsLock.writeLock().unlock();
138        }
139    }
140
141    @Override
142    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
143            boolean createIfTemporary) throws Exception {
144
145        destinationsLock.writeLock().lock();
146        try {
147            Destination dest = destinations.get(destination);
148            if (dest == null) {
149                if (destination.isTemporary() == false || createIfTemporary) {
150                    // Limit the number of destinations that can be created if
151                    // maxDestinations has been set on a policy
152                    validateMaxDestinations(destination);
153
154                    LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination);
155                    dest = createDestination(context, destination);
156                    // intercept if there is a valid interceptor defined
157                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
158                    if (destinationInterceptor != null) {
159                        dest = destinationInterceptor.intercept(dest);
160                    }
161                    dest.start();
162                    addSubscriptionsForDestination(context, dest);
163                    destinations.put(destination, dest);
164                    updateRegionDestCounts(destination, 1);
165                    destinationMap.put(destination, dest);
166                }
167                if (dest == null) {
168                    throw new DestinationDoesNotExistException(destination.getQualifiedName());
169                }
170            }
171            return dest;
172        } finally {
173            destinationsLock.writeLock().unlock();
174        }
175    }
176
177    public Map<ConsumerId, Subscription> getSubscriptions() {
178        return subscriptions;
179    }
180
181
182    /**
183     * Updates the counts in RegionStatistics based on whether or not the destination
184     * is an Advisory Destination or not
185     *
186     * @param destination the destination being used to determine which counters to update
187     * @param count the count to add to the counters
188     */
189    protected void updateRegionDestCounts(ActiveMQDestination destination, int count) {
190        if (destination != null) {
191            if (AdvisorySupport.isAdvisoryTopic(destination)) {
192                regionStatistics.getAdvisoryDestinations().add(count);
193            } else {
194                regionStatistics.getDestinations().add(count);
195            }
196            regionStatistics.getAllDestinations().add(count);
197        }
198    }
199
200    /**
201     * This method checks whether or not the destination can be created based on
202     * {@link PolicyEntry#getMaxDestinations}, if it has been set. Advisory
203     * topics are ignored.
204     *
205     * @param destination
206     * @throws Exception
207     */
208    protected void validateMaxDestinations(ActiveMQDestination destination)
209            throws Exception {
210        if (broker.getDestinationPolicy() != null) {
211            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
212            // Make sure the destination is not an advisory topic
213            if (entry != null && entry.getMaxDestinations() >= 0
214                    && !AdvisorySupport.isAdvisoryTopic(destination)) {
215                // If there is an entry for this destination, look up the set of
216                // destinations associated with this policy
217                // If a destination isn't specified, then just count up
218                // non-advisory destinations (ie count all destinations)
219                int destinationSize = (int) (entry.getDestination() != null ?
220                        destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
221                if (destinationSize >= entry.getMaxDestinations()) {
222                    if (entry.getDestination() != null) {
223                        throw new IllegalStateException(
224                                "The maxmimum number of destinations allowed ("+ entry.getMaxDestinations() +
225                                ") for the policy " + entry.getDestination() + " has already been reached.");
226                    // No destination has been set (default policy)
227                    } else {
228                        throw new IllegalStateException("The maxmimum number of destinations allowed ("
229                                        + entry.getMaxDestinations() + ") has already been reached.");
230                    }
231                }
232            }
233        }
234    }
235
236    protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
237        List<Subscription> rc = new ArrayList<Subscription>();
238        // Add all consumers that are interested in the destination.
239        for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
240            Subscription sub = iter.next();
241            if (sub.matches(dest.getActiveMQDestination())) {
242                try {
243                    ConnectionContext originalContext = sub.getContext() != null ? sub.getContext() : context;
244                    dest.addSubscription(originalContext, sub);
245                    rc.add(sub);
246                } catch (SecurityException e) {
247                    if (sub.isWildcard()) {
248                        LOG.debug("Subscription denied for " + sub + " to destination " +
249                            dest.getActiveMQDestination() +  ": " + e.getMessage());
250                    } else {
251                        throw e;
252                    }
253                }
254            }
255        }
256        return rc;
257
258    }
259
260    @Override
261    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
262            throws Exception {
263
264        // No timeout.. then try to shut down right way, fails if there are
265        // current subscribers.
266        if (timeout == 0) {
267            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
268                Subscription sub = iter.next();
269                if (sub.matches(destination) ) {
270                    throw new JMSException("Destination still has an active subscription: " + destination);
271                }
272            }
273        }
274
275        if (timeout > 0) {
276            // TODO: implement a way to notify the subscribers that we want to
277            // take the down
278            // the destination and that they should un-subscribe.. Then wait up
279            // to timeout time before
280            // dropping the subscription.
281        }
282
283        LOG.debug("{} removing destination: {}", broker.getBrokerName(), destination);
284
285        destinationsLock.writeLock().lock();
286        try {
287            Destination dest = destinations.remove(destination);
288            if (dest != null) {
289                updateRegionDestCounts(destination, -1);
290
291                // timeout<0 or we timed out, we now force any remaining
292                // subscriptions to un-subscribe.
293                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
294                    Subscription sub = iter.next();
295                    if (sub.matches(destination)) {
296                        dest.removeSubscription(context, sub, 0l);
297                    }
298                }
299                destinationMap.remove(destination, dest);
300                dispose(context, dest);
301                DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
302                if (destinationInterceptor != null) {
303                    destinationInterceptor.remove(dest);
304                }
305
306            } else {
307                LOG.debug("Cannot remove a destination that doesn't exist: {}", destination);
308            }
309        } finally {
310            destinationsLock.writeLock().unlock();
311        }
312    }
313
314    /**
315     * Provide an exact or wildcard lookup of destinations in the region
316     *
317     * @return a set of matching destination objects.
318     */
319    @Override
320    @SuppressWarnings("unchecked")
321    public Set<Destination> getDestinations(ActiveMQDestination destination) {
322        destinationsLock.readLock().lock();
323        try{
324            return destinationMap.get(destination);
325        } finally {
326            destinationsLock.readLock().unlock();
327        }
328    }
329
330    @Override
331    public Map<ActiveMQDestination, Destination> getDestinationMap() {
332        return destinations;
333    }
334
335    @Override
336    @SuppressWarnings("unchecked")
337    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
338        LOG.debug("{} adding consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() });
339        ActiveMQDestination destination = info.getDestination();
340        if (destination != null && !destination.isPattern() && !destination.isComposite()) {
341            // lets auto-create the destination
342            lookup(context, destination,true);
343        }
344
345        Object addGuard;
346        synchronized (consumerChangeMutexMap) {
347            addGuard = consumerChangeMutexMap.get(info.getConsumerId());
348            if (addGuard == null) {
349                addGuard = new Object();
350                consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
351            }
352        }
353        synchronized (addGuard) {
354            Subscription o = subscriptions.get(info.getConsumerId());
355            if (o != null) {
356                LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
357                return o;
358            }
359
360            // We may need to add some destinations that are in persistent store
361            // but not active
362            // in the broker.
363            //
364            // TODO: think about this a little more. This is good cause
365            // destinations are not loaded into
366            // memory until a client needs to use the queue, but a management
367            // agent viewing the
368            // broker will not see a destination that exists in persistent
369            // store. We may want to
370            // eagerly load all destinations into the broker but have an
371            // inactive state for the
372            // destination which has reduced memory usage.
373            //
374            DestinationFilter.parseFilter(info.getDestination());
375
376            Subscription sub = createSubscription(context, info);
377
378            // At this point we're done directly manipulating subscriptions,
379            // but we need to retain the synchronized block here. Consider
380            // otherwise what would happen if at this point a second
381            // thread added, then removed, as would be allowed with
382            // no mutex held. Remove is only essentially run once
383            // so everything after this point would be leaked.
384
385            // Add the subscription to all the matching queues.
386            // But copy the matches first - to prevent deadlocks
387            List<Destination> addList = new ArrayList<Destination>();
388            destinationsLock.readLock().lock();
389            try {
390                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
391                    addList.add(dest);
392                }
393                // ensure sub visible to any new dest addSubscriptionsForDestination
394                subscriptions.put(info.getConsumerId(), sub);
395            } finally {
396                destinationsLock.readLock().unlock();
397            }
398
399            List<Destination> removeList = new ArrayList<Destination>();
400            for (Destination dest : addList) {
401                try {
402                    dest.addSubscription(context, sub);
403                    removeList.add(dest);
404                } catch (SecurityException e){
405                    if (sub.isWildcard()) {
406                        LOG.debug("Subscription denied for " + sub + " to destination " +
407                            dest.getActiveMQDestination() + ": " + e.getMessage());
408                    } else {
409                        // remove partial subscriptions
410                        for (Destination remove : removeList) {
411                            try {
412                                remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
413                            } catch (Exception ex) {
414                                LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex);
415                            }
416                        }
417                        subscriptions.remove(info.getConsumerId());
418                        removeList.clear();
419                        throw e;
420                    }
421                }
422            }
423            removeList.clear();
424
425            if (info.isBrowser()) {
426                ((QueueBrowserSubscription) sub).destinationsAdded();
427            }
428
429            return sub;
430        }
431    }
432
433    /**
434     * Get all the Destinations that are in storage
435     *
436     * @return Set of all stored destinations
437     */
438    @SuppressWarnings("rawtypes")
439    public Set getDurableDestinations() {
440        return destinationFactory.getDestinations();
441    }
442
443    /**
444     * @return all Destinations that don't have active consumers
445     */
446    protected Set<ActiveMQDestination> getInactiveDestinations() {
447        Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
448        destinationsLock.readLock().lock();
449        try {
450            inactiveDests.removeAll(destinations.keySet());
451        } finally {
452            destinationsLock.readLock().unlock();
453        }
454        return inactiveDests;
455    }
456
457    @Override
458    @SuppressWarnings("unchecked")
459    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
460        LOG.debug("{} removing consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() });
461
462        Subscription sub = subscriptions.remove(info.getConsumerId());
463        // The sub could be removed elsewhere - see ConnectionSplitBroker
464        if (sub != null) {
465
466            // remove the subscription from all the matching queues.
467            List<Destination> removeList = new ArrayList<Destination>();
468            destinationsLock.readLock().lock();
469            try {
470                for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
471                    removeList.add(dest);
472                }
473            } finally {
474                destinationsLock.readLock().unlock();
475            }
476            for (Destination dest : removeList) {
477                dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
478            }
479
480            destroySubscription(sub);
481        }
482        synchronized (consumerChangeMutexMap) {
483            consumerChangeMutexMap.remove(info.getConsumerId());
484        }
485    }
486
487    protected void destroySubscription(Subscription sub) {
488        sub.destroy();
489    }
490
491    @Override
492    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
493        throw new JMSException("Invalid operation.");
494    }
495
496    @Override
497    public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
498        final ConnectionContext context = producerExchange.getConnectionContext();
499
500        if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
501            final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
502            producerExchange.setRegionDestination(regionDestination);
503        }
504
505        producerExchange.getRegionDestination().send(producerExchange, messageSend);
506
507        if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){
508            producerExchange.getProducerState().getInfo().incrementSentCount();
509        }
510    }
511
512    @Override
513    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
514        Subscription sub = consumerExchange.getSubscription();
515        if (sub == null) {
516            sub = subscriptions.get(ack.getConsumerId());
517            if (sub == null) {
518                if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
519                    LOG.warn("Ack for non existent subscription, ack: {}", ack);
520                    throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
521                } else {
522                    LOG.debug("Ack for non existent subscription in recovery, ack: {}", ack);
523                    return;
524                }
525            }
526            consumerExchange.setSubscription(sub);
527        }
528        sub.acknowledge(consumerExchange.getConnectionContext(), ack);
529    }
530
531    @Override
532    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
533        Subscription sub = subscriptions.get(pull.getConsumerId());
534        if (sub == null) {
535            throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
536        }
537        return sub.pullMessage(context, pull);
538    }
539
540    protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
541        Destination dest = null;
542
543        destinationsLock.readLock().lock();
544        try {
545            dest = destinations.get(destination);
546        } finally {
547            destinationsLock.readLock().unlock();
548        }
549
550        if (dest == null) {
551            if (isAutoCreateDestinations()) {
552                // Try to auto create the destination... re-invoke broker
553                // from the
554                // top so that the proper security checks are performed.
555                context.getBroker().addDestination(context, destination, createTemporary);
556                dest = addDestination(context, destination, false);
557                // We should now have the dest created.
558                destinationsLock.readLock().lock();
559                try {
560                    dest = destinations.get(destination);
561                } finally {
562                    destinationsLock.readLock().unlock();
563                }
564            }
565
566            if (dest == null) {
567                throw new JMSException("The destination " + destination + " does not exist.");
568            }
569        }
570        return dest;
571    }
572
573    @Override
574    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
575        Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
576        if (sub != null) {
577            sub.processMessageDispatchNotification(messageDispatchNotification);
578        } else {
579            throw new JMSException("Slave broker out of sync with master - Subscription: "
580                    + messageDispatchNotification.getConsumerId() + " on "
581                    + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
582                    + messageDispatchNotification.getMessageId());
583        }
584    }
585
586    /*
587     * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
588     * dispatch is deferred till the notification to ensure that the
589     * subscription chosen by the master is used. AMQ-2102
590     */
591    protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
592            throws Exception {
593        Destination dest = null;
594        destinationsLock.readLock().lock();
595        try {
596            dest = destinations.get(messageDispatchNotification.getDestination());
597        } finally {
598            destinationsLock.readLock().unlock();
599        }
600
601        if (dest != null) {
602            dest.processDispatchNotification(messageDispatchNotification);
603        } else {
604            throw new JMSException("Slave broker out of sync with master - Destination: "
605                    + messageDispatchNotification.getDestination() + " does not exist for consumer "
606                    + messageDispatchNotification.getConsumerId() + " with message: "
607                    + messageDispatchNotification.getMessageId());
608        }
609    }
610
611    @Override
612    public void gc() {
613        for (Subscription sub : subscriptions.values()) {
614            sub.gc();
615        }
616
617        destinationsLock.readLock().lock();
618        try {
619            for (Destination dest : destinations.values()) {
620                dest.gc();
621            }
622        } finally {
623            destinationsLock.readLock().unlock();
624        }
625    }
626
627    protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
628
629    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
630            throws Exception {
631        return destinationFactory.createDestination(context, destination, destinationStatistics);
632    }
633
634    public boolean isAutoCreateDestinations() {
635        return autoCreateDestinations;
636    }
637
638    public void setAutoCreateDestinations(boolean autoCreateDestinations) {
639        this.autoCreateDestinations = autoCreateDestinations;
640    }
641
642    @Override
643    @SuppressWarnings("unchecked")
644    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
645        destinationsLock.readLock().lock();
646        try {
647            for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
648                dest.addProducer(context, info);
649            }
650        } finally {
651            destinationsLock.readLock().unlock();
652        }
653    }
654
655    /**
656     * Removes a Producer.
657     *
658     * @param context
659     *            the environment the operation is being executed under.
660     * @throws Exception
661     *             TODO
662     */
663    @Override
664    @SuppressWarnings("unchecked")
665    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
666        destinationsLock.readLock().lock();
667        try {
668            for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
669                dest.removeProducer(context, info);
670            }
671        } finally {
672            destinationsLock.readLock().unlock();
673        }
674    }
675
676    protected void dispose(ConnectionContext context, Destination dest) throws Exception {
677        dest.dispose(context);
678        dest.stop();
679        destinationFactory.removeDestination(dest);
680    }
681
682    @Override
683    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
684        Subscription sub = subscriptions.get(control.getConsumerId());
685        if (sub != null && sub instanceof AbstractSubscription) {
686            ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
687            if (broker.getDestinationPolicy() != null) {
688                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination());
689                if (entry != null) {
690                    entry.configurePrefetch(sub);
691                }
692            }
693            LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()});
694            try {
695                lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
696            } catch (Exception e) {
697                LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: {}", control.getDestination(), e);
698            }
699        }
700    }
701
702    @Override
703    public void reapplyInterceptor() {
704        destinationsLock.writeLock().lock();
705        try {
706            DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
707            Map<ActiveMQDestination, Destination> map = getDestinationMap();
708            for (ActiveMQDestination key : map.keySet()) {
709                Destination destination = map.get(key);
710                if (destination instanceof CompositeDestinationFilter) {
711                    destination = ((CompositeDestinationFilter) destination).next;
712                }
713                if (destinationInterceptor != null) {
714                    destination = destinationInterceptor.intercept(destination);
715                }
716                getDestinationMap().put(key, destination);
717                Destination prev = destinations.put(key, destination);
718                if (prev == null) {
719                    updateRegionDestCounts(key, 1);
720                }
721            }
722        } finally {
723            destinationsLock.writeLock().unlock();
724        }
725    }
726}