001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.fanout;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.URI;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import org.apache.activemq.command.Command;
028import org.apache.activemq.command.ConsumerInfo;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.RemoveInfo;
031import org.apache.activemq.command.Response;
032import org.apache.activemq.state.ConnectionStateTracker;
033import org.apache.activemq.thread.DefaultThreadPools;
034import org.apache.activemq.thread.Task;
035import org.apache.activemq.thread.TaskRunner;
036import org.apache.activemq.transport.CompositeTransport;
037import org.apache.activemq.transport.DefaultTransportListener;
038import org.apache.activemq.transport.FutureResponse;
039import org.apache.activemq.transport.ResponseCallback;
040import org.apache.activemq.transport.Transport;
041import org.apache.activemq.transport.TransportFactory;
042import org.apache.activemq.transport.TransportListener;
043import org.apache.activemq.util.IOExceptionSupport;
044import org.apache.activemq.util.ServiceStopper;
045import org.apache.activemq.util.ServiceSupport;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A Transport that fans out a connection to multiple brokers.
051 * 
052 * 
053 */
054public class FanoutTransport implements CompositeTransport {
055
056    private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);
057
058    private TransportListener transportListener;
059    private boolean disposed;
060    private boolean connected;
061
062    private final Object reconnectMutex = new Object();
063    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
064    private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
065
066    private final TaskRunner reconnectTask;
067    private boolean started;
068
069    private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
070    private int connectedCount;
071
072    private int minAckCount = 2;
073
074    private long initialReconnectDelay = 10;
075    private long maxReconnectDelay = 1000 * 30;
076    private long backOffMultiplier = 2;
077    private final boolean useExponentialBackOff = true;
078    private int maxReconnectAttempts;
079    private Exception connectionFailure;
080    private FanoutTransportHandler primary;
081    private boolean fanOutQueues = false;
082
083    static class RequestCounter {
084
085        final Command command;
086        final AtomicInteger ackCount;
087
088        RequestCounter(Command command, int count) {
089            this.command = command;
090            this.ackCount = new AtomicInteger(count);
091        }
092
093        @Override
094        public String toString() {
095            return command.getCommandId() + "=" + ackCount.get();
096        }
097    }
098
099    class FanoutTransportHandler extends DefaultTransportListener {
100
101        private final URI uri;
102        private Transport transport;
103
104        private int connectFailures;
105        private long reconnectDelay = initialReconnectDelay;
106        private long reconnectDate;
107
108        public FanoutTransportHandler(URI uri) {
109            this.uri = uri;
110        }
111
112        @Override
113        public void onCommand(Object o) {
114            Command command = (Command)o;
115            if (command.isResponse()) {
116                Integer id = new Integer(((Response)command).getCorrelationId());
117                RequestCounter rc = requestMap.get(id);
118                if (rc != null) {
119                    if (rc.ackCount.decrementAndGet() <= 0) {
120                        requestMap.remove(id);
121                        transportListenerOnCommand(command);
122                    }
123                } else {
124                    transportListenerOnCommand(command);
125                }
126            } else {
127                transportListenerOnCommand(command);
128            }
129        }
130
131        @Override
132        public void onException(IOException error) {
133            try {
134                synchronized (reconnectMutex) {
135                    if (transport == null || !transport.isConnected()) {
136                        return;
137                    }
138
139                    LOG.debug("Transport failed, starting up reconnect task", error);
140
141                    ServiceSupport.dispose(transport);
142                    transport = null;
143                    connectedCount--;
144                    if (primary == this) {
145                        primary = null;
146                    }
147                    reconnectTask.wakeup();
148                }
149            } catch (InterruptedException e) {
150                Thread.currentThread().interrupt();
151                if (transportListener != null) {
152                    transportListener.onException(new InterruptedIOException());
153                }
154            }
155        }
156    }
157
158    public FanoutTransport() throws InterruptedIOException {
159        // Setup a task that is used to reconnect the a connection async.
160        reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
161            public boolean iterate() {
162                return doConnect();
163            }
164        }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
165    }
166
167    /**
168     * @return
169     */
170    private boolean doConnect() {
171        long closestReconnectDate = 0;
172        synchronized (reconnectMutex) {
173
174            if (disposed || connectionFailure != null) {
175                reconnectMutex.notifyAll();
176            }
177
178            if (transports.size() == connectedCount || disposed || connectionFailure != null) {
179                return false;
180            } else {
181
182                if (transports.isEmpty()) {
183                    // connectionFailure = new IOException("No uris available to
184                    // connect to.");
185                } else {
186
187                    // Try to connect them up.
188                    Iterator<FanoutTransportHandler> iter = transports.iterator();
189                    for (int i = 0; iter.hasNext() && !disposed; i++) {
190
191                        long now = System.currentTimeMillis();
192
193                        FanoutTransportHandler fanoutHandler = iter.next();
194                        if (fanoutHandler.transport != null) {
195                            continue;
196                        }
197
198                        // Are we waiting a little to try to reconnect this one?
199                        if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
200                            if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
201                                closestReconnectDate = fanoutHandler.reconnectDate;
202                            }
203                            continue;
204                        }
205
206                        URI uri = fanoutHandler.uri;
207                        try {
208                            LOG.debug("Stopped: " + this);
209                            LOG.debug("Attempting connect to: " + uri);
210                            Transport t = TransportFactory.compositeConnect(uri);
211                            fanoutHandler.transport = t;
212                            t.setTransportListener(fanoutHandler);
213                            if (started) {
214                                restoreTransport(fanoutHandler);
215                            }
216                            LOG.debug("Connection established");
217                            fanoutHandler.reconnectDelay = initialReconnectDelay;
218                            fanoutHandler.connectFailures = 0;
219                            if (primary == null) {
220                                primary = fanoutHandler;
221                            }
222                            connectedCount++;
223                        } catch (Exception e) {
224                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);
225
226                            if( fanoutHandler.transport !=null ) {
227                                ServiceSupport.dispose(fanoutHandler.transport);
228                                fanoutHandler.transport=null;
229                            }
230                            
231                            if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
232                                LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
233                                connectionFailure = e;
234                                reconnectMutex.notifyAll();
235                                return false;
236                            } else {
237
238                                if (useExponentialBackOff) {
239                                    // Exponential increment of reconnect delay.
240                                    fanoutHandler.reconnectDelay *= backOffMultiplier;
241                                    if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
242                                        fanoutHandler.reconnectDelay = maxReconnectDelay;
243                                    }
244                                }
245
246                                fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
247
248                                if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
249                                    closestReconnectDate = fanoutHandler.reconnectDate;
250                                }
251                            }
252                        }
253                    }
254                    if (transports.size() == connectedCount || disposed) {
255                        reconnectMutex.notifyAll();
256                        return false;
257                    }
258
259                }
260            }
261
262        }
263
264        try {
265            long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
266            if (reconnectDelay > 0) {
267                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
268                Thread.sleep(reconnectDelay);
269            }
270        } catch (InterruptedException e1) {
271            Thread.currentThread().interrupt();
272        }
273        return true;
274    }
275
276    public void start() throws Exception {
277        synchronized (reconnectMutex) {
278            LOG.debug("Started.");
279            if (started) {
280                return;
281            }
282            started = true;
283            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
284                FanoutTransportHandler th = iter.next();
285                if (th.transport != null) {
286                    restoreTransport(th);
287                }
288            }
289            connected=true;
290        }
291    }
292
293    public void stop() throws Exception {
294        synchronized (reconnectMutex) {
295            ServiceStopper ss = new ServiceStopper();
296
297            if (!started) {
298                return;
299            }
300            started = false;
301            disposed = true;
302            connected=false;
303
304            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
305                FanoutTransportHandler th = iter.next();
306                if (th.transport != null) {
307                    ss.stop(th.transport);
308                }
309            }
310
311            LOG.debug("Stopped: " + this);
312            ss.throwFirstException();
313        }
314        reconnectTask.shutdown();
315    }
316
317        public int getMinAckCount() {
318                return minAckCount;
319        }
320
321        public void setMinAckCount(int minAckCount) {
322                this.minAckCount = minAckCount;
323        }    
324    
325    public long getInitialReconnectDelay() {
326        return initialReconnectDelay;
327    }
328
329    public void setInitialReconnectDelay(long initialReconnectDelay) {
330        this.initialReconnectDelay = initialReconnectDelay;
331    }
332
333    public long getMaxReconnectDelay() {
334        return maxReconnectDelay;
335    }
336
337    public void setMaxReconnectDelay(long maxReconnectDelay) {
338        this.maxReconnectDelay = maxReconnectDelay;
339    }
340
341    public long getReconnectDelayExponent() {
342        return backOffMultiplier;
343    }
344
345    public void setReconnectDelayExponent(long reconnectDelayExponent) {
346        this.backOffMultiplier = reconnectDelayExponent;
347    }
348
349    public int getMaxReconnectAttempts() {
350        return maxReconnectAttempts;
351    }
352
353    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
354        this.maxReconnectAttempts = maxReconnectAttempts;
355    }
356
357    public void oneway(Object o) throws IOException {
358        final Command command = (Command)o;
359        try {
360            synchronized (reconnectMutex) {
361
362                // Wait for transport to be connected.
363                while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
364                    LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
365                    reconnectMutex.wait(1000);
366                }
367
368                // Still not fully connected.
369                if (connectedCount < minAckCount) {
370
371                    Exception error;
372
373                    // Throw the right kind of error..
374                    if (disposed) {
375                        error = new IOException("Transport disposed.");
376                    } else if (connectionFailure != null) {
377                        error = connectionFailure;
378                    } else {
379                        error = new IOException("Unexpected failure.");
380                    }
381
382                    if (error instanceof IOException) {
383                        throw (IOException)error;
384                    }
385                    throw IOExceptionSupport.create(error);
386                }
387
388                // If it was a request and it was not being tracked by
389                // the state tracker,
390                // then hold it in the requestMap so that we can replay
391                // it later.
392                boolean fanout = isFanoutCommand(command);
393                if (stateTracker.track(command) == null && command.isResponseRequired()) {
394                    int size = fanout ? minAckCount : 1;
395                    requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
396                }
397                
398                // Send the message.
399                if (fanout) {
400                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
401                        FanoutTransportHandler th = iter.next();
402                        if (th.transport != null) {
403                            try {
404                                th.transport.oneway(command);
405                            } catch (IOException e) {
406                                LOG.debug("Send attempt: failed.");
407                                th.onException(e);
408                            }
409                        }
410                    }
411                } else {
412                    try {
413                        primary.transport.oneway(command);
414                    } catch (IOException e) {
415                        LOG.debug("Send attempt: failed.");
416                        primary.onException(e);
417                    }
418                }
419
420            }
421        } catch (InterruptedException e) {
422            // Some one may be trying to stop our thread.
423            Thread.currentThread().interrupt();
424            throw new InterruptedIOException();
425        }
426    }
427
428    /**
429     * @param command
430     * @return
431     */
432    private boolean isFanoutCommand(Command command) {
433        if (command.isMessage()) {
434            if( fanOutQueues ) {
435                return true;
436            }
437            return ((Message)command).getDestination().isTopic();
438        }
439        if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
440                command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
441            return false;
442        }
443        return true;
444    }
445
446    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
447        throw new AssertionError("Unsupported Method");
448    }
449
450    public Object request(Object command) throws IOException {
451        throw new AssertionError("Unsupported Method");
452    }
453
454    public Object request(Object command, int timeout) throws IOException {
455        throw new AssertionError("Unsupported Method");
456    }
457
458    public void reconnect() {
459        LOG.debug("Waking up reconnect task");
460        try {
461            reconnectTask.wakeup();
462        } catch (InterruptedException e) {
463            Thread.currentThread().interrupt();
464        }
465    }
466
467    public TransportListener getTransportListener() {
468        return transportListener;
469    }
470
471    public void setTransportListener(TransportListener commandListener) {
472        this.transportListener = commandListener;
473    }
474
475    public <T> T narrow(Class<T> target) {
476
477        if (target.isAssignableFrom(getClass())) {
478            return target.cast(this);
479        }
480
481        synchronized (reconnectMutex) {
482            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
483                FanoutTransportHandler th = iter.next();
484                if (th.transport != null) {
485                    T rc = th.transport.narrow(target);
486                    if (rc != null) {
487                        return rc;
488                    }
489                }
490            }
491        }
492
493        return null;
494
495    }
496
497    protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
498        th.transport.start();
499        stateTracker.setRestoreConsumers(th.transport == primary);
500        stateTracker.restore(th.transport);
501        for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
502            RequestCounter rc = iter2.next();
503            th.transport.oneway(rc.command);
504        }
505    }
506
507    public void add(boolean reblance,URI uris[]) {
508
509        synchronized (reconnectMutex) {
510            for (int i = 0; i < uris.length; i++) {
511                URI uri = uris[i];
512
513                boolean match = false;
514                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
515                    FanoutTransportHandler th = iter.next();
516                    if (th.uri.equals(uri)) {
517                        match = true;
518                        break;
519                    }
520                }
521                if (!match) {
522                    FanoutTransportHandler th = new FanoutTransportHandler(uri);
523                    transports.add(th);
524                    reconnect();
525                }
526            }
527        }
528
529    }
530
531    public void remove(boolean rebalance,URI uris[]) {
532
533        synchronized (reconnectMutex) {
534            for (int i = 0; i < uris.length; i++) {
535                URI uri = uris[i];
536
537                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
538                    FanoutTransportHandler th = iter.next();
539                    if (th.uri.equals(uri)) {
540                        if (th.transport != null) {
541                            ServiceSupport.dispose(th.transport);
542                            connectedCount--;
543                        }
544                        iter.remove();
545                        break;
546                    }
547                }
548            }
549        }
550
551    }
552    
553    public void reconnect(URI uri) throws IOException {
554                add(true,new URI[]{uri});
555                
556        }
557    
558    public boolean isReconnectSupported() {
559        return true;
560    }
561
562    public boolean isUpdateURIsSupported() {
563        return true;
564    }
565    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
566        add(reblance,uris);
567    }
568
569
570    public String getRemoteAddress() {
571        if (primary != null) {
572            if (primary.transport != null) {
573                return primary.transport.getRemoteAddress();
574            }
575        }
576        return null;
577    }
578
579    protected void transportListenerOnCommand(Command command) {
580        if (transportListener != null) {
581            transportListener.onCommand(command);
582        }
583    }
584
585    public boolean isFaultTolerant() {
586        return true;
587    }
588
589    public boolean isFanOutQueues() {
590        return fanOutQueues;
591    }
592
593    public void setFanOutQueues(boolean fanOutQueues) {
594        this.fanOutQueues = fanOutQueues;
595    }
596
597        public boolean isDisposed() {
598                return disposed;
599        }
600        
601
602    public boolean isConnected() {
603        return connected;
604    }
605
606    public int getReceiveCounter() {
607        int rc = 0;
608        synchronized (reconnectMutex) {
609            for (FanoutTransportHandler th : transports) {
610                if (th.transport != null) {
611                    rc += th.transport.getReceiveCounter();
612                }
613            }
614        }
615        return rc;
616    }
617}