001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import org.apache.activemq.broker.region.Destination; 020import org.apache.activemq.broker.region.Region; 021import org.apache.activemq.command.Message; 022import org.apache.activemq.command.MessageId; 023import org.apache.activemq.state.ProducerState; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import java.io.IOException; 028import java.util.concurrent.atomic.AtomicLong; 029 030/** 031 * Holds internal state in the broker for a MessageProducer 032 * 033 * 034 */ 035public class ProducerBrokerExchange { 036 037 private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class); 038 private ConnectionContext connectionContext; 039 private Destination regionDestination; 040 private Region region; 041 private ProducerState producerState; 042 private boolean mutable = true; 043 private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); 044 private boolean auditProducerSequenceIds; 045 private boolean isNetworkProducer; 046 private BrokerService brokerService; 047 048 public ProducerBrokerExchange() { 049 } 050 051 public ProducerBrokerExchange copy() { 052 ProducerBrokerExchange rc = new ProducerBrokerExchange(); 053 rc.connectionContext = connectionContext.copy(); 054 rc.regionDestination = regionDestination; 055 rc.region = region; 056 rc.producerState = producerState; 057 rc.mutable = mutable; 058 return rc; 059 } 060 061 062 /** 063 * @return the connectionContext 064 */ 065 public ConnectionContext getConnectionContext() { 066 return this.connectionContext; 067 } 068 069 /** 070 * @param connectionContext the connectionContext to set 071 */ 072 public void setConnectionContext(ConnectionContext connectionContext) { 073 this.connectionContext = connectionContext; 074 } 075 076 /** 077 * @return the mutable 078 */ 079 public boolean isMutable() { 080 return this.mutable; 081 } 082 083 /** 084 * @param mutable the mutable to set 085 */ 086 public void setMutable(boolean mutable) { 087 this.mutable = mutable; 088 } 089 090 /** 091 * @return the regionDestination 092 */ 093 public Destination getRegionDestination() { 094 return this.regionDestination; 095 } 096 097 /** 098 * @param regionDestination the regionDestination to set 099 */ 100 public void setRegionDestination(Destination regionDestination) { 101 this.regionDestination = regionDestination; 102 } 103 104 /** 105 * @return the region 106 */ 107 public Region getRegion() { 108 return this.region; 109 } 110 111 /** 112 * @param region the region to set 113 */ 114 public void setRegion(Region region) { 115 this.region = region; 116 } 117 118 /** 119 * @return the producerState 120 */ 121 public ProducerState getProducerState() { 122 return this.producerState; 123 } 124 125 /** 126 * @param producerState the producerState to set 127 */ 128 public void setProducerState(ProducerState producerState) { 129 this.producerState = producerState; 130 } 131 132 /** 133 * Enforce duplicate suppression using info from persistence adapter 134 * @param messageSend 135 * @return false if message should be ignored as a duplicate 136 */ 137 public boolean canDispatch(Message messageSend) { 138 boolean canDispatch = true; 139 if (auditProducerSequenceIds && messageSend.isPersistent()) { 140 final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId(); 141 if (isNetworkProducer) { 142 // messages are multiplexed on this producer so we need to query the persistenceAdapter 143 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId()); 144 if (producerSequenceId <= lastStoredForMessageProducer) { 145 canDispatch = false; 146 if (LOG.isDebugEnabled()) { 147 LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] from network producer with producerSequenceId [" 148 + producerSequenceId + "] less than last stored: " + lastStoredForMessageProducer); 149 } 150 } 151 } else if (producerSequenceId <= lastSendSequenceNumber.get()) { 152 canDispatch = false; 153 if (LOG.isDebugEnabled()) { 154 LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] with producerSequenceId [" 155 + producerSequenceId + "] less than last stored: " + lastSendSequenceNumber); 156 } 157 } else { 158 // track current so we can suppress duplicates later in the stream 159 lastSendSequenceNumber.set(producerSequenceId); 160 } 161 } 162 return canDispatch; 163 } 164 165 private long getStoredSequenceIdForMessage(MessageId messageId) { 166 try { 167 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 168 } catch (IOException ignored) { 169 LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored); 170 } 171 return -1; 172 } 173 174 public void setLastStoredSequenceId(long l) { 175 auditProducerSequenceIds = true; 176 if (connectionContext.isNetworkConnection()) { 177 brokerService = connectionContext.getBroker().getBrokerService(); 178 isNetworkProducer = true; 179 } 180 lastSendSequenceNumber.set(l); 181 LOG.debug("last stored sequence id set: " + l); 182 } 183}