001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.security.GeneralSecurityException; 021import java.security.cert.X509Certificate; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Properties; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentMap; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.Future; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.atomic.AtomicBoolean; 039 040import javax.management.ObjectName; 041 042import org.apache.activemq.DestinationDoesNotExistException; 043import org.apache.activemq.Service; 044import org.apache.activemq.advisory.AdvisoryBroker; 045import org.apache.activemq.advisory.AdvisorySupport; 046import org.apache.activemq.broker.BrokerService; 047import org.apache.activemq.broker.BrokerServiceAware; 048import org.apache.activemq.broker.ConnectionContext; 049import org.apache.activemq.broker.TransportConnection; 050import org.apache.activemq.broker.region.AbstractRegion; 051import org.apache.activemq.broker.region.DurableTopicSubscription; 052import org.apache.activemq.broker.region.Region; 053import org.apache.activemq.broker.region.RegionBroker; 054import org.apache.activemq.broker.region.Subscription; 055import org.apache.activemq.broker.region.policy.PolicyEntry; 056import org.apache.activemq.command.ActiveMQDestination; 057import org.apache.activemq.command.ActiveMQMessage; 058import org.apache.activemq.command.ActiveMQTempDestination; 059import org.apache.activemq.command.ActiveMQTopic; 060import org.apache.activemq.command.BrokerId; 061import org.apache.activemq.command.BrokerInfo; 062import org.apache.activemq.command.BrokerSubscriptionInfo; 063import org.apache.activemq.command.Command; 064import org.apache.activemq.command.CommandTypes; 065import org.apache.activemq.command.ConnectionError; 066import org.apache.activemq.command.ConnectionId; 067import org.apache.activemq.command.ConnectionInfo; 068import org.apache.activemq.command.ConsumerId; 069import org.apache.activemq.command.ConsumerInfo; 070import org.apache.activemq.command.DataStructure; 071import org.apache.activemq.command.DestinationInfo; 072import org.apache.activemq.command.ExceptionResponse; 073import org.apache.activemq.command.KeepAliveInfo; 074import org.apache.activemq.command.Message; 075import org.apache.activemq.command.MessageAck; 076import org.apache.activemq.command.MessageDispatch; 077import org.apache.activemq.command.MessageId; 078import org.apache.activemq.command.NetworkBridgeFilter; 079import org.apache.activemq.command.ProducerInfo; 080import org.apache.activemq.command.RemoveInfo; 081import org.apache.activemq.command.RemoveSubscriptionInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionInfo; 084import org.apache.activemq.command.ShutdownInfo; 085import org.apache.activemq.command.SubscriptionInfo; 086import org.apache.activemq.command.WireFormatInfo; 087import org.apache.activemq.filter.DestinationFilter; 088import org.apache.activemq.filter.MessageEvaluationContext; 089import org.apache.activemq.security.SecurityContext; 090import org.apache.activemq.transport.DefaultTransportListener; 091import org.apache.activemq.transport.FutureResponse; 092import org.apache.activemq.transport.ResponseCallback; 093import org.apache.activemq.transport.Transport; 094import org.apache.activemq.transport.TransportDisposedIOException; 095import org.apache.activemq.transport.TransportFilter; 096import org.apache.activemq.transport.tcp.SslTransport; 097import org.apache.activemq.transport.tcp.TcpTransport; 098import org.apache.activemq.util.IdGenerator; 099import org.apache.activemq.util.IntrospectionSupport; 100import org.apache.activemq.util.LongSequenceGenerator; 101import org.apache.activemq.util.MarshallingSupport; 102import org.apache.activemq.util.NetworkBridgeUtils; 103import org.apache.activemq.util.ServiceStopper; 104import org.apache.activemq.util.ServiceSupport; 105import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter; 106import org.slf4j.Logger; 107import org.slf4j.LoggerFactory; 108 109/** 110 * A useful base class for implementing demand forwarding bridges. 111 */ 112public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 113 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); 114 protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; 115 protected final Transport localBroker; 116 protected final Transport remoteBroker; 117 protected IdGenerator idGenerator = new IdGenerator(); 118 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 119 protected ConnectionInfo localConnectionInfo; 120 protected ConnectionInfo remoteConnectionInfo; 121 protected SessionInfo localSessionInfo; 122 protected ProducerInfo producerInfo; 123 protected String remoteBrokerName = "Unknown"; 124 protected String localClientId; 125 protected ConsumerInfo demandConsumerInfo; 126 protected int demandConsumerDispatched; 127 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 128 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 129 protected final AtomicBoolean bridgeFailed = new AtomicBoolean(); 130 protected final AtomicBoolean disposed = new AtomicBoolean(); 131 protected BrokerId localBrokerId; 132 protected ActiveMQDestination[] excludedDestinations; 133 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 134 protected ActiveMQDestination[] staticallyIncludedDestinations; 135 protected ActiveMQDestination[] durableDestinations; 136 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>(); 137 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>(); 138 protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>()); 139 protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; 140 protected final CountDownLatch startedLatch = new CountDownLatch(2); 141 protected final CountDownLatch localStartedLatch = new CountDownLatch(1); 142 protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1); 143 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 144 protected NetworkBridgeConfiguration configuration; 145 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); 146 147 protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null}; 148 protected BrokerId remoteBrokerId; 149 150 protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics(); 151 152 private NetworkBridgeListener networkBridgeListener; 153 private boolean createdByDuplex; 154 private BrokerInfo localBrokerInfo; 155 private BrokerInfo remoteBrokerInfo; 156 157 private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed); 158 private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed); 159 160 private final AtomicBoolean started = new AtomicBoolean(); 161 private TransportConnection duplexInitiatingConnection; 162 private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean(); 163 protected BrokerService brokerService = null; 164 private ObjectName mbeanObjectName; 165 private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); 166 //Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads 167 private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor(); 168 private Transport duplexInboundLocalBroker = null; 169 private ProducerInfo duplexInboundLocalProducerInfo; 170 171 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 172 this.configuration = configuration; 173 this.localBroker = localBroker; 174 this.remoteBroker = remoteBroker; 175 } 176 177 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 178 this.localBrokerInfo = localBrokerInfo; 179 this.remoteBrokerInfo = remoteBrokerInfo; 180 this.duplexInitiatingConnection = connection; 181 start(); 182 serviceRemoteCommand(remoteBrokerInfo); 183 } 184 185 @Override 186 public void start() throws Exception { 187 if (started.compareAndSet(false, true)) { 188 189 if (brokerService == null) { 190 throw new IllegalArgumentException("BrokerService is null on " + this); 191 } 192 193 networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics()); 194 195 if (isDuplex()) { 196 duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker()); 197 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() { 198 199 @Override 200 public void onCommand(Object o) { 201 Command command = (Command) o; 202 serviceLocalCommand(command); 203 } 204 205 @Override 206 public void onException(IOException error) { 207 serviceLocalException(error); 208 } 209 }); 210 duplexInboundLocalBroker.start(); 211 } 212 213 localBroker.setTransportListener(new DefaultTransportListener() { 214 215 @Override 216 public void onCommand(Object o) { 217 Command command = (Command) o; 218 serviceLocalCommand(command); 219 } 220 221 @Override 222 public void onException(IOException error) { 223 if (!futureLocalBrokerInfo.isDone()) { 224 futureLocalBrokerInfo.cancel(true); 225 return; 226 } 227 serviceLocalException(error); 228 } 229 }); 230 231 remoteBroker.setTransportListener(new DefaultTransportListener() { 232 233 @Override 234 public void onCommand(Object o) { 235 Command command = (Command) o; 236 serviceRemoteCommand(command); 237 } 238 239 @Override 240 public void onException(IOException error) { 241 if (!futureRemoteBrokerInfo.isDone()) { 242 futureRemoteBrokerInfo.cancel(true); 243 return; 244 } 245 serviceRemoteException(error); 246 } 247 }); 248 249 remoteBroker.start(); 250 localBroker.start(); 251 252 if (!disposed.get()) { 253 try { 254 triggerStartAsyncNetworkBridgeCreation(); 255 } catch (IOException e) { 256 LOG.warn("Caught exception from remote start", e); 257 } 258 } else { 259 LOG.warn("Bridge was disposed before the start() method was fully executed."); 260 throw new TransportDisposedIOException(); 261 } 262 } 263 } 264 265 @Override 266 public void stop() throws Exception { 267 if (started.compareAndSet(true, false)) { 268 if (disposed.compareAndSet(false, true)) { 269 LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName); 270 271 futureRemoteBrokerInfo.cancel(true); 272 futureLocalBrokerInfo.cancel(true); 273 274 NetworkBridgeListener l = this.networkBridgeListener; 275 if (l != null) { 276 l.onStop(this); 277 } 278 try { 279 // local start complete 280 if (startedLatch.getCount() < 2) { 281 LOG.trace("{} unregister bridge ({}) to {}", new Object[]{ 282 configuration.getBrokerName(), this, remoteBrokerName 283 }); 284 brokerService.getBroker().removeBroker(null, remoteBrokerInfo); 285 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); 286 } 287 288 remoteBridgeStarted.set(false); 289 final CountDownLatch sendShutdown = new CountDownLatch(1); 290 291 brokerService.getTaskRunnerFactory().execute(new Runnable() { 292 @Override 293 public void run() { 294 try { 295 serialExecutor.shutdown(); 296 if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 297 List<Runnable> pendingTasks = serialExecutor.shutdownNow(); 298 LOG.info("pending tasks on stop {}", pendingTasks); 299 } 300 //Shutdown the syncExecutor, call countDown to make sure a thread can 301 //terminate if it is waiting 302 staticDestinationsLatch.countDown(); 303 syncExecutor.shutdown(); 304 if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 305 List<Runnable> pendingTasks = syncExecutor.shutdownNow(); 306 LOG.info("pending tasks on stop {}", pendingTasks); 307 } 308 localBroker.oneway(new ShutdownInfo()); 309 remoteBroker.oneway(new ShutdownInfo()); 310 } catch (Throwable e) { 311 LOG.debug("Caught exception sending shutdown", e); 312 } finally { 313 sendShutdown.countDown(); 314 } 315 316 } 317 }, "ActiveMQ ForwardingBridge StopTask"); 318 319 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 320 LOG.info("Network Could not shutdown in a timely manner"); 321 } 322 } finally { 323 ServiceStopper ss = new ServiceStopper(); 324 ss.stop(remoteBroker); 325 ss.stop(localBroker); 326 ss.stop(duplexInboundLocalBroker); 327 // Release the started Latch since another thread could be 328 // stuck waiting for it to start up. 329 startedLatch.countDown(); 330 startedLatch.countDown(); 331 localStartedLatch.countDown(); 332 staticDestinationsLatch.countDown(); 333 334 ss.throwFirstException(); 335 } 336 } 337 338 LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName); 339 } 340 } 341 342 protected void triggerStartAsyncNetworkBridgeCreation() throws IOException { 343 brokerService.getTaskRunnerFactory().execute(new Runnable() { 344 @Override 345 public void run() { 346 final String originalName = Thread.currentThread().getName(); 347 Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " + 348 "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); 349 350 try { 351 // First we collect the info data from both the local and remote ends 352 collectBrokerInfos(); 353 354 // Once we have all required broker info we can attempt to start 355 // the local and then remote sides of the bridge. 356 doStartLocalAndRemoteBridges(); 357 } finally { 358 Thread.currentThread().setName(originalName); 359 } 360 } 361 }); 362 } 363 364 private void collectBrokerInfos() { 365 int timeout = 30000; 366 TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class); 367 if (tcpTransport != null) { 368 timeout = tcpTransport.getConnectionTimeout(); 369 } 370 371 // First wait for the remote to feed us its BrokerInfo, then we can check on 372 // the LocalBrokerInfo and decide is this is a loop. 373 try { 374 remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS); 375 if (remoteBrokerInfo == null) { 376 serviceLocalException(new Throwable("remoteBrokerInfo is null")); 377 return; 378 } 379 } catch (Exception e) { 380 serviceRemoteException(e); 381 return; 382 } 383 384 try { 385 localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS); 386 if (localBrokerInfo == null) { 387 serviceLocalException(new Throwable("localBrokerInfo is null")); 388 return; 389 } 390 391 // Before we try and build the bridge lets check if we are in a loop 392 // and if so just stop now before registering anything. 393 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 394 if (localBrokerId.equals(remoteBrokerId)) { 395 LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{ 396 configuration.getBrokerName(), remoteBrokerName, remoteBrokerId 397 }); 398 ServiceSupport.dispose(localBroker); 399 ServiceSupport.dispose(remoteBroker); 400 // the bridge is left in a bit of limbo, but it won't get retried 401 // in this state. 402 return; 403 } 404 405 // Fill in the remote broker's information now. 406 remoteBrokerPath[0] = remoteBrokerId; 407 remoteBrokerName = remoteBrokerInfo.getBrokerName(); 408 if (configuration.isUseBrokerNamesAsIdSeed()) { 409 idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName); 410 } 411 } catch (Throwable e) { 412 serviceLocalException(e); 413 } 414 } 415 416 private void doStartLocalAndRemoteBridges() { 417 418 if (disposed.get()) { 419 return; 420 } 421 422 if (isCreatedByDuplex()) { 423 // apply remote (propagated) configuration to local duplex bridge before start 424 Properties props = null; 425 try { 426 props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 427 IntrospectionSupport.getProperties(configuration, props, null); 428 if (configuration.getExcludedDestinations() != null) { 429 excludedDestinations = configuration.getExcludedDestinations().toArray( 430 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 431 } 432 if (configuration.getStaticallyIncludedDestinations() != null) { 433 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 434 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 435 } 436 if (configuration.getDynamicallyIncludedDestinations() != null) { 437 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( 438 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); 439 } 440 } catch (Throwable t) { 441 LOG.error("Error mapping remote configuration: {}", props, t); 442 } 443 } 444 445 try { 446 startLocalBridge(); 447 } catch (Throwable e) { 448 serviceLocalException(e); 449 return; 450 } 451 452 try { 453 startRemoteBridge(); 454 } catch (Throwable e) { 455 serviceRemoteException(e); 456 return; 457 } 458 459 try { 460 if (safeWaitUntilStarted()) { 461 setupStaticDestinations(); 462 staticDestinationsLatch.countDown(); 463 } 464 } catch (Throwable e) { 465 serviceLocalException(e); 466 } 467 } 468 469 private void startLocalBridge() throws Throwable { 470 if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) { 471 synchronized (this) { 472 LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker); 473 if (!disposed.get()) { 474 475 if (idGenerator == null) { 476 throw new IllegalStateException("Id Generator cannot be null"); 477 } 478 479 localConnectionInfo = new ConnectionInfo(); 480 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 481 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 482 localConnectionInfo.setClientId(localClientId); 483 localConnectionInfo.setUserName(configuration.getUserName()); 484 localConnectionInfo.setPassword(configuration.getPassword()); 485 Transport originalTransport = remoteBroker; 486 while (originalTransport instanceof TransportFilter) { 487 originalTransport = ((TransportFilter) originalTransport).getNext(); 488 } 489 if (originalTransport instanceof SslTransport) { 490 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 491 localConnectionInfo.setTransportContext(peerCerts); 492 } 493 // sync requests that may fail 494 Object resp = localBroker.request(localConnectionInfo); 495 if (resp instanceof ExceptionResponse) { 496 throw ((ExceptionResponse) resp).getException(); 497 } 498 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 499 localBroker.oneway(localSessionInfo); 500 501 if (configuration.isDuplex()) { 502 // separate in-bound channel for forwards so we don't 503 // contend with out-bound dispatch on same connection 504 remoteBrokerInfo.setNetworkConnection(true); 505 duplexInboundLocalBroker.oneway(remoteBrokerInfo); 506 507 ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); 508 duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 509 duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" 510 + configuration.getBrokerName()); 511 duplexLocalConnectionInfo.setUserName(configuration.getUserName()); 512 duplexLocalConnectionInfo.setPassword(configuration.getPassword()); 513 514 if (originalTransport instanceof SslTransport) { 515 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 516 duplexLocalConnectionInfo.setTransportContext(peerCerts); 517 } 518 // sync requests that may fail 519 resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo); 520 if (resp instanceof ExceptionResponse) { 521 throw ((ExceptionResponse) resp).getException(); 522 } 523 SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1); 524 duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1); 525 duplexInboundLocalBroker.oneway(duplexInboundSession); 526 duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo); 527 } 528 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString()); 529 NetworkBridgeListener l = this.networkBridgeListener; 530 if (l != null) { 531 l.onStart(this); 532 } 533 534 // Let the local broker know the remote broker's ID. 535 localBroker.oneway(remoteBrokerInfo); 536 // new peer broker (a consumer can work with remote broker also) 537 brokerService.getBroker().addBroker(null, remoteBrokerInfo); 538 539 LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{ 540 localBroker, remoteBroker, remoteBrokerName 541 }); 542 LOG.trace("{} register bridge ({}) to {}", new Object[]{ 543 configuration.getBrokerName(), this, remoteBrokerName 544 }); 545 } else { 546 LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed."); 547 } 548 startedLatch.countDown(); 549 localStartedLatch.countDown(); 550 } 551 } 552 } 553 554 protected void startRemoteBridge() throws Exception { 555 if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) { 556 LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker); 557 synchronized (this) { 558 if (!isCreatedByDuplex()) { 559 BrokerInfo brokerInfo = new BrokerInfo(); 560 brokerInfo.setBrokerName(configuration.getBrokerName()); 561 brokerInfo.setBrokerURL(configuration.getBrokerURL()); 562 brokerInfo.setNetworkConnection(true); 563 brokerInfo.setDuplexConnection(configuration.isDuplex()); 564 // set our properties 565 Properties props = new Properties(); 566 IntrospectionSupport.getProperties(configuration, props, null); 567 568 String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations"; 569 String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations"; 570 571 if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) { 572 props.put(dynamicallyIncludedDestinationsKey, 573 StringToListOfActiveMQDestinationConverter. 574 convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true)); 575 } 576 if (!configuration.getStaticallyIncludedDestinations().isEmpty()) { 577 props.put(staticallyIncludedDestinationsKey, 578 StringToListOfActiveMQDestinationConverter. 579 convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true)); 580 } 581 582 props.remove("networkTTL"); 583 String str = MarshallingSupport.propertiesToString(props); 584 brokerInfo.setNetworkProperties(str); 585 brokerInfo.setBrokerId(this.localBrokerId); 586 remoteBroker.oneway(brokerInfo); 587 if (configuration.isSyncDurableSubs() && 588 remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { 589 remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService, 590 configuration)); 591 } 592 } 593 if (remoteConnectionInfo != null) { 594 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 595 } 596 remoteConnectionInfo = new ConnectionInfo(); 597 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 598 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); 599 remoteConnectionInfo.setUserName(configuration.getUserName()); 600 remoteConnectionInfo.setPassword(configuration.getPassword()); 601 remoteBroker.oneway(remoteConnectionInfo); 602 603 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 604 remoteBroker.oneway(remoteSessionInfo); 605 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 606 producerInfo.setResponseRequired(false); 607 remoteBroker.oneway(producerInfo); 608 // Listen to consumer advisory messages on the remote broker to determine demand. 609 if (!configuration.isStaticBridge()) { 610 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 611 // always dispatch advisory message asynchronously so that 612 // we never block the producer broker if we are slow 613 demandConsumerInfo.setDispatchAsync(true); 614 String advisoryTopic = configuration.getDestinationFilter(); 615 if (configuration.isBridgeTempDestinations()) { 616 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 617 } 618 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 619 configureConsumerPrefetch(demandConsumerInfo); 620 remoteBroker.oneway(demandConsumerInfo); 621 } 622 startedLatch.countDown(); 623 } 624 } 625 } 626 627 @Override 628 public void serviceRemoteException(Throwable error) { 629 if (!disposed.get()) { 630 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 631 LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 632 localBroker, remoteBroker, error 633 }); 634 } else { 635 LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 636 localBroker, remoteBroker, error 637 }); 638 } 639 LOG.debug("The remote Exception was: {}", error, error); 640 brokerService.getTaskRunnerFactory().execute(new Runnable() { 641 @Override 642 public void run() { 643 ServiceSupport.dispose(getControllingService()); 644 } 645 }); 646 fireBridgeFailed(error); 647 } 648 } 649 650 protected void serviceRemoteCommand(Command command) { 651 if (!disposed.get()) { 652 try { 653 if (command.isMessageDispatch()) { 654 safeWaitUntilStarted(); 655 MessageDispatch md = (MessageDispatch) command; 656 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 657 ackAdvisory(md.getMessage()); 658 } else if (command.isBrokerInfo()) { 659 futureRemoteBrokerInfo.set((BrokerInfo) command); 660 } else if (command instanceof BrokerSubscriptionInfo) { 661 final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command; 662 663 //Start in a new thread so we don't block the transport waiting for staticDestinations 664 syncExecutor.execute(new Runnable() { 665 666 @Override 667 public void run() { 668 try { 669 staticDestinationsLatch.await(); 670 //Make sure after the countDown of staticDestinationsLatch we aren't stopping 671 if (!disposed.get()) { 672 BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo; 673 LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}", 674 brokerService.getBrokerName(), subInfo.getBrokerName()); 675 676 if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions() 677 && !configuration.isDynamicOnly()) { 678 if (started.get()) { 679 if (subInfo.getSubscriptionInfos() != null) { 680 for (ConsumerInfo info : subInfo.getSubscriptionInfos()) { 681 //re-add any process any non-NC consumers that match the 682 //dynamicallyIncludedDestinations list 683 if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) && 684 NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) { 685 serviceRemoteConsumerAdvisory(info); 686 } 687 } 688 } 689 690 //After re-added, clean up any empty durables 691 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 692 DemandSubscription ds = i.next(); 693 if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) { 694 cleanupDurableSub(ds, i); 695 } 696 } 697 } 698 } 699 } 700 } catch (Exception e) { 701 LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e); 702 LOG.debug(e.getMessage(), e); 703 } 704 } 705 }); 706 707 } else if (command.getClass() == ConnectionError.class) { 708 ConnectionError ce = (ConnectionError) command; 709 serviceRemoteException(ce.getException()); 710 } else { 711 if (isDuplex()) { 712 LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType()); 713 if (command.isMessage()) { 714 final ActiveMQMessage message = (ActiveMQMessage) command; 715 if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 716 serviceRemoteConsumerAdvisory(message.getDataStructure()); 717 ackAdvisory(message); 718 } else { 719 if (!isPermissableDestination(message.getDestination(), true)) { 720 return; 721 } 722 // message being forwarded - we need to 723 // propagate the response to our local send 724 if (canDuplexDispatch(message)) { 725 message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); 726 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 727 duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { 728 final int correlationId = message.getCommandId(); 729 730 @Override 731 public void onCompletion(FutureResponse resp) { 732 try { 733 Response reply = resp.getResult(); 734 reply.setCorrelationId(correlationId); 735 remoteBroker.oneway(reply); 736 //increment counter when messages are received in duplex mode 737 networkBridgeStatistics.getReceivedCount().increment(); 738 } catch (IOException error) { 739 LOG.error("Exception: {} on duplex forward of: {}", error, message); 740 serviceRemoteException(error); 741 } 742 } 743 }); 744 } else { 745 duplexInboundLocalBroker.oneway(message); 746 networkBridgeStatistics.getReceivedCount().increment(); 747 } 748 serviceInboundMessage(message); 749 } else { 750 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 751 Response reply = new Response(); 752 reply.setCorrelationId(message.getCommandId()); 753 remoteBroker.oneway(reply); 754 } 755 } 756 } 757 } else { 758 switch (command.getDataStructureType()) { 759 case ConnectionInfo.DATA_STRUCTURE_TYPE: 760 if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { 761 // end of initiating connection setup - propogate to initial connection to get mbean by clientid 762 duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); 763 } else { 764 localBroker.oneway(command); 765 } 766 break; 767 case SessionInfo.DATA_STRUCTURE_TYPE: 768 localBroker.oneway(command); 769 break; 770 case ProducerInfo.DATA_STRUCTURE_TYPE: 771 // using duplexInboundLocalProducerInfo 772 break; 773 case MessageAck.DATA_STRUCTURE_TYPE: 774 MessageAck ack = (MessageAck) command; 775 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); 776 if (localSub != null) { 777 ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); 778 localBroker.oneway(ack); 779 } else { 780 LOG.warn("Matching local subscription not found for ack: {}", ack); 781 } 782 break; 783 case ConsumerInfo.DATA_STRUCTURE_TYPE: 784 localStartedLatch.await(); 785 if (started.get()) { 786 addConsumerInfo((ConsumerInfo) command); 787 } else { 788 // received a subscription whilst stopping 789 LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); 790 } 791 break; 792 case ShutdownInfo.DATA_STRUCTURE_TYPE: 793 // initiator is shutting down, controlled case 794 // abortive close dealt with by inactivity monitor 795 LOG.info("Stopping network bridge on shutdown of remote broker"); 796 serviceRemoteException(new IOException(command.toString())); 797 break; 798 default: 799 LOG.debug("Ignoring remote command: {}", command); 800 } 801 } 802 } else { 803 switch (command.getDataStructureType()) { 804 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 805 case WireFormatInfo.DATA_STRUCTURE_TYPE: 806 case ShutdownInfo.DATA_STRUCTURE_TYPE: 807 break; 808 default: 809 LOG.warn("Unexpected remote command: {}", command); 810 } 811 } 812 } 813 } catch (Throwable e) { 814 LOG.debug("Exception processing remote command: {}", command, e); 815 serviceRemoteException(e); 816 } 817 } 818 } 819 820 private void ackAdvisory(Message message) throws IOException { 821 demandConsumerDispatched++; 822 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * 823 (configuration.getAdvisoryAckPercentage() / 100f))) { 824 MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched); 825 ack.setConsumerId(demandConsumerInfo.getConsumerId()); 826 remoteBroker.oneway(ack); 827 demandConsumerDispatched = 0; 828 } 829 } 830 831 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 832 final int networkTTL = configuration.getConsumerTTL(); 833 if (data.getClass() == ConsumerInfo.class) { 834 // Create a new local subscription 835 ConsumerInfo info = (ConsumerInfo) data; 836 BrokerId[] path = info.getBrokerPath(); 837 838 if (info.isBrowser()) { 839 LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName); 840 return; 841 } 842 843 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 844 LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{ 845 configuration.getBrokerName(), remoteBrokerName, networkTTL, info 846 }); 847 return; 848 } 849 850 if (contains(path, localBrokerPath[0])) { 851 // Ignore this consumer as it's a consumer we locally sent to the broker. 852 LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{ 853 configuration.getBrokerName(), remoteBrokerName, info 854 }); 855 return; 856 } 857 858 if (!isPermissableDestination(info.getDestination())) { 859 // ignore if not in the permitted or in the excluded list 860 LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{ 861 configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info 862 }); 863 return; 864 } 865 866 // in a cyclic network there can be multiple bridges per broker that can propagate 867 // a network subscription so there is a need to synchronize on a shared entity 868 synchronized (brokerService.getVmConnectorURI()) { 869 addConsumerInfo(info); 870 } 871 } else if (data.getClass() == DestinationInfo.class) { 872 // It's a destination info - we want to pass up information about temporary destinations 873 final DestinationInfo destInfo = (DestinationInfo) data; 874 BrokerId[] path = destInfo.getBrokerPath(); 875 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 876 LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{ 877 configuration.getBrokerName(), destInfo, networkTTL 878 }); 879 return; 880 } 881 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 882 LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo); 883 return; 884 } 885 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 886 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 887 // re-set connection id so comes from here 888 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 889 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 890 } 891 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 892 LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{ 893 configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo 894 }); 895 if (destInfo.isRemoveOperation()) { 896 // Serialize with removeSub operations such that all removeSub advisories 897 // are generated 898 serialExecutor.execute(new Runnable() { 899 @Override 900 public void run() { 901 try { 902 localBroker.oneway(destInfo); 903 } catch (IOException e) { 904 LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e); 905 } 906 } 907 }); 908 } else { 909 localBroker.oneway(destInfo); 910 } 911 } else if (data.getClass() == RemoveInfo.class) { 912 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 913 removeDemandSubscription(id); 914 915 if (forcedDurableRemoteId.remove(id)) { 916 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 917 DemandSubscription ds = i.next(); 918 boolean removed = ds.removeForcedDurableConsumer(id); 919 if (removed) { 920 cleanupDurableSub(ds, i); 921 } 922 } 923 } 924 925 } else if (data.getClass() == RemoveSubscriptionInfo.class) { 926 RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); 927 SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); 928 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 929 DemandSubscription ds = i.next(); 930 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); 931 if (removed) { 932 cleanupDurableSub(ds, i); 933 } 934 } 935 } 936 } 937 938 private void cleanupDurableSub(final DemandSubscription ds, 939 Iterator<DemandSubscription> i) throws IOException { 940 if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty() 941 && ds.getForcedDurableConsumersSize() == 0) { 942 // deactivate subscriber 943 RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); 944 localBroker.oneway(removeInfo); 945 946 // remove subscriber 947 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 948 sending.setClientId(localClientId); 949 sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName()); 950 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 951 localBroker.oneway(sending); 952 953 //remove subscriber from map 954 i.remove(); 955 } 956 } 957 958 @Override 959 public void serviceLocalException(Throwable error) { 960 serviceLocalException(null, error); 961 } 962 963 public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { 964 LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error); 965 if (!disposed.get()) { 966 if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { 967 // not a reason to terminate the bridge - temps can disappear with 968 // pending sends as the demand sub may outlive the remote dest 969 if (messageDispatch != null) { 970 LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); 971 try { 972 MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); 973 poisonAck.setPoisonCause(error); 974 localBroker.oneway(poisonAck); 975 } catch (IOException ioe) { 976 LOG.error("Failed to posion ack message following forward failure: ", ioe); 977 } 978 fireFailedForwardAdvisory(messageDispatch, error); 979 } else { 980 LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error); 981 } 982 return; 983 } 984 985 LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error}); 986 LOG.debug("The local Exception was: {}", error, error); 987 988 brokerService.getTaskRunnerFactory().execute(new Runnable() { 989 @Override 990 public void run() { 991 ServiceSupport.dispose(getControllingService()); 992 } 993 }); 994 fireBridgeFailed(error); 995 } 996 } 997 998 private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) { 999 if (configuration.isAdvisoryForFailedForward()) { 1000 AdvisoryBroker advisoryBroker = null; 1001 try { 1002 advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); 1003 1004 if (advisoryBroker != null) { 1005 ConnectionContext context = new ConnectionContext(); 1006 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 1007 context.setBroker(brokerService.getBroker()); 1008 1009 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 1010 advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); 1011 advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, 1012 advisoryMessage); 1013 1014 } 1015 } catch (Exception e) { 1016 LOG.warn("failed to fire forward failure advisory, cause: {}", e); 1017 LOG.debug("detail", e); 1018 } 1019 } 1020 } 1021 1022 protected Service getControllingService() { 1023 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 1024 } 1025 1026 protected void addSubscription(DemandSubscription sub) throws IOException { 1027 if (sub != null) { 1028 if (isDuplex()) { 1029 // async vm transport, need to wait for completion 1030 localBroker.request(sub.getLocalInfo()); 1031 } else { 1032 localBroker.oneway(sub.getLocalInfo()); 1033 } 1034 } 1035 } 1036 1037 protected void removeSubscription(final DemandSubscription sub) throws IOException { 1038 if (sub != null) { 1039 LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()}); 1040 1041 // ensure not available for conduit subs pending removal 1042 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1043 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1044 1045 // continue removal in separate thread to free up this thread for outstanding responses 1046 // Serialize with removeDestination operations so that removeSubs are serialized with 1047 // removeDestinations such that all removeSub advisories are generated 1048 serialExecutor.execute(new Runnable() { 1049 @Override 1050 public void run() { 1051 sub.waitForCompletion(); 1052 try { 1053 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 1054 } catch (IOException e) { 1055 LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e); 1056 } 1057 } 1058 }); 1059 } 1060 } 1061 1062 protected Message configureMessage(MessageDispatch md) throws IOException { 1063 Message message = md.getMessage().copy(); 1064 // Update the packet to show where it came from. 1065 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 1066 message.setProducerId(producerInfo.getProducerId()); 1067 message.setDestination(md.getDestination()); 1068 message.setMemoryUsage(null); 1069 if (message.getOriginalTransactionId() == null) { 1070 message.setOriginalTransactionId(message.getTransactionId()); 1071 } 1072 message.setTransactionId(null); 1073 if (configuration.isUseCompression()) { 1074 message.compress(); 1075 } 1076 return message; 1077 } 1078 1079 protected void serviceLocalCommand(Command command) { 1080 if (!disposed.get()) { 1081 try { 1082 if (command.isMessageDispatch()) { 1083 safeWaitUntilStarted(); 1084 networkBridgeStatistics.getEnqueues().increment(); 1085 final MessageDispatch md = (MessageDispatch) command; 1086 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 1087 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 1088 1089 if (suppressMessageDispatch(md, sub)) { 1090 LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{ 1091 configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage() 1092 }); 1093 // still ack as it may be durable 1094 try { 1095 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1096 } finally { 1097 sub.decrementOutstandingResponses(); 1098 } 1099 return; 1100 } 1101 1102 Message message = configureMessage(md); 1103 LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{ 1104 configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId()) 1105 }); 1106 if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 1107 try { 1108 // never request b/c they are eventually acked async 1109 remoteBroker.oneway(message); 1110 } finally { 1111 sub.decrementOutstandingResponses(); 1112 } 1113 return; 1114 } 1115 if (isPermissableDestination(md.getDestination())) { 1116 if (message.isPersistent() || configuration.isAlwaysSyncSend()) { 1117 1118 // The message was not sent using async send, so we should only 1119 // ack the local broker when we get confirmation that the remote 1120 // broker has received the message. 1121 remoteBroker.asyncRequest(message, new ResponseCallback() { 1122 @Override 1123 public void onCompletion(FutureResponse future) { 1124 try { 1125 Response response = future.getResult(); 1126 if (response.isException()) { 1127 ExceptionResponse er = (ExceptionResponse) response; 1128 serviceLocalException(md, er.getException()); 1129 } else { 1130 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1131 networkBridgeStatistics.getDequeues().increment(); 1132 } 1133 } catch (IOException e) { 1134 serviceLocalException(md, e); 1135 } finally { 1136 sub.decrementOutstandingResponses(); 1137 } 1138 } 1139 }); 1140 1141 } else { 1142 // If the message was originally sent using async send, we will 1143 // preserve that QOS by bridging it using an async send (small chance 1144 // of message loss). 1145 try { 1146 remoteBroker.oneway(message); 1147 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1148 networkBridgeStatistics.getDequeues().increment(); 1149 } finally { 1150 sub.decrementOutstandingResponses(); 1151 } 1152 } 1153 serviceOutbound(message); 1154 } 1155 } else { 1156 LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); 1157 } 1158 } else if (command.isBrokerInfo()) { 1159 futureLocalBrokerInfo.set((BrokerInfo) command); 1160 } else if (command.isShutdownInfo()) { 1161 LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName()); 1162 stop(); 1163 } else if (command.getClass() == ConnectionError.class) { 1164 ConnectionError ce = (ConnectionError) command; 1165 serviceLocalException(ce.getException()); 1166 } else { 1167 switch (command.getDataStructureType()) { 1168 case WireFormatInfo.DATA_STRUCTURE_TYPE: 1169 break; 1170 case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE: 1171 break; 1172 default: 1173 LOG.warn("Unexpected local command: {}", command); 1174 } 1175 } 1176 } catch (Throwable e) { 1177 LOG.warn("Caught an exception processing local command", e); 1178 serviceLocalException(e); 1179 } 1180 } 1181 } 1182 1183 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { 1184 boolean suppress = false; 1185 // for durable subs, suppression via filter leaves dangling acks so we 1186 // need to check here and allow the ack irrespective 1187 if (sub.getLocalInfo().isDurable()) { 1188 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); 1189 messageEvalContext.setMessageReference(md.getMessage()); 1190 messageEvalContext.setDestination(md.getDestination()); 1191 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); 1192 //AMQ-6465 - Need to decrement the reference count after checking matches() as 1193 //the call above will increment the reference count by 1 1194 messageEvalContext.getMessageReference().decrementReferenceCount(); 1195 } 1196 return suppress; 1197 } 1198 1199 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 1200 if (brokerPath != null) { 1201 for (BrokerId id : brokerPath) { 1202 if (brokerId.equals(id)) { 1203 return true; 1204 } 1205 } 1206 } 1207 return false; 1208 } 1209 1210 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 1211 if (brokerPath == null || brokerPath.length == 0) { 1212 return pathsToAppend; 1213 } 1214 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 1215 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1216 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 1217 return rc; 1218 } 1219 1220 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 1221 if (brokerPath == null || brokerPath.length == 0) { 1222 return new BrokerId[]{idToAppend}; 1223 } 1224 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 1225 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1226 rc[brokerPath.length] = idToAppend; 1227 return rc; 1228 } 1229 1230 protected boolean isPermissableDestination(ActiveMQDestination destination) { 1231 return isPermissableDestination(destination, false); 1232 } 1233 1234 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 1235 // Are we not bridging temporary destinations? 1236 if (destination.isTemporary()) { 1237 if (allowTemporary) { 1238 return true; 1239 } else { 1240 return configuration.isBridgeTempDestinations(); 1241 } 1242 } 1243 1244 ActiveMQDestination[] dests = excludedDestinations; 1245 if (dests != null && dests.length > 0) { 1246 for (ActiveMQDestination dest : dests) { 1247 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest); 1248 if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1249 return false; 1250 } 1251 } 1252 } 1253 1254 dests = staticallyIncludedDestinations; 1255 if (dests != null && dests.length > 0) { 1256 for (ActiveMQDestination dest : dests) { 1257 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1258 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1259 return true; 1260 } 1261 } 1262 } 1263 1264 dests = dynamicallyIncludedDestinations; 1265 if (dests != null && dests.length > 0) { 1266 for (ActiveMQDestination dest : dests) { 1267 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1268 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1269 return true; 1270 } 1271 } 1272 1273 return false; 1274 } 1275 1276 return true; 1277 } 1278 1279 /** 1280 * Subscriptions for these destinations are always created 1281 */ 1282 protected void setupStaticDestinations() { 1283 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1284 if (dests != null) { 1285 for (ActiveMQDestination dest : dests) { 1286 if (isPermissableDestination(dest)) { 1287 DemandSubscription sub = createDemandSubscription(dest, null); 1288 sub.setStaticallyIncluded(true); 1289 try { 1290 addSubscription(sub); 1291 } catch (IOException e) { 1292 LOG.error("Failed to add static destination {}", dest, e); 1293 } 1294 LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest); 1295 } else { 1296 LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest); 1297 } 1298 } 1299 } 1300 } 1301 1302 protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 1303 ConsumerInfo info = consumerInfo.copy(); 1304 addRemoteBrokerToBrokerPath(info); 1305 DemandSubscription sub = createDemandSubscription(info); 1306 if (sub != null) { 1307 if (duplicateSuppressionIsRequired(sub)) { 1308 undoMapRegistration(sub); 1309 } else { 1310 if (consumerInfo.isDurable()) { 1311 sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); 1312 } 1313 addSubscription(sub); 1314 LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub); 1315 } 1316 } 1317 } 1318 1319 private void undoMapRegistration(DemandSubscription sub) { 1320 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1321 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1322 } 1323 1324 /* 1325 * check our existing subs networkConsumerIds against the list of network 1326 * ids in this subscription A match means a duplicate which we suppress for 1327 * topics and maybe for queues 1328 */ 1329 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 1330 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 1331 boolean suppress = false; 1332 1333 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic() 1334 && !configuration.isSuppressDuplicateTopicSubscriptions()) { 1335 return suppress; 1336 } 1337 1338 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 1339 Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination()); 1340 for (Subscription sub : currentSubs) { 1341 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 1342 if (!networkConsumers.isEmpty()) { 1343 if (matchFound(candidateConsumers, networkConsumers)) { 1344 if (isInActiveDurableSub(sub)) { 1345 suppress = false; 1346 } else { 1347 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 1348 } 1349 break; 1350 } 1351 } 1352 } 1353 return suppress; 1354 } 1355 1356 private boolean isInActiveDurableSub(Subscription sub) { 1357 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); 1358 } 1359 1360 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1361 boolean suppress = false; 1362 1363 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1364 LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{ 1365 configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds() 1366 }); 1367 suppress = true; 1368 } else { 1369 // remove the existing lower priority duplicate and allow this candidate 1370 try { 1371 removeDuplicateSubscription(existingSub); 1372 1373 LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{ 1374 configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds() 1375 }); 1376 } catch (IOException e) { 1377 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e); 1378 } 1379 } 1380 return suppress; 1381 } 1382 1383 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1384 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1385 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1386 break; 1387 } 1388 } 1389 } 1390 1391 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1392 boolean found = false; 1393 for (ConsumerId aliasConsumer : networkConsumers) { 1394 if (candidateConsumers.contains(aliasConsumer)) { 1395 found = true; 1396 break; 1397 } 1398 } 1399 return found; 1400 } 1401 1402 protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) { 1403 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker(); 1404 Region region; 1405 Collection<Subscription> subs; 1406 1407 region = null; 1408 switch (dest.getDestinationType()) { 1409 case ActiveMQDestination.QUEUE_TYPE: 1410 region = region_broker.getQueueRegion(); 1411 break; 1412 case ActiveMQDestination.TOPIC_TYPE: 1413 region = region_broker.getTopicRegion(); 1414 break; 1415 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1416 region = region_broker.getTempQueueRegion(); 1417 break; 1418 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1419 region = region_broker.getTempTopicRegion(); 1420 break; 1421 } 1422 1423 if (region instanceof AbstractRegion) { 1424 subs = ((AbstractRegion) region).getSubscriptions().values(); 1425 } else { 1426 subs = null; 1427 } 1428 1429 return subs; 1430 } 1431 1432 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1433 // add our original id to ourselves 1434 info.addNetworkConsumerId(info.getConsumerId()); 1435 return doCreateDemandSubscription(info); 1436 } 1437 1438 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1439 DemandSubscription result = new DemandSubscription(info); 1440 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1441 if (info.getDestination().isTemporary()) { 1442 // reset the local connection Id 1443 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1444 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1445 } 1446 1447 if (configuration.isDecreaseNetworkConsumerPriority()) { 1448 byte priority = (byte) configuration.getConsumerPriorityBase(); 1449 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1450 // The longer the path to the consumer, the less it's consumer priority. 1451 priority -= info.getBrokerPath().length + 1; 1452 } 1453 result.getLocalInfo().setPriority(priority); 1454 LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info}); 1455 } 1456 configureDemandSubscription(info, result); 1457 return result; 1458 } 1459 1460 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) { 1461 ConsumerInfo info = new ConsumerInfo(); 1462 info.setNetworkSubscription(true); 1463 info.setDestination(destination); 1464 1465 if (subscriptionName != null) { 1466 info.setSubscriptionName(subscriptionName); 1467 } 1468 1469 // Indicate that this subscription is being made on behalf of the remote broker. 1470 info.setBrokerPath(new BrokerId[]{remoteBrokerId}); 1471 1472 // the remote info held by the DemandSubscription holds the original 1473 // consumerId, the local info get's overwritten 1474 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1475 DemandSubscription result = null; 1476 try { 1477 result = createDemandSubscription(info); 1478 } catch (IOException e) { 1479 LOG.error("Failed to create DemandSubscription ", e); 1480 } 1481 return result; 1482 } 1483 1484 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1485 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) || 1486 AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { 1487 sub.getLocalInfo().setDispatchAsync(true); 1488 } else { 1489 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1490 } 1491 configureConsumerPrefetch(sub.getLocalInfo()); 1492 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1493 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1494 1495 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info)); 1496 if (!info.isDurable()) { 1497 // This works for now since we use a VM connection to the local broker. 1498 // may need to change if we ever subscribe to a remote broker. 1499 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); 1500 } else { 1501 sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); 1502 } 1503 } 1504 1505 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1506 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1507 LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{ 1508 configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub 1509 }); 1510 if (sub != null) { 1511 removeSubscription(sub); 1512 LOG.debug("{} removed sub on {} from {}: {}", new Object[]{ 1513 configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo() 1514 }); 1515 } 1516 } 1517 1518 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1519 boolean removeDone = false; 1520 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1521 if (sub != null) { 1522 try { 1523 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1524 removeDone = true; 1525 } catch (IOException e) { 1526 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e); 1527 } 1528 } 1529 return removeDone; 1530 } 1531 1532 /** 1533 * Performs a timed wait on the started latch and then checks for disposed 1534 * before performing another wait each time the the started wait times out. 1535 */ 1536 protected boolean safeWaitUntilStarted() throws InterruptedException { 1537 while (!disposed.get()) { 1538 if (startedLatch.await(1, TimeUnit.SECONDS)) { 1539 break; 1540 } 1541 } 1542 return !disposed.get(); 1543 } 1544 1545 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 1546 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory; 1547 if (brokerService != null && brokerService.getDestinationPolicy() != null) { 1548 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination()); 1549 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) { 1550 filterFactory = entry.getNetworkBridgeFilterFactory(); 1551 } 1552 } 1553 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL()); 1554 } 1555 1556 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 1557 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); 1558 } 1559 1560 protected BrokerId[] getRemoteBrokerPath() { 1561 return remoteBrokerPath; 1562 } 1563 1564 @Override 1565 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1566 this.networkBridgeListener = listener; 1567 } 1568 1569 private void fireBridgeFailed(Throwable reason) { 1570 LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason); 1571 NetworkBridgeListener l = this.networkBridgeListener; 1572 if (l != null && this.bridgeFailed.compareAndSet(false, true)) { 1573 l.bridgeFailed(); 1574 } 1575 } 1576 1577 /** 1578 * @return Returns the dynamicallyIncludedDestinations. 1579 */ 1580 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 1581 return dynamicallyIncludedDestinations; 1582 } 1583 1584 /** 1585 * @param dynamicallyIncludedDestinations 1586 * The dynamicallyIncludedDestinations to set. 1587 */ 1588 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 1589 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 1590 } 1591 1592 /** 1593 * @return Returns the excludedDestinations. 1594 */ 1595 public ActiveMQDestination[] getExcludedDestinations() { 1596 return excludedDestinations; 1597 } 1598 1599 /** 1600 * @param excludedDestinations The excludedDestinations to set. 1601 */ 1602 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 1603 this.excludedDestinations = excludedDestinations; 1604 } 1605 1606 /** 1607 * @return Returns the staticallyIncludedDestinations. 1608 */ 1609 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 1610 return staticallyIncludedDestinations; 1611 } 1612 1613 /** 1614 * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. 1615 */ 1616 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 1617 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 1618 } 1619 1620 /** 1621 * @return Returns the durableDestinations. 1622 */ 1623 public ActiveMQDestination[] getDurableDestinations() { 1624 return durableDestinations; 1625 } 1626 1627 /** 1628 * @param durableDestinations The durableDestinations to set. 1629 */ 1630 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 1631 this.durableDestinations = durableDestinations; 1632 } 1633 1634 /** 1635 * @return Returns the localBroker. 1636 */ 1637 public Transport getLocalBroker() { 1638 return localBroker; 1639 } 1640 1641 /** 1642 * @return Returns the remoteBroker. 1643 */ 1644 public Transport getRemoteBroker() { 1645 return remoteBroker; 1646 } 1647 1648 /** 1649 * @return the createdByDuplex 1650 */ 1651 public boolean isCreatedByDuplex() { 1652 return this.createdByDuplex; 1653 } 1654 1655 /** 1656 * @param createdByDuplex the createdByDuplex to set 1657 */ 1658 public void setCreatedByDuplex(boolean createdByDuplex) { 1659 this.createdByDuplex = createdByDuplex; 1660 } 1661 1662 @Override 1663 public String getRemoteAddress() { 1664 return remoteBroker.getRemoteAddress(); 1665 } 1666 1667 @Override 1668 public String getLocalAddress() { 1669 return localBroker.getRemoteAddress(); 1670 } 1671 1672 @Override 1673 public String getRemoteBrokerName() { 1674 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1675 } 1676 1677 @Override 1678 public String getRemoteBrokerId() { 1679 return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString(); 1680 } 1681 1682 @Override 1683 public String getLocalBrokerName() { 1684 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1685 } 1686 1687 @Override 1688 public long getDequeueCounter() { 1689 return networkBridgeStatistics.getDequeues().getCount(); 1690 } 1691 1692 @Override 1693 public long getEnqueueCounter() { 1694 return networkBridgeStatistics.getEnqueues().getCount(); 1695 } 1696 1697 @Override 1698 public NetworkBridgeStatistics getNetworkBridgeStatistics() { 1699 return networkBridgeStatistics; 1700 } 1701 1702 protected boolean isDuplex() { 1703 return configuration.isDuplex() || createdByDuplex; 1704 } 1705 1706 public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() { 1707 return subscriptionMapByRemoteId; 1708 } 1709 1710 @Override 1711 public void setBrokerService(BrokerService brokerService) { 1712 this.brokerService = brokerService; 1713 this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); 1714 localBrokerPath[0] = localBrokerId; 1715 } 1716 1717 @Override 1718 public void setMbeanObjectName(ObjectName objectName) { 1719 this.mbeanObjectName = objectName; 1720 } 1721 1722 @Override 1723 public ObjectName getMbeanObjectName() { 1724 return mbeanObjectName; 1725 } 1726 1727 @Override 1728 public void resetStats() { 1729 networkBridgeStatistics.reset(); 1730 } 1731 1732 /* 1733 * Used to allow for async tasks to await receipt of the BrokerInfo from the local and 1734 * remote sides of the network bridge. 1735 */ 1736 private static class FutureBrokerInfo implements Future<BrokerInfo> { 1737 1738 private final CountDownLatch slot = new CountDownLatch(1); 1739 private final AtomicBoolean disposed; 1740 private volatile BrokerInfo info = null; 1741 1742 public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) { 1743 this.info = info; 1744 this.disposed = disposed; 1745 } 1746 1747 @Override 1748 public boolean cancel(boolean mayInterruptIfRunning) { 1749 slot.countDown(); 1750 return true; 1751 } 1752 1753 @Override 1754 public boolean isCancelled() { 1755 return slot.getCount() == 0 && info == null; 1756 } 1757 1758 @Override 1759 public boolean isDone() { 1760 return info != null; 1761 } 1762 1763 @Override 1764 public BrokerInfo get() throws InterruptedException, ExecutionException { 1765 try { 1766 if (info == null) { 1767 while (!disposed.get()) { 1768 if (slot.await(1, TimeUnit.SECONDS)) { 1769 break; 1770 } 1771 } 1772 } 1773 return info; 1774 } catch (InterruptedException e) { 1775 Thread.currentThread().interrupt(); 1776 LOG.debug("Operation interrupted: {}", e, e); 1777 throw new InterruptedException("Interrupted."); 1778 } 1779 } 1780 1781 @Override 1782 public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 1783 try { 1784 if (info == null) { 1785 long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 1786 1787 while (!disposed.get() || System.currentTimeMillis() < deadline) { 1788 if (slot.await(1, TimeUnit.MILLISECONDS)) { 1789 break; 1790 } 1791 } 1792 if (info == null) { 1793 throw new TimeoutException(); 1794 } 1795 } 1796 return info; 1797 } catch (InterruptedException e) { 1798 throw new InterruptedException("Interrupted."); 1799 } 1800 } 1801 1802 public void set(BrokerInfo info) { 1803 this.info = info; 1804 this.slot.countDown(); 1805 } 1806 } 1807 1808 protected void serviceOutbound(Message message) { 1809 NetworkBridgeListener l = this.networkBridgeListener; 1810 if (l != null) { 1811 l.onOutboundMessage(this, message); 1812 } 1813 } 1814 1815 protected void serviceInboundMessage(Message message) { 1816 NetworkBridgeListener l = this.networkBridgeListener; 1817 if (l != null) { 1818 l.onInboundMessage(this, message); 1819 } 1820 } 1821 1822 protected boolean canDuplexDispatch(Message message) { 1823 boolean result = true; 1824 if (configuration.isCheckDuplicateMessagesOnDuplex()){ 1825 final long producerSequenceId = message.getMessageId().getProducerSequenceId(); 1826 // messages are multiplexed on this producer so we need to query the persistenceAdapter 1827 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId()); 1828 if (producerSequenceId <= lastStoredForMessageProducer) { 1829 result = false; 1830 LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ 1831 (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer 1832 }); 1833 } 1834 } 1835 return result; 1836 } 1837 1838 protected long getStoredSequenceIdForMessage(MessageId messageId) { 1839 try { 1840 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 1841 } catch (IOException ignored) { 1842 LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); 1843 } 1844 return -1; 1845 } 1846 1847 protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) { 1848 //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly 1849 //set then use it, else default to the prefetchSize setting 1850 if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) && 1851 configuration.getAdvisoryPrefetchSize() > 0) { 1852 consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize()); 1853 } else { 1854 consumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 1855 } 1856 } 1857 1858}