001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadaptor; 018 019import java.io.IOException; 020import java.util.Iterator; 021import java.util.Map; 022import java.util.concurrent.ConcurrentHashMap; 023import org.apache.activemq.broker.ConnectionContext; 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.Message; 026import org.apache.activemq.command.MessageAck; 027import org.apache.activemq.command.MessageId; 028import org.apache.activemq.command.SubscriptionInfo; 029import org.apache.activemq.kaha.ListContainer; 030import org.apache.activemq.kaha.MapContainer; 031import org.apache.activemq.kaha.Marshaller; 032import org.apache.activemq.kaha.Store; 033import org.apache.activemq.kaha.StoreEntry; 034import org.apache.activemq.store.MessageRecoveryListener; 035import org.apache.activemq.store.TopicMessageStore; 036 037/** 038 * 039 */ 040public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore { 041 042 protected ListContainer<TopicSubAck> ackContainer; 043 protected Map<Object, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<Object, TopicSubContainer>(); 044 private Map<String, SubscriptionInfo> subscriberContainer; 045 private Store store; 046 047 public KahaTopicMessageStore(Store store, MapContainer<MessageId, Message> messageContainer, 048 ListContainer<TopicSubAck> ackContainer, MapContainer<String, SubscriptionInfo> subsContainer, 049 ActiveMQDestination destination) throws IOException { 050 super(messageContainer, destination); 051 this.store = store; 052 this.ackContainer = ackContainer; 053 subscriberContainer = subsContainer; 054 // load all the Ack containers 055 for (Iterator<String> i = subscriberContainer.keySet().iterator(); i.hasNext();) { 056 Object key = i.next(); 057 addSubscriberMessageContainer(key); 058 } 059 } 060 061 @Override 062 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 063 int subscriberCount = subscriberMessages.size(); 064 if (subscriberCount > 0) { 065 MessageId id = message.getMessageId(); 066 StoreEntry messageEntry = messageContainer.place(id, message); 067 TopicSubAck tsa = new TopicSubAck(); 068 tsa.setCount(subscriberCount); 069 tsa.setMessageEntry(messageEntry); 070 StoreEntry ackEntry = ackContainer.placeLast(tsa); 071 for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) { 072 TopicSubContainer container = i.next(); 073 ConsumerMessageRef ref = new ConsumerMessageRef(); 074 ref.setAckEntry(ackEntry); 075 ref.setMessageEntry(messageEntry); 076 ref.setMessageId(id); 077 container.add(ref); 078 } 079 } 080 } 081 082 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 083 MessageId messageId, MessageAck ack) throws IOException { 084 String subcriberId = getSubscriptionKey(clientId, subscriptionName); 085 TopicSubContainer container = subscriberMessages.get(subcriberId); 086 if (container != null) { 087 ConsumerMessageRef ref = container.remove(messageId); 088 if (container.isEmpty()) { 089 container.reset(); 090 } 091 if (ref != null) { 092 TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); 093 if (tsa != null) { 094 if (tsa.decrementCount() <= 0) { 095 StoreEntry entry = ref.getAckEntry(); 096 entry = ackContainer.refresh(entry); 097 ackContainer.remove(entry); 098 entry = tsa.getMessageEntry(); 099 entry = messageContainer.refresh(entry); 100 messageContainer.remove(entry); 101 } else { 102 ackContainer.update(ref.getAckEntry(), tsa); 103 } 104 } 105 } 106 } 107 } 108 109 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 110 return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName)); 111 } 112 113 public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { 114 String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); 115 // if already exists - won't add it again as it causes data files 116 // to hang around 117 if (!subscriberContainer.containsKey(key)) { 118 subscriberContainer.put(key, info); 119 } 120 // add the subscriber 121 addSubscriberMessageContainer(key); 122 /* 123 * if(retroactive){ for(StoreEntry 124 * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ 125 * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); 126 * ConsumerMessageRef ref=new ConsumerMessageRef(); 127 * ref.setAckEntry(entry); ref.setMessageEntry(tsa.getMessageEntry()); 128 * container.add(ref); } } 129 */ 130 } 131 132 public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException { 133 String key = getSubscriptionKey(clientId, subscriptionName); 134 removeSubscriberMessageContainer(key); 135 } 136 137 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 138 throws Exception { 139 String key = getSubscriptionKey(clientId, subscriptionName); 140 TopicSubContainer container = subscriberMessages.get(key); 141 if (container != null) { 142 for (Iterator i = container.iterator(); i.hasNext();) { 143 ConsumerMessageRef ref = (ConsumerMessageRef)i.next(); 144 Message msg = messageContainer.get(ref.getMessageEntry()); 145 if (msg != null) { 146 if (!recoverMessage(listener, msg)) { 147 break; 148 } 149 } 150 } 151 } 152 } 153 154 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 155 MessageRecoveryListener listener) throws Exception { 156 String key = getSubscriptionKey(clientId, subscriptionName); 157 TopicSubContainer container = subscriberMessages.get(key); 158 if (container != null) { 159 int count = 0; 160 StoreEntry entry = container.getBatchEntry(); 161 if (entry == null) { 162 entry = container.getEntry(); 163 } else { 164 entry = container.refreshEntry(entry); 165 if (entry != null) { 166 entry = container.getNextEntry(entry); 167 } 168 } 169 if (entry != null) { 170 do { 171 ConsumerMessageRef consumerRef = container.get(entry); 172 Message msg = messageContainer.getValue(consumerRef.getMessageEntry()); 173 if (msg != null) { 174 recoverMessage(listener, msg); 175 count++; 176 container.setBatchEntry(msg.getMessageId().toString(), entry); 177 } else { 178 container.reset(); 179 } 180 181 entry = container.getNextEntry(entry); 182 } while (entry != null && count < maxReturned && listener.hasSpace()); 183 } 184 } 185 } 186 187 public synchronized void delete() { 188 super.delete(); 189 ackContainer.clear(); 190 subscriberContainer.clear(); 191 } 192 193 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 194 return subscriberContainer.values() 195 .toArray(new SubscriptionInfo[subscriberContainer.size()]); 196 } 197 198 protected String getSubscriptionKey(String clientId, String subscriberName) { 199 String result = clientId + ":"; 200 result += subscriberName != null ? subscriberName : "NOT_SET"; 201 return result; 202 } 203 204 protected MapContainer addSubscriberMessageContainer(Object key) throws IOException { 205 MapContainer container = store.getMapContainer(key, "topic-subs"); 206 container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER); 207 Marshaller marshaller = new ConsumerMessageRefMarshaller(); 208 container.setValueMarshaller(marshaller); 209 TopicSubContainer tsc = new TopicSubContainer(container); 210 subscriberMessages.put(key, tsc); 211 return container; 212 } 213 214 protected synchronized void removeSubscriberMessageContainer(Object key) 215 throws IOException { 216 subscriberContainer.remove(key); 217 TopicSubContainer container = subscriberMessages.remove(key); 218 if (container != null) { 219 for (Iterator i = container.iterator(); i.hasNext();) { 220 ConsumerMessageRef ref = (ConsumerMessageRef) i.next(); 221 if (ref != null) { 222 TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); 223 if (tsa != null) { 224 if (tsa.decrementCount() <= 0) { 225 ackContainer.remove(ref.getAckEntry()); 226 messageContainer.remove(tsa.getMessageEntry()); 227 } else { 228 ackContainer.update(ref.getAckEntry(), tsa); 229 } 230 } 231 } 232 } 233 container.clear(); 234 } 235 store.deleteListContainer(key, "topic-subs"); 236 237 } 238 239 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException { 240 String key = getSubscriptionKey(clientId, subscriberName); 241 TopicSubContainer container = subscriberMessages.get(key); 242 return container != null ? container.size() : 0; 243 } 244 245 /** 246 * @param context 247 * @throws IOException 248 * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext) 249 */ 250 public synchronized void removeAllMessages(ConnectionContext context) throws IOException { 251 messageContainer.clear(); 252 ackContainer.clear(); 253 for (Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) { 254 TopicSubContainer container = i.next(); 255 container.clear(); 256 } 257 } 258 259 public synchronized void resetBatching(String clientId, String subscriptionName) { 260 String key = getSubscriptionKey(clientId, subscriptionName); 261 TopicSubContainer topicSubContainer = subscriberMessages.get(key); 262 if (topicSubContainer != null) { 263 topicSubContainer.reset(); 264 } 265 } 266}