001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import org.apache.activemq.filter.BooleanExpression; 023import org.apache.activemq.state.CommandVisitor; 024 025/** 026 * @openwire:marshaller code="5" 027 * 028 */ 029public class ConsumerInfo extends BaseCommand { 030 031 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO; 032 033 public static final byte HIGH_PRIORITY = 10; 034 public static final byte NORMAL_PRIORITY = 0; 035 public static final byte NETWORK_CONSUMER_PRIORITY = -5; 036 public static final byte LOW_PRIORITY = -10; 037 038 protected ConsumerId consumerId; 039 protected ActiveMQDestination destination; 040 protected int prefetchSize; 041 protected int maximumPendingMessageLimit; 042 protected boolean browser; 043 protected boolean dispatchAsync; 044 protected String selector; 045 protected String subscriptionName; 046 protected boolean noLocal; 047 protected boolean exclusive; 048 protected boolean retroactive; 049 protected byte priority; 050 protected BrokerId[] brokerPath; 051 protected boolean optimizedAcknowledge; 052 // used by the broker 053 protected transient int currentPrefetchSize; 054 // if true, the consumer will not send range 055 protected boolean noRangeAcks; 056 // acks. 057 058 protected BooleanExpression additionalPredicate; 059 protected transient boolean networkSubscription; // this subscription 060 protected transient List<ConsumerId> networkConsumerIds; // the original consumerId 061 062 // not marshalled, populated from RemoveInfo, the last message delivered, used 063 // to suppress redelivery on prefetched messages after close 064 private transient long lastDeliveredSequenceId; 065 066 // originated from a 067 // network connection 068 069 public ConsumerInfo() { 070 } 071 072 public ConsumerInfo(ConsumerId consumerId) { 073 this.consumerId = consumerId; 074 } 075 076 public ConsumerInfo(SessionInfo sessionInfo, long consumerId) { 077 this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId); 078 } 079 080 public ConsumerInfo copy() { 081 ConsumerInfo info = new ConsumerInfo(); 082 copy(info); 083 return info; 084 } 085 086 public void copy(ConsumerInfo info) { 087 super.copy(info); 088 info.consumerId = consumerId; 089 info.destination = destination; 090 info.prefetchSize = prefetchSize; 091 info.maximumPendingMessageLimit = maximumPendingMessageLimit; 092 info.browser = browser; 093 info.dispatchAsync = dispatchAsync; 094 info.selector = selector; 095 info.subscriptionName = subscriptionName; 096 info.noLocal = noLocal; 097 info.exclusive = exclusive; 098 info.retroactive = retroactive; 099 info.priority = priority; 100 info.brokerPath = brokerPath; 101 info.networkSubscription = networkSubscription; 102 if (networkConsumerIds != null) { 103 if (info.networkConsumerIds==null){ 104 info.networkConsumerIds=new ArrayList<ConsumerId>(); 105 } 106 info.networkConsumerIds.addAll(networkConsumerIds); 107 } 108 } 109 110 public boolean isDurable() { 111 return subscriptionName != null; 112 } 113 114 public byte getDataStructureType() { 115 return DATA_STRUCTURE_TYPE; 116 } 117 118 /** 119 * Is used to uniquely identify the consumer to the broker. 120 * 121 * @openwire:property version=1 cache=true 122 */ 123 public ConsumerId getConsumerId() { 124 return consumerId; 125 } 126 127 public void setConsumerId(ConsumerId consumerId) { 128 this.consumerId = consumerId; 129 } 130 131 /** 132 * Is this consumer a queue browser? 133 * 134 * @openwire:property version=1 135 */ 136 public boolean isBrowser() { 137 return browser; 138 } 139 140 public void setBrowser(boolean browser) { 141 this.browser = browser; 142 } 143 144 /** 145 * The destination that the consumer is interested in receiving messages 146 * from. This destination could be a composite destination. 147 * 148 * @openwire:property version=1 cache=true 149 */ 150 public ActiveMQDestination getDestination() { 151 return destination; 152 } 153 154 public void setDestination(ActiveMQDestination destination) { 155 this.destination = destination; 156 } 157 158 /** 159 * How many messages a broker will send to the client without receiving an 160 * ack before he stops dispatching messages to the client. 161 * 162 * @openwire:property version=1 163 */ 164 public int getPrefetchSize() { 165 return prefetchSize; 166 } 167 168 public void setPrefetchSize(int prefetchSize) { 169 this.prefetchSize = prefetchSize; 170 this.currentPrefetchSize = prefetchSize; 171 } 172 173 /** 174 * How many messages a broker will keep around, above the prefetch limit, 175 * for non-durable topics before starting to discard older messages. 176 * 177 * @openwire:property version=1 178 */ 179 public int getMaximumPendingMessageLimit() { 180 return maximumPendingMessageLimit; 181 } 182 183 public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) { 184 this.maximumPendingMessageLimit = maximumPendingMessageLimit; 185 } 186 187 /** 188 * Should the broker dispatch a message to the consumer async? If he does it 189 * async, then he uses a more SEDA style of processing while if it is not 190 * done async, then he broker use a STP style of processing. STP is more 191 * appropriate in high bandwidth situations or when being used by and in vm 192 * transport. 193 * 194 * @openwire:property version=1 195 */ 196 public boolean isDispatchAsync() { 197 return dispatchAsync; 198 } 199 200 public void setDispatchAsync(boolean dispatchAsync) { 201 this.dispatchAsync = dispatchAsync; 202 } 203 204 /** 205 * The JMS selector used to filter out messages that this consumer is 206 * interested in. 207 * 208 * @openwire:property version=1 209 */ 210 public String getSelector() { 211 return selector; 212 } 213 214 public void setSelector(String selector) { 215 this.selector = selector; 216 } 217 218 /** 219 * Used to identify the name of a durable subscription. 220 * 221 * @openwire:property version=1 222 */ 223 public String getSubscriptionName() { 224 return subscriptionName; 225 } 226 227 public void setSubscriptionName(String durableSubscriptionId) { 228 this.subscriptionName = durableSubscriptionId; 229 } 230 231 /** 232 * @deprecated 233 * @return 234 * @see getSubscriptionName 235 */ 236 public String getSubcriptionName() { 237 return subscriptionName; 238 } 239 240 /** 241 * @deprecated 242 * @see setSubscriptionName 243 * @param durableSubscriptionId 244 */ 245 public void setSubcriptionName(String durableSubscriptionId) { 246 this.subscriptionName = durableSubscriptionId; 247 } 248 249 /** 250 * Set noLocal to true to avoid receiving messages that were published 251 * locally on the same connection. 252 * 253 * @openwire:property version=1 254 */ 255 public boolean isNoLocal() { 256 return noLocal; 257 } 258 259 public void setNoLocal(boolean noLocal) { 260 this.noLocal = noLocal; 261 } 262 263 /** 264 * An exclusive consumer locks out other consumers from being able to 265 * receive messages from the destination. If there are multiple exclusive 266 * consumers for a destination, the first one created will be the exclusive 267 * consumer of the destination. 268 * 269 * @openwire:property version=1 270 */ 271 public boolean isExclusive() { 272 return exclusive; 273 } 274 275 public void setExclusive(boolean exclusive) { 276 this.exclusive = exclusive; 277 } 278 279 /** 280 * A retroactive consumer only has meaning for Topics. It allows a consumer 281 * to retroactively see messages sent prior to the consumer being created. 282 * If the consumer is not durable, it will be delivered the last message 283 * published to the topic. If the consumer is durable then it will receive 284 * all persistent messages that are still stored in persistent storage for 285 * that topic. 286 * 287 * @openwire:property version=1 288 */ 289 public boolean isRetroactive() { 290 return retroactive; 291 } 292 293 public void setRetroactive(boolean retroactive) { 294 this.retroactive = retroactive; 295 } 296 297 public RemoveInfo createRemoveCommand() { 298 RemoveInfo command = new RemoveInfo(getConsumerId()); 299 command.setResponseRequired(isResponseRequired()); 300 return command; 301 } 302 303 /** 304 * The broker will avoid dispatching to a lower priority consumer if there 305 * are other higher priority consumers available to dispatch to. This allows 306 * letting the broker to have an affinity to higher priority consumers. 307 * Default priority is 0. 308 * 309 * @openwire:property version=1 310 */ 311 public byte getPriority() { 312 return priority; 313 } 314 315 public void setPriority(byte priority) { 316 this.priority = priority; 317 } 318 319 /** 320 * The route of brokers the command has moved through. 321 * 322 * @openwire:property version=1 cache=true 323 */ 324 public BrokerId[] getBrokerPath() { 325 return brokerPath; 326 } 327 328 public void setBrokerPath(BrokerId[] brokerPath) { 329 this.brokerPath = brokerPath; 330 } 331 332 /** 333 * A transient additional predicate that can be used it inject additional 334 * predicates into the selector on the fly. Handy if if say a Security 335 * Broker interceptor wants to filter out messages based on security level 336 * of the consumer. 337 * 338 * @openwire:property version=1 339 */ 340 public BooleanExpression getAdditionalPredicate() { 341 return additionalPredicate; 342 } 343 344 public void setAdditionalPredicate(BooleanExpression additionalPredicate) { 345 this.additionalPredicate = additionalPredicate; 346 } 347 348 public Response visit(CommandVisitor visitor) throws Exception { 349 return visitor.processAddConsumer(this); 350 } 351 352 /** 353 * @openwire:property version=1 354 * @return Returns the networkSubscription. 355 */ 356 public boolean isNetworkSubscription() { 357 return networkSubscription; 358 } 359 360 /** 361 * @param networkSubscription The networkSubscription to set. 362 */ 363 public void setNetworkSubscription(boolean networkSubscription) { 364 this.networkSubscription = networkSubscription; 365 } 366 367 /** 368 * @openwire:property version=1 369 * @return Returns the optimizedAcknowledge. 370 */ 371 public boolean isOptimizedAcknowledge() { 372 return optimizedAcknowledge; 373 } 374 375 /** 376 * @param optimizedAcknowledge The optimizedAcknowledge to set. 377 */ 378 public void setOptimizedAcknowledge(boolean optimizedAcknowledge) { 379 this.optimizedAcknowledge = optimizedAcknowledge; 380 } 381 382 /** 383 * @return Returns the currentPrefetchSize. 384 */ 385 public int getCurrentPrefetchSize() { 386 return currentPrefetchSize; 387 } 388 389 /** 390 * @param currentPrefetchSize The currentPrefetchSize to set. 391 */ 392 public void setCurrentPrefetchSize(int currentPrefetchSize) { 393 this.currentPrefetchSize = currentPrefetchSize; 394 } 395 396 /** 397 * The broker may be able to optimize it's processing or provides better QOS 398 * if it knows the consumer will not be sending ranged acks. 399 * 400 * @return true if the consumer will not send range acks. 401 * @openwire:property version=1 402 */ 403 public boolean isNoRangeAcks() { 404 return noRangeAcks; 405 } 406 407 public void setNoRangeAcks(boolean noRangeAcks) { 408 this.noRangeAcks = noRangeAcks; 409 } 410 411 public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) { 412 if (networkConsumerIds == null) { 413 networkConsumerIds = new ArrayList<ConsumerId>(); 414 } 415 networkConsumerIds.add(networkConsumerId); 416 } 417 418 public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) { 419 if (networkConsumerIds != null) { 420 networkConsumerIds.remove(networkConsumerId); 421 if (networkConsumerIds.isEmpty()) { 422 networkConsumerIds=null; 423 } 424 } 425 } 426 427 public synchronized boolean isNetworkConsumersEmpty() { 428 return networkConsumerIds == null || networkConsumerIds.isEmpty(); 429 } 430 431 public synchronized List<ConsumerId> getNetworkConsumerIds(){ 432 List<ConsumerId> result = new ArrayList<ConsumerId>(); 433 if (networkConsumerIds != null) { 434 result.addAll(networkConsumerIds); 435 } 436 return result; 437 } 438 439 /** 440 * Tracks the original subscription id that causes a subscription to 441 * percolate through a network when networkTTL > 1. Tracking the original 442 * subscription allows duplicate suppression. 443 * 444 * @return array of the current subscription path 445 * @openwire:property version=4 446 */ 447 public ConsumerId[] getNetworkConsumerPath() { 448 ConsumerId[] result = null; 449 if (networkConsumerIds != null) { 450 result = networkConsumerIds.toArray(new ConsumerId[0]); 451 } 452 return result; 453 } 454 455 public void setNetworkConsumerPath(ConsumerId[] consumerPath) { 456 if (consumerPath != null) { 457 for (int i=0; i<consumerPath.length; i++) { 458 addNetworkConsumerId(consumerPath[i]); 459 } 460 } 461 } 462 463 public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) { 464 this.lastDeliveredSequenceId = lastDeliveredSequenceId; 465 } 466 467 public long getLastDeliveredSequenceId() { 468 return lastDeliveredSequenceId; 469 } 470 471}