001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.util; 018 019import java.util.Set; 020 021import javax.annotation.PostConstruct; 022 023import org.apache.activemq.broker.BrokerPluginSupport; 024import org.apache.activemq.broker.Connection; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.ConsumerBrokerExchange; 027import org.apache.activemq.broker.ProducerBrokerExchange; 028import org.apache.activemq.broker.region.Destination; 029import org.apache.activemq.broker.region.MessageReference; 030import org.apache.activemq.broker.region.Subscription; 031import org.apache.activemq.command.*; 032import org.apache.activemq.usage.Usage; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A simple Broker intercepter which allows you to enable/disable logging. 038 * 039 * @org.apache.xbean.XBean 040 */ 041public class LoggingBrokerPlugin extends BrokerPluginSupport { 042 043 private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class); 044 045 private boolean logAll = false; 046 private boolean logConnectionEvents = true; 047 private boolean logSessionEvents = true; 048 private boolean logTransactionEvents = false; 049 private boolean logConsumerEvents = false; 050 private boolean logProducerEvents = false; 051 private boolean logInternalEvents = false; 052 private boolean perDestinationLogger = false; 053 054 /** 055 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 056 * 057 * delegates to afterPropertiesSet, done to prevent backwards incompatible signature change 058 */ 059 @PostConstruct 060 private void postConstruct() { 061 try { 062 afterPropertiesSet(); 063 } catch (Exception ex) { 064 throw new RuntimeException(ex); 065 } 066 } 067 068 /** 069 * @throws Exception 070 * @org.apache.xbean.InitMethod 071 */ 072 public void afterPropertiesSet() throws Exception { 073 LOG.info("Created LoggingBrokerPlugin: {}", this.toString()); 074 } 075 076 public boolean isLogAll() { 077 return logAll; 078 } 079 080 /** 081 * Logger all Events that go through the Plugin 082 */ 083 public void setLogAll(boolean logAll) { 084 this.logAll = logAll; 085 } 086 087 088 public boolean isLogConnectionEvents() { 089 return logConnectionEvents; 090 } 091 092 /** 093 * Logger Events that are related to connections 094 */ 095 public void setLogConnectionEvents(boolean logConnectionEvents) { 096 this.logConnectionEvents = logConnectionEvents; 097 } 098 099 public boolean isLogSessionEvents() { 100 return logSessionEvents; 101 } 102 103 /** 104 * Logger Events that are related to sessions 105 */ 106 public void setLogSessionEvents(boolean logSessionEvents) { 107 this.logSessionEvents = logSessionEvents; 108 } 109 110 public boolean isLogTransactionEvents() { 111 return logTransactionEvents; 112 } 113 114 /** 115 * Logger Events that are related to transaction processing 116 */ 117 public void setLogTransactionEvents(boolean logTransactionEvents) { 118 this.logTransactionEvents = logTransactionEvents; 119 } 120 121 public boolean isLogConsumerEvents() { 122 return logConsumerEvents; 123 } 124 125 /** 126 * Logger Events that are related to Consumers 127 */ 128 public void setLogConsumerEvents(boolean logConsumerEvents) { 129 this.logConsumerEvents = logConsumerEvents; 130 } 131 132 public boolean isLogProducerEvents() { 133 return logProducerEvents; 134 } 135 136 /** 137 * Logger Events that are related to Producers 138 */ 139 public void setLogProducerEvents(boolean logProducerEvents) { 140 this.logProducerEvents = logProducerEvents; 141 } 142 143 public boolean isLogInternalEvents() { 144 return logInternalEvents; 145 } 146 147 /** 148 * Logger Events that are normally internal to the broker 149 */ 150 public void setLogInternalEvents(boolean logInternalEvents) { 151 this.logInternalEvents = logInternalEvents; 152 } 153 154 @Override 155 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 156 if (isLogAll() || isLogConsumerEvents()) { 157 LOG.info("Acknowledging message for client ID: {}{}", consumerExchange.getConnectionContext().getClientId(), (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : "")); 158 if (ack.getMessageCount() > 1) { 159 LOG.trace("Message count: {}, First Message Id: {}, Last Message Id: {}", new Object[]{ ack.getMessageCount(), ack.getFirstMessageId(), ack.getLastMessageId() }); 160 } 161 } 162 super.acknowledge(consumerExchange, ack); 163 } 164 165 @Override 166 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 167 if (isLogAll() || isLogConsumerEvents()) { 168 LOG.info("Message Pull from: {} on {}", context.getClientId(), pull.getDestination().getPhysicalName()); 169 } 170 return super.messagePull(context, pull); 171 } 172 173 @Override 174 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 175 if (isLogAll() || isLogConnectionEvents()) { 176 LOG.info("Adding Connection: {}", info); 177 } 178 super.addConnection(context, info); 179 } 180 181 @Override 182 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 183 if (isLogAll() || isLogConsumerEvents()) { 184 LOG.info("Adding Consumer: {}", info); 185 } 186 return super.addConsumer(context, info); 187 } 188 189 @Override 190 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 191 if (isLogAll() || isLogProducerEvents()) { 192 LOG.info("Adding Producer: {}", info); 193 } 194 super.addProducer(context, info); 195 } 196 197 @Override 198 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 199 if (isLogAll() || isLogTransactionEvents()) { 200 LOG.info("Committing transaction: {}", xid.getTransactionKey()); 201 } 202 super.commitTransaction(context, xid, onePhase); 203 } 204 205 @Override 206 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 207 if (isLogAll() || isLogConsumerEvents()) { 208 LOG.info("Removing subscription: {}", info); 209 } 210 super.removeSubscription(context, info); 211 } 212 213 @Override 214 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 215 216 TransactionId[] result = super.getPreparedTransactions(context); 217 if ((isLogAll() || isLogTransactionEvents()) && result != null) { 218 StringBuffer tids = new StringBuffer(); 219 for (TransactionId tid : result) { 220 if (tids.length() > 0) { 221 tids.append(", "); 222 } 223 tids.append(tid.getTransactionKey()); 224 } 225 LOG.info("Prepared transactions: {}", tids); 226 } 227 return result; 228 } 229 230 @Override 231 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 232 if (isLogAll() || isLogTransactionEvents()) { 233 LOG.info("Preparing transaction: {}", xid.getTransactionKey()); 234 } 235 return super.prepareTransaction(context, xid); 236 } 237 238 @Override 239 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 240 if (isLogAll() || isLogConnectionEvents()) { 241 LOG.info("Removing Connection: {}", info); 242 } 243 super.removeConnection(context, info, error); 244 } 245 246 @Override 247 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 248 if (isLogAll() || isLogConsumerEvents()) { 249 LOG.info("Removing Consumer: {}", info); 250 } 251 super.removeConsumer(context, info); 252 } 253 254 @Override 255 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 256 if (isLogAll() || isLogProducerEvents()) { 257 LOG.info("Removing Producer: {}", info); 258 } 259 super.removeProducer(context, info); 260 } 261 262 @Override 263 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 264 if (isLogAll() || isLogTransactionEvents()) { 265 LOG.info("Rolling back Transaction: {}", xid.getTransactionKey()); 266 } 267 super.rollbackTransaction(context, xid); 268 } 269 270 @Override 271 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 272 if (isLogAll() || isLogProducerEvents()) { 273 logSend(messageSend.copy()); 274 } 275 super.send(producerExchange, messageSend); 276 } 277 278 private void logSend(Message copy) { 279 Logger perDestinationsLogger = LOG; 280 if (isPerDestinationLogger()) { 281 ActiveMQDestination destination = copy.getDestination(); 282 perDestinationsLogger = LoggerFactory.getLogger(LOG.getName() + 283 "." + destination.getDestinationTypeAsString() + "." + destination.getPhysicalName()); 284 } 285 perDestinationsLogger.info("Sending message: {}", copy); 286 } 287 288 @Override 289 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 290 if (isLogAll() || isLogTransactionEvents()) { 291 LOG.info("Beginning transaction: {}", xid.getTransactionKey()); 292 } 293 super.beginTransaction(context, xid); 294 } 295 296 @Override 297 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 298 if (isLogAll() || isLogTransactionEvents()) { 299 LOG.info("Forgetting transaction: {}", transactionId.getTransactionKey()); 300 } 301 super.forgetTransaction(context, transactionId); 302 } 303 304 @Override 305 public Connection[] getClients() throws Exception { 306 Connection[] result = super.getClients(); 307 308 if (isLogAll() || isLogInternalEvents()) { 309 if (result == null) { 310 LOG.info("Get Clients returned empty list."); 311 } else { 312 StringBuffer cids = new StringBuffer(); 313 for (Connection c : result) { 314 cids.append(cids.length() > 0 ? ", " : ""); 315 cids.append(c.getConnectionId()); 316 } 317 LOG.info("Connected clients: {}", cids); 318 } 319 } 320 return super.getClients(); 321 } 322 323 @Override 324 public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context, 325 ActiveMQDestination destination, boolean create) throws Exception { 326 if (isLogAll() || isLogInternalEvents()) { 327 LOG.info("Adding destination: {}:{}", destination.getDestinationTypeAsString(), destination.getPhysicalName()); 328 } 329 return super.addDestination(context, destination, create); 330 } 331 332 @Override 333 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 334 throws Exception { 335 if (isLogAll() || isLogInternalEvents()) { 336 LOG.info("Removing destination: {}:{}", destination.getDestinationTypeAsString(), destination.getPhysicalName()); 337 } 338 super.removeDestination(context, destination, timeout); 339 } 340 341 @Override 342 public ActiveMQDestination[] getDestinations() throws Exception { 343 ActiveMQDestination[] result = super.getDestinations(); 344 if (isLogAll() || isLogInternalEvents()) { 345 if (result == null) { 346 LOG.info("Get Destinations returned empty list."); 347 } else { 348 StringBuffer destinations = new StringBuffer(); 349 for (ActiveMQDestination dest : result) { 350 destinations.append(destinations.length() > 0 ? ", " : ""); 351 destinations.append(dest.getPhysicalName()); 352 } 353 LOG.info("Get Destinations: {}", destinations); 354 } 355 } 356 return result; 357 } 358 359 @Override 360 public void start() throws Exception { 361 if (isLogAll() || isLogInternalEvents()) { 362 LOG.info("Starting {}", getBrokerName()); 363 } 364 super.start(); 365 } 366 367 @Override 368 public void stop() throws Exception { 369 if (isLogAll() || isLogInternalEvents()) { 370 LOG.info("Stopping {}", getBrokerName()); 371 } 372 super.stop(); 373 } 374 375 @Override 376 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 377 if (isLogAll() || isLogSessionEvents()) { 378 LOG.info("Adding Session: {}", info); 379 } 380 super.addSession(context, info); 381 } 382 383 @Override 384 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 385 if (isLogAll() || isLogSessionEvents()) { 386 LOG.info("Removing Session: {}", info); 387 } 388 super.removeSession(context, info); 389 } 390 391 @Override 392 public void addBroker(Connection connection, BrokerInfo info) { 393 if (isLogAll() || isLogInternalEvents()) { 394 LOG.info("Adding Broker {}", info.getBrokerName()); 395 } 396 super.addBroker(connection, info); 397 } 398 399 @Override 400 public void removeBroker(Connection connection, BrokerInfo info) { 401 if (isLogAll() || isLogInternalEvents()) { 402 LOG.info("Removing Broker {}", info.getBrokerName()); 403 } 404 super.removeBroker(connection, info); 405 } 406 407 @Override 408 public BrokerInfo[] getPeerBrokerInfos() { 409 BrokerInfo[] result = super.getPeerBrokerInfos(); 410 if (isLogAll() || isLogInternalEvents()) { 411 if (result == null) { 412 LOG.info("Get Peer Broker Infos returned empty list."); 413 } else { 414 StringBuffer peers = new StringBuffer(); 415 for (BrokerInfo bi : result) { 416 peers.append(peers.length() > 0 ? ", " : ""); 417 peers.append(bi.getBrokerName()); 418 } 419 LOG.info("Get Peer Broker Infos: {}", peers); 420 } 421 } 422 return result; 423 } 424 425 @Override 426 public void preProcessDispatch(MessageDispatch messageDispatch) { 427 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 428 LOG.info("preProcessDispatch: {}", messageDispatch); 429 } 430 super.preProcessDispatch(messageDispatch); 431 } 432 433 @Override 434 public void postProcessDispatch(MessageDispatch messageDispatch) { 435 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 436 LOG.info("postProcessDispatch: {}", messageDispatch); 437 } 438 super.postProcessDispatch(messageDispatch); 439 } 440 441 @Override 442 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 443 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 444 LOG.info("ProcessDispatchNotification: {}", messageDispatchNotification); 445 } 446 super.processDispatchNotification(messageDispatchNotification); 447 } 448 449 @Override 450 public Set<ActiveMQDestination> getDurableDestinations() { 451 Set<ActiveMQDestination> result = super.getDurableDestinations(); 452 if (isLogAll() || isLogInternalEvents()) { 453 if (result == null) { 454 LOG.info("Get Durable Destinations returned empty list."); 455 } else { 456 StringBuffer destinations = new StringBuffer(); 457 for (ActiveMQDestination dest : result) { 458 destinations.append(destinations.length() > 0 ? ", " : ""); 459 destinations.append(dest.getPhysicalName()); 460 } 461 LOG.info("Get Durable Destinations: {}", destinations); 462 } 463 } 464 return result; 465 } 466 467 @Override 468 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 469 if (isLogAll() || isLogInternalEvents()) { 470 LOG.info("Adding destination info: {}", info); 471 } 472 super.addDestinationInfo(context, info); 473 } 474 475 @Override 476 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 477 if (isLogAll() || isLogInternalEvents()) { 478 LOG.info("Removing destination info: {}", info); 479 } 480 super.removeDestinationInfo(context, info); 481 } 482 483 @Override 484 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { 485 if (isLogAll() || isLogInternalEvents()) { 486 String msg = "Unable to display message."; 487 488 msg = message.getMessage().toString(); 489 490 LOG.info("Message has expired: {}", msg); 491 } 492 super.messageExpired(context, message, subscription); 493 } 494 495 @Override 496 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 497 Subscription subscription, Throwable poisonCause) { 498 if (isLogAll() || isLogInternalEvents()) { 499 String msg = "Unable to display message."; 500 501 msg = messageReference.getMessage().toString(); 502 503 LOG.info("Sending to DLQ: {}", msg); 504 } 505 return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); 506 } 507 508 @Override 509 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination destination) { 510 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 511 LOG.info("Fast Producer: {}", producerInfo); 512 } 513 super.fastProducer(context, producerInfo, destination); 514 } 515 516 @Override 517 public void isFull(ConnectionContext context, Destination destination, Usage usage) { 518 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 519 LOG.info("Destination is full: {}", destination.getName()); 520 } 521 super.isFull(context, destination, usage); 522 } 523 524 @Override 525 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 526 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 527 String msg = "Unable to display message."; 528 529 msg = messageReference.getMessage().toString(); 530 531 LOG.info("Message consumed: {}", msg); 532 } 533 super.messageConsumed(context, messageReference); 534 } 535 536 @Override 537 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 538 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 539 String msg = "Unable to display message."; 540 541 msg = messageReference.getMessage().toString(); 542 543 LOG.info("Message delivered: {}", msg); 544 } 545 super.messageDelivered(context, messageReference); 546 } 547 548 @Override 549 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 550 if (isLogAll() || isLogInternalEvents()) { 551 String msg = "Unable to display message."; 552 553 msg = messageReference.getMessage().toString(); 554 555 LOG.info("Message discarded: {}", msg); 556 } 557 super.messageDiscarded(context, sub, messageReference); 558 } 559 560 @Override 561 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { 562 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 563 LOG.info("Detected slow consumer on {}", destination.getName()); 564 StringBuffer buf = new StringBuffer("Connection("); 565 buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId()); 566 buf.append(") Session("); 567 buf.append(subs.getConsumerInfo().getConsumerId().getSessionId()); 568 buf.append(")"); 569 LOG.info(buf.toString()); 570 } 571 super.slowConsumer(context, destination, subs); 572 } 573 574 @Override 575 public void nowMasterBroker() { 576 if (isLogAll() || isLogInternalEvents()) { 577 LOG.info("Is now the master broker: {}", getBrokerName()); 578 } 579 super.nowMasterBroker(); 580 } 581 582 @Override 583 public String toString() { 584 StringBuffer buf = new StringBuffer(); 585 buf.append("LoggingBrokerPlugin("); 586 buf.append("logAll="); 587 buf.append(isLogAll()); 588 buf.append(", logConnectionEvents="); 589 buf.append(isLogConnectionEvents()); 590 buf.append(", logSessionEvents="); 591 buf.append(isLogSessionEvents()); 592 buf.append(", logConsumerEvents="); 593 buf.append(isLogConsumerEvents()); 594 buf.append(", logProducerEvents="); 595 buf.append(isLogProducerEvents()); 596 buf.append(", logTransactionEvents="); 597 buf.append(isLogTransactionEvents()); 598 buf.append(", logInternalEvents="); 599 buf.append(isLogInternalEvents()); 600 buf.append(")"); 601 return buf.toString(); 602 } 603 604 public void setPerDestinationLogger(boolean perDestinationLogger) { 605 this.perDestinationLogger = perDestinationLogger; 606 } 607 608 public boolean isPerDestinationLogger() { 609 return perDestinationLogger; 610 } 611}