001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.net.URI;
020import java.util.HashMap;
021
022import javax.jms.Connection;
023import javax.jms.JMSException;
024import javax.jms.XAConnection;
025import javax.jms.XASession;
026import javax.resource.NotSupportedException;
027import javax.resource.ResourceException;
028import javax.resource.spi.ActivationSpec;
029import javax.resource.spi.BootstrapContext;
030import javax.resource.spi.ResourceAdapterInternalException;
031import javax.resource.spi.endpoint.MessageEndpointFactory;
032import javax.transaction.xa.XAResource;
033
034import org.apache.activemq.ActiveMQConnection;
035import org.apache.activemq.ActiveMQConnectionFactory;
036import org.apache.activemq.RedeliveryPolicy;
037import org.apache.activemq.broker.BrokerFactory;
038import org.apache.activemq.broker.BrokerService;
039import org.apache.activemq.util.ServiceSupport;
040
041/**
042 * Knows how to connect to one ActiveMQ server. It can then activate endpoints
043 * and deliver messages to those end points using the connection configure in
044 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
045 * 
046 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
047 *                         description="The JCA Resource Adaptor for ActiveMQ"
048 * 
049 */
050public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter {
051
052    private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
053
054    private BootstrapContext bootstrapContext;
055    private String brokerXmlConfig;
056    private BrokerService broker;
057    private Thread brokerStartThread;
058    private ActiveMQConnectionFactory connectionFactory;
059    
060    /**
061     * 
062     */
063    public ActiveMQResourceAdapter() {
064        super();
065    }
066
067    /**
068     * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
069     */
070    public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
071        this.bootstrapContext = bootstrapContext;
072        if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
073            brokerStartThread = new Thread("Starting ActiveMQ Broker") {
074                @Override
075                public void run () {
076                    try {
077                        // ensure RAR resources are available to xbean (needed for weblogic)
078                        log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader());
079                        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
080                        log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader());
081                        
082                        synchronized( ActiveMQResourceAdapter.this ) {
083                            broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
084                        }
085                        broker.start();
086                    } catch (Throwable e) {
087                        log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
088                        log.debug("Reason for: "+e.getMessage(), e);
089                    }
090                }
091            };
092            brokerStartThread.setDaemon(true);
093            brokerStartThread.start();
094            
095            // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it..
096            try {
097                brokerStartThread.join(1000*5);
098            } catch (InterruptedException e) {
099                Thread.currentThread().interrupt();
100            }                
101        }
102    }
103
104    /**
105     * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection()
106     */
107    public ActiveMQConnection makeConnection() throws JMSException {
108        if( connectionFactory == null ) {
109            return makeConnection(getInfo());
110        } else {
111            return makeConnection(getInfo(), connectionFactory);
112        }
113    }
114
115    /**
116     * @param activationSpec
117     */
118    public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
119        ActiveMQConnectionFactory cf = getConnectionFactory();
120        if (cf == null) {
121            cf = createConnectionFactory(getInfo());
122        }
123        String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
124        String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
125        String clientId = activationSpec.getClientId();
126        if (clientId != null) {
127            cf.setClientID(clientId);
128        } else {
129            if (activationSpec.isDurableSubscription()) {
130                log.warn("No clientID specified for durable subscription: " + activationSpec);
131            }
132        }
133        ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password);
134
135        // have we configured a redelivery policy
136        RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
137        if (redeliveryPolicy != null) {
138            physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
139        }
140        return physicalConnection;
141    }
142
143    /**
144     * @see javax.resource.spi.ResourceAdapter#stop()
145     */
146    public void stop() {
147        while (endpointWorkers.size() > 0) {
148            ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
149            endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
150        }
151        
152        synchronized( this ) {
153            if (broker != null) {
154                if( brokerStartThread.isAlive() ) {
155                    brokerStartThread.interrupt();
156                }
157                ServiceSupport.dispose(broker);
158                broker = null;
159            }
160        }
161        
162        this.bootstrapContext = null;
163    }
164
165    /**
166     * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
167     */
168    public BootstrapContext getBootstrapContext() {
169        return bootstrapContext;
170    }
171
172    /**
173     * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
174     *      javax.resource.spi.ActivationSpec)
175     */
176    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException {
177
178        // spec section 5.3.3
179        if (!equals(activationSpec.getResourceAdapter())) {
180            throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
181        }
182
183        if (!(activationSpec instanceof MessageActivationSpec)) {
184            throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
185        }
186
187        ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
188        // This is weird.. the same endpoint activated twice.. must be a
189        // container error.
190        if (endpointWorkers.containsKey(key)) {
191            throw new IllegalStateException("Endpoint previously activated");
192        }
193
194        ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
195
196        endpointWorkers.put(key, worker);
197        worker.start();
198    }
199
200    /**
201     * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
202     *      javax.resource.spi.ActivationSpec)
203     */
204    public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
205
206        if (activationSpec instanceof MessageActivationSpec) {
207            ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
208            ActiveMQEndpointWorker worker = endpointWorkers.remove(key);
209            if (worker == null) {
210                // This is weird.. that endpoint was not activated.. oh well..
211                // this method
212                // does not throw exceptions so just return.
213                return;
214            }
215            try {
216                worker.stop();
217            } catch (InterruptedException e) {
218                // We interrupted.. we won't throw an exception but will stop
219                // waiting for the worker
220                // to stop.. we tried our best. Keep trying to interrupt the
221                // thread.
222                Thread.currentThread().interrupt();
223            }
224
225        }
226
227    }
228
229    /**
230     * We only connect to one resource manager per ResourceAdapter instance, so
231     * any ActivationSpec will return the same XAResource.
232     * 
233     * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
234     */
235    public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
236        Connection connection = null;
237        try {
238            connection = makeConnection();
239            if (connection instanceof XAConnection) {
240                XASession session = ((XAConnection)connection).createXASession();
241                XAResource xaResource = session.getXAResource();
242                return new XAResource[] {
243                    xaResource
244                };
245            }
246            return new XAResource[] {};
247        } catch (JMSException e) {
248            throw new ResourceException(e);
249        } finally {
250            try {
251                connection.close();
252            } catch (Throwable ignore) {
253                //
254            }
255        }
256    }
257
258    // ///////////////////////////////////////////////////////////////////////
259    //
260    // Java Bean getters and setters for this ResourceAdapter class.
261    //
262    // ///////////////////////////////////////////////////////////////////////
263
264    /**
265     * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
266     */
267    public String getBrokerXmlConfig() {
268        return brokerXmlConfig;
269    }
270
271    /**
272     * Sets the <a href="http://activemq.org/Xml+Configuration">XML
273     * configuration file </a> used to configure the ActiveMQ broker via Spring
274     * if using embedded mode.
275     * 
276     * @param brokerXmlConfig is the filename which is assumed to be on the
277     *                classpath unless a URL is specified. So a value of
278     *                <code>foo/bar.xml</code> would be assumed to be on the
279     *                classpath whereas <code>file:dir/file.xml</code> would
280     *                use the file system. Any valid URL string is supported.
281     */
282    public void setBrokerXmlConfig(String brokerXmlConfig) {
283        this.brokerXmlConfig = brokerXmlConfig;
284    }
285
286    /**
287     * @see java.lang.Object#equals(java.lang.Object)
288     */
289    @Override
290    public boolean equals(Object o) {
291        if (this == o) {
292            return true;
293        }
294        if (!(o instanceof MessageResourceAdapter)) {
295            return false;
296        }
297
298        final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o;
299
300        if (!getInfo().equals(activeMQResourceAdapter.getInfo())) {
301            return false;
302        }
303        if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) {
304            return false;
305        }
306
307        return true;
308    }
309
310    /**
311     * @see java.lang.Object#hashCode()
312     */
313    @Override
314    public int hashCode() {
315        int result;
316        result = getInfo().hashCode();
317        if (brokerXmlConfig != null) {
318            result ^= brokerXmlConfig.hashCode();
319        }
320        return result;
321    }
322
323    public ActiveMQConnectionFactory getConnectionFactory() {
324        return connectionFactory;
325    }
326
327    public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) {
328        this.connectionFactory = aConnectionFactory;
329    }
330
331
332    }