001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.Serializable;
020
021import javax.jms.JMSException;
022import javax.jms.Message;
023
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.command.MessageId;
026import org.apache.activemq.command.ProducerId;
027import org.apache.activemq.util.BitArrayBin;
028import org.apache.activemq.util.IdGenerator;
029import org.apache.activemq.util.LRUCache;
030
031/**
032 * Provides basic audit functions for Messages without sync
033 * 
034 * 
035 */
036public class ActiveMQMessageAuditNoSync implements Serializable {
037
038    private static final long serialVersionUID = 1L;
039
040    public static final int DEFAULT_WINDOW_SIZE = 2048;
041    public static final int MAXIMUM_PRODUCER_COUNT = 64;
042    private int auditDepth;
043    private int maximumNumberOfProducersToTrack;
044    private LRUCache<Object, BitArrayBin> map;
045
046    /**
047     * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
048     * 64
049     */
050    public ActiveMQMessageAuditNoSync() {
051        this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
052    }
053
054    /**
055     * Construct a MessageAudit
056     * 
057     * @param auditDepth range of ids to track
058     * @param maximumNumberOfProducersToTrack number of producers expected in
059     *                the system
060     */
061    public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
062        this.auditDepth = auditDepth;
063        this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
064        this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
065    }
066    
067    /**
068     * @return the auditDepth
069     */
070    public int getAuditDepth() {
071        return auditDepth;
072    }
073
074    /**
075     * @param auditDepth the auditDepth to set
076     */
077    public void setAuditDepth(int auditDepth) {
078        this.auditDepth = auditDepth;
079    }
080
081    /**
082     * @return the maximumNumberOfProducersToTrack
083     */
084    public int getMaximumNumberOfProducersToTrack() {
085        return maximumNumberOfProducersToTrack;
086    }
087
088    /**
089     * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
090     */
091    public void setMaximumNumberOfProducersToTrack(
092            int maximumNumberOfProducersToTrack) {
093        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
094        this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
095    }
096
097    /**
098     * Checks if this message has been seen before
099     * 
100     * @param message
101     * @return true if the message is a duplicate
102     * @throws JMSException
103     */
104    public boolean isDuplicate(Message message) throws JMSException {
105        return isDuplicate(message.getJMSMessageID());
106    }
107
108    /**
109     * checks whether this messageId has been seen before and adds this
110     * messageId to the list
111     * 
112     * @param id
113     * @return true if the message is a duplicate
114     */
115    public boolean isDuplicate(String id) {
116        boolean answer = false;
117        String seed = IdGenerator.getSeedFromId(id);
118        if (seed != null) {
119            BitArrayBin bab = map.get(seed);
120            if (bab == null) {
121                bab = new BitArrayBin(auditDepth);
122                map.put(seed, bab);
123            }
124            long index = IdGenerator.getSequenceFromId(id);
125            if (index >= 0) {
126                answer = bab.setBit(index, true);
127            }
128        }
129        return answer;
130    }
131
132    /**
133     * Checks if this message has been seen before
134     * 
135     * @param message
136     * @return true if the message is a duplicate
137     */
138    public boolean isDuplicate(final MessageReference message) {
139        MessageId id = message.getMessageId();
140        return isDuplicate(id);
141    }
142    
143    /**
144     * Checks if this messageId has been seen before
145     * 
146     * @param id
147     * @return true if the message is a duplicate
148     */
149    public boolean isDuplicate(final MessageId id) {
150        boolean answer = false;
151        
152        if (id != null) {
153            ProducerId pid = id.getProducerId();
154            if (pid != null) {
155                BitArrayBin bab = map.get(pid);
156                if (bab == null) {
157                    bab = new BitArrayBin(auditDepth);
158                    map.put(pid, bab);
159                }
160                answer = bab.setBit(id.getProducerSequenceId(), true);
161            }
162        }
163        return answer;
164    }
165
166    /**
167     * mark this message as being received
168     * 
169     * @param message
170     */
171    public void rollback(final MessageReference message) {
172        MessageId id = message.getMessageId();
173        rollback(id);
174    }
175    
176    /**
177     * mark this message as being received
178     * 
179     * @param id
180     */
181    public void rollback(final  MessageId id) {
182        if (id != null) {
183            ProducerId pid = id.getProducerId();
184            if (pid != null) {
185                BitArrayBin bab = map.get(pid);
186                if (bab != null) {
187                    bab.setBit(id.getProducerSequenceId(), false);
188                }
189            }
190        }
191    }
192
193    public void rollback(final String id) {
194        String seed = IdGenerator.getSeedFromId(id);
195        if (seed != null) {
196            BitArrayBin bab = map.get(seed);
197            if (bab != null) {
198                long index = IdGenerator.getSequenceFromId(id);
199                bab.setBit(index, false);
200            }
201        }
202    }
203    
204    /**
205     * Check the message is in order
206     * @param msg
207     * @return
208     * @throws JMSException
209     */
210    public boolean isInOrder(Message msg) throws JMSException {
211        return isInOrder(msg.getJMSMessageID());
212    }
213    
214    /**
215     * Check the message id is in order
216     * @param id
217     * @return
218     */
219    public boolean isInOrder(final String id) {
220        boolean answer = true;
221        
222        if (id != null) {
223            String seed = IdGenerator.getSeedFromId(id);
224            if (seed != null) {
225                BitArrayBin bab = map.get(seed);
226                if (bab != null) {
227                    long index = IdGenerator.getSequenceFromId(id);
228                    answer = bab.isInOrder(index);
229                }
230               
231            }
232        }
233        return answer;
234    }
235    
236    /**
237     * Check the MessageId is in order
238     * @param message 
239     * @return
240     */
241    public boolean isInOrder(final MessageReference message) {
242        return isInOrder(message.getMessageId());
243    }
244    
245    /**
246     * Check the MessageId is in order
247     * @param id
248     * @return
249     */
250    public boolean isInOrder(final MessageId id) {
251        boolean answer = false;
252
253        if (id != null) {
254            ProducerId pid = id.getProducerId();
255            if (pid != null) {
256                BitArrayBin bab = map.get(pid);
257                if (bab == null) {
258                    bab = new BitArrayBin(auditDepth);
259                    map.put(pid, bab);
260                }
261                answer = bab.isInOrder(id.getProducerSequenceId());
262
263            }
264        }
265        return answer;
266    }
267
268    public long getLastSeqId(ProducerId id) {
269        long result = -1;
270        BitArrayBin bab = map.get(id.toString());
271        if (bab != null) {
272            result = bab.getLastSetIndex();
273        }
274        return result;
275    }
276
277    public void clear() {
278        map.clear();
279    }
280}