001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import javax.jms.Connection; 023import javax.jms.ConnectionConsumer; 024import javax.jms.ConnectionMetaData; 025import javax.jms.Destination; 026import javax.jms.ExceptionListener; 027import javax.jms.IllegalStateException; 028import javax.jms.JMSException; 029import javax.jms.Queue; 030import javax.jms.QueueConnection; 031import javax.jms.QueueSession; 032import javax.jms.ServerSessionPool; 033import javax.jms.Session; 034import javax.jms.Topic; 035import javax.jms.TopicConnection; 036import javax.jms.TopicSession; 037import org.apache.activemq.ActiveMQQueueSession; 038import org.apache.activemq.ActiveMQSession; 039import org.apache.activemq.ActiveMQTopicSession; 040 041/** 042 * Acts as a pass through proxy for a JMS Connection object. It intercepts 043 * events that are of interest of the ActiveMQManagedConnection. 044 * 045 * 046 */ 047public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener { 048 049 private ActiveMQManagedConnection managedConnection; 050 private final List<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>(); 051 private ExceptionListener exceptionListener; 052 053 public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) { 054 this.managedConnection = managedConnection; 055 } 056 057 /** 058 * Used to let the ActiveMQManagedConnection that this connection handel is 059 * not needed by the app. 060 * 061 * @throws JMSException 062 */ 063 public void close() throws JMSException { 064 if (managedConnection != null) { 065 managedConnection.proxyClosedEvent(this); 066 } 067 } 068 069 /** 070 * Called by the ActiveMQManagedConnection to invalidate this proxy. 071 */ 072 public void cleanup() { 073 exceptionListener = null; 074 managedConnection = null; 075 synchronized (sessions) { 076 for (ManagedSessionProxy p : sessions) { 077 try { 078 //TODO is this dangerous? should we copy the list before iterating? 079 p.cleanup(); 080 } catch (JMSException ignore) { 081 } 082 } 083 sessions.clear(); 084 } 085 } 086 087 /** 088 * @return "physical" underlying activemq connection, if proxy is associated with a managed connection 089 * @throws javax.jms.JMSException if managed connection is null 090 */ 091 private Connection getConnection() throws JMSException { 092 if (managedConnection == null) { 093 throw new IllegalStateException("The Connection is closed"); 094 } 095 return managedConnection.getPhysicalConnection(); 096 } 097 098 /** 099 * @param transacted Whether session is transacted 100 * @param acknowledgeMode session acknowledge mode 101 * @return session proxy 102 * @throws JMSException on error 103 */ 104 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 105 return createSessionProxy(transacted, acknowledgeMode); 106 } 107 108 /** 109 * @param transacted Whether session is transacted 110 * @param acknowledgeMode session acknowledge mode 111 * @return session proxy 112 * @throws JMSException on error 113 */ 114 private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException { 115 if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) { 116 acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 117 } 118 ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode); 119 ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext()); 120 session.setTransactionContext(txContext); 121 ManagedSessionProxy p = new ManagedSessionProxy(session, this); 122 p.setUseSharedTxContext(managedConnection.isInManagedTx()); 123 synchronized (sessions) { 124 sessions.add(p); 125 } 126 return p; 127 } 128 129 protected void sessionClosed(ManagedSessionProxy session) { 130 synchronized (sessions) { 131 sessions.remove(session); 132 } 133 } 134 135 public void setUseSharedTxContext(boolean enable) throws JMSException { 136 synchronized (sessions) { 137 for (ManagedSessionProxy p : sessions) { 138 p.setUseSharedTxContext(enable); 139 } 140 } 141 } 142 143 /** 144 * @param transacted Whether session is transacted 145 * @param acknowledgeMode session acknowledge mode 146 * @return session proxy 147 * @throws JMSException on error 148 */ 149 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 150 return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode)); 151 } 152 153 /** 154 * @param transacted Whether session is transacted 155 * @param acknowledgeMode session acknowledge mode 156 * @return session proxy 157 * @throws JMSException on error 158 */ 159 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 160 return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode)); 161 } 162 163 /** 164 * @return client id from delegate 165 * @throws JMSException 166 */ 167 public String getClientID() throws JMSException { 168 return getConnection().getClientID(); 169 } 170 171 /** 172 * @return exception listener from delegate 173 * @throws JMSException 174 */ 175 public ExceptionListener getExceptionListener() throws JMSException { 176 return getConnection().getExceptionListener(); 177 } 178 179 /** 180 * @return connection metadata from delegate 181 * @throws JMSException 182 */ 183 public ConnectionMetaData getMetaData() throws JMSException { 184 return getConnection().getMetaData(); 185 } 186 187 /** 188 * Sets client id on delegate 189 * @param clientID new clientId 190 * @throws JMSException 191 */ 192 public void setClientID(String clientID) throws JMSException { 193 getConnection().setClientID(clientID); 194 } 195 196 /** 197 * sets exception listener on delegate 198 * @param listener new listener 199 * @throws JMSException 200 */ 201 public void setExceptionListener(ExceptionListener listener) throws JMSException { 202 getConnection(); 203 exceptionListener = listener; 204 } 205 206 /** 207 * @throws JMSException 208 */ 209 public void start() throws JMSException { 210 getConnection().start(); 211 } 212 213 /** 214 * @throws JMSException 215 */ 216 public void stop() throws JMSException { 217 getConnection().stop(); 218 } 219 220 /** 221 * @param queue 222 * @param messageSelector 223 * @param sessionPool 224 * @param maxMessages 225 * @return 226 * @throws JMSException 227 */ 228 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 229 throw new JMSException("Not Supported."); 230 } 231 232 /** 233 * @param topic 234 * @param messageSelector 235 * @param sessionPool 236 * @param maxMessages 237 * @return 238 * @throws JMSException 239 */ 240 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 241 throw new JMSException("Not Supported."); 242 } 243 244 /** 245 * @param destination 246 * @param messageSelector 247 * @param sessionPool 248 * @param maxMessages 249 * @return 250 * @throws JMSException 251 */ 252 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 253 throw new JMSException("Not Supported."); 254 } 255 256 /** 257 * @param topic 258 * @param subscriptionName 259 * @param messageSelector 260 * @param sessionPool 261 * @param maxMessages 262 * @return 263 * @throws JMSException 264 */ 265 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 266 throw new JMSException("Not Supported."); 267 } 268 269 /** 270 * @return Returns the managedConnection. 271 */ 272 public ActiveMQManagedConnection getManagedConnection() { 273 return managedConnection; 274 } 275 276 public void onException(JMSException e) { 277 if (exceptionListener != null && managedConnection != null) { 278 try { 279 exceptionListener.onException(e); 280 } catch (Throwable ignore) { 281 // We can never trust user code so ignore any exceptions. 282 } 283 } 284 } 285 286}