001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.Iterator; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028import javax.jms.IllegalStateException; 029import javax.jms.JMSException; 030 031import org.apache.activemq.DestinationDoesNotExistException; 032import org.apache.activemq.advisory.AdvisorySupport; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ConsumerBrokerExchange; 035import org.apache.activemq.broker.ProducerBrokerExchange; 036import org.apache.activemq.broker.region.policy.PolicyEntry; 037import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ConsumerControl; 040import org.apache.activemq.command.ConsumerId; 041import org.apache.activemq.command.ConsumerInfo; 042import org.apache.activemq.command.Message; 043import org.apache.activemq.command.MessageAck; 044import org.apache.activemq.command.MessageDispatchNotification; 045import org.apache.activemq.command.MessagePull; 046import org.apache.activemq.command.ProducerInfo; 047import org.apache.activemq.command.RemoveSubscriptionInfo; 048import org.apache.activemq.command.Response; 049import org.apache.activemq.filter.DestinationFilter; 050import org.apache.activemq.filter.DestinationMap; 051import org.apache.activemq.security.SecurityContext; 052import org.apache.activemq.thread.TaskRunnerFactory; 053import org.apache.activemq.usage.SystemUsage; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * 059 */ 060public abstract class AbstractRegion implements Region { 061 062 private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class); 063 064 protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 065 protected final DestinationMap destinationMap = new DestinationMap(); 066 protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>(); 067 protected final SystemUsage usageManager; 068 protected final DestinationFactory destinationFactory; 069 protected final DestinationStatistics destinationStatistics; 070 protected final RegionStatistics regionStatistics = new RegionStatistics(); 071 protected final RegionBroker broker; 072 protected boolean autoCreateDestinations = true; 073 protected final TaskRunnerFactory taskRunnerFactory; 074 protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock(); 075 protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>(); 076 protected boolean started; 077 078 public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, 079 TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 080 if (broker == null) { 081 throw new IllegalArgumentException("null broker"); 082 } 083 this.broker = broker; 084 this.destinationStatistics = destinationStatistics; 085 this.usageManager = memoryManager; 086 this.taskRunnerFactory = taskRunnerFactory; 087 if (destinationFactory == null) { 088 throw new IllegalArgumentException("null destinationFactory"); 089 } 090 this.destinationFactory = destinationFactory; 091 } 092 093 @Override 094 public final void start() throws Exception { 095 started = true; 096 097 Set<ActiveMQDestination> inactiveDests = getInactiveDestinations(); 098 for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) { 099 ActiveMQDestination dest = iter.next(); 100 101 ConnectionContext context = new ConnectionContext(); 102 context.setBroker(broker.getBrokerService().getBroker()); 103 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 104 context.getBroker().addDestination(context, dest, false); 105 } 106 destinationsLock.readLock().lock(); 107 try{ 108 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 109 Destination dest = i.next(); 110 dest.start(); 111 } 112 } finally { 113 destinationsLock.readLock().unlock(); 114 } 115 } 116 117 @Override 118 public void stop() throws Exception { 119 started = false; 120 destinationsLock.readLock().lock(); 121 try{ 122 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 123 Destination dest = i.next(); 124 dest.stop(); 125 } 126 } finally { 127 destinationsLock.readLock().unlock(); 128 } 129 130 destinationsLock.writeLock().lock(); 131 try { 132 destinations.clear(); 133 regionStatistics.getAdvisoryDestinations().reset(); 134 regionStatistics.getDestinations().reset(); 135 regionStatistics.getAllDestinations().reset(); 136 } finally { 137 destinationsLock.writeLock().unlock(); 138 } 139 } 140 141 @Override 142 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, 143 boolean createIfTemporary) throws Exception { 144 145 destinationsLock.writeLock().lock(); 146 try { 147 Destination dest = destinations.get(destination); 148 if (dest == null) { 149 if (destination.isTemporary() == false || createIfTemporary) { 150 // Limit the number of destinations that can be created if 151 // maxDestinations has been set on a policy 152 validateMaxDestinations(destination); 153 154 LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination); 155 dest = createDestination(context, destination); 156 // intercept if there is a valid interceptor defined 157 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 158 if (destinationInterceptor != null) { 159 dest = destinationInterceptor.intercept(dest); 160 } 161 dest.start(); 162 addSubscriptionsForDestination(context, dest); 163 destinations.put(destination, dest); 164 updateRegionDestCounts(destination, 1); 165 destinationMap.put(destination, dest); 166 } 167 if (dest == null) { 168 throw new DestinationDoesNotExistException(destination.getQualifiedName()); 169 } 170 } 171 return dest; 172 } finally { 173 destinationsLock.writeLock().unlock(); 174 } 175 } 176 177 public Map<ConsumerId, Subscription> getSubscriptions() { 178 return subscriptions; 179 } 180 181 182 /** 183 * Updates the counts in RegionStatistics based on whether or not the destination 184 * is an Advisory Destination or not 185 * 186 * @param destination the destination being used to determine which counters to update 187 * @param count the count to add to the counters 188 */ 189 protected void updateRegionDestCounts(ActiveMQDestination destination, int count) { 190 if (destination != null) { 191 if (AdvisorySupport.isAdvisoryTopic(destination)) { 192 regionStatistics.getAdvisoryDestinations().add(count); 193 } else { 194 regionStatistics.getDestinations().add(count); 195 } 196 regionStatistics.getAllDestinations().add(count); 197 } 198 } 199 200 /** 201 * This method checks whether or not the destination can be created based on 202 * {@link PolicyEntry#getMaxDestinations}, if it has been set. Advisory 203 * topics are ignored. 204 * 205 * @param destination 206 * @throws Exception 207 */ 208 protected void validateMaxDestinations(ActiveMQDestination destination) 209 throws Exception { 210 if (broker.getDestinationPolicy() != null) { 211 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 212 // Make sure the destination is not an advisory topic 213 if (entry != null && entry.getMaxDestinations() >= 0 214 && !AdvisorySupport.isAdvisoryTopic(destination)) { 215 // If there is an entry for this destination, look up the set of 216 // destinations associated with this policy 217 // If a destination isn't specified, then just count up 218 // non-advisory destinations (ie count all destinations) 219 int destinationSize = (int) (entry.getDestination() != null ? 220 destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount()); 221 if (destinationSize >= entry.getMaxDestinations()) { 222 if (entry.getDestination() != null) { 223 throw new IllegalStateException( 224 "The maxmimum number of destinations allowed ("+ entry.getMaxDestinations() + 225 ") for the policy " + entry.getDestination() + " has already been reached."); 226 // No destination has been set (default policy) 227 } else { 228 throw new IllegalStateException("The maxmimum number of destinations allowed (" 229 + entry.getMaxDestinations() + ") has already been reached."); 230 } 231 } 232 } 233 } 234 } 235 236 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { 237 List<Subscription> rc = new ArrayList<Subscription>(); 238 // Add all consumers that are interested in the destination. 239 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 240 Subscription sub = iter.next(); 241 if (sub.matches(dest.getActiveMQDestination())) { 242 try { 243 ConnectionContext originalContext = sub.getContext() != null ? sub.getContext() : context; 244 dest.addSubscription(originalContext, sub); 245 rc.add(sub); 246 } catch (SecurityException e) { 247 if (sub.isWildcard()) { 248 LOG.debug("Subscription denied for " + sub + " to destination " + 249 dest.getActiveMQDestination() + ": " + e.getMessage()); 250 } else { 251 throw e; 252 } 253 } 254 } 255 } 256 return rc; 257 258 } 259 260 @Override 261 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 262 throws Exception { 263 264 // No timeout.. then try to shut down right way, fails if there are 265 // current subscribers. 266 if (timeout == 0) { 267 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 268 Subscription sub = iter.next(); 269 if (sub.matches(destination) ) { 270 throw new JMSException("Destination still has an active subscription: " + destination); 271 } 272 } 273 } 274 275 if (timeout > 0) { 276 // TODO: implement a way to notify the subscribers that we want to 277 // take the down 278 // the destination and that they should un-subscribe.. Then wait up 279 // to timeout time before 280 // dropping the subscription. 281 } 282 283 LOG.debug("{} removing destination: {}", broker.getBrokerName(), destination); 284 285 destinationsLock.writeLock().lock(); 286 try { 287 Destination dest = destinations.remove(destination); 288 if (dest != null) { 289 updateRegionDestCounts(destination, -1); 290 291 // timeout<0 or we timed out, we now force any remaining 292 // subscriptions to un-subscribe. 293 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 294 Subscription sub = iter.next(); 295 if (sub.matches(destination)) { 296 dest.removeSubscription(context, sub, 0l); 297 } 298 } 299 destinationMap.remove(destination, dest); 300 dispose(context, dest); 301 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 302 if (destinationInterceptor != null) { 303 destinationInterceptor.remove(dest); 304 } 305 306 } else { 307 LOG.debug("Cannot remove a destination that doesn't exist: {}", destination); 308 } 309 } finally { 310 destinationsLock.writeLock().unlock(); 311 } 312 } 313 314 /** 315 * Provide an exact or wildcard lookup of destinations in the region 316 * 317 * @return a set of matching destination objects. 318 */ 319 @Override 320 @SuppressWarnings("unchecked") 321 public Set<Destination> getDestinations(ActiveMQDestination destination) { 322 destinationsLock.readLock().lock(); 323 try{ 324 return destinationMap.get(destination); 325 } finally { 326 destinationsLock.readLock().unlock(); 327 } 328 } 329 330 @Override 331 public Map<ActiveMQDestination, Destination> getDestinationMap() { 332 return destinations; 333 } 334 335 @Override 336 @SuppressWarnings("unchecked") 337 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 338 LOG.debug("{} adding consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() }); 339 ActiveMQDestination destination = info.getDestination(); 340 if (destination != null && !destination.isPattern() && !destination.isComposite()) { 341 // lets auto-create the destination 342 lookup(context, destination,true); 343 } 344 345 Object addGuard; 346 synchronized (consumerChangeMutexMap) { 347 addGuard = consumerChangeMutexMap.get(info.getConsumerId()); 348 if (addGuard == null) { 349 addGuard = new Object(); 350 consumerChangeMutexMap.put(info.getConsumerId(), addGuard); 351 } 352 } 353 synchronized (addGuard) { 354 Subscription o = subscriptions.get(info.getConsumerId()); 355 if (o != null) { 356 LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); 357 return o; 358 } 359 360 // We may need to add some destinations that are in persistent store 361 // but not active 362 // in the broker. 363 // 364 // TODO: think about this a little more. This is good cause 365 // destinations are not loaded into 366 // memory until a client needs to use the queue, but a management 367 // agent viewing the 368 // broker will not see a destination that exists in persistent 369 // store. We may want to 370 // eagerly load all destinations into the broker but have an 371 // inactive state for the 372 // destination which has reduced memory usage. 373 // 374 DestinationFilter.parseFilter(info.getDestination()); 375 376 Subscription sub = createSubscription(context, info); 377 378 // At this point we're done directly manipulating subscriptions, 379 // but we need to retain the synchronized block here. Consider 380 // otherwise what would happen if at this point a second 381 // thread added, then removed, as would be allowed with 382 // no mutex held. Remove is only essentially run once 383 // so everything after this point would be leaked. 384 385 // Add the subscription to all the matching queues. 386 // But copy the matches first - to prevent deadlocks 387 List<Destination> addList = new ArrayList<Destination>(); 388 destinationsLock.readLock().lock(); 389 try { 390 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 391 addList.add(dest); 392 } 393 // ensure sub visible to any new dest addSubscriptionsForDestination 394 subscriptions.put(info.getConsumerId(), sub); 395 } finally { 396 destinationsLock.readLock().unlock(); 397 } 398 399 List<Destination> removeList = new ArrayList<Destination>(); 400 for (Destination dest : addList) { 401 try { 402 dest.addSubscription(context, sub); 403 removeList.add(dest); 404 } catch (SecurityException e){ 405 if (sub.isWildcard()) { 406 LOG.debug("Subscription denied for " + sub + " to destination " + 407 dest.getActiveMQDestination() + ": " + e.getMessage()); 408 } else { 409 // remove partial subscriptions 410 for (Destination remove : removeList) { 411 try { 412 remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 413 } catch (Exception ex) { 414 LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex); 415 } 416 } 417 subscriptions.remove(info.getConsumerId()); 418 removeList.clear(); 419 throw e; 420 } 421 } 422 } 423 removeList.clear(); 424 425 if (info.isBrowser()) { 426 ((QueueBrowserSubscription) sub).destinationsAdded(); 427 } 428 429 return sub; 430 } 431 } 432 433 /** 434 * Get all the Destinations that are in storage 435 * 436 * @return Set of all stored destinations 437 */ 438 @SuppressWarnings("rawtypes") 439 public Set getDurableDestinations() { 440 return destinationFactory.getDestinations(); 441 } 442 443 /** 444 * @return all Destinations that don't have active consumers 445 */ 446 protected Set<ActiveMQDestination> getInactiveDestinations() { 447 Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations(); 448 destinationsLock.readLock().lock(); 449 try { 450 inactiveDests.removeAll(destinations.keySet()); 451 } finally { 452 destinationsLock.readLock().unlock(); 453 } 454 return inactiveDests; 455 } 456 457 @Override 458 @SuppressWarnings("unchecked") 459 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 460 LOG.debug("{} removing consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() }); 461 462 Subscription sub = subscriptions.remove(info.getConsumerId()); 463 // The sub could be removed elsewhere - see ConnectionSplitBroker 464 if (sub != null) { 465 466 // remove the subscription from all the matching queues. 467 List<Destination> removeList = new ArrayList<Destination>(); 468 destinationsLock.readLock().lock(); 469 try { 470 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 471 removeList.add(dest); 472 } 473 } finally { 474 destinationsLock.readLock().unlock(); 475 } 476 for (Destination dest : removeList) { 477 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 478 } 479 480 destroySubscription(sub); 481 } 482 synchronized (consumerChangeMutexMap) { 483 consumerChangeMutexMap.remove(info.getConsumerId()); 484 } 485 } 486 487 protected void destroySubscription(Subscription sub) { 488 sub.destroy(); 489 } 490 491 @Override 492 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 493 throw new JMSException("Invalid operation."); 494 } 495 496 @Override 497 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 498 final ConnectionContext context = producerExchange.getConnectionContext(); 499 500 if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { 501 final Destination regionDestination = lookup(context, messageSend.getDestination(),false); 502 producerExchange.setRegionDestination(regionDestination); 503 } 504 505 producerExchange.getRegionDestination().send(producerExchange, messageSend); 506 507 if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){ 508 producerExchange.getProducerState().getInfo().incrementSentCount(); 509 } 510 } 511 512 @Override 513 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 514 Subscription sub = consumerExchange.getSubscription(); 515 if (sub == null) { 516 sub = subscriptions.get(ack.getConsumerId()); 517 if (sub == null) { 518 if (!consumerExchange.getConnectionContext().isInRecoveryMode()) { 519 LOG.warn("Ack for non existent subscription, ack: {}", ack); 520 throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); 521 } else { 522 LOG.debug("Ack for non existent subscription in recovery, ack: {}", ack); 523 return; 524 } 525 } 526 consumerExchange.setSubscription(sub); 527 } 528 sub.acknowledge(consumerExchange.getConnectionContext(), ack); 529 } 530 531 @Override 532 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 533 Subscription sub = subscriptions.get(pull.getConsumerId()); 534 if (sub == null) { 535 throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId()); 536 } 537 return sub.pullMessage(context, pull); 538 } 539 540 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception { 541 Destination dest = null; 542 543 destinationsLock.readLock().lock(); 544 try { 545 dest = destinations.get(destination); 546 } finally { 547 destinationsLock.readLock().unlock(); 548 } 549 550 if (dest == null) { 551 if (isAutoCreateDestinations()) { 552 // Try to auto create the destination... re-invoke broker 553 // from the 554 // top so that the proper security checks are performed. 555 context.getBroker().addDestination(context, destination, createTemporary); 556 dest = addDestination(context, destination, false); 557 // We should now have the dest created. 558 destinationsLock.readLock().lock(); 559 try { 560 dest = destinations.get(destination); 561 } finally { 562 destinationsLock.readLock().unlock(); 563 } 564 } 565 566 if (dest == null) { 567 throw new JMSException("The destination " + destination + " does not exist."); 568 } 569 } 570 return dest; 571 } 572 573 @Override 574 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 575 Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId()); 576 if (sub != null) { 577 sub.processMessageDispatchNotification(messageDispatchNotification); 578 } else { 579 throw new JMSException("Slave broker out of sync with master - Subscription: " 580 + messageDispatchNotification.getConsumerId() + " on " 581 + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: " 582 + messageDispatchNotification.getMessageId()); 583 } 584 } 585 586 /* 587 * For a Queue/TempQueue, dispatch order is imperative to match acks, so the 588 * dispatch is deferred till the notification to ensure that the 589 * subscription chosen by the master is used. AMQ-2102 590 */ 591 protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) 592 throws Exception { 593 Destination dest = null; 594 destinationsLock.readLock().lock(); 595 try { 596 dest = destinations.get(messageDispatchNotification.getDestination()); 597 } finally { 598 destinationsLock.readLock().unlock(); 599 } 600 601 if (dest != null) { 602 dest.processDispatchNotification(messageDispatchNotification); 603 } else { 604 throw new JMSException("Slave broker out of sync with master - Destination: " 605 + messageDispatchNotification.getDestination() + " does not exist for consumer " 606 + messageDispatchNotification.getConsumerId() + " with message: " 607 + messageDispatchNotification.getMessageId()); 608 } 609 } 610 611 @Override 612 public void gc() { 613 for (Subscription sub : subscriptions.values()) { 614 sub.gc(); 615 } 616 617 destinationsLock.readLock().lock(); 618 try { 619 for (Destination dest : destinations.values()) { 620 dest.gc(); 621 } 622 } finally { 623 destinationsLock.readLock().unlock(); 624 } 625 } 626 627 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; 628 629 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) 630 throws Exception { 631 return destinationFactory.createDestination(context, destination, destinationStatistics); 632 } 633 634 public boolean isAutoCreateDestinations() { 635 return autoCreateDestinations; 636 } 637 638 public void setAutoCreateDestinations(boolean autoCreateDestinations) { 639 this.autoCreateDestinations = autoCreateDestinations; 640 } 641 642 @Override 643 @SuppressWarnings("unchecked") 644 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 645 destinationsLock.readLock().lock(); 646 try { 647 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 648 dest.addProducer(context, info); 649 } 650 } finally { 651 destinationsLock.readLock().unlock(); 652 } 653 } 654 655 /** 656 * Removes a Producer. 657 * 658 * @param context 659 * the environment the operation is being executed under. 660 * @throws Exception 661 * TODO 662 */ 663 @Override 664 @SuppressWarnings("unchecked") 665 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 666 destinationsLock.readLock().lock(); 667 try { 668 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 669 dest.removeProducer(context, info); 670 } 671 } finally { 672 destinationsLock.readLock().unlock(); 673 } 674 } 675 676 protected void dispose(ConnectionContext context, Destination dest) throws Exception { 677 dest.dispose(context); 678 dest.stop(); 679 destinationFactory.removeDestination(dest); 680 } 681 682 @Override 683 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 684 Subscription sub = subscriptions.get(control.getConsumerId()); 685 if (sub != null && sub instanceof AbstractSubscription) { 686 ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch()); 687 if (broker.getDestinationPolicy() != null) { 688 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination()); 689 if (entry != null) { 690 entry.configurePrefetch(sub); 691 } 692 } 693 LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()}); 694 try { 695 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); 696 } catch (Exception e) { 697 LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: {}", control.getDestination(), e); 698 } 699 } 700 } 701 702 @Override 703 public void reapplyInterceptor() { 704 destinationsLock.writeLock().lock(); 705 try { 706 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 707 Map<ActiveMQDestination, Destination> map = getDestinationMap(); 708 for (ActiveMQDestination key : map.keySet()) { 709 Destination destination = map.get(key); 710 if (destination instanceof CompositeDestinationFilter) { 711 destination = ((CompositeDestinationFilter) destination).next; 712 } 713 if (destinationInterceptor != null) { 714 destination = destinationInterceptor.intercept(destination); 715 } 716 getDestinationMap().put(key, destination); 717 Destination prev = destinations.put(key, destination); 718 if (prev == null) { 719 updateRegionDestCounts(key, 1); 720 } 721 } 722 } finally { 723 destinationsLock.writeLock().unlock(); 724 } 725 } 726}