001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.EOFException; 020import java.io.IOException; 021import java.net.SocketException; 022import java.net.URI; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Properties; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.CopyOnWriteArrayList; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.concurrent.atomic.AtomicReference; 039import java.util.concurrent.locks.ReentrantReadWriteLock; 040 041import javax.transaction.xa.XAResource; 042 043import org.apache.activemq.advisory.AdvisoryBroker; 044import org.apache.activemq.advisory.AdvisorySupport; 045import org.apache.activemq.broker.region.ConnectionStatistics; 046import org.apache.activemq.broker.region.DurableTopicSubscription; 047import org.apache.activemq.broker.region.RegionBroker; 048import org.apache.activemq.broker.region.Subscription; 049import org.apache.activemq.broker.region.TopicRegion; 050import org.apache.activemq.command.ActiveMQDestination; 051import org.apache.activemq.command.BrokerInfo; 052import org.apache.activemq.command.BrokerSubscriptionInfo; 053import org.apache.activemq.command.Command; 054import org.apache.activemq.command.CommandTypes; 055import org.apache.activemq.command.ConnectionControl; 056import org.apache.activemq.command.ConnectionError; 057import org.apache.activemq.command.ConnectionId; 058import org.apache.activemq.command.ConnectionInfo; 059import org.apache.activemq.command.ConsumerControl; 060import org.apache.activemq.command.ConsumerId; 061import org.apache.activemq.command.ConsumerInfo; 062import org.apache.activemq.command.ControlCommand; 063import org.apache.activemq.command.DataArrayResponse; 064import org.apache.activemq.command.DestinationInfo; 065import org.apache.activemq.command.ExceptionResponse; 066import org.apache.activemq.command.FlushCommand; 067import org.apache.activemq.command.IntegerResponse; 068import org.apache.activemq.command.KeepAliveInfo; 069import org.apache.activemq.command.Message; 070import org.apache.activemq.command.MessageAck; 071import org.apache.activemq.command.MessageDispatch; 072import org.apache.activemq.command.MessageDispatchNotification; 073import org.apache.activemq.command.MessagePull; 074import org.apache.activemq.command.ProducerAck; 075import org.apache.activemq.command.ProducerId; 076import org.apache.activemq.command.ProducerInfo; 077import org.apache.activemq.command.RemoveInfo; 078import org.apache.activemq.command.RemoveSubscriptionInfo; 079import org.apache.activemq.command.Response; 080import org.apache.activemq.command.SessionId; 081import org.apache.activemq.command.SessionInfo; 082import org.apache.activemq.command.ShutdownInfo; 083import org.apache.activemq.command.TransactionId; 084import org.apache.activemq.command.TransactionInfo; 085import org.apache.activemq.command.WireFormatInfo; 086import org.apache.activemq.network.DemandForwardingBridge; 087import org.apache.activemq.network.MBeanNetworkListener; 088import org.apache.activemq.network.NetworkBridgeConfiguration; 089import org.apache.activemq.network.NetworkBridgeFactory; 090import org.apache.activemq.network.NetworkConnector; 091import org.apache.activemq.security.MessageAuthorizationPolicy; 092import org.apache.activemq.state.CommandVisitor; 093import org.apache.activemq.state.ConnectionState; 094import org.apache.activemq.state.ConsumerState; 095import org.apache.activemq.state.ProducerState; 096import org.apache.activemq.state.SessionState; 097import org.apache.activemq.state.TransactionState; 098import org.apache.activemq.thread.Task; 099import org.apache.activemq.thread.TaskRunner; 100import org.apache.activemq.thread.TaskRunnerFactory; 101import org.apache.activemq.transaction.Transaction; 102import org.apache.activemq.transport.DefaultTransportListener; 103import org.apache.activemq.transport.ResponseCorrelator; 104import org.apache.activemq.transport.TransmitCallback; 105import org.apache.activemq.transport.Transport; 106import org.apache.activemq.transport.TransportDisposedIOException; 107import org.apache.activemq.util.IntrospectionSupport; 108import org.apache.activemq.util.MarshallingSupport; 109import org.apache.activemq.util.NetworkBridgeUtils; 110import org.apache.activemq.util.SubscriptionKey; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113import org.slf4j.MDC; 114 115public class TransportConnection implements Connection, Task, CommandVisitor { 116 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 117 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 118 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 119 // Keeps track of the broker and connector that created this connection. 120 protected final Broker broker; 121 protected final BrokerService brokerService; 122 protected final TransportConnector connector; 123 // Keeps track of the state of the connections. 124 // protected final ConcurrentHashMap localConnectionStates=new 125 // ConcurrentHashMap(); 126 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 127 // The broker and wireformat info that was exchanged. 128 protected BrokerInfo brokerInfo; 129 protected final List<Command> dispatchQueue = new LinkedList<>(); 130 protected TaskRunner taskRunner; 131 protected final AtomicReference<Throwable> transportException = new AtomicReference<>(); 132 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 133 private final Transport transport; 134 private MessageAuthorizationPolicy messageAuthorizationPolicy; 135 private WireFormatInfo wireFormatInfo; 136 // Used to do async dispatch.. this should perhaps be pushed down into the 137 // transport layer.. 138 private boolean inServiceException; 139 private final ConnectionStatistics statistics = new ConnectionStatistics(); 140 private boolean manageable; 141 private boolean slow; 142 private boolean markedCandidate; 143 private boolean blockedCandidate; 144 private boolean blocked; 145 private boolean connected; 146 private boolean active; 147 private final AtomicBoolean starting = new AtomicBoolean(); 148 private final AtomicBoolean pendingStop = new AtomicBoolean(); 149 private long timeStamp; 150 private final AtomicBoolean stopping = new AtomicBoolean(false); 151 private final CountDownLatch stopped = new CountDownLatch(1); 152 private final AtomicBoolean asyncException = new AtomicBoolean(false); 153 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>(); 154 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>(); 155 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 156 private ConnectionContext context; 157 private boolean networkConnection; 158 private boolean faultTolerantConnection; 159 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 160 private DemandForwardingBridge duplexBridge; 161 private final TaskRunnerFactory taskRunnerFactory; 162 private final TaskRunnerFactory stopTaskRunnerFactory; 163 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 164 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 165 private String duplexNetworkConnectorId; 166 167 /** 168 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport 169 * else commands are sent async. 170 * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection. 171 */ 172 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 173 TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) { 174 this.connector = connector; 175 this.broker = broker; 176 this.brokerService = broker.getBrokerService(); 177 178 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 179 brokerConnectionStates = rb.getConnectionStates(); 180 if (connector != null) { 181 this.statistics.setParent(connector.getStatistics()); 182 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 183 } 184 this.taskRunnerFactory = taskRunnerFactory; 185 this.stopTaskRunnerFactory = stopTaskRunnerFactory; 186 this.transport = transport; 187 if( this.transport instanceof BrokerServiceAware ) { 188 ((BrokerServiceAware)this.transport).setBrokerService(brokerService); 189 } 190 this.transport.setTransportListener(new DefaultTransportListener() { 191 @Override 192 public void onCommand(Object o) { 193 serviceLock.readLock().lock(); 194 try { 195 if (!(o instanceof Command)) { 196 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 197 } 198 Command command = (Command) o; 199 if (!brokerService.isStopping()) { 200 Response response = service(command); 201 if (response != null && !brokerService.isStopping()) { 202 dispatchSync(response); 203 } 204 } else { 205 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 206 } 207 } finally { 208 serviceLock.readLock().unlock(); 209 } 210 } 211 212 @Override 213 public void onException(IOException exception) { 214 serviceLock.readLock().lock(); 215 try { 216 serviceTransportException(exception); 217 } finally { 218 serviceLock.readLock().unlock(); 219 } 220 } 221 }); 222 connected = true; 223 } 224 225 /** 226 * Returns the number of messages to be dispatched to this connection 227 * 228 * @return size of dispatch queue 229 */ 230 @Override 231 public int getDispatchQueueSize() { 232 synchronized (dispatchQueue) { 233 return dispatchQueue.size(); 234 } 235 } 236 237 public void serviceTransportException(IOException e) { 238 if (!stopping.get() && !pendingStop.get()) { 239 transportException.set(e); 240 if (TRANSPORTLOG.isDebugEnabled()) { 241 TRANSPORTLOG.debug(this + " failed: " + e, e); 242 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { 243 TRANSPORTLOG.warn(this + " failed: " + e); 244 } 245 stopAsync(e); 246 } 247 } 248 249 private boolean expected(IOException e) { 250 return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); 251 } 252 253 private boolean isStomp() { 254 URI uri = connector.getUri(); 255 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1; 256 } 257 258 /** 259 * Calls the serviceException method in an async thread. Since handling a 260 * service exception closes a socket, we should not tie up broker threads 261 * since client sockets may hang or cause deadlocks. 262 */ 263 @Override 264 public void serviceExceptionAsync(final IOException e) { 265 if (asyncException.compareAndSet(false, true)) { 266 new Thread("Async Exception Handler") { 267 @Override 268 public void run() { 269 serviceException(e); 270 } 271 }.start(); 272 } 273 } 274 275 /** 276 * Closes a clients connection due to a detected error. Errors are ignored 277 * if: the client is closing or broker is closing. Otherwise, the connection 278 * error transmitted to the client before stopping it's transport. 279 */ 280 @Override 281 public void serviceException(Throwable e) { 282 // are we a transport exception such as not being able to dispatch 283 // synchronously to a transport 284 if (e instanceof IOException) { 285 serviceTransportException((IOException) e); 286 } else if (e.getClass() == BrokerStoppedException.class) { 287 // Handle the case where the broker is stopped 288 // But the client is still connected. 289 if (!stopping.get()) { 290 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 291 ConnectionError ce = new ConnectionError(); 292 ce.setException(e); 293 dispatchSync(ce); 294 // Record the error that caused the transport to stop 295 transportException.set(e); 296 // Wait a little bit to try to get the output buffer to flush 297 // the exception notification to the client. 298 try { 299 Thread.sleep(500); 300 } catch (InterruptedException ie) { 301 Thread.currentThread().interrupt(); 302 } 303 // Worst case is we just kill the connection before the 304 // notification gets to him. 305 stopAsync(); 306 } 307 } else if (!stopping.get() && !inServiceException) { 308 inServiceException = true; 309 try { 310 if (SERVICELOG.isDebugEnabled()) { 311 SERVICELOG.debug("Async error occurred: " + e, e); 312 } else { 313 SERVICELOG.warn("Async error occurred: " + e); 314 } 315 ConnectionError ce = new ConnectionError(); 316 ce.setException(e); 317 if (pendingStop.get()) { 318 dispatchSync(ce); 319 } else { 320 dispatchAsync(ce); 321 } 322 } finally { 323 inServiceException = false; 324 } 325 } 326 } 327 328 @Override 329 public Response service(Command command) { 330 MDC.put("activemq.connector", connector.getUri().toString()); 331 Response response = null; 332 boolean responseRequired = command.isResponseRequired(); 333 int commandId = command.getCommandId(); 334 try { 335 if (!pendingStop.get()) { 336 response = command.visit(this); 337 } else { 338 response = new ExceptionResponse(transportException.get()); 339 } 340 } catch (Throwable e) { 341 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 342 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 343 + " command: " + command + ", exception: " + e, e); 344 } 345 346 if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) { 347 LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause()); 348 responseRequired = false; 349 } 350 351 if (responseRequired) { 352 if (e instanceof SecurityException || e.getCause() instanceof SecurityException) { 353 SERVICELOG.warn("Security Error occurred on connection to: {}, {}", 354 transport.getRemoteAddress(), e.getMessage()); 355 } 356 response = new ExceptionResponse(e); 357 } else { 358 forceRollbackOnlyOnFailedAsyncTransactionOp(e, command); 359 serviceException(e); 360 } 361 } 362 if (responseRequired) { 363 if (response == null) { 364 response = new Response(); 365 } 366 response.setCorrelationId(commandId); 367 } 368 // The context may have been flagged so that the response is not 369 // sent. 370 if (context != null) { 371 if (context.isDontSendReponse()) { 372 context.setDontSendReponse(false); 373 response = null; 374 } 375 context = null; 376 } 377 MDC.remove("activemq.connector"); 378 return response; 379 } 380 381 private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) { 382 if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) { 383 Transaction transaction = getActiveTransaction(command); 384 if (transaction != null && !transaction.isRollbackOnly()) { 385 LOG.debug("on async exception, force rollback of transaction for: " + command, e); 386 transaction.setRollbackOnly(e); 387 } 388 } 389 } 390 391 private Transaction getActiveTransaction(Command command) { 392 Transaction transaction = null; 393 try { 394 if (command instanceof Message) { 395 Message messageSend = (Message) command; 396 ProducerId producerId = messageSend.getProducerId(); 397 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 398 transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId()); 399 } else if (command instanceof MessageAck) { 400 MessageAck messageAck = (MessageAck) command; 401 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId()); 402 if (consumerExchange != null) { 403 transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId()); 404 } 405 } 406 } catch(Exception ignored){ 407 LOG.trace("failed to find active transaction for command: " + command, ignored); 408 } 409 return transaction; 410 } 411 412 private boolean isInTransaction(Command command) { 413 return command instanceof Message && ((Message)command).isInTransaction() 414 || command instanceof MessageAck && ((MessageAck)command).isInTransaction(); 415 } 416 417 @Override 418 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 419 return null; 420 } 421 422 @Override 423 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 424 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 425 return null; 426 } 427 428 @Override 429 public Response processWireFormat(WireFormatInfo info) throws Exception { 430 wireFormatInfo = info; 431 protocolVersion.set(info.getVersion()); 432 return null; 433 } 434 435 @Override 436 public Response processShutdown(ShutdownInfo info) throws Exception { 437 stopAsync(); 438 return null; 439 } 440 441 @Override 442 public Response processFlush(FlushCommand command) throws Exception { 443 return null; 444 } 445 446 @Override 447 public Response processBeginTransaction(TransactionInfo info) throws Exception { 448 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 449 context = null; 450 if (cs != null) { 451 context = cs.getContext(); 452 } 453 if (cs == null) { 454 throw new NullPointerException("Context is null"); 455 } 456 // Avoid replaying dup commands 457 if (cs.getTransactionState(info.getTransactionId()) == null) { 458 cs.addTransactionState(info.getTransactionId()); 459 broker.beginTransaction(context, info.getTransactionId()); 460 } 461 return null; 462 } 463 464 @Override 465 public int getActiveTransactionCount() { 466 int rc = 0; 467 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 468 Collection<TransactionState> transactions = cs.getTransactionStates(); 469 for (TransactionState transaction : transactions) { 470 rc++; 471 } 472 } 473 return rc; 474 } 475 476 @Override 477 public Long getOldestActiveTransactionDuration() { 478 TransactionState oldestTX = null; 479 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 480 Collection<TransactionState> transactions = cs.getTransactionStates(); 481 for (TransactionState transaction : transactions) { 482 if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) { 483 oldestTX = transaction; 484 } 485 } 486 } 487 if( oldestTX == null ) { 488 return null; 489 } 490 return System.currentTimeMillis() - oldestTX.getCreatedAt(); 491 } 492 493 @Override 494 public Response processEndTransaction(TransactionInfo info) throws Exception { 495 // No need to do anything. This packet is just sent by the client 496 // make sure he is synced with the server as commit command could 497 // come from a different connection. 498 return null; 499 } 500 501 @Override 502 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 503 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 504 context = null; 505 if (cs != null) { 506 context = cs.getContext(); 507 } 508 if (cs == null) { 509 throw new NullPointerException("Context is null"); 510 } 511 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 512 if (transactionState == null) { 513 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 514 + info.getTransactionId()); 515 } 516 // Avoid dups. 517 if (!transactionState.isPrepared()) { 518 transactionState.setPrepared(true); 519 int result = broker.prepareTransaction(context, info.getTransactionId()); 520 transactionState.setPreparedResult(result); 521 if (result == XAResource.XA_RDONLY) { 522 // we are done, no further rollback or commit from TM 523 cs.removeTransactionState(info.getTransactionId()); 524 } 525 IntegerResponse response = new IntegerResponse(result); 526 return response; 527 } else { 528 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 529 return response; 530 } 531 } 532 533 @Override 534 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 535 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 536 context = cs.getContext(); 537 cs.removeTransactionState(info.getTransactionId()); 538 broker.commitTransaction(context, info.getTransactionId(), true); 539 return null; 540 } 541 542 @Override 543 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 544 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 545 context = cs.getContext(); 546 cs.removeTransactionState(info.getTransactionId()); 547 broker.commitTransaction(context, info.getTransactionId(), false); 548 return null; 549 } 550 551 @Override 552 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 553 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 554 context = cs.getContext(); 555 cs.removeTransactionState(info.getTransactionId()); 556 broker.rollbackTransaction(context, info.getTransactionId()); 557 return null; 558 } 559 560 @Override 561 public Response processForgetTransaction(TransactionInfo info) throws Exception { 562 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 563 context = cs.getContext(); 564 broker.forgetTransaction(context, info.getTransactionId()); 565 return null; 566 } 567 568 @Override 569 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 570 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 571 context = cs.getContext(); 572 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 573 return new DataArrayResponse(preparedTransactions); 574 } 575 576 @Override 577 public Response processMessage(Message messageSend) throws Exception { 578 ProducerId producerId = messageSend.getProducerId(); 579 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 580 if (producerExchange.canDispatch(messageSend)) { 581 broker.send(producerExchange, messageSend); 582 } 583 return null; 584 } 585 586 @Override 587 public Response processMessageAck(MessageAck ack) throws Exception { 588 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 589 if (consumerExchange != null) { 590 broker.acknowledge(consumerExchange, ack); 591 } else if (ack.isInTransaction()) { 592 LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack); 593 } 594 return null; 595 } 596 597 @Override 598 public Response processMessagePull(MessagePull pull) throws Exception { 599 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 600 } 601 602 @Override 603 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 604 broker.processDispatchNotification(notification); 605 return null; 606 } 607 608 @Override 609 public Response processAddDestination(DestinationInfo info) throws Exception { 610 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 611 broker.addDestinationInfo(cs.getContext(), info); 612 if (info.getDestination().isTemporary()) { 613 cs.addTempDestination(info); 614 } 615 return null; 616 } 617 618 @Override 619 public Response processRemoveDestination(DestinationInfo info) throws Exception { 620 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 621 broker.removeDestinationInfo(cs.getContext(), info); 622 if (info.getDestination().isTemporary()) { 623 cs.removeTempDestination(info.getDestination()); 624 } 625 return null; 626 } 627 628 @Override 629 public Response processAddProducer(ProducerInfo info) throws Exception { 630 SessionId sessionId = info.getProducerId().getParentId(); 631 ConnectionId connectionId = sessionId.getParentId(); 632 TransportConnectionState cs = lookupConnectionState(connectionId); 633 if (cs == null) { 634 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " 635 + connectionId); 636 } 637 SessionState ss = cs.getSessionState(sessionId); 638 if (ss == null) { 639 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 640 + sessionId); 641 } 642 // Avoid replaying dup commands 643 if (!ss.getProducerIds().contains(info.getProducerId())) { 644 ActiveMQDestination destination = info.getDestination(); 645 // Do not check for null here as it would cause the count of max producers to exclude 646 // anonymous producers. The isAdvisoryTopic method checks for null so it is safe to 647 // call it from here with a null Destination value. 648 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 649 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ 650 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); 651 } 652 } 653 broker.addProducer(cs.getContext(), info); 654 try { 655 ss.addProducer(info); 656 } catch (IllegalStateException e) { 657 broker.removeProducer(cs.getContext(), info); 658 } 659 660 } 661 return null; 662 } 663 664 @Override 665 public Response processRemoveProducer(ProducerId id) throws Exception { 666 SessionId sessionId = id.getParentId(); 667 ConnectionId connectionId = sessionId.getParentId(); 668 TransportConnectionState cs = lookupConnectionState(connectionId); 669 SessionState ss = cs.getSessionState(sessionId); 670 if (ss == null) { 671 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 672 + sessionId); 673 } 674 ProducerState ps = ss.removeProducer(id); 675 if (ps == null) { 676 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 677 } 678 removeProducerBrokerExchange(id); 679 broker.removeProducer(cs.getContext(), ps.getInfo()); 680 return null; 681 } 682 683 @Override 684 public Response processAddConsumer(ConsumerInfo info) throws Exception { 685 SessionId sessionId = info.getConsumerId().getParentId(); 686 ConnectionId connectionId = sessionId.getParentId(); 687 TransportConnectionState cs = lookupConnectionState(connectionId); 688 if (cs == null) { 689 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " 690 + connectionId); 691 } 692 SessionState ss = cs.getSessionState(sessionId); 693 if (ss == null) { 694 throw new IllegalStateException(broker.getBrokerName() 695 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 696 } 697 // Avoid replaying dup commands 698 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 699 ActiveMQDestination destination = info.getDestination(); 700 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 701 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){ 702 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection()); 703 } 704 } 705 706 broker.addConsumer(cs.getContext(), info); 707 try { 708 ss.addConsumer(info); 709 addConsumerBrokerExchange(cs, info.getConsumerId()); 710 } catch (IllegalStateException e) { 711 broker.removeConsumer(cs.getContext(), info); 712 } 713 714 } 715 return null; 716 } 717 718 @Override 719 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 720 SessionId sessionId = id.getParentId(); 721 ConnectionId connectionId = sessionId.getParentId(); 722 TransportConnectionState cs = lookupConnectionState(connectionId); 723 if (cs == null) { 724 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 725 + connectionId); 726 } 727 SessionState ss = cs.getSessionState(sessionId); 728 if (ss == null) { 729 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 730 + sessionId); 731 } 732 ConsumerState consumerState = ss.removeConsumer(id); 733 if (consumerState == null) { 734 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 735 } 736 ConsumerInfo info = consumerState.getInfo(); 737 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 738 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 739 removeConsumerBrokerExchange(id); 740 return null; 741 } 742 743 @Override 744 public Response processAddSession(SessionInfo info) throws Exception { 745 ConnectionId connectionId = info.getSessionId().getParentId(); 746 TransportConnectionState cs = lookupConnectionState(connectionId); 747 // Avoid replaying dup commands 748 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 749 broker.addSession(cs.getContext(), info); 750 try { 751 cs.addSession(info); 752 } catch (IllegalStateException e) { 753 LOG.warn("Failed to add session: {}", info.getSessionId(), e); 754 broker.removeSession(cs.getContext(), info); 755 } 756 } 757 return null; 758 } 759 760 @Override 761 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 762 ConnectionId connectionId = id.getParentId(); 763 TransportConnectionState cs = lookupConnectionState(connectionId); 764 if (cs == null) { 765 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 766 } 767 SessionState session = cs.getSessionState(id); 768 if (session == null) { 769 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 770 } 771 // Don't let new consumers or producers get added while we are closing 772 // this down. 773 session.shutdown(); 774 // Cascade the connection stop to the consumers and producers. 775 for (ConsumerId consumerId : session.getConsumerIds()) { 776 try { 777 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 778 } catch (Throwable e) { 779 LOG.warn("Failed to remove consumer: {}", consumerId, e); 780 } 781 } 782 for (ProducerId producerId : session.getProducerIds()) { 783 try { 784 processRemoveProducer(producerId); 785 } catch (Throwable e) { 786 LOG.warn("Failed to remove producer: {}", producerId, e); 787 } 788 } 789 cs.removeSession(id); 790 broker.removeSession(cs.getContext(), session.getInfo()); 791 return null; 792 } 793 794 @Override 795 public Response processAddConnection(ConnectionInfo info) throws Exception { 796 // Older clients should have been defaulting this field to true.. but 797 // they were not. 798 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 799 info.setClientMaster(true); 800 } 801 TransportConnectionState state; 802 // Make sure 2 concurrent connections by the same ID only generate 1 803 // TransportConnectionState object. 804 synchronized (brokerConnectionStates) { 805 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 806 if (state == null) { 807 state = new TransportConnectionState(info, this); 808 brokerConnectionStates.put(info.getConnectionId(), state); 809 } 810 state.incrementReference(); 811 } 812 // If there are 2 concurrent connections for the same connection id, 813 // then last one in wins, we need to sync here 814 // to figure out the winner. 815 synchronized (state.getConnectionMutex()) { 816 if (state.getConnection() != this) { 817 LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress()); 818 state.getConnection().stop(); 819 LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress()); 820 state.setConnection(this); 821 state.reset(info); 822 } 823 } 824 registerConnectionState(info.getConnectionId(), state); 825 LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info }); 826 this.faultTolerantConnection = info.isFaultTolerant(); 827 // Setup the context. 828 String clientId = info.getClientId(); 829 context = new ConnectionContext(); 830 context.setBroker(broker); 831 context.setClientId(clientId); 832 context.setClientMaster(info.isClientMaster()); 833 context.setConnection(this); 834 context.setConnectionId(info.getConnectionId()); 835 context.setConnector(connector); 836 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 837 context.setNetworkConnection(networkConnection); 838 context.setFaultTolerant(faultTolerantConnection); 839 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 840 context.setUserName(info.getUserName()); 841 context.setWireFormatInfo(wireFormatInfo); 842 context.setReconnect(info.isFailoverReconnect()); 843 this.manageable = info.isManageable(); 844 context.setConnectionState(state); 845 state.setContext(context); 846 state.setConnection(this); 847 if (info.getClientIp() == null) { 848 info.setClientIp(getRemoteAddress()); 849 } 850 851 try { 852 broker.addConnection(context, info); 853 } catch (Exception e) { 854 synchronized (brokerConnectionStates) { 855 brokerConnectionStates.remove(info.getConnectionId()); 856 } 857 unregisterConnectionState(info.getConnectionId()); 858 LOG.warn("Failed to add Connection id={}, clientId={} due to {}", info.getConnectionId(), clientId, e); 859 //AMQ-6561 - stop for all exceptions on addConnection 860 // close this down - in case the peer of this transport doesn't play nice 861 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); 862 throw e; 863 } 864 if (info.isManageable()) { 865 // send ConnectionCommand 866 ConnectionControl command = this.connector.getConnectionControl(); 867 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 868 if (info.isFailoverReconnect()) { 869 command.setRebalanceConnection(false); 870 } 871 dispatchAsync(command); 872 } 873 return null; 874 } 875 876 @Override 877 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 878 throws InterruptedException { 879 LOG.debug("remove connection id: {}", id); 880 TransportConnectionState cs = lookupConnectionState(id); 881 if (cs != null) { 882 // Don't allow things to be added to the connection state while we 883 // are shutting down. 884 cs.shutdown(); 885 // Cascade the connection stop to the sessions. 886 for (SessionId sessionId : cs.getSessionIds()) { 887 try { 888 processRemoveSession(sessionId, lastDeliveredSequenceId); 889 } catch (Throwable e) { 890 SERVICELOG.warn("Failed to remove session {}", sessionId, e); 891 } 892 } 893 // Cascade the connection stop to temp destinations. 894 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { 895 DestinationInfo di = iter.next(); 896 try { 897 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 898 } catch (Throwable e) { 899 SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e); 900 } 901 iter.remove(); 902 } 903 try { 904 broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get()); 905 } catch (Throwable e) { 906 SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e); 907 } 908 TransportConnectionState state = unregisterConnectionState(id); 909 if (state != null) { 910 synchronized (brokerConnectionStates) { 911 // If we are the last reference, we should remove the state 912 // from the broker. 913 if (state.decrementReference() == 0) { 914 brokerConnectionStates.remove(id); 915 } 916 } 917 } 918 } 919 return null; 920 } 921 922 @Override 923 public Response processProducerAck(ProducerAck ack) throws Exception { 924 // A broker should not get ProducerAck messages. 925 return null; 926 } 927 928 @Override 929 public Connector getConnector() { 930 return connector; 931 } 932 933 @Override 934 public void dispatchSync(Command message) { 935 try { 936 processDispatch(message); 937 } catch (IOException e) { 938 serviceExceptionAsync(e); 939 } 940 } 941 942 @Override 943 public void dispatchAsync(Command message) { 944 if (!stopping.get()) { 945 if (taskRunner == null) { 946 dispatchSync(message); 947 } else { 948 synchronized (dispatchQueue) { 949 dispatchQueue.add(message); 950 } 951 try { 952 taskRunner.wakeup(); 953 } catch (InterruptedException e) { 954 Thread.currentThread().interrupt(); 955 } 956 } 957 } else { 958 if (message.isMessageDispatch()) { 959 MessageDispatch md = (MessageDispatch) message; 960 TransmitCallback sub = md.getTransmitCallback(); 961 broker.postProcessDispatch(md); 962 if (sub != null) { 963 sub.onFailure(); 964 } 965 } 966 } 967 } 968 969 protected void processDispatch(Command command) throws IOException { 970 MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 971 try { 972 if (!stopping.get()) { 973 if (messageDispatch != null) { 974 try { 975 broker.preProcessDispatch(messageDispatch); 976 } catch (RuntimeException convertToIO) { 977 throw new IOException(convertToIO); 978 } 979 } 980 dispatch(command); 981 } 982 } catch (IOException e) { 983 if (messageDispatch != null) { 984 TransmitCallback sub = messageDispatch.getTransmitCallback(); 985 broker.postProcessDispatch(messageDispatch); 986 if (sub != null) { 987 sub.onFailure(); 988 } 989 messageDispatch = null; 990 throw e; 991 } 992 } finally { 993 if (messageDispatch != null) { 994 TransmitCallback sub = messageDispatch.getTransmitCallback(); 995 broker.postProcessDispatch(messageDispatch); 996 if (sub != null) { 997 sub.onSuccess(); 998 } 999 } 1000 } 1001 } 1002 1003 @Override 1004 public boolean iterate() { 1005 try { 1006 if (pendingStop.get() || stopping.get()) { 1007 if (dispatchStopped.compareAndSet(false, true)) { 1008 if (transportException.get() == null) { 1009 try { 1010 dispatch(new ShutdownInfo()); 1011 } catch (Throwable ignore) { 1012 } 1013 } 1014 dispatchStoppedLatch.countDown(); 1015 } 1016 return false; 1017 } 1018 if (!dispatchStopped.get()) { 1019 Command command = null; 1020 synchronized (dispatchQueue) { 1021 if (dispatchQueue.isEmpty()) { 1022 return false; 1023 } 1024 command = dispatchQueue.remove(0); 1025 } 1026 processDispatch(command); 1027 return true; 1028 } 1029 return false; 1030 } catch (IOException e) { 1031 if (dispatchStopped.compareAndSet(false, true)) { 1032 dispatchStoppedLatch.countDown(); 1033 } 1034 serviceExceptionAsync(e); 1035 return false; 1036 } 1037 } 1038 1039 /** 1040 * Returns the statistics for this connection 1041 */ 1042 @Override 1043 public ConnectionStatistics getStatistics() { 1044 return statistics; 1045 } 1046 1047 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1048 return messageAuthorizationPolicy; 1049 } 1050 1051 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1052 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1053 } 1054 1055 @Override 1056 public boolean isManageable() { 1057 return manageable; 1058 } 1059 1060 @Override 1061 public void start() throws Exception { 1062 try { 1063 synchronized (this) { 1064 starting.set(true); 1065 if (taskRunnerFactory != null) { 1066 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 1067 + getRemoteAddress()); 1068 } else { 1069 taskRunner = null; 1070 } 1071 transport.start(); 1072 active = true; 1073 BrokerInfo info = connector.getBrokerInfo().copy(); 1074 if (connector.isUpdateClusterClients()) { 1075 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 1076 } else { 1077 info.setPeerBrokerInfos(null); 1078 } 1079 dispatchAsync(info); 1080 1081 connector.onStarted(this); 1082 } 1083 } catch (Exception e) { 1084 // Force clean up on an error starting up. 1085 pendingStop.set(true); 1086 throw e; 1087 } finally { 1088 // stop() can be called from within the above block, 1089 // but we want to be sure start() completes before 1090 // stop() runs, so queue the stop until right now: 1091 setStarting(false); 1092 if (isPendingStop()) { 1093 LOG.debug("Calling the delayed stop() after start() {}", this); 1094 stop(); 1095 } 1096 } 1097 } 1098 1099 @Override 1100 public void stop() throws Exception { 1101 // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) 1102 // as their lifecycle is handled elsewhere 1103 1104 stopAsync(); 1105 while (!stopped.await(5, TimeUnit.SECONDS)) { 1106 LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress()); 1107 } 1108 } 1109 1110 public void delayedStop(final int waitTime, final String reason, Throwable cause) { 1111 if (waitTime > 0) { 1112 synchronized (this) { 1113 pendingStop.set(true); 1114 transportException.set(cause); 1115 } 1116 try { 1117 stopTaskRunnerFactory.execute(new Runnable() { 1118 @Override 1119 public void run() { 1120 try { 1121 Thread.sleep(waitTime); 1122 stopAsync(); 1123 LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason); 1124 } catch (InterruptedException e) { 1125 } 1126 } 1127 }); 1128 } catch (Throwable t) { 1129 LOG.warn("Cannot create stopAsync. This exception will be ignored.", t); 1130 } 1131 } 1132 } 1133 1134 public void stopAsync(Throwable cause) { 1135 transportException.set(cause); 1136 stopAsync(); 1137 } 1138 1139 public void stopAsync() { 1140 // If we're in the middle of starting then go no further... for now. 1141 synchronized (this) { 1142 pendingStop.set(true); 1143 if (starting.get()) { 1144 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); 1145 return; 1146 } 1147 } 1148 if (stopping.compareAndSet(false, true)) { 1149 // Let all the connection contexts know we are shutting down 1150 // so that in progress operations can notice and unblock. 1151 List<TransportConnectionState> connectionStates = listConnectionStates(); 1152 for (TransportConnectionState cs : connectionStates) { 1153 ConnectionContext connectionContext = cs.getContext(); 1154 if (connectionContext != null) { 1155 connectionContext.getStopping().set(true); 1156 } 1157 } 1158 try { 1159 stopTaskRunnerFactory.execute(new Runnable() { 1160 @Override 1161 public void run() { 1162 serviceLock.writeLock().lock(); 1163 try { 1164 doStop(); 1165 } catch (Throwable e) { 1166 LOG.debug("Error occurred while shutting down a connection {}", this, e); 1167 } finally { 1168 stopped.countDown(); 1169 serviceLock.writeLock().unlock(); 1170 } 1171 } 1172 }); 1173 } catch (Throwable t) { 1174 LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t); 1175 stopped.countDown(); 1176 } 1177 } 1178 } 1179 1180 @Override 1181 public String toString() { 1182 return "Transport Connection to: " + transport.getRemoteAddress(); 1183 } 1184 1185 protected void doStop() throws Exception { 1186 LOG.debug("Stopping connection: {}", transport.getRemoteAddress()); 1187 connector.onStopped(this); 1188 try { 1189 synchronized (this) { 1190 if (duplexBridge != null) { 1191 duplexBridge.stop(); 1192 } 1193 } 1194 } catch (Exception ignore) { 1195 LOG.trace("Exception caught stopping. This exception is ignored.", ignore); 1196 } 1197 try { 1198 transport.stop(); 1199 LOG.debug("Stopped transport: {}", transport.getRemoteAddress()); 1200 } catch (Exception e) { 1201 LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e); 1202 } 1203 if (taskRunner != null) { 1204 taskRunner.shutdown(1); 1205 taskRunner = null; 1206 } 1207 active = false; 1208 // Run the MessageDispatch callbacks so that message references get 1209 // cleaned up. 1210 synchronized (dispatchQueue) { 1211 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) { 1212 Command command = iter.next(); 1213 if (command.isMessageDispatch()) { 1214 MessageDispatch md = (MessageDispatch) command; 1215 TransmitCallback sub = md.getTransmitCallback(); 1216 broker.postProcessDispatch(md); 1217 if (sub != null) { 1218 sub.onFailure(); 1219 } 1220 } 1221 } 1222 dispatchQueue.clear(); 1223 } 1224 // 1225 // Remove all logical connection associated with this connection 1226 // from the broker. 1227 if (!broker.isStopped()) { 1228 List<TransportConnectionState> connectionStates = listConnectionStates(); 1229 connectionStates = listConnectionStates(); 1230 for (TransportConnectionState cs : connectionStates) { 1231 cs.getContext().getStopping().set(true); 1232 try { 1233 LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); 1234 processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 1235 } catch (Throwable ignore) { 1236 LOG.debug("Exception caught removing connection {}. This exception is ignored.", cs.getInfo().getConnectionId(), ignore); 1237 } 1238 } 1239 } 1240 LOG.debug("Connection Stopped: {}", getRemoteAddress()); 1241 } 1242 1243 /** 1244 * @return Returns the blockedCandidate. 1245 */ 1246 public boolean isBlockedCandidate() { 1247 return blockedCandidate; 1248 } 1249 1250 /** 1251 * @param blockedCandidate The blockedCandidate to set. 1252 */ 1253 public void setBlockedCandidate(boolean blockedCandidate) { 1254 this.blockedCandidate = blockedCandidate; 1255 } 1256 1257 /** 1258 * @return Returns the markedCandidate. 1259 */ 1260 public boolean isMarkedCandidate() { 1261 return markedCandidate; 1262 } 1263 1264 /** 1265 * @param markedCandidate The markedCandidate to set. 1266 */ 1267 public void setMarkedCandidate(boolean markedCandidate) { 1268 this.markedCandidate = markedCandidate; 1269 if (!markedCandidate) { 1270 timeStamp = 0; 1271 blockedCandidate = false; 1272 } 1273 } 1274 1275 /** 1276 * @param slow The slow to set. 1277 */ 1278 public void setSlow(boolean slow) { 1279 this.slow = slow; 1280 } 1281 1282 /** 1283 * @return true if the Connection is slow 1284 */ 1285 @Override 1286 public boolean isSlow() { 1287 return slow; 1288 } 1289 1290 /** 1291 * @return true if the Connection is potentially blocked 1292 */ 1293 public boolean isMarkedBlockedCandidate() { 1294 return markedCandidate; 1295 } 1296 1297 /** 1298 * Mark the Connection, so we can deem if it's collectable on the next sweep 1299 */ 1300 public void doMark() { 1301 if (timeStamp == 0) { 1302 timeStamp = System.currentTimeMillis(); 1303 } 1304 } 1305 1306 /** 1307 * @return if after being marked, the Connection is still writing 1308 */ 1309 @Override 1310 public boolean isBlocked() { 1311 return blocked; 1312 } 1313 1314 /** 1315 * @return true if the Connection is connected 1316 */ 1317 @Override 1318 public boolean isConnected() { 1319 return connected; 1320 } 1321 1322 /** 1323 * @param blocked The blocked to set. 1324 */ 1325 public void setBlocked(boolean blocked) { 1326 this.blocked = blocked; 1327 } 1328 1329 /** 1330 * @param connected The connected to set. 1331 */ 1332 public void setConnected(boolean connected) { 1333 this.connected = connected; 1334 } 1335 1336 /** 1337 * @return true if the Connection is active 1338 */ 1339 @Override 1340 public boolean isActive() { 1341 return active; 1342 } 1343 1344 /** 1345 * @param active The active to set. 1346 */ 1347 public void setActive(boolean active) { 1348 this.active = active; 1349 } 1350 1351 /** 1352 * @return true if the Connection is starting 1353 */ 1354 public boolean isStarting() { 1355 return starting.get(); 1356 } 1357 1358 @Override 1359 public synchronized boolean isNetworkConnection() { 1360 return networkConnection; 1361 } 1362 1363 @Override 1364 public boolean isFaultTolerantConnection() { 1365 return this.faultTolerantConnection; 1366 } 1367 1368 protected void setStarting(boolean starting) { 1369 this.starting.set(starting); 1370 } 1371 1372 /** 1373 * @return true if the Connection needs to stop 1374 */ 1375 public boolean isPendingStop() { 1376 return pendingStop.get(); 1377 } 1378 1379 protected void setPendingStop(boolean pendingStop) { 1380 this.pendingStop.set(pendingStop); 1381 } 1382 1383 private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException { 1384 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1385 Map<String, String> props = createMap(properties); 1386 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1387 IntrospectionSupport.setProperties(config, props, ""); 1388 return config; 1389 } 1390 1391 @Override 1392 public Response processBrokerInfo(BrokerInfo info) { 1393 if (info.isSlaveBroker()) { 1394 LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); 1395 } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { 1396 try { 1397 NetworkBridgeConfiguration config = getNetworkConfiguration(info); 1398 if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { 1399 LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); 1400 dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); 1401 } 1402 } catch (Exception e) { 1403 LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); 1404 return null; 1405 } 1406 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1407 // so this TransportConnection is the rear end of a network bridge 1408 // We have been requested to create a two way pipe ... 1409 try { 1410 NetworkBridgeConfiguration config = getNetworkConfiguration(info); 1411 config.setBrokerName(broker.getBrokerName()); 1412 1413 if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { 1414 LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); 1415 dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); 1416 } 1417 1418 // check for existing duplex connection hanging about 1419 1420 // We first look if existing network connection already exists for the same broker Id and network connector name 1421 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1422 // and the duplex network connector side wanting to open a new one 1423 // In this case, the old connection must be broken 1424 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1425 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1426 synchronized (connections) { 1427 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) { 1428 TransportConnection c = iter.next(); 1429 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1430 LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); 1431 c.stopAsync(); 1432 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1433 c.getStopped().await(1, TimeUnit.SECONDS); 1434 } 1435 } 1436 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1437 } 1438 Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker); 1439 Transport remoteBridgeTransport = transport; 1440 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { 1441 // the vm transport case is already wrapped 1442 remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport); 1443 } 1444 String duplexName = localTransport.toString(); 1445 if (duplexName.contains("#")) { 1446 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1447 } 1448 MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName)); 1449 listener.setCreatedByDuplex(true); 1450 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1451 duplexBridge.setBrokerService(brokerService); 1452 //Need to set durableDestinations to properly restart subs when dynamicOnly=false 1453 duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations( 1454 broker.getDurableDestinations())); 1455 1456 // now turn duplex off this side 1457 info.setDuplexConnection(false); 1458 duplexBridge.setCreatedByDuplex(true); 1459 duplexBridge.duplexStart(this, brokerInfo, info); 1460 LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); 1461 return null; 1462 } catch (TransportDisposedIOException e) { 1463 LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); 1464 return null; 1465 } catch (Exception e) { 1466 LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); 1467 return null; 1468 } 1469 } 1470 // We only expect to get one broker info command per connection 1471 if (this.brokerInfo != null) { 1472 LOG.warn("Unexpected extra broker info command received: {}", info); 1473 } 1474 this.brokerInfo = info; 1475 networkConnection = true; 1476 List<TransportConnectionState> connectionStates = listConnectionStates(); 1477 for (TransportConnectionState cs : connectionStates) { 1478 cs.getContext().setNetworkConnection(true); 1479 } 1480 return null; 1481 } 1482 1483 @SuppressWarnings({"unchecked", "rawtypes"}) 1484 private HashMap<String, String> createMap(Properties properties) { 1485 return new HashMap(properties); 1486 } 1487 1488 protected void dispatch(Command command) throws IOException { 1489 try { 1490 setMarkedCandidate(true); 1491 transport.oneway(command); 1492 } finally { 1493 setMarkedCandidate(false); 1494 } 1495 } 1496 1497 @Override 1498 public String getRemoteAddress() { 1499 return transport.getRemoteAddress(); 1500 } 1501 1502 public Transport getTransport() { 1503 return transport; 1504 } 1505 1506 @Override 1507 public String getConnectionId() { 1508 List<TransportConnectionState> connectionStates = listConnectionStates(); 1509 for (TransportConnectionState cs : connectionStates) { 1510 if (cs.getInfo().getClientId() != null) { 1511 return cs.getInfo().getClientId(); 1512 } 1513 return cs.getInfo().getConnectionId().toString(); 1514 } 1515 return null; 1516 } 1517 1518 @Override 1519 public void updateClient(ConnectionControl control) { 1520 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1521 && this.wireFormatInfo.getVersion() >= 6) { 1522 dispatchAsync(control); 1523 } 1524 } 1525 1526 public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){ 1527 ProducerBrokerExchange result = null; 1528 if (producerInfo != null && producerInfo.getProducerId() != null){ 1529 synchronized (producerExchanges){ 1530 result = producerExchanges.get(producerInfo.getProducerId()); 1531 } 1532 } 1533 return result; 1534 } 1535 1536 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1537 ProducerBrokerExchange result = producerExchanges.get(id); 1538 if (result == null) { 1539 synchronized (producerExchanges) { 1540 result = new ProducerBrokerExchange(); 1541 TransportConnectionState state = lookupConnectionState(id); 1542 context = state.getContext(); 1543 result.setConnectionContext(context); 1544 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) { 1545 result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id)); 1546 } 1547 SessionState ss = state.getSessionState(id.getParentId()); 1548 if (ss != null) { 1549 result.setProducerState(ss.getProducerState(id)); 1550 ProducerState producerState = ss.getProducerState(id); 1551 if (producerState != null && producerState.getInfo() != null) { 1552 ProducerInfo info = producerState.getInfo(); 1553 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1554 } 1555 } 1556 producerExchanges.put(id, result); 1557 } 1558 } else { 1559 context = result.getConnectionContext(); 1560 } 1561 return result; 1562 } 1563 1564 private void removeProducerBrokerExchange(ProducerId id) { 1565 synchronized (producerExchanges) { 1566 producerExchanges.remove(id); 1567 } 1568 } 1569 1570 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1571 ConsumerBrokerExchange result = consumerExchanges.get(id); 1572 return result; 1573 } 1574 1575 private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) { 1576 ConsumerBrokerExchange result = consumerExchanges.get(id); 1577 if (result == null) { 1578 synchronized (consumerExchanges) { 1579 result = new ConsumerBrokerExchange(); 1580 context = connectionState.getContext(); 1581 result.setConnectionContext(context); 1582 SessionState ss = connectionState.getSessionState(id.getParentId()); 1583 if (ss != null) { 1584 ConsumerState cs = ss.getConsumerState(id); 1585 if (cs != null) { 1586 ConsumerInfo info = cs.getInfo(); 1587 if (info != null) { 1588 if (info.getDestination() != null && info.getDestination().isPattern()) { 1589 result.setWildcard(true); 1590 } 1591 } 1592 } 1593 } 1594 consumerExchanges.put(id, result); 1595 } 1596 } 1597 return result; 1598 } 1599 1600 private void removeConsumerBrokerExchange(ConsumerId id) { 1601 synchronized (consumerExchanges) { 1602 consumerExchanges.remove(id); 1603 } 1604 } 1605 1606 public int getProtocolVersion() { 1607 return protocolVersion.get(); 1608 } 1609 1610 @Override 1611 public Response processControlCommand(ControlCommand command) throws Exception { 1612 return null; 1613 } 1614 1615 @Override 1616 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1617 return null; 1618 } 1619 1620 @Override 1621 public Response processConnectionControl(ConnectionControl control) throws Exception { 1622 if (control != null) { 1623 faultTolerantConnection = control.isFaultTolerant(); 1624 } 1625 return null; 1626 } 1627 1628 @Override 1629 public Response processConnectionError(ConnectionError error) throws Exception { 1630 return null; 1631 } 1632 1633 @Override 1634 public Response processConsumerControl(ConsumerControl control) throws Exception { 1635 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1636 broker.processConsumerControl(consumerExchange, control); 1637 return null; 1638 } 1639 1640 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1641 TransportConnectionState state) { 1642 TransportConnectionState cs = null; 1643 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1644 // swap implementations 1645 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1646 newRegister.intialize(connectionStateRegister); 1647 connectionStateRegister = newRegister; 1648 } 1649 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1650 return cs; 1651 } 1652 1653 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1654 return connectionStateRegister.unregisterConnectionState(connectionId); 1655 } 1656 1657 protected synchronized List<TransportConnectionState> listConnectionStates() { 1658 return connectionStateRegister.listConnectionStates(); 1659 } 1660 1661 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1662 return connectionStateRegister.lookupConnectionState(connectionId); 1663 } 1664 1665 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1666 return connectionStateRegister.lookupConnectionState(id); 1667 } 1668 1669 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1670 return connectionStateRegister.lookupConnectionState(id); 1671 } 1672 1673 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1674 return connectionStateRegister.lookupConnectionState(id); 1675 } 1676 1677 // public only for testing 1678 public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1679 return connectionStateRegister.lookupConnectionState(connectionId); 1680 } 1681 1682 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1683 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1684 } 1685 1686 protected synchronized String getDuplexNetworkConnectorId() { 1687 return this.duplexNetworkConnectorId; 1688 } 1689 1690 public boolean isStopping() { 1691 return stopping.get(); 1692 } 1693 1694 protected CountDownLatch getStopped() { 1695 return stopped; 1696 } 1697 1698 private int getProducerCount(ConnectionId connectionId) { 1699 int result = 0; 1700 TransportConnectionState cs = lookupConnectionState(connectionId); 1701 if (cs != null) { 1702 for (SessionId sessionId : cs.getSessionIds()) { 1703 SessionState sessionState = cs.getSessionState(sessionId); 1704 if (sessionState != null) { 1705 result += sessionState.getProducerIds().size(); 1706 } 1707 } 1708 } 1709 return result; 1710 } 1711 1712 private int getConsumerCount(ConnectionId connectionId) { 1713 int result = 0; 1714 TransportConnectionState cs = lookupConnectionState(connectionId); 1715 if (cs != null) { 1716 for (SessionId sessionId : cs.getSessionIds()) { 1717 SessionState sessionState = cs.getSessionState(sessionId); 1718 if (sessionState != null) { 1719 result += sessionState.getConsumerIds().size(); 1720 } 1721 } 1722 } 1723 return result; 1724 } 1725 1726 public WireFormatInfo getRemoteWireFormatInfo() { 1727 return wireFormatInfo; 1728 } 1729 1730 /* (non-Javadoc) 1731 * @see org.apache.activemq.state.CommandVisitor#processBrokerSubscriptionInfo(org.apache.activemq.command.BrokerSubscriptionInfo) 1732 */ 1733 @Override 1734 public Response processBrokerSubscriptionInfo(BrokerSubscriptionInfo info) throws Exception { 1735 return null; 1736 } 1737}