001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transaction;
018
019import java.io.IOException;
020import javax.transaction.xa.XAException;
021import org.apache.activemq.broker.ConnectionContext;
022import org.apache.activemq.command.LocalTransactionId;
023import org.apache.activemq.command.TransactionId;
024import org.apache.activemq.store.TransactionStore;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * 
030 */
031public class LocalTransaction extends Transaction {
032
033    private static final Logger LOG = LoggerFactory.getLogger(LocalTransaction.class);
034
035    private final TransactionStore transactionStore;
036    private final LocalTransactionId xid;
037    private final ConnectionContext context;
038
039    public LocalTransaction(TransactionStore transactionStore, LocalTransactionId xid, ConnectionContext context) {
040        this.transactionStore = transactionStore;
041        this.xid = xid;
042        this.context = context;
043    }
044
045    @Override
046    public void commit(boolean onePhase) throws XAException, IOException {
047        if (LOG.isDebugEnabled()) {
048            LOG.debug("commit: "  + xid
049                    + " syncCount: " + size());
050        }
051        
052        // Get ready for commit.
053        try {
054            prePrepare();
055        } catch (XAException e) {
056            throw e;
057        } catch (Throwable e) {
058            LOG.warn("COMMIT FAILED: ", e);
059            rollback();
060            // Let them know we rolled back.
061            XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
062            xae.errorCode = XAException.XA_RBOTHER;
063            xae.initCause(e);
064            throw xae;
065        }
066
067        setState(Transaction.FINISHED_STATE);
068        context.getTransactions().remove(xid);
069        // Sync on transaction store to avoid out of order messages in the cursor
070        // https://issues.apache.org/activemq/browse/AMQ-2594
071        try {
072            transactionStore.commit(getTransactionId(), false,preCommitTask, postCommitTask);
073            this.waitPostCommitDone(postCommitTask);
074        } catch (Throwable t) {
075            LOG.warn("Store COMMIT FAILED: ", t);
076            rollback();
077            XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
078            xae.errorCode = XAException.XA_RBOTHER;
079            xae.initCause(t);
080            throw xae;
081        }
082    }
083
084    @Override
085    public void rollback() throws XAException, IOException {
086
087        if (LOG.isDebugEnabled()) {
088            LOG.debug("rollback: "  + xid
089                    + " syncCount: " + size());
090        }
091        setState(Transaction.FINISHED_STATE);
092        context.getTransactions().remove(xid);
093        // Sync on transaction store to avoid out of order messages in the cursor
094        // https://issues.apache.org/activemq/browse/AMQ-2594
095        synchronized (transactionStore) {
096           transactionStore.rollback(getTransactionId());
097
098            try {
099                fireAfterRollback();
100            } catch (Throwable e) {
101                LOG.warn("POST ROLLBACK FAILED: ", e);
102                XAException xae = new XAException("POST ROLLBACK FAILED");
103                xae.errorCode = XAException.XAER_RMERR;
104                xae.initCause(e);
105                throw xae;
106            }
107        }
108    }
109
110    @Override
111    public int prepare() throws XAException {
112        XAException xae = new XAException("Prepare not implemented on Local Transactions.");
113        xae.errorCode = XAException.XAER_RMERR;
114        throw xae;
115    }
116
117    @Override
118    public TransactionId getTransactionId() {
119        return xid;
120    }
121    
122    @Override
123    public Logger getLog() {
124        return LOG;
125    }
126}