001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command; 018 019import java.net.URI; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.List; 023import java.util.StringTokenizer; 024 025import javax.jms.Destination; 026import javax.jms.Message; 027import javax.management.MBeanServerConnection; 028import javax.management.MBeanServerInvocationHandler; 029import javax.management.ObjectInstance; 030import javax.management.ObjectName; 031import javax.management.openmbean.CompositeData; 032import javax.management.remote.JMXConnector; 033 034import org.apache.activemq.broker.jmx.QueueViewMBean; 035import org.apache.activemq.command.ActiveMQQueue; 036import org.apache.activemq.console.util.AmqMessagesUtil; 037import org.apache.activemq.console.util.JmxMBeansUtil; 038 039public class PurgeCommand extends AbstractJmxCommand { 040 041 protected String[] helpFile = new String[] { 042 "Task Usage: Main purge [browse-options] <destinations>", 043 "Description: Delete selected destination's messages that matches the message selector.", 044 "", 045 "Purge Options:", 046 " --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to", 047 " the messages selector format.", 048 " --jmxurl <url> Set the JMX URL to connect to.", 049 " --pid <pid> Set the pid to connect to (only on Sun JVM).", 050 " --jmxuser <user> Set the JMX user used for authenticating.", 051 " --jmxpassword <password> Set the JMX password used for authenticating.", 052 " --jmxlocal Use the local JMX server instead of a remote one.", 053 " --version Display the version information.", 054 " -h,-?,--help Display the browse broker help information.", 055 "", 056 "Examples:", 057 " Main purge FOO.BAR", 058 " - Delete all the messages in queue FOO.BAR", 059 060 " Main purge --msgsel \"JMSMessageID='*:10',JMSPriority>5\" FOO.*", 061 " - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in", 062 " the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the", 063 " queue FOO.BAR.", 064 " SLQ92 syntax is also supported.", 065 " * To use wildcard queries, the field must be a string and the query enclosed in ''", 066 " Use double quotes \"\" around the entire message selector string.", 067 "" 068 }; 069 070 private final List<String> queryAddObjects = new ArrayList<String>(10); 071 private final List<String> querySubObjects = new ArrayList<String>(10); 072 073 /** 074 * Execute the purge command, which allows you to purge the messages in a 075 * given JMS destination 076 * 077 * @param tokens - command arguments 078 * @throws Exception 079 */ 080 protected void runTask(List<String> tokens) throws Exception { 081 try { 082 // If there is no queue name specified, let's select all 083 if (tokens.isEmpty()) { 084 tokens.add("*"); 085 } 086 087 // Iterate through the queue names 088 for (Iterator<String> i = tokens.iterator(); i.hasNext();) { 089 List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "Type=Queue,Destination=" + i.next() + ",*"); 090 091 for (Iterator j = queueList.iterator(); j.hasNext();) { 092 ObjectName queueName = ((ObjectInstance)j.next()).getObjectName(); 093 if (queryAddObjects.isEmpty()) { 094 purgeQueue(queueName); 095 } else { 096 097 QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler. 098 newProxyInstance(createJmxConnection(), 099 queueName, 100 QueueViewMBean.class, 101 true); 102 int removed = 0; 103 104 // AMQ-3404: We support two syntaxes for the message 105 // selector query: 106 // 1) AMQ specific: 107 // "JMSPriority>2,MyHeader='Foo'" 108 // 109 // 2) SQL-92 syntax: 110 // "(JMSPriority>2) AND (MyHeader='Foo')" 111 // 112 // If syntax style 1) is used, the comma separated 113 // criterias are broken into List<String> elements. 114 // We then need to construct the SQL-92 query out of 115 // this list. 116 117 String sqlQuery = null; 118 if (queryAddObjects.size() > 1) { 119 sqlQuery = convertToSQL92(queryAddObjects); 120 } else { 121 sqlQuery = queryAddObjects.get(0); 122 } 123 removed = proxy.removeMatchingMessages(sqlQuery); 124 context.printInfo("Removed: " + removed 125 + " messages for message selector " + sqlQuery.toString()); 126 } 127 } 128 } 129 } catch (Exception e) { 130 context.printException(new RuntimeException("Failed to execute purge task. Reason: " + e)); 131 throw new Exception(e); 132 } 133 } 134 135 136 /** 137 * Purge all the messages in the queue 138 * 139 * @param queue - ObjectName of the queue to purge 140 * @throws Exception 141 */ 142 public void purgeQueue(ObjectName queue) throws Exception { 143 context.printInfo("Purging all messages in queue: " + queue.getKeyProperty("Destination")); 144 createJmxConnection().invoke(queue, "purge", new Object[] {}, new String[] {}); 145 } 146 147 /** 148 * Handle the --msgsel, --xmsgsel. 149 * 150 * @param token - option token to handle 151 * @param tokens - succeeding command arguments 152 * @throws Exception 153 */ 154 protected void handleOption(String token, List<String> tokens) throws Exception { 155 // If token is an additive message selector option 156 if (token.startsWith("--msgsel")) { 157 158 // If no message selector is specified, or next token is a new 159 // option 160 if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { 161 context.printException(new IllegalArgumentException("Message selector not specified")); 162 return; 163 } 164 165 StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); 166 while (queryTokens.hasMoreTokens()) { 167 queryAddObjects.add(queryTokens.nextToken()); 168 } 169 } else if (token.startsWith("--xmsgsel")) { 170 // If token is a substractive message selector option 171 172 // If no message selector is specified, or next token is a new 173 // option 174 if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { 175 context.printException(new IllegalArgumentException("Message selector not specified")); 176 return; 177 } 178 179 StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); 180 while (queryTokens.hasMoreTokens()) { 181 querySubObjects.add(queryTokens.nextToken()); 182 } 183 184 } else { 185 // Let super class handle unknown option 186 super.handleOption(token, tokens); 187 } 188 } 189 190 /** 191 * Converts the message selector as provided on command line 192 * argument to activem-admin into an SQL-92 conform string. 193 * E.g. 194 * "JMSMessageID='*:10',JMSPriority>5" 195 * gets converted into 196 * "(JMSMessageID='%:10') AND (JMSPriority>5)" 197 * 198 * @param tokens - List of message selector query parameters 199 * @return SQL-92 string of that query. 200 */ 201 public String convertToSQL92(List<String> tokens) { 202 String selector = ""; 203 204 // Convert to message selector 205 for (Iterator i = tokens.iterator(); i.hasNext(); ) { 206 selector = selector + "(" + i.next().toString() + ") AND "; 207 } 208 209 // Remove last AND and replace '*' with '%' 210 if (!selector.equals("")) { 211 selector = selector.substring(0, selector.length() - 5); 212 selector = selector.replace('*', '%'); 213 } 214 return selector; 215 } 216 217 218 /** 219 * Print the help messages for the browse command 220 */ 221 protected void printHelp() { 222 context.printHelp(helpFile); 223 } 224 225}