001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.group; 018 019import java.util.HashMap; 020import java.util.Iterator; 021import java.util.Map; 022 023import org.apache.activemq.broker.region.Destination; 024import org.apache.activemq.broker.region.Subscription; 025import org.apache.activemq.command.ConsumerId; 026import org.apache.activemq.memory.LRUMap; 027 028/** 029 * A simple implementation which tracks every individual GroupID value in a LRUCache 030 * 031 * 032 */ 033public class CachedMessageGroupMap implements MessageGroupMap { 034 private final LRUMap<String, ConsumerId> cache; 035 private final int maximumCacheSize; 036 Destination destination; 037 038 CachedMessageGroupMap(int size){ 039 cache = new LRUMap<String, ConsumerId>(size) { 040 @Override 041 public boolean removeEldestEntry(final Map.Entry eldest) { 042 boolean remove = super.removeEldestEntry(eldest); 043 if (remove) { 044 if (destination != null) { 045 for (Subscription s : destination.getConsumers()) { 046 if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) { 047 s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination()); 048 break; 049 } 050 } 051 } 052 } 053 return remove; 054 } 055 }; 056 maximumCacheSize = size; 057 } 058 public synchronized void put(String groupId, ConsumerId consumerId) { 059 cache.put(groupId, consumerId); 060 } 061 062 public synchronized ConsumerId get(String groupId) { 063 return cache.get(groupId); 064 } 065 066 public synchronized ConsumerId removeGroup(String groupId) { 067 return cache.remove(groupId); 068 } 069 070 public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) { 071 SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet(); 072 Map<String,ConsumerId> map = new HashMap<String, ConsumerId>(); 073 map.putAll(cache); 074 for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) { 075 String group = iter.next(); 076 ConsumerId owner = map.get(group); 077 if (owner.equals(consumerId)) { 078 ownedGroups.add(group); 079 } 080 } 081 for (String group:ownedGroups.getUnderlyingSet()){ 082 cache.remove(group); 083 } 084 return ownedGroups; 085 } 086 087 088 @Override 089 public synchronized void removeAll(){ 090 cache.clear(); 091 if (destination != null) { 092 for (Subscription s : destination.getConsumers()) { 093 s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination()); 094 } 095 } 096 } 097 098 @Override 099 public synchronized Map<String, String> getGroups() { 100 Map<String,String> result = new HashMap<String,String>(); 101 for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){ 102 result.put(entry.getKey(),entry.getValue().toString()); 103 } 104 return result; 105 } 106 107 @Override 108 public String getType() { 109 return "cached"; 110 } 111 112 public int getMaximumCacheSize(){ 113 return maximumCacheSize; 114 } 115 116 public String toString() { 117 return "message groups: " + cache.size(); 118 } 119 120 public void setDestination(Destination destination) { 121 this.destination = destination; 122 } 123}