001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.LinkedHashMap; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028import org.apache.activemq.ActiveMQMessageAudit; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.ActiveMQTopic; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.command.MessageAck; 034import org.apache.activemq.command.MessageId; 035import org.apache.activemq.command.SubscriptionInfo; 036import org.apache.activemq.store.MessageRecoveryListener; 037import org.apache.activemq.store.TopicMessageStore; 038import org.apache.activemq.util.ByteSequence; 039import org.apache.activemq.util.IOExceptionSupport; 040import org.apache.activemq.wireformat.WireFormat; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * 046 */ 047public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { 048 049 private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class); 050 private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>(); 051 052 public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE"; 053 private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty( 054 PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10); 055 private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock(); 056 private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() { 057 protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) { 058 return size() > SEQUENCE_ID_CACHE_SIZE; 059 } 060 }; 061 062 063 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException { 064 super(persistenceAdapter, adapter, wireFormat, topic, audit); 065 } 066 067 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { 068 if (ack != null && ack.isUnmatchedAck()) { 069 if (LOG.isTraceEnabled()) { 070 LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks."); 071 } 072 return; 073 } 074 TransactionContext c = persistenceAdapter.getTransactionContext(context); 075 try { 076 long[] res = getCachedStoreSequenceId(c, destination, messageId); 077 if (this.isPrioritizedMessages()) { 078 adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]); 079 } else { 080 adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]); 081 } 082 if (LOG.isTraceEnabled()) { 083 LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId); 084 } 085 } catch (SQLException e) { 086 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 087 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e); 088 } finally { 089 c.close(); 090 } 091 } 092 093 private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException { 094 long[] val = null; 095 sequenceIdCacheSizeLock.readLock().lock(); 096 try { 097 val = sequenceIdCache.get(messageId); 098 } finally { 099 sequenceIdCacheSizeLock.readLock().unlock(); 100 } 101 if (val == null) { 102 val = adapter.getStoreSequenceId(transactionContext, destination, messageId); 103 } 104 return val; 105 } 106 107 /** 108 * @throws Exception 109 */ 110 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 111 TransactionContext c = persistenceAdapter.getTransactionContext(); 112 try { 113 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() { 114 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 115 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 116 msg.getMessageId().setBrokerSequenceId(sequenceId); 117 return listener.recoverMessage(msg); 118 } 119 120 public boolean recoverMessageReference(String reference) throws Exception { 121 return listener.recoverMessageReference(new MessageId(reference)); 122 } 123 124 }); 125 } catch (SQLException e) { 126 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 127 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); 128 } finally { 129 c.close(); 130 } 131 } 132 133 private class LastRecovered implements Iterable<LastRecoveredEntry> { 134 LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10]; 135 LastRecovered() { 136 for (int i=0; i<perPriority.length; i++) { 137 perPriority[i] = new LastRecoveredEntry(i); 138 } 139 } 140 141 public void updateStored(long sequence, int priority) { 142 perPriority[priority].stored = sequence; 143 } 144 145 public LastRecoveredEntry defaultPriority() { 146 return perPriority[javax.jms.Message.DEFAULT_PRIORITY]; 147 } 148 149 public String toString() { 150 return Arrays.deepToString(perPriority); 151 } 152 153 public Iterator<LastRecoveredEntry> iterator() { 154 return new PriorityIterator(); 155 } 156 157 class PriorityIterator implements Iterator<LastRecoveredEntry> { 158 int current = 9; 159 public boolean hasNext() { 160 for (int i=current; i>=0; i--) { 161 if (perPriority[i].hasMessages()) { 162 current = i; 163 return true; 164 } 165 } 166 return false; 167 } 168 169 public LastRecoveredEntry next() { 170 return perPriority[current]; 171 } 172 173 public void remove() { 174 throw new RuntimeException("not implemented"); 175 } 176 } 177 } 178 179 private class LastRecoveredEntry { 180 final int priority; 181 long recovered = 0; 182 long stored = Integer.MAX_VALUE; 183 184 public LastRecoveredEntry(int priority) { 185 this.priority = priority; 186 } 187 188 public String toString() { 189 return priority + "-" + stored + ":" + recovered; 190 } 191 192 public void exhausted() { 193 stored = recovered; 194 } 195 196 public boolean hasMessages() { 197 return stored > recovered; 198 } 199 } 200 201 class LastRecoveredAwareListener implements JDBCMessageRecoveryListener { 202 final MessageRecoveryListener delegate; 203 final int maxMessages; 204 LastRecoveredEntry lastRecovered; 205 int recoveredCount; 206 int recoveredMarker; 207 208 public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) { 209 this.delegate = delegate; 210 this.maxMessages = maxMessages; 211 } 212 213 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 214 if (delegate.hasSpace() && recoveredCount < maxMessages) { 215 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 216 msg.getMessageId().setBrokerSequenceId(sequenceId); 217 lastRecovered.recovered = sequenceId; 218 if (delegate.recoverMessage(msg)) { 219 recoveredCount++; 220 return true; 221 } 222 } 223 return false; 224 } 225 226 public boolean recoverMessageReference(String reference) throws Exception { 227 return delegate.recoverMessageReference(new MessageId(reference)); 228 } 229 230 public void setLastRecovered(LastRecoveredEntry lastRecovered) { 231 this.lastRecovered = lastRecovered; 232 recoveredMarker = recoveredCount; 233 } 234 235 public boolean complete() { 236 return !delegate.hasSpace() || recoveredCount == maxMessages; 237 } 238 239 public boolean stalled() { 240 return recoveredMarker == recoveredCount; 241 } 242 } 243 244 public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) 245 throws Exception { 246 //Duration duration = new Duration("recoverNextMessages"); 247 TransactionContext c = persistenceAdapter.getTransactionContext(); 248 249 String key = getSubscriptionKey(clientId, subscriptionName); 250 if (!subscriberLastRecoveredMap.containsKey(key)) { 251 subscriberLastRecoveredMap.put(key, new LastRecovered()); 252 } 253 final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key); 254 LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned); 255 try { 256 if (LOG.isTraceEnabled()) { 257 LOG.trace(key + " existing last recovered: " + lastRecovered); 258 } 259 if (isPrioritizedMessages()) { 260 Iterator<LastRecoveredEntry> it = lastRecovered.iterator(); 261 for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) { 262 LastRecoveredEntry entry = it.next(); 263 recoveredAwareListener.setLastRecovered(entry); 264 //Duration microDuration = new Duration("recoverNextMessages:loop"); 265 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, 266 entry.recovered, entry.priority, maxReturned, recoveredAwareListener); 267 //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount)); 268 if (recoveredAwareListener.stalled()) { 269 if (recoveredAwareListener.complete()) { 270 break; 271 } else { 272 entry.exhausted(); 273 } 274 } 275 } 276 } else { 277 LastRecoveredEntry last = lastRecovered.defaultPriority(); 278 recoveredAwareListener.setLastRecovered(last); 279 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, 280 last.recovered, 0, maxReturned, recoveredAwareListener); 281 } 282 if (LOG.isTraceEnabled()) { 283 LOG.trace(key + " last recovered: " + lastRecovered); 284 } 285 //duration.end(); 286 } catch (SQLException e) { 287 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 288 } finally { 289 c.close(); 290 } 291 } 292 293 public void resetBatching(String clientId, String subscriptionName) { 294 subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName)); 295 } 296 297 protected void onAdd(MessageId messageId, long sequenceId, byte priority) { 298 // update last recovered state 299 for (LastRecovered last : subscriberLastRecoveredMap.values()) { 300 last.updateStored(sequenceId, priority); 301 } 302 sequenceIdCacheSizeLock.writeLock().lock(); 303 try { 304 sequenceIdCache.put(messageId, new long[]{sequenceId, priority}); 305 } finally { 306 sequenceIdCacheSizeLock.writeLock().unlock(); 307 } 308 } 309 310 311 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 312 TransactionContext c = persistenceAdapter.getTransactionContext(); 313 try { 314 c = persistenceAdapter.getTransactionContext(); 315 adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages()); 316 } catch (SQLException e) { 317 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 318 throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e); 319 } finally { 320 c.close(); 321 } 322 } 323 324 /** 325 * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String, 326 * String) 327 */ 328 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 329 TransactionContext c = persistenceAdapter.getTransactionContext(); 330 try { 331 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName); 332 } catch (SQLException e) { 333 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 334 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e); 335 } finally { 336 c.close(); 337 } 338 } 339 340 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 341 TransactionContext c = persistenceAdapter.getTransactionContext(); 342 try { 343 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName); 344 } catch (SQLException e) { 345 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 346 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e); 347 } finally { 348 c.close(); 349 resetBatching(clientId, subscriptionName); 350 } 351 } 352 353 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 354 TransactionContext c = persistenceAdapter.getTransactionContext(); 355 try { 356 return adapter.doGetAllSubscriptions(c, destination); 357 } catch (SQLException e) { 358 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 359 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e); 360 } finally { 361 c.close(); 362 } 363 } 364 365 public int getMessageCount(String clientId, String subscriberName) throws IOException { 366 //Duration duration = new Duration("getMessageCount"); 367 int result = 0; 368 TransactionContext c = persistenceAdapter.getTransactionContext(); 369 try { 370 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages()); 371 } catch (SQLException e) { 372 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 373 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); 374 } finally { 375 c.close(); 376 } 377 if (LOG.isTraceEnabled()) { 378 LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result); 379 } 380 //duration.end(); 381 return result; 382 } 383 384 protected String getSubscriptionKey(String clientId, String subscriberName) { 385 String result = clientId + ":"; 386 result += subscriberName != null ? subscriberName : "NOT_SET"; 387 return result; 388 } 389 390}