001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.util.HashMap; 030import java.util.concurrent.BlockingQueue; 031import java.util.concurrent.LinkedBlockingQueue; 032import java.util.concurrent.TimeUnit; 033 034import javax.net.ServerSocketFactory; 035 036import org.apache.activemq.Service; 037import org.apache.activemq.ThreadPriorities; 038import org.apache.activemq.command.BrokerInfo; 039import org.apache.activemq.openwire.OpenWireFormatFactory; 040import org.apache.activemq.transport.Transport; 041import org.apache.activemq.transport.TransportLoggerFactory; 042import org.apache.activemq.transport.TransportServer; 043import org.apache.activemq.transport.TransportServerThreadSupport; 044import org.apache.activemq.util.IOExceptionSupport; 045import org.apache.activemq.util.InetAddressUtil; 046import org.apache.activemq.util.IntrospectionSupport; 047import org.apache.activemq.util.ServiceListener; 048import org.apache.activemq.util.ServiceStopper; 049import org.apache.activemq.util.ServiceSupport; 050import org.apache.activemq.wireformat.WireFormat; 051import org.apache.activemq.wireformat.WireFormatFactory; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * A TCP based implementation of {@link TransportServer} 057 * 058 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 059 * 060 */ 061 062public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{ 063 064 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 065 protected ServerSocket serverSocket; 066 protected int backlog = 5000; 067 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 068 protected final TcpTransportFactory transportFactory; 069 protected long maxInactivityDuration = 30000; 070 protected long maxInactivityDurationInitalDelay = 10000; 071 protected int minmumWireFormatVersion; 072 protected boolean useQueueForAccept=true; 073 074 /** 075 * trace=true -> the Transport stack where this TcpTransport 076 * object will be, will have a TransportLogger layer 077 * trace=false -> the Transport stack where this TcpTransport 078 * object will be, will NOT have a TransportLogger layer, and therefore 079 * will never be able to print logging messages. 080 * This parameter is most probably set in Connection or TransportConnector URIs. 081 */ 082 protected boolean trace = false; 083 084 protected int soTimeout = 0; 085 protected int socketBufferSize = 64 * 1024; 086 protected int connectionTimeout = 30000; 087 088 /** 089 * Name of the LogWriter implementation to use. 090 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. 091 * This parameter is most probably set in Connection or TransportConnector URIs. 092 */ 093 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName; 094 /** 095 * Specifies if the TransportLogger will be manageable by JMX or not. 096 * Also, as long as there is at least 1 TransportLogger which is manageable, 097 * a TransportLoggerControl MBean will me created. 098 */ 099 protected boolean dynamicManagement = false; 100 /** 101 * startLogging=true -> the TransportLogger object of the Transport stack 102 * will initially write messages to the log. 103 * startLogging=false -> the TransportLogger object of the Transport stack 104 * will initially NOT write messages to the log. 105 * This parameter only has an effect if trace == true. 106 * This parameter is most probably set in Connection or TransportConnector URIs. 107 */ 108 protected boolean startLogging = true; 109 protected final ServerSocketFactory serverSocketFactory; 110 protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 111 protected Thread socketHandlerThread; 112 /** 113 * The maximum number of sockets allowed for this server 114 */ 115 protected int maximumConnections = Integer.MAX_VALUE; 116 protected int currentTransportCount=0; 117 118 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { 119 super(location); 120 this.transportFactory = transportFactory; 121 this.serverSocketFactory = serverSocketFactory; 122 123 } 124 125 public void bind() throws IOException { 126 URI bind = getBindLocation(); 127 128 String host = bind.getHost(); 129 host = (host == null || host.length() == 0) ? "localhost" : host; 130 InetAddress addr = InetAddress.getByName(host); 131 132 try { 133 134 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 135 configureServerSocket(this.serverSocket); 136 137 } catch (IOException e) { 138 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 139 } 140 try { 141 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind 142 .getFragment())); 143 } catch (URISyntaxException e) { 144 145 // it could be that the host name contains invalid characters such 146 // as _ on unix platforms 147 // so lets try use the IP address instead 148 try { 149 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); 150 } catch (URISyntaxException e2) { 151 throw IOExceptionSupport.create(e2); 152 } 153 } 154 } 155 156 private void configureServerSocket(ServerSocket socket) throws SocketException { 157 socket.setSoTimeout(2000); 158 if (transportOptions != null) { 159 IntrospectionSupport.setProperties(socket, transportOptions); 160 } 161 } 162 163 /** 164 * @return Returns the wireFormatFactory. 165 */ 166 public WireFormatFactory getWireFormatFactory() { 167 return wireFormatFactory; 168 } 169 170 /** 171 * @param wireFormatFactory The wireFormatFactory to set. 172 */ 173 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 174 this.wireFormatFactory = wireFormatFactory; 175 } 176 177 /** 178 * Associates a broker info with the transport server so that the transport 179 * can do discovery advertisements of the broker. 180 * 181 * @param brokerInfo 182 */ 183 public void setBrokerInfo(BrokerInfo brokerInfo) { 184 } 185 186 public long getMaxInactivityDuration() { 187 return maxInactivityDuration; 188 } 189 190 public void setMaxInactivityDuration(long maxInactivityDuration) { 191 this.maxInactivityDuration = maxInactivityDuration; 192 } 193 194 public long getMaxInactivityDurationInitalDelay() { 195 return this.maxInactivityDurationInitalDelay; 196 } 197 198 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 199 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 200 } 201 202 public int getMinmumWireFormatVersion() { 203 return minmumWireFormatVersion; 204 } 205 206 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 207 this.minmumWireFormatVersion = minmumWireFormatVersion; 208 } 209 210 public boolean isTrace() { 211 return trace; 212 } 213 214 public void setTrace(boolean trace) { 215 this.trace = trace; 216 } 217 218 public String getLogWriterName() { 219 return logWriterName; 220 } 221 222 public void setLogWriterName(String logFormat) { 223 this.logWriterName = logFormat; 224 } 225 226 public boolean isDynamicManagement() { 227 return dynamicManagement; 228 } 229 230 public void setDynamicManagement(boolean useJmx) { 231 this.dynamicManagement = useJmx; 232 } 233 234 public boolean isStartLogging() { 235 return startLogging; 236 } 237 238 239 public void setStartLogging(boolean startLogging) { 240 this.startLogging = startLogging; 241 } 242 243 /** 244 * @return the backlog 245 */ 246 public int getBacklog() { 247 return backlog; 248 } 249 250 /** 251 * @param backlog the backlog to set 252 */ 253 public void setBacklog(int backlog) { 254 this.backlog = backlog; 255 } 256 257 /** 258 * @return the useQueueForAccept 259 */ 260 public boolean isUseQueueForAccept() { 261 return useQueueForAccept; 262 } 263 264 /** 265 * @param useQueueForAccept the useQueueForAccept to set 266 */ 267 public void setUseQueueForAccept(boolean useQueueForAccept) { 268 this.useQueueForAccept = useQueueForAccept; 269 } 270 271 272 /** 273 * pull Sockets from the ServerSocket 274 */ 275 public void run() { 276 while (!isStopped()) { 277 Socket socket = null; 278 try { 279 socket = serverSocket.accept(); 280 if (socket != null) { 281 if (isStopped() || getAcceptListener() == null) { 282 socket.close(); 283 } else { 284 if (useQueueForAccept) { 285 socketQueue.put(socket); 286 }else { 287 handleSocket(socket); 288 } 289 } 290 } 291 } catch (SocketTimeoutException ste) { 292 // expect this to happen 293 } catch (Exception e) { 294 if (!isStopping()) { 295 onAcceptError(e); 296 } else if (!isStopped()) { 297 LOG.warn("run()", e); 298 onAcceptError(e); 299 } 300 } 301 } 302 } 303 304 /** 305 * Allow derived classes to override the Transport implementation that this 306 * transport server creates. 307 * 308 * @param socket 309 * @param format 310 * @return 311 * @throws IOException 312 */ 313 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 314 return new TcpTransport(format, socket); 315 } 316 317 /** 318 * @return pretty print of this 319 */ 320 public String toString() { 321 return "" + getBindLocation(); 322 } 323 324 /** 325 * @param socket 326 * @param inetAddress 327 * @return real hostName 328 * @throws UnknownHostException 329 */ 330 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 331 String result = null; 332 if (socket.isBound()) { 333 if (socket.getInetAddress().isAnyLocalAddress()) { 334 // make it more human readable and useful, an alternative to 0.0.0.0 335 result = InetAddressUtil.getLocalHostName(); 336 } else { 337 result = socket.getInetAddress().getCanonicalHostName(); 338 } 339 } else { 340 result = bindAddress.getCanonicalHostName(); 341 } 342 return result; 343 } 344 345 protected void doStart() throws Exception { 346 if(useQueueForAccept) { 347 Runnable run = new Runnable() { 348 public void run() { 349 try { 350 while (!isStopped() && !isStopping()) { 351 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 352 if (sock != null) { 353 handleSocket(sock); 354 } 355 } 356 357 } catch (InterruptedException e) { 358 LOG.info("socketQueue interuppted - stopping"); 359 if (!isStopping()) { 360 onAcceptError(e); 361 } 362 } 363 364 } 365 366 }; 367 socketHandlerThread = new Thread(null, run, 368 "ActiveMQ Transport Server Thread Handler: " + toString(), 369 getStackSize()); 370 socketHandlerThread.setDaemon(true); 371 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1); 372 socketHandlerThread.start(); 373 } 374 super.doStart(); 375 376 } 377 378 protected void doStop(ServiceStopper stopper) throws Exception { 379 super.doStop(stopper); 380 if (serverSocket != null) { 381 serverSocket.close(); 382 } 383 } 384 385 public InetSocketAddress getSocketAddress() { 386 return (InetSocketAddress)serverSocket.getLocalSocketAddress(); 387 } 388 389 protected final void handleSocket(Socket socket) { 390 try { 391 if (this.currentTransportCount >= this.maximumConnections) { 392 throw new ExceededMaximumConnectionsException("Exceeded the maximum " + 393 "number of allowed client connections. See the 'maximumConnections' " + 394 "property on the TCP transport configuration URI in the ActiveMQ " + 395 "configuration file (e.g., activemq.xml)"); 396 397 } else { 398 HashMap<String, Object> options = new HashMap<String, Object>(); 399 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 400 options.put("maxInactivityDurationInitalDelay", 401 Long.valueOf(maxInactivityDurationInitalDelay)); 402 options.put("minmumWireFormatVersion", 403 Integer.valueOf(minmumWireFormatVersion)); 404 options.put("trace", Boolean.valueOf(trace)); 405 options.put("soTimeout", Integer.valueOf(soTimeout)); 406 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 407 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 408 options.put("logWriterName", logWriterName); 409 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 410 options.put("startLogging", Boolean.valueOf(startLogging)); 411 options.putAll(transportOptions); 412 413 WireFormat format = wireFormatFactory.createWireFormat(); 414 Transport transport = createTransport(socket, format); 415 416 if (transport instanceof ServiceSupport) { 417 ((ServiceSupport) transport).addServiceListener(this); 418 } 419 420 Transport configuredTransport = 421 transportFactory.serverConfigure( transport, format, options); 422 423 getAcceptListener().onAccept(configuredTransport); 424 } 425 } catch (SocketTimeoutException ste) { 426 // expect this to happen 427 } catch (Exception e) { 428 if (!isStopping()) { 429 onAcceptError(e); 430 } else if (!isStopped()) { 431 LOG.warn("run()", e); 432 onAcceptError(e); 433 } 434 } 435 436 } 437 438 public int getSoTimeout() { 439 return soTimeout; 440 } 441 442 public void setSoTimeout(int soTimeout) { 443 this.soTimeout = soTimeout; 444 } 445 446 public int getSocketBufferSize() { 447 return socketBufferSize; 448 } 449 450 public void setSocketBufferSize(int socketBufferSize) { 451 this.socketBufferSize = socketBufferSize; 452 } 453 454 public int getConnectionTimeout() { 455 return connectionTimeout; 456 } 457 458 public void setConnectionTimeout(int connectionTimeout) { 459 this.connectionTimeout = connectionTimeout; 460 } 461 462 /** 463 * @return the maximumConnections 464 */ 465 public int getMaximumConnections() { 466 return maximumConnections; 467 } 468 469 /** 470 * @param maximumConnections the maximumConnections to set 471 */ 472 public void setMaximumConnections(int maximumConnections) { 473 this.maximumConnections = maximumConnections; 474 } 475 476 477 public void started(Service service) { 478 this.currentTransportCount++; 479 } 480 481 public void stopped(Service service) { 482 this.currentTransportCount--; 483 } 484}