001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.store.amq;
019
020import java.io.IOException;
021import java.util.Iterator;
022import java.util.LinkedHashMap;
023import java.util.Map;
024import javax.transaction.xa.XAException;
025import org.apache.activemq.command.JournalTopicAck;
026import org.apache.activemq.command.JournalTransaction;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.command.MessageAck;
029import org.apache.activemq.command.TransactionId;
030import org.apache.activemq.command.XATransactionId;
031import org.apache.activemq.kaha.impl.async.Location;
032import org.apache.activemq.store.TransactionRecoveryListener;
033import org.apache.activemq.store.TransactionStore;
034
035/**
036 */
037public class AMQTransactionStore implements TransactionStore {
038
039    protected Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId, AMQTx>();
040    Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId, AMQTx>();
041
042    private final AMQPersistenceAdapter peristenceAdapter;
043    private boolean doingRecover;
044
045    public AMQTransactionStore(AMQPersistenceAdapter adapter) {
046        this.peristenceAdapter = adapter;
047    }
048
049    /**
050     * @throws IOException
051     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
052     */
053    public void prepare(TransactionId txid) throws IOException {
054        AMQTx tx = null;
055        synchronized (inflightTransactions) {
056            tx = inflightTransactions.remove(txid);
057        }
058        if (tx == null) {
059            return;
060        }
061        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
062        synchronized (preparedTransactions) {
063            preparedTransactions.put(txid, tx);
064        }
065    }
066
067    /**
068     * @throws IOException
069     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
070     */
071    public void replayPrepare(TransactionId txid) throws IOException {
072        AMQTx tx = null;
073        synchronized (inflightTransactions) {
074            tx = inflightTransactions.remove(txid);
075        }
076        if (tx == null) {
077            return;
078        }
079        synchronized (preparedTransactions) {
080            preparedTransactions.put(txid, tx);
081        }
082    }
083
084    public AMQTx getTx(TransactionId txid, Location location) {
085        AMQTx tx = null;
086        synchronized (inflightTransactions) {
087            tx = inflightTransactions.get(txid);
088            if (tx == null) {
089                tx = new AMQTx(location);
090                inflightTransactions.put(txid, tx);
091            }
092        }
093        return tx;
094    }
095
096    /**
097     * @throws XAException
098     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
099     */
100    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
101        if (preCommit != null) {
102            preCommit.run();
103        }
104        AMQTx tx;
105        if (wasPrepared) {
106            synchronized (preparedTransactions) {
107                tx = preparedTransactions.remove(txid);
108            }
109        } else {
110            synchronized (inflightTransactions) {
111                tx = inflightTransactions.remove(txid);
112            }
113        }
114        if (tx == null) {
115            if (postCommit != null) {
116                postCommit.run();
117            }
118            return;
119        }
120        if (txid.isXATransaction()) {
121            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), true,true);
122        } else {
123            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
124        }
125        if (postCommit != null) {
126            postCommit.run();
127        }
128    }
129
130    /**
131     * @throws XAException
132     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
133     */
134    public AMQTx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
135        if (wasPrepared) {
136            synchronized (preparedTransactions) {
137                return preparedTransactions.remove(txid);
138            }
139        } else {
140            synchronized (inflightTransactions) {
141                return inflightTransactions.remove(txid);
142            }
143        }
144    }
145
146    /**
147     * @throws IOException
148     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
149     */
150    public void rollback(TransactionId txid) throws IOException {
151        AMQTx tx = null;
152        synchronized (inflightTransactions) {
153            tx = inflightTransactions.remove(txid);
154        }
155        if (tx != null) {
156            synchronized (preparedTransactions) {
157                tx = preparedTransactions.remove(txid);
158            }
159        }
160        if (tx != null) {
161            if (txid.isXATransaction()) {
162                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), true,true);
163            } else {
164                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), true,true);
165            }
166        }
167    }
168
169    /**
170     * @throws IOException
171     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
172     */
173    public void replayRollback(TransactionId txid) throws IOException {
174        boolean inflight = false;
175        synchronized (inflightTransactions) {
176            inflight = inflightTransactions.remove(txid) != null;
177        }
178        if (inflight) {
179            synchronized (preparedTransactions) {
180                preparedTransactions.remove(txid);
181            }
182        }
183    }
184
185    public void start() throws Exception {
186    }
187
188    public void stop() throws Exception {
189    }
190
191    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
192        // All the in-flight transactions get rolled back..
193        synchronized (inflightTransactions) {
194            inflightTransactions.clear();
195        }
196        this.doingRecover = true;
197        try {
198            Map<TransactionId, AMQTx> txs = null;
199            synchronized (preparedTransactions) {
200                txs = new LinkedHashMap<TransactionId, AMQTx>(preparedTransactions);
201            }
202            for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
203                Object txid = iter.next();
204                AMQTx tx = txs.get(txid);
205                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
206            }
207        } finally {
208            this.doingRecover = false;
209        }
210    }
211
212    /**
213     * @param message
214     * @throws IOException
215     */
216    void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
217        AMQTx tx = getTx(message.getTransactionId(), location);
218        tx.add(store, message, location);
219    }
220
221    /**
222     * @param ack
223     * @throws IOException
224     */
225    public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
226        AMQTx tx = getTx(ack.getTransactionId(), location);
227        tx.add(store, ack);
228    }
229
230    public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
231        AMQTx tx = getTx(ack.getTransactionId(), location);
232        tx.add(store, ack);
233    }
234
235    public Location checkpoint() throws IOException {
236        // Nothing really to checkpoint.. since, we don't
237        // checkpoint tx operations in to long term store until they are
238        // committed.
239        // But we keep track of the first location of an operation
240        // that was associated with an active tx. The journal can not
241        // roll over active tx records.
242        Location minimumLocationInUse = null;
243        synchronized (inflightTransactions) {
244            for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
245                AMQTx tx = iter.next();
246                Location location = tx.getLocation();
247                if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
248                    minimumLocationInUse = location;
249                }
250            }
251        }
252        synchronized (preparedTransactions) {
253            for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
254                AMQTx tx = iter.next();
255                Location location = tx.getLocation();
256                if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
257                    minimumLocationInUse = location;
258                }
259            }
260            return minimumLocationInUse;
261        }
262    }
263
264    public boolean isDoingRecover() {
265        return doingRecover;
266    }
267
268    /**
269     * @return the preparedTransactions
270     */
271    public Map<TransactionId, AMQTx> getPreparedTransactions() {
272        return this.preparedTransactions;
273    }
274
275    /**
276     * @param preparedTransactions the preparedTransactions to set
277     */
278    public void setPreparedTransactions(Map<TransactionId, AMQTx> preparedTransactions) {
279        if (preparedTransactions != null) {
280            this.preparedTransactions.clear();
281            this.preparedTransactions.putAll(preparedTransactions);
282        }
283    }
284}