001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transaction;
018
019import java.io.IOException;
020import javax.transaction.xa.XAException;
021import javax.transaction.xa.XAResource;
022import org.apache.activemq.TransactionContext;
023import org.apache.activemq.broker.TransactionBroker;
024import org.apache.activemq.command.ConnectionId;
025import org.apache.activemq.command.TransactionId;
026import org.apache.activemq.command.XATransactionId;
027import org.apache.activemq.store.TransactionStore;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * 
033 */
034public class XATransaction extends Transaction {
035
036    private static final Logger LOG = LoggerFactory.getLogger(XATransaction.class);
037
038    private final TransactionStore transactionStore;
039    private final XATransactionId xid;
040    private final TransactionBroker broker;
041    private final ConnectionId connectionId;
042
043    public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) {
044        this.transactionStore = transactionStore;
045        this.xid = xid;
046        this.broker = broker;
047        this.connectionId = connectionId;
048        if (LOG.isDebugEnabled()) {
049            LOG.debug("XA Transaction new/begin : " + xid);
050        }
051    }
052
053    @Override
054    public void commit(boolean onePhase) throws XAException, IOException {
055        if (LOG.isDebugEnabled()) {
056            LOG.debug("XA Transaction commit onePhase:" + onePhase + ", xid: " + xid);
057        }
058
059        switch (getState()) {
060        case START_STATE:
061            // 1 phase commit, no work done.
062            checkForPreparedState(onePhase);
063            setStateFinished();
064            break;
065        case IN_USE_STATE:
066            // 1 phase commit, work done.
067            checkForPreparedState(onePhase);
068            doPrePrepare();
069            setStateFinished();
070            storeCommit(getTransactionId(), false, preCommitTask, postCommitTask);
071            break;
072        case PREPARED_STATE:
073            // 2 phase commit, work done.
074            // We would record commit here.
075            setStateFinished();
076            storeCommit(getTransactionId(), true, preCommitTask, postCommitTask);
077            break;
078        default:
079            illegalStateTransition("commit");
080        }
081    }
082
083    private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit)
084            throws XAException, IOException {
085        try {
086            transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask);
087            waitPostCommitDone(postCommitTask);
088        } catch (XAException xae) {
089            throw xae;
090        } catch (Throwable t) {
091            LOG.warn("Store COMMIT FAILED: ", t);
092            rollback();
093            XAException xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBOTHER);
094            xae.initCause(t);
095            throw xae;
096        }
097    }
098
099    private void illegalStateTransition(String callName) throws XAException {
100        XAException xae = newXAException("Cannot call " + callName + " now.", XAException.XAER_PROTO);
101        throw xae;
102    }
103
104    private void checkForPreparedState(boolean onePhase) throws XAException {
105        if (!onePhase) {
106            XAException xae = newXAException("Cannot do 2 phase commit if the transaction has not been prepared", XAException.XAER_PROTO);
107            throw xae;
108        }
109    }
110
111    private void doPrePrepare() throws XAException, IOException {
112        try {
113            prePrepare();
114        } catch (XAException e) {
115            throw e;
116        } catch (Throwable e) {
117            LOG.warn("PRE-PREPARE FAILED: ", e);
118            rollback();
119            XAException xae = newXAException("PRE-PREPARE FAILED: Transaction rolled back", XAException.XA_RBOTHER);
120            xae.initCause(e);
121            throw xae;
122        }
123    }
124
125    @Override
126    public void rollback() throws XAException, IOException {
127
128        if (LOG.isDebugEnabled()) {
129            LOG.debug("XA Transaction rollback: " + xid);
130        }
131
132        switch (getState()) {
133        case START_STATE:
134            // 1 phase rollback no work done.
135            setStateFinished();
136            break;
137        case IN_USE_STATE:
138            // 1 phase rollback work done.
139            setStateFinished();
140            transactionStore.rollback(getTransactionId());
141            doPostRollback();
142            break;
143        case PREPARED_STATE:
144            // 2 phase rollback work done.
145            setStateFinished();
146            transactionStore.rollback(getTransactionId());
147            doPostRollback();
148            break;
149        case FINISHED_STATE:
150            // failure to commit
151            transactionStore.rollback(getTransactionId());
152            doPostRollback();
153            break;
154        default:
155            throw newXAException("Invalid state: " + getState(), XAException.XA_RBPROTO);
156        }
157
158    }
159
160    private void doPostRollback() throws XAException {
161        try {
162            fireAfterRollback();
163        } catch (Throwable e) {
164            // I guess this could happen. Post commit task failed
165            // to execute properly.
166            LOG.warn("POST ROLLBACK FAILED: ", e);
167            XAException xae = newXAException("POST ROLLBACK FAILED", XAException.XAER_RMERR);
168            xae.initCause(e);
169            throw xae;
170        }
171    }
172
173    @Override
174    public int prepare() throws XAException, IOException {
175        if (LOG.isDebugEnabled()) {
176            LOG.debug("XA Transaction prepare: " + xid);
177        }
178
179        switch (getState()) {
180        case START_STATE:
181            // No work done.. no commit/rollback needed.
182            setStateFinished();
183            return XAResource.XA_RDONLY;
184        case IN_USE_STATE:
185            // We would record prepare here.
186            doPrePrepare();
187            setState(Transaction.PREPARED_STATE);
188            transactionStore.prepare(getTransactionId());
189            return XAResource.XA_OK;
190        default:
191            illegalStateTransition("prepare");
192            return XAResource.XA_RDONLY;
193        }
194    }
195
196    private void setStateFinished() {
197        setState(Transaction.FINISHED_STATE);
198        broker.removeTransaction(xid);
199    }
200
201    public ConnectionId getConnectionId() {
202        return connectionId;
203    }
204
205    @Override
206    public TransactionId getTransactionId() {
207        return xid;
208    }
209    
210    @Override
211    public Logger getLog() {
212        return LOG;
213    }
214
215    public XATransactionId getXid() {
216        return xid;
217    }
218}