001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.util; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024import org.apache.activemq.broker.BrokerService; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * @org.apache.xbean.XBean 030 */ 031 public class DefaultIOExceptionHandler implements IOExceptionHandler { 032 033 private static final Logger LOG = LoggerFactory 034 .getLogger(DefaultIOExceptionHandler.class); 035 private BrokerService broker; 036 private boolean ignoreAllErrors = false; 037 private boolean ignoreNoSpaceErrors = true; 038 private boolean ignoreSQLExceptions = true; 039 private boolean stopStartConnectors = false; 040 private String noSpaceMessage = "space"; 041 private String sqlExceptionMessage = ""; // match all 042 private long resumeCheckSleepPeriod = 5*1000; 043 private AtomicBoolean stopStartInProgress = new AtomicBoolean(false); 044 045 public void handle(IOException exception) { 046 if (ignoreAllErrors) { 047 LOG.info("Ignoring IO exception, " + exception, exception); 048 return; 049 } 050 051 if (ignoreNoSpaceErrors) { 052 Throwable cause = exception; 053 while (cause != null && cause instanceof IOException) { 054 String message = cause.getMessage(); 055 if (message != null && message.contains(noSpaceMessage)) { 056 LOG.info("Ignoring no space left exception, " + exception, exception); 057 return; 058 } 059 cause = cause.getCause(); 060 } 061 } 062 063 if (ignoreSQLExceptions) { 064 Throwable cause = exception; 065 while (cause != null) { 066 String message = cause.getMessage(); 067 if (cause instanceof SQLException && message.contains(sqlExceptionMessage)) { 068 LOG.info("Ignoring SQLException, " + exception, cause); 069 return; 070 } 071 cause = cause.getCause(); 072 } 073 } 074 075 if (stopStartConnectors) { 076 if (!stopStartInProgress.compareAndSet(false, true)) { 077 // we are already working on it 078 return; 079 } 080 LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception); 081 082 new Thread("stop transport connectors on IO exception") { 083 public void run() { 084 try { 085 ServiceStopper stopper = new ServiceStopper(); 086 broker.stopAllConnectors(stopper); 087 } catch (Exception e) { 088 LOG.warn("Failure occurred while stopping broker connectors", e); 089 } 090 } 091 }.start(); 092 093 // resume again 094 new Thread("restart transport connectors post IO exception") { 095 public void run() { 096 try { 097 while (isPersistenceAdapterDown()) { 098 LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports"); 099 TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod); 100 } 101 broker.startAllConnectors(); 102 } catch (Exception e) { 103 LOG.warn("Failure occurred while restarting broker connectors", e); 104 } finally { 105 stopStartInProgress.compareAndSet(true, false); 106 } 107 } 108 109 private boolean isPersistenceAdapterDown() { 110 boolean checkpointSuccess = false; 111 try { 112 broker.getPersistenceAdapter().checkpoint(true); 113 checkpointSuccess = true; 114 } catch (Throwable ignored) {} 115 return !checkpointSuccess; 116 } 117 }.start(); 118 119 return; 120 } 121 122 LOG.info("Stopping the broker due to IO exception, " + exception, exception); 123 new Thread("Stopping the broker due to IO exception") { 124 public void run() { 125 try { 126 broker.stop(); 127 } catch (Exception e) { 128 LOG.warn("Failure occurred while stopping broker", e); 129 } 130 } 131 }.start(); 132 } 133 134 public void setBrokerService(BrokerService broker) { 135 this.broker = broker; 136 } 137 138 public boolean isIgnoreAllErrors() { 139 return ignoreAllErrors; 140 } 141 142 public void setIgnoreAllErrors(boolean ignoreAllErrors) { 143 this.ignoreAllErrors = ignoreAllErrors; 144 } 145 146 public boolean isIgnoreNoSpaceErrors() { 147 return ignoreNoSpaceErrors; 148 } 149 150 public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) { 151 this.ignoreNoSpaceErrors = ignoreNoSpaceErrors; 152 } 153 154 public String getNoSpaceMessage() { 155 return noSpaceMessage; 156 } 157 158 public void setNoSpaceMessage(String noSpaceMessage) { 159 this.noSpaceMessage = noSpaceMessage; 160 } 161 162 public boolean isIgnoreSQLExceptions() { 163 return ignoreSQLExceptions; 164 } 165 166 public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) { 167 this.ignoreSQLExceptions = ignoreSQLExceptions; 168 } 169 170 public String getSqlExceptionMessage() { 171 return sqlExceptionMessage; 172 } 173 174 public void setSqlExceptionMessage(String sqlExceptionMessage) { 175 this.sqlExceptionMessage = sqlExceptionMessage; 176 } 177 178 public boolean isStopStartConnectors() { 179 return stopStartConnectors; 180 } 181 182 public void setStopStartConnectors(boolean stopStartConnectors) { 183 this.stopStartConnectors = stopStartConnectors; 184 } 185 186 public long getResumeCheckSleepPeriod() { 187 return resumeCheckSleepPeriod; 188 } 189 190 public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) { 191 this.resumeCheckSleepPeriod = resumeCheckSleepPeriod; 192 } 193}