001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.vm; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025 026import org.apache.activemq.broker.BrokerFactory; 027import org.apache.activemq.broker.BrokerFactoryHandler; 028import org.apache.activemq.broker.BrokerRegistry; 029import org.apache.activemq.broker.BrokerService; 030import org.apache.activemq.broker.TransportConnector; 031import org.apache.activemq.transport.MarshallingTransportFilter; 032import org.apache.activemq.transport.Transport; 033import org.apache.activemq.transport.TransportFactory; 034import org.apache.activemq.transport.TransportServer; 035import org.apache.activemq.util.IOExceptionSupport; 036import org.apache.activemq.util.IntrospectionSupport; 037import org.apache.activemq.util.ServiceSupport; 038import org.apache.activemq.util.URISupport; 039import org.apache.activemq.util.URISupport.CompositeData; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.slf4j.MDC; 043 044public class VMTransportFactory extends TransportFactory { 045 046 public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>(); 047 public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>(); 048 public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>(); 049 private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class); 050 051 BrokerFactoryHandler brokerFactoryHandler; 052 053 public Transport doConnect(URI location) throws Exception { 054 return VMTransportServer.configure(doCompositeConnect(location)); 055 } 056 057 public Transport doCompositeConnect(URI location) throws Exception { 058 URI brokerURI; 059 String host; 060 Map<String, String> options; 061 boolean create = true; 062 int waitForStart = -1; 063 CompositeData data = URISupport.parseComposite(location); 064 if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) { 065 brokerURI = data.getComponents()[0]; 066 CompositeData brokerData = URISupport.parseComposite(brokerURI); 067 host = (String)brokerData.getParameters().get("brokerName"); 068 if (host == null) { 069 host = "localhost"; 070 } 071 if (brokerData.getPath() != null) { 072 host = brokerData.getPath(); 073 } 074 options = data.getParameters(); 075 location = new URI("vm://" + host); 076 } else { 077 // If using the less complex vm://localhost?broker.persistent=true 078 // form 079 try { 080 host = extractHost(location); 081 options = URISupport.parseParameters(location); 082 String config = (String)options.remove("brokerConfig"); 083 if (config != null) { 084 brokerURI = new URI(config); 085 } else { 086 Map brokerOptions = IntrospectionSupport.extractProperties(options, "broker."); 087 brokerURI = new URI("broker://()/" + host + "?" 088 + URISupport.createQueryString(brokerOptions)); 089 } 090 if ("false".equals(options.remove("create"))) { 091 create = false; 092 } 093 String waitForStartString = options.remove("waitForStart"); 094 if (waitForStartString != null) { 095 waitForStart = Integer.parseInt(waitForStartString); 096 } 097 } catch (URISyntaxException e1) { 098 throw IOExceptionSupport.create(e1); 099 } 100 location = new URI("vm://" + host); 101 } 102 if (host == null) { 103 host = "localhost"; 104 } 105 VMTransportServer server = SERVERS.get(host); 106 // validate the broker is still active 107 if (!validateBroker(host) || server == null) { 108 BrokerService broker = null; 109 // Synchronize on the registry so that multiple concurrent threads 110 // doing this do not think that the broker has not been created and 111 // cause multiple brokers to be started. 112 synchronized (BrokerRegistry.getInstance().getRegistryMutext()) { 113 broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart); 114 if (broker == null) { 115 if (!create) { 116 throw new IOException("Broker named '" + host + "' does not exist."); 117 } 118 try { 119 if (brokerFactoryHandler != null) { 120 broker = brokerFactoryHandler.createBroker(brokerURI); 121 } else { 122 broker = BrokerFactory.createBroker(brokerURI); 123 } 124 broker.start(); 125 MDC.put("activemq.broker", broker.getBrokerName()); 126 } catch (URISyntaxException e) { 127 throw IOExceptionSupport.create(e); 128 } 129 BROKERS.put(host, broker); 130 BrokerRegistry.getInstance().getRegistryMutext().notifyAll(); 131 } 132 133 server = SERVERS.get(host); 134 if (server == null) { 135 server = (VMTransportServer)bind(location, true); 136 TransportConnector connector = new TransportConnector(server); 137 connector.setBrokerService(broker); 138 connector.setUri(location); 139 connector.setTaskRunnerFactory(broker.getTaskRunnerFactory()); 140 connector.start(); 141 CONNECTORS.put(host, connector); 142 } 143 144 } 145 } 146 147 VMTransport vmtransport = server.connect(); 148 IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options)); 149 IntrospectionSupport.setProperties(vmtransport, options); 150 Transport transport = vmtransport; 151 if (vmtransport.isMarshal()) { 152 Map<String, String> optionsCopy = new HashMap<String, String>(options); 153 transport = new MarshallingTransportFilter(transport, createWireFormat(options), 154 createWireFormat(optionsCopy)); 155 } 156 if (!options.isEmpty()) { 157 throw new IllegalArgumentException("Invalid connect parameters: " + options); 158 } 159 return transport; 160 } 161 162 private static String extractHost(URI location) { 163 String host = location.getHost(); 164 if (host == null || host.length() == 0) { 165 host = location.getAuthority(); 166 if (host == null || host.length() == 0) { 167 host = "localhost"; 168 } 169 } 170 return host; 171 } 172 173/** 174 * @param registry 175 * @param brokerName 176 * @param waitForStart - time in milliseconds to wait for a broker to appear 177 * @return 178 */ 179 private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) { 180 BrokerService broker = null; 181 synchronized(registry.getRegistryMutext()) { 182 broker = registry.lookup(brokerName); 183 if (broker == null && waitForStart > 0) { 184 final long expiry = System.currentTimeMillis() + waitForStart; 185 while (broker == null && expiry > System.currentTimeMillis()) { 186 long timeout = Math.max(0, expiry - System.currentTimeMillis()); 187 try { 188 LOG.debug("waiting for broker named: " + brokerName + " to start"); 189 registry.getRegistryMutext().wait(timeout); 190 } catch (InterruptedException ignored) { 191 } 192 broker = registry.lookup(brokerName); 193 } 194 } 195 } 196 return broker; 197 } 198 199 public TransportServer doBind(URI location) throws IOException { 200 return bind(location, false); 201 } 202 203 /** 204 * @param location 205 * @return the TransportServer 206 * @throws IOException 207 */ 208 private TransportServer bind(URI location, boolean dispose) throws IOException { 209 String host = extractHost(location); 210 LOG.debug("binding to broker: " + host); 211 VMTransportServer server = new VMTransportServer(location, dispose); 212 Object currentBoundValue = SERVERS.get(host); 213 if (currentBoundValue != null) { 214 throw new IOException("VMTransportServer already bound at: " + location); 215 } 216 SERVERS.put(host, server); 217 return server; 218 } 219 220 public static void stopped(VMTransportServer server) { 221 String host = extractHost(server.getBindURI()); 222 stopped(host); 223 } 224 225 public static void stopped(String host) { 226 SERVERS.remove(host); 227 TransportConnector connector = CONNECTORS.remove(host); 228 if (connector != null) { 229 LOG.debug("Shutting down VM connectors for broker: " + host); 230 ServiceSupport.dispose(connector); 231 BrokerService broker = BROKERS.remove(host); 232 if (broker != null) { 233 ServiceSupport.dispose(broker); 234 } 235 MDC.remove("activemq.broker"); 236 } 237 } 238 239 public BrokerFactoryHandler getBrokerFactoryHandler() { 240 return brokerFactoryHandler; 241 } 242 243 public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) { 244 this.brokerFactoryHandler = brokerFactoryHandler; 245 } 246 247 private boolean validateBroker(String host) { 248 boolean result = true; 249 if (BROKERS.containsKey(host) || SERVERS.containsKey(host) || CONNECTORS.containsKey(host)) { 250 // check the broker is still in the BrokerRegistry 251 TransportConnector connector = CONNECTORS.get(host); 252 if (BrokerRegistry.getInstance().lookup(host) == null 253 || (connector != null && connector.getBroker().isStopped())) { 254 result = false; 255 // clean-up 256 BROKERS.remove(host); 257 SERVERS.remove(host); 258 if (connector != null) { 259 CONNECTORS.remove(host); 260 if (connector != null) { 261 ServiceSupport.dispose(connector); 262 } 263 } 264 } 265 } 266 return result; 267 } 268}