001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.failover; 018 019import java.io.BufferedReader; 020import java.io.FileReader; 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.InterruptedIOException; 024import java.net.InetAddress; 025import java.net.MalformedURLException; 026import java.net.URI; 027import java.net.URL; 028import java.util.ArrayList; 029import java.util.Collections; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.StringTokenizer; 036import java.util.concurrent.CopyOnWriteArrayList; 037import java.util.concurrent.atomic.AtomicReference; 038 039import org.apache.activemq.broker.SslContext; 040import org.apache.activemq.command.Command; 041import org.apache.activemq.command.ConnectionControl; 042import org.apache.activemq.command.ConnectionId; 043import org.apache.activemq.command.RemoveInfo; 044import org.apache.activemq.command.Response; 045import org.apache.activemq.state.ConnectionStateTracker; 046import org.apache.activemq.state.Tracked; 047import org.apache.activemq.thread.DefaultThreadPools; 048import org.apache.activemq.thread.Task; 049import org.apache.activemq.thread.TaskRunner; 050import org.apache.activemq.transport.CompositeTransport; 051import org.apache.activemq.transport.DefaultTransportListener; 052import org.apache.activemq.transport.FutureResponse; 053import org.apache.activemq.transport.ResponseCallback; 054import org.apache.activemq.transport.Transport; 055import org.apache.activemq.transport.TransportFactory; 056import org.apache.activemq.transport.TransportListener; 057import org.apache.activemq.util.IOExceptionSupport; 058import org.apache.activemq.util.ServiceSupport; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * A Transport that is made reliable by being able to fail over to another 064 * transport when a transport failure is detected. 065 */ 066public class FailoverTransport implements CompositeTransport { 067 068 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class); 069 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10; 070 private static final int INFINITE = -1; 071 private TransportListener transportListener; 072 private boolean disposed; 073 private boolean connected; 074 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); 075 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>(); 076 077 private final Object reconnectMutex = new Object(); 078 private final Object backupMutex = new Object(); 079 private final Object sleepMutex = new Object(); 080 private final Object listenerMutex = new Object(); 081 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 082 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>(); 083 084 private URI connectedTransportURI; 085 private URI failedConnectTransportURI; 086 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); 087 private final TaskRunner reconnectTask; 088 private boolean started; 089 private boolean initialized; 090 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 091 private long maxReconnectDelay = 1000 * 30; 092 private double backOffMultiplier = 2d; 093 private long timeout = INFINITE; 094 private boolean useExponentialBackOff = true; 095 private boolean randomize = true; 096 private int maxReconnectAttempts = INFINITE; 097 private int startupMaxReconnectAttempts = INFINITE; 098 private int connectFailures; 099 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 100 private Exception connectionFailure; 101 private boolean firstConnection = true; 102 // optionally always have a backup created 103 private boolean backup = false; 104 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>(); 105 private int backupPoolSize = 1; 106 private boolean trackMessages = false; 107 private boolean trackTransactionProducers = true; 108 private int maxCacheSize = 128 * 1024; 109 private final TransportListener disposedListener = new DefaultTransportListener() { 110 }; 111 private final TransportListener myTransportListener = createTransportListener(); 112 private boolean updateURIsSupported = true; 113 private boolean reconnectSupported = true; 114 // remember for reconnect thread 115 private SslContext brokerSslContext; 116 private String updateURIsURL = null; 117 private boolean rebalanceUpdateURIs = true; 118 private boolean doRebalance = false; 119 private boolean connectedToPriority = false; 120 121 private boolean priorityBackup = false; 122 private ArrayList<URI> priorityList = new ArrayList<URI>(); 123 private boolean priorityBackupAvailable = false; 124 125 public FailoverTransport() throws InterruptedIOException { 126 brokerSslContext = SslContext.getCurrentSslContext(); 127 stateTracker.setTrackTransactions(true); 128 // Setup a task that is used to reconnect the a connection async. 129 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { 130 public boolean iterate() { 131 boolean result = false; 132 if (!started) { 133 return result; 134 } 135 boolean buildBackup = true; 136 synchronized (backupMutex) { 137 if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) { 138 result = doReconnect(); 139 buildBackup = false; 140 connectedToPriority = isPriority(connectedTransportURI); 141 } 142 } 143 if (buildBackup) { 144 buildBackups(); 145 if (priorityBackup && !connectedToPriority) { 146 try { 147 doDelay(); 148 if (reconnectTask == null) { 149 return true; 150 } 151 reconnectTask.wakeup(); 152 } catch (InterruptedException e) { 153 LOG.debug("Reconnect task has been interrupted.", e); 154 } 155 } 156 } else { 157 // build backups on the next iteration 158 buildBackup = true; 159 try { 160 if (reconnectTask == null) { 161 return true; 162 } 163 reconnectTask.wakeup(); 164 } catch (InterruptedException e) { 165 LOG.debug("Reconnect task has been interrupted.", e); 166 } 167 } 168 return result; 169 } 170 171 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); 172 } 173 174 TransportListener createTransportListener() { 175 return new TransportListener() { 176 public void onCommand(Object o) { 177 Command command = (Command) o; 178 if (command == null) { 179 return; 180 } 181 if (command.isResponse()) { 182 Object object = null; 183 synchronized (requestMap) { 184 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); 185 } 186 if (object != null && object.getClass() == Tracked.class) { 187 ((Tracked) object).onResponses(command); 188 } 189 } 190 if (!initialized) { 191 initialized = true; 192 } 193 194 if (command.isConnectionControl()) { 195 handleConnectionControl((ConnectionControl) command); 196 } 197 if (transportListener != null) { 198 transportListener.onCommand(command); 199 } 200 } 201 202 public void onException(IOException error) { 203 try { 204 handleTransportFailure(error); 205 } catch (InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 transportListener.onException(new InterruptedIOException()); 208 } 209 } 210 211 public void transportInterupted() { 212 if (transportListener != null) { 213 transportListener.transportInterupted(); 214 } 215 } 216 217 public void transportResumed() { 218 if (transportListener != null) { 219 transportListener.transportResumed(); 220 } 221 } 222 }; 223 } 224 225 public final void disposeTransport(Transport transport) { 226 transport.setTransportListener(disposedListener); 227 ServiceSupport.dispose(transport); 228 } 229 230 public final void handleTransportFailure(IOException e) throws InterruptedException { 231 if (LOG.isTraceEnabled()) { 232 LOG.trace(this + " handleTransportFailure: " + e); 233 } 234 Transport transport = connectedTransport.getAndSet(null); 235 if (transport == null) { 236 // sync with possible in progress reconnect 237 synchronized (reconnectMutex) { 238 transport = connectedTransport.getAndSet(null); 239 } 240 } 241 if (transport != null) { 242 243 disposeTransport(transport); 244 245 boolean reconnectOk = false; 246 synchronized (reconnectMutex) { 247 if (canReconnect()) { 248 reconnectOk = true; 249 } 250 LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason: " + e 251 + (reconnectOk ? "," : ", not") +" attempting to automatically reconnect"); 252 253 initialized = false; 254 failedConnectTransportURI = connectedTransportURI; 255 connectedTransportURI = null; 256 connected = false; 257 258 // notify before any reconnect attempt so ack state can be whacked 259 if (transportListener != null) { 260 transportListener.transportInterupted(); 261 } 262 263 if (reconnectOk) { 264 updated.remove(failedConnectTransportURI); 265 reconnectTask.wakeup(); 266 } else { 267 propagateFailureToExceptionListener(e); 268 } 269 } 270 } 271 } 272 273 private boolean canReconnect() { 274 return started && 0 != calculateReconnectAttemptLimit(); 275 } 276 277 public final void handleConnectionControl(ConnectionControl control) { 278 String reconnectStr = control.getReconnectTo(); 279 if (reconnectStr != null) { 280 reconnectStr = reconnectStr.trim(); 281 if (reconnectStr.length() > 0) { 282 try { 283 URI uri = new URI(reconnectStr); 284 if (isReconnectSupported()) { 285 reconnect(uri); 286 LOG.info("Reconnected to: " + uri); 287 } 288 } catch (Exception e) { 289 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e); 290 } 291 } 292 } 293 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers()); 294 } 295 296 private final void processNewTransports(boolean rebalance, String newTransports) { 297 if (newTransports != null) { 298 newTransports = newTransports.trim(); 299 if (newTransports.length() > 0 && isUpdateURIsSupported()) { 300 List<URI> list = new ArrayList<URI>(); 301 StringTokenizer tokenizer = new StringTokenizer(newTransports, ","); 302 while (tokenizer.hasMoreTokens()) { 303 String str = tokenizer.nextToken(); 304 try { 305 URI uri = new URI(str); 306 list.add(uri); 307 } catch (Exception e) { 308 LOG.error("Failed to parse broker address: " + str, e); 309 } 310 } 311 if (list.isEmpty() == false) { 312 try { 313 updateURIs(rebalance, list.toArray(new URI[list.size()])); 314 } catch (IOException e) { 315 LOG.error("Failed to update transport URI's from: " + newTransports, e); 316 } 317 } 318 } 319 } 320 } 321 322 public void start() throws Exception { 323 synchronized (reconnectMutex) { 324 if (LOG.isDebugEnabled()) { 325 LOG.debug("Started " + this); 326 } 327 if (started) { 328 return; 329 } 330 started = true; 331 stateTracker.setMaxCacheSize(getMaxCacheSize()); 332 stateTracker.setTrackMessages(isTrackMessages()); 333 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers()); 334 if (connectedTransport.get() != null) { 335 stateTracker.restore(connectedTransport.get()); 336 } else { 337 reconnect(false); 338 } 339 } 340 } 341 342 public void stop() throws Exception { 343 Transport transportToStop = null; 344 synchronized (reconnectMutex) { 345 if (LOG.isDebugEnabled()) { 346 LOG.debug("Stopped " + this); 347 } 348 if (!started) { 349 return; 350 } 351 started = false; 352 disposed = true; 353 connected = false; 354 for (BackupTransport t : backups) { 355 t.setDisposed(true); 356 } 357 backups.clear(); 358 359 if (connectedTransport.get() != null) { 360 transportToStop = connectedTransport.getAndSet(null); 361 } 362 reconnectMutex.notifyAll(); 363 } 364 synchronized (sleepMutex) { 365 sleepMutex.notifyAll(); 366 } 367 reconnectTask.shutdown(); 368 if (transportToStop != null) { 369 transportToStop.stop(); 370 } 371 } 372 373 public long getInitialReconnectDelay() { 374 return initialReconnectDelay; 375 } 376 377 public void setInitialReconnectDelay(long initialReconnectDelay) { 378 this.initialReconnectDelay = initialReconnectDelay; 379 } 380 381 public long getMaxReconnectDelay() { 382 return maxReconnectDelay; 383 } 384 385 public void setMaxReconnectDelay(long maxReconnectDelay) { 386 this.maxReconnectDelay = maxReconnectDelay; 387 } 388 389 public long getReconnectDelay() { 390 return reconnectDelay; 391 } 392 393 public void setReconnectDelay(long reconnectDelay) { 394 this.reconnectDelay = reconnectDelay; 395 } 396 397 public double getReconnectDelayExponent() { 398 return backOffMultiplier; 399 } 400 401 public void setReconnectDelayExponent(double reconnectDelayExponent) { 402 this.backOffMultiplier = reconnectDelayExponent; 403 } 404 405 public Transport getConnectedTransport() { 406 return connectedTransport.get(); 407 } 408 409 public URI getConnectedTransportURI() { 410 return connectedTransportURI; 411 } 412 413 public int getMaxReconnectAttempts() { 414 return maxReconnectAttempts; 415 } 416 417 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 418 this.maxReconnectAttempts = maxReconnectAttempts; 419 } 420 421 public int getStartupMaxReconnectAttempts() { 422 return this.startupMaxReconnectAttempts; 423 } 424 425 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { 426 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; 427 } 428 429 public long getTimeout() { 430 return timeout; 431 } 432 433 public void setTimeout(long timeout) { 434 this.timeout = timeout; 435 } 436 437 /** 438 * @return Returns the randomize. 439 */ 440 public boolean isRandomize() { 441 return randomize; 442 } 443 444 /** 445 * @param randomize The randomize to set. 446 */ 447 public void setRandomize(boolean randomize) { 448 this.randomize = randomize; 449 } 450 451 public boolean isBackup() { 452 return backup; 453 } 454 455 public void setBackup(boolean backup) { 456 this.backup = backup; 457 } 458 459 public int getBackupPoolSize() { 460 return backupPoolSize; 461 } 462 463 public void setBackupPoolSize(int backupPoolSize) { 464 this.backupPoolSize = backupPoolSize; 465 } 466 467 public int getCurrentBackups() { 468 return this.backups.size(); 469 } 470 471 public boolean isTrackMessages() { 472 return trackMessages; 473 } 474 475 public void setTrackMessages(boolean trackMessages) { 476 this.trackMessages = trackMessages; 477 } 478 479 public boolean isTrackTransactionProducers() { 480 return this.trackTransactionProducers; 481 } 482 483 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 484 this.trackTransactionProducers = trackTransactionProducers; 485 } 486 487 public int getMaxCacheSize() { 488 return maxCacheSize; 489 } 490 491 public void setMaxCacheSize(int maxCacheSize) { 492 this.maxCacheSize = maxCacheSize; 493 } 494 495 public boolean isPriorityBackup() { 496 return priorityBackup; 497 } 498 499 public void setPriorityBackup(boolean priorityBackup) { 500 this.priorityBackup = priorityBackup; 501 } 502 503 public void setPriorityURIs(String priorityURIs) { 504 StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ","); 505 while (tokenizer.hasMoreTokens()) { 506 String str = tokenizer.nextToken(); 507 try { 508 URI uri = new URI(str); 509 priorityList.add(uri); 510 } catch (Exception e) { 511 LOG.error("Failed to parse broker address: " + str, e); 512 } 513 } 514 } 515 516 public void oneway(Object o) throws IOException { 517 518 Command command = (Command) o; 519 Exception error = null; 520 try { 521 522 synchronized (reconnectMutex) { 523 524 if (command != null && connectedTransport.get() == null) { 525 if (command.isShutdownInfo()) { 526 // Skipping send of ShutdownInfo command when not connected. 527 return; 528 } else if (command instanceof RemoveInfo || command.isMessageAck()) { 529 // Simulate response to RemoveInfo command or MessageAck (as it will be stale) 530 stateTracker.track(command); 531 if (command.isResponseRequired()) { 532 Response response = new Response(); 533 response.setCorrelationId(command.getCommandId()); 534 myTransportListener.onCommand(response); 535 } 536 return; 537 } 538 } 539 540 // Keep trying until the message is sent. 541 for (int i = 0; !disposed; i++) { 542 try { 543 544 // Wait for transport to be connected. 545 Transport transport = connectedTransport.get(); 546 long start = System.currentTimeMillis(); 547 boolean timedout = false; 548 while (transport == null && !disposed && connectionFailure == null 549 && !Thread.currentThread().isInterrupted()) { 550 if (LOG.isTraceEnabled()) { 551 LOG.trace("Waiting for transport to reconnect..: " + command); 552 } 553 long end = System.currentTimeMillis(); 554 if (timeout > 0 && (end - start > timeout)) { 555 timedout = true; 556 if (LOG.isInfoEnabled()) { 557 LOG.info("Failover timed out after " + (end - start) + "ms"); 558 } 559 break; 560 } 561 try { 562 reconnectMutex.wait(100); 563 } catch (InterruptedException e) { 564 Thread.currentThread().interrupt(); 565 if (LOG.isDebugEnabled()) { 566 LOG.debug("Interupted: " + e, e); 567 } 568 } 569 transport = connectedTransport.get(); 570 } 571 572 if (transport == null) { 573 // Previous loop may have exited due to use being 574 // disposed. 575 if (disposed) { 576 error = new IOException("Transport disposed."); 577 } else if (connectionFailure != null) { 578 error = connectionFailure; 579 } else if (timedout == true) { 580 error = new IOException("Failover timeout of " + timeout + " ms reached."); 581 } else { 582 error = new IOException("Unexpected failure."); 583 } 584 break; 585 } 586 587 // If it was a request and it was not being tracked by 588 // the state tracker, 589 // then hold it in the requestMap so that we can replay 590 // it later. 591 Tracked tracked = stateTracker.track(command); 592 synchronized (requestMap) { 593 if (tracked != null && tracked.isWaitingForResponse()) { 594 requestMap.put(Integer.valueOf(command.getCommandId()), tracked); 595 } else if (tracked == null && command.isResponseRequired()) { 596 requestMap.put(Integer.valueOf(command.getCommandId()), command); 597 } 598 } 599 600 // Send the message. 601 try { 602 transport.oneway(command); 603 stateTracker.trackBack(command); 604 } catch (IOException e) { 605 606 // If the command was not tracked.. we will retry in 607 // this method 608 if (tracked == null) { 609 610 // since we will retry in this method.. take it 611 // out of the request 612 // map so that it is not sent 2 times on 613 // recovery 614 if (command.isResponseRequired()) { 615 requestMap.remove(Integer.valueOf(command.getCommandId())); 616 } 617 618 // Rethrow the exception so it will handled by 619 // the outer catch 620 throw e; 621 } else { 622 // Handle the error but allow the method to return since the 623 // tracked commands are replayed on reconnect. 624 if (LOG.isDebugEnabled()) { 625 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 626 } 627 handleTransportFailure(e); 628 } 629 } 630 631 return; 632 633 } catch (IOException e) { 634 if (LOG.isDebugEnabled()) { 635 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 636 } 637 handleTransportFailure(e); 638 } 639 } 640 } 641 } catch (InterruptedException e) { 642 // Some one may be trying to stop our thread. 643 Thread.currentThread().interrupt(); 644 throw new InterruptedIOException(); 645 } 646 647 if (!disposed) { 648 if (error != null) { 649 if (error instanceof IOException) { 650 throw (IOException) error; 651 } 652 throw IOExceptionSupport.create(error); 653 } 654 } 655 } 656 657 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 658 throw new AssertionError("Unsupported Method"); 659 } 660 661 public Object request(Object command) throws IOException { 662 throw new AssertionError("Unsupported Method"); 663 } 664 665 public Object request(Object command, int timeout) throws IOException { 666 throw new AssertionError("Unsupported Method"); 667 } 668 669 public void add(boolean rebalance, URI u[]) { 670 boolean newURI = false; 671 for (URI uri : u) { 672 if (!contains(uri)) { 673 uris.add(uri); 674 newURI = true; 675 } 676 } 677 if (newURI) { 678 reconnect(rebalance); 679 } 680 } 681 682 public void remove(boolean rebalance, URI u[]) { 683 for (URI uri : u) { 684 uris.remove(uri); 685 } 686 // rebalance is automatic if any connected to removed/stopped broker 687 } 688 689 public void add(boolean rebalance, String u) { 690 try { 691 URI newURI = new URI(u); 692 if (contains(newURI) == false) { 693 uris.add(newURI); 694 reconnect(rebalance); 695 } 696 697 } catch (Exception e) { 698 LOG.error("Failed to parse URI: " + u); 699 } 700 } 701 702 public void reconnect(boolean rebalance) { 703 synchronized (reconnectMutex) { 704 if (started) { 705 if (rebalance) { 706 doRebalance = true; 707 } 708 LOG.debug("Waking up reconnect task"); 709 try { 710 reconnectTask.wakeup(); 711 } catch (InterruptedException e) { 712 Thread.currentThread().interrupt(); 713 } 714 } else { 715 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); 716 } 717 } 718 } 719 720 private List<URI> getConnectList() { 721 if (!updated.isEmpty()) { 722 if (failedConnectTransportURI != null) { 723 boolean removed = updated.remove(failedConnectTransportURI); 724 if (removed) { 725 updated.add(failedConnectTransportURI); 726 } 727 } 728 return updated; 729 } 730 ArrayList<URI> l = new ArrayList<URI>(uris); 731 boolean removed = false; 732 if (failedConnectTransportURI != null) { 733 removed = l.remove(failedConnectTransportURI); 734 } 735 if (randomize) { 736 // Randomly, reorder the list by random swapping 737 for (int i = 0; i < l.size(); i++) { 738 int p = (int) (Math.random() * 100 % l.size()); 739 URI t = l.get(p); 740 l.set(p, l.get(i)); 741 l.set(i, t); 742 } 743 } 744 if (removed) { 745 l.add(failedConnectTransportURI); 746 } 747 if (LOG.isDebugEnabled()) { 748 LOG.debug("urlList connectionList:" + l + ", from: " + uris); 749 } 750 return l; 751 } 752 753 public TransportListener getTransportListener() { 754 return transportListener; 755 } 756 757 public void setTransportListener(TransportListener commandListener) { 758 synchronized (listenerMutex) { 759 this.transportListener = commandListener; 760 listenerMutex.notifyAll(); 761 } 762 } 763 764 public <T> T narrow(Class<T> target) { 765 766 if (target.isAssignableFrom(getClass())) { 767 return target.cast(this); 768 } 769 Transport transport = connectedTransport.get(); 770 if (transport != null) { 771 return transport.narrow(target); 772 } 773 return null; 774 775 } 776 777 protected void restoreTransport(Transport t) throws Exception, IOException { 778 t.start(); 779 // send information to the broker - informing it we are an ft client 780 ConnectionControl cc = new ConnectionControl(); 781 cc.setFaultTolerant(true); 782 t.oneway(cc); 783 stateTracker.restore(t); 784 Map<Integer, Command> tmpMap = null; 785 synchronized (requestMap) { 786 tmpMap = new LinkedHashMap<Integer, Command>(requestMap); 787 } 788 for (Command command : tmpMap.values()) { 789 if (LOG.isTraceEnabled()) { 790 LOG.trace("restore requestMap, replay: " + command); 791 } 792 t.oneway(command); 793 } 794 } 795 796 public boolean isUseExponentialBackOff() { 797 return useExponentialBackOff; 798 } 799 800 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 801 this.useExponentialBackOff = useExponentialBackOff; 802 } 803 804 @Override 805 public String toString() { 806 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); 807 } 808 809 public String getRemoteAddress() { 810 Transport transport = connectedTransport.get(); 811 if (transport != null) { 812 return transport.getRemoteAddress(); 813 } 814 return null; 815 } 816 817 public boolean isFaultTolerant() { 818 return true; 819 } 820 821 private void doUpdateURIsFromDisk() { 822 // If updateURIsURL is specified, read the file and add any new 823 // transport URI's to this FailOverTransport. 824 // Note: Could track file timestamp to avoid unnecessary reading. 825 String fileURL = getUpdateURIsURL(); 826 if (fileURL != null) { 827 BufferedReader in = null; 828 String newUris = null; 829 StringBuffer buffer = new StringBuffer(); 830 831 try { 832 in = new BufferedReader(getURLStream(fileURL)); 833 while (true) { 834 String line = in.readLine(); 835 if (line == null) { 836 break; 837 } 838 buffer.append(line); 839 } 840 newUris = buffer.toString(); 841 } catch (IOException ioe) { 842 LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); 843 } finally { 844 if (in != null) { 845 try { 846 in.close(); 847 } catch (IOException ioe) { 848 // ignore 849 } 850 } 851 } 852 853 processNewTransports(isRebalanceUpdateURIs(), newUris); 854 } 855 } 856 857 final boolean doReconnect() { 858 Exception failure = null; 859 synchronized (reconnectMutex) { 860 861 // First ensure we are up to date. 862 doUpdateURIsFromDisk(); 863 864 if (disposed || connectionFailure != null) { 865 reconnectMutex.notifyAll(); 866 } 867 if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) { 868 return false; 869 } else { 870 List<URI> connectList = getConnectList(); 871 if (connectList.isEmpty()) { 872 failure = new IOException("No uris available to connect to."); 873 } else { 874 if (doRebalance) { 875 if (connectList.get(0).equals(connectedTransportURI)) { 876 // already connected to first in the list, no need to rebalance 877 doRebalance = false; 878 return false; 879 } else { 880 if (LOG.isDebugEnabled()) { 881 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); 882 } 883 try { 884 Transport transport = this.connectedTransport.getAndSet(null); 885 if (transport != null) { 886 disposeTransport(transport); 887 } 888 } catch (Exception e) { 889 if (LOG.isDebugEnabled()) { 890 LOG.debug("Caught an exception stopping existing transport for rebalance", e); 891 } 892 } 893 } 894 doRebalance = false; 895 } 896 897 resetReconnectDelay(); 898 899 Transport transport = null; 900 URI uri = null; 901 902 // If we have a backup already waiting lets try it. 903 synchronized (backupMutex) { 904 if ((priorityBackup || backup) && !backups.isEmpty()) { 905 ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups); 906 if (randomize) { 907 Collections.shuffle(l); 908 } 909 BackupTransport bt = l.remove(0); 910 backups.remove(bt); 911 transport = bt.getTransport(); 912 uri = bt.getUri(); 913 if (priorityBackup && priorityBackupAvailable) { 914 Transport old = this.connectedTransport.getAndSet(null); 915 if (transport != null) { 916 disposeTransport(old); 917 } 918 priorityBackupAvailable = false; 919 } 920 } 921 } 922 923 // Sleep for the reconnectDelay if there's no backup and we aren't trying 924 // for the first time, or we were disposed for some reason. 925 if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) { 926 synchronized (sleepMutex) { 927 if (LOG.isDebugEnabled()) { 928 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 929 } 930 try { 931 sleepMutex.wait(reconnectDelay); 932 } catch (InterruptedException e) { 933 Thread.currentThread().interrupt(); 934 } 935 } 936 } 937 938 Iterator<URI> iter = connectList.iterator(); 939 while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) { 940 941 try { 942 SslContext.setCurrentSslContext(brokerSslContext); 943 944 // We could be starting with a backup and if so we wait to grab a 945 // URI from the pool until next time around. 946 if (transport == null) { 947 uri = iter.next(); 948 transport = TransportFactory.compositeConnect(uri); 949 } 950 951 if (LOG.isDebugEnabled()) { 952 LOG.debug("Attempting " + connectFailures + "th connect to: " + uri); 953 } 954 transport.setTransportListener(myTransportListener); 955 transport.start(); 956 957 if (started && !firstConnection) { 958 restoreTransport(transport); 959 } 960 961 if (LOG.isDebugEnabled()) { 962 LOG.debug("Connection established"); 963 } 964 reconnectDelay = initialReconnectDelay; 965 connectedTransportURI = uri; 966 connectedTransport.set(transport); 967 reconnectMutex.notifyAll(); 968 connectFailures = 0; 969 970 // Make sure on initial startup, that the transportListener 971 // has been initialized for this instance. 972 synchronized (listenerMutex) { 973 if (transportListener == null) { 974 try { 975 // if it isn't set after 2secs - it probably never will be 976 listenerMutex.wait(2000); 977 } catch (InterruptedException ex) { 978 } 979 } 980 } 981 982 if (transportListener != null) { 983 transportListener.transportResumed(); 984 } else { 985 if (LOG.isDebugEnabled()) { 986 LOG.debug("transport resumed by transport listener not set"); 987 } 988 } 989 990 if (firstConnection) { 991 firstConnection = false; 992 LOG.info("Successfully connected to " + uri); 993 } else { 994 LOG.info("Successfully reconnected to " + uri); 995 } 996 997 connected = true; 998 return false; 999 } catch (Exception e) { 1000 failure = e; 1001 if (LOG.isDebugEnabled()) { 1002 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 1003 } 1004 if (transport != null) { 1005 try { 1006 transport.stop(); 1007 transport = null; 1008 } catch (Exception ee) { 1009 if (LOG.isDebugEnabled()) { 1010 LOG.debug("Stop of failed transport: " + transport + 1011 " failed with reason: " + ee); 1012 } 1013 } 1014 } 1015 } finally { 1016 SslContext.setCurrentSslContext(null); 1017 } 1018 } 1019 } 1020 } 1021 1022 int reconnectLimit = calculateReconnectAttemptLimit(); 1023 1024 connectFailures++; 1025 if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { 1026 LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)"); 1027 connectionFailure = failure; 1028 1029 // Make sure on initial startup, that the transportListener has been 1030 // initialized for this instance. 1031 synchronized (listenerMutex) { 1032 if (transportListener == null) { 1033 try { 1034 listenerMutex.wait(2000); 1035 } catch (InterruptedException ex) { 1036 } 1037 } 1038 } 1039 1040 propagateFailureToExceptionListener(connectionFailure); 1041 return false; 1042 } 1043 } 1044 1045 if (!disposed) { 1046 doDelay(); 1047 } 1048 1049 return !disposed; 1050 } 1051 1052 private void doDelay() { 1053 if (reconnectDelay > 0) { 1054 synchronized (sleepMutex) { 1055 if (LOG.isDebugEnabled()) { 1056 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection"); 1057 } 1058 try { 1059 sleepMutex.wait(reconnectDelay); 1060 } catch (InterruptedException e) { 1061 Thread.currentThread().interrupt(); 1062 } 1063 } 1064 } 1065 1066 if (useExponentialBackOff) { 1067 // Exponential increment of reconnect delay. 1068 reconnectDelay *= backOffMultiplier; 1069 if (reconnectDelay > maxReconnectDelay) { 1070 reconnectDelay = maxReconnectDelay; 1071 } 1072 } 1073 } 1074 1075 private void resetReconnectDelay() { 1076 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { 1077 reconnectDelay = initialReconnectDelay; 1078 } 1079 } 1080 1081 /* 1082 * called with reconnectMutex held 1083 */ 1084 private void propagateFailureToExceptionListener(Exception exception) { 1085 if (transportListener != null) { 1086 if (exception instanceof IOException) { 1087 transportListener.onException((IOException)exception); 1088 } else { 1089 transportListener.onException(IOExceptionSupport.create(exception)); 1090 } 1091 } 1092 reconnectMutex.notifyAll(); 1093 } 1094 1095 private int calculateReconnectAttemptLimit() { 1096 int maxReconnectValue = this.maxReconnectAttempts; 1097 if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) { 1098 maxReconnectValue = this.startupMaxReconnectAttempts; 1099 } 1100 return maxReconnectValue; 1101 } 1102 1103 final boolean buildBackups() { 1104 synchronized (backupMutex) { 1105 if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) { 1106 ArrayList<URI> backupList = new ArrayList<URI>(priorityList); 1107 List<URI> connectList = getConnectList(); 1108 for (URI uri: connectList) { 1109 if (!backupList.contains(uri)) { 1110 backupList.add(uri); 1111 } 1112 } 1113 // removed disposed backups 1114 List<BackupTransport> disposedList = new ArrayList<BackupTransport>(); 1115 for (BackupTransport bt : backups) { 1116 if (bt.isDisposed()) { 1117 disposedList.add(bt); 1118 } 1119 } 1120 backups.removeAll(disposedList); 1121 disposedList.clear(); 1122 for (Iterator<URI> iter = backupList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) { 1123 URI uri = iter.next(); 1124 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { 1125 try { 1126 SslContext.setCurrentSslContext(brokerSslContext); 1127 BackupTransport bt = new BackupTransport(this); 1128 bt.setUri(uri); 1129 if (!backups.contains(bt)) { 1130 Transport t = TransportFactory.compositeConnect(uri); 1131 t.setTransportListener(bt); 1132 t.start(); 1133 bt.setTransport(t); 1134 backups.add(bt); 1135 if (priorityBackup && isPriority(uri)) { 1136 priorityBackupAvailable = true; 1137 } 1138 } 1139 } catch (Exception e) { 1140 LOG.debug("Failed to build backup ", e); 1141 } finally { 1142 SslContext.setCurrentSslContext(null); 1143 } 1144 } 1145 } 1146 } 1147 } 1148 return false; 1149 } 1150 1151 protected boolean isPriority(URI uri) { 1152 if (!priorityList.isEmpty()) { 1153 return priorityList.contains(uri); 1154 } 1155 return uris.indexOf(uri) == 0; 1156 } 1157 1158 public boolean isDisposed() { 1159 return disposed; 1160 } 1161 1162 public boolean isConnected() { 1163 return connected; 1164 } 1165 1166 public void reconnect(URI uri) throws IOException { 1167 add(true, new URI[]{uri}); 1168 } 1169 1170 public boolean isReconnectSupported() { 1171 return this.reconnectSupported; 1172 } 1173 1174 public void setReconnectSupported(boolean value) { 1175 this.reconnectSupported = value; 1176 } 1177 1178 public boolean isUpdateURIsSupported() { 1179 return this.updateURIsSupported; 1180 } 1181 1182 public void setUpdateURIsSupported(boolean value) { 1183 this.updateURIsSupported = value; 1184 } 1185 1186 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { 1187 if (isUpdateURIsSupported()) { 1188 HashSet<URI> copy = new HashSet<URI>(this.updated); 1189 updated.clear(); 1190 if (updatedURIs != null && updatedURIs.length > 0) { 1191 for (URI uri : updatedURIs) { 1192 if (uri != null && !updated.contains(uri)) { 1193 updated.add(uri); 1194 } 1195 } 1196 if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) { 1197 buildBackups(); 1198 synchronized (reconnectMutex) { 1199 reconnect(rebalance); 1200 } 1201 } 1202 } 1203 } 1204 } 1205 1206 /** 1207 * @return the updateURIsURL 1208 */ 1209 public String getUpdateURIsURL() { 1210 return this.updateURIsURL; 1211 } 1212 1213 /** 1214 * @param updateURIsURL the updateURIsURL to set 1215 */ 1216 public void setUpdateURIsURL(String updateURIsURL) { 1217 this.updateURIsURL = updateURIsURL; 1218 } 1219 1220 /** 1221 * @return the rebalanceUpdateURIs 1222 */ 1223 public boolean isRebalanceUpdateURIs() { 1224 return this.rebalanceUpdateURIs; 1225 } 1226 1227 /** 1228 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set 1229 */ 1230 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) { 1231 this.rebalanceUpdateURIs = rebalanceUpdateURIs; 1232 } 1233 1234 public int getReceiveCounter() { 1235 Transport transport = connectedTransport.get(); 1236 if (transport == null) { 1237 return 0; 1238 } 1239 return transport.getReceiveCounter(); 1240 } 1241 1242 public int getConnectFailures() { 1243 return connectFailures; 1244 } 1245 1246 public void connectionInterruptProcessingComplete(ConnectionId connectionId) { 1247 synchronized (reconnectMutex) { 1248 stateTracker.connectionInterruptProcessingComplete(this, connectionId); 1249 } 1250 } 1251 1252 public ConnectionStateTracker getStateTracker() { 1253 return stateTracker; 1254 } 1255 1256 private boolean contains(URI newURI) { 1257 boolean result = false; 1258 for (URI uri : uris) { 1259 if (newURI.getPort() == uri.getPort()) { 1260 InetAddress newAddr = null; 1261 InetAddress addr = null; 1262 try { 1263 newAddr = InetAddress.getByName(newURI.getHost()); 1264 addr = InetAddress.getByName(uri.getHost()); 1265 } catch(IOException e) { 1266 1267 if (newAddr == null) { 1268 LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e); 1269 } else { 1270 LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e); 1271 } 1272 1273 if (newURI.getHost().equalsIgnoreCase(uri.getHost())) { 1274 result = true; 1275 break; 1276 } else { 1277 continue; 1278 } 1279 } 1280 1281 if (addr.equals(newAddr)) { 1282 result = true; 1283 break; 1284 } 1285 } 1286 } 1287 1288 return result; 1289 } 1290 1291 private InputStreamReader getURLStream(String path) throws IOException { 1292 InputStreamReader result = null; 1293 URL url = null; 1294 try { 1295 url = new URL(path); 1296 result = new InputStreamReader(url.openStream()); 1297 } catch (MalformedURLException e) { 1298 // ignore - it could be a path to a a local file 1299 } 1300 if (result == null) { 1301 result = new FileReader(path); 1302 } 1303 return result; 1304 } 1305}