001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.Iterator; 020import java.util.Set; 021 022import javax.jms.JMSException; 023 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.broker.region.policy.PolicyEntry; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ConsumerInfo; 028import org.apache.activemq.command.MessageDispatchNotification; 029import org.apache.activemq.thread.TaskRunnerFactory; 030import org.apache.activemq.usage.SystemUsage; 031 032/** 033 * 034 * 035 */ 036public class QueueRegion extends AbstractRegion { 037 038 public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, 039 SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 040 DestinationFactory destinationFactory) { 041 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 042 } 043 044 public String toString() { 045 return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() 046 + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; 047 } 048 049 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) 050 throws JMSException { 051 ActiveMQDestination destination = info.getDestination(); 052 PolicyEntry entry = null; 053 if (destination != null && broker.getDestinationPolicy() != null) { 054 entry = broker.getDestinationPolicy().getEntryFor(destination); 055 056 } 057 if (info.isBrowser()) { 058 QueueBrowserSubscription sub = new QueueBrowserSubscription(broker,usageManager, context, info); 059 if (entry != null) { 060 entry.configure(broker, usageManager, sub); 061 } 062 return sub; 063 } else { 064 QueueSubscription sub = new QueueSubscription(broker, usageManager,context, info); 065 if (entry != null) { 066 entry.configure(broker, usageManager, sub); 067 } 068 return sub; 069 } 070 } 071 072 protected Set<ActiveMQDestination> getInactiveDestinations() { 073 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations(); 074 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) { 075 ActiveMQDestination dest = iter.next(); 076 if (!dest.isQueue()) { 077 iter.remove(); 078 } 079 } 080 return inactiveDestinations; 081 } 082 083 /* 084 * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till 085 * the notification to ensure that the subscription chosen by the master is used. 086 * 087 * (non-Javadoc) 088 * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification) 089 */ 090 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 091 processDispatchNotificationViaDestination(messageDispatchNotification); 092 } 093}