001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.index; 018 019import java.io.IOException; 020import java.lang.ref.WeakReference; 021import java.util.Iterator; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.activemq.store.kahadb.disk.index.ListNode.ListIterator; 028import org.apache.activemq.store.kahadb.disk.page.Page; 029import org.apache.activemq.store.kahadb.disk.page.PageFile; 030import org.apache.activemq.store.kahadb.disk.page.Transaction; 031import org.apache.activemq.store.kahadb.disk.util.Marshaller; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035public class ListIndex<Key,Value> implements Index<Key,Value> { 036 037 private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class); 038 public final static long NOT_SET = -1; 039 protected PageFile pageFile; 040 protected long headPageId; 041 protected long tailPageId; 042 private AtomicLong size = new AtomicLong(0); 043 044 protected AtomicBoolean loaded = new AtomicBoolean(); 045 046 private ListNode.NodeMarshaller<Key, Value> marshaller; 047 private Marshaller<Key> keyMarshaller; 048 private Marshaller<Value> valueMarshaller; 049 050 public ListIndex() { 051 } 052 053 public ListIndex(PageFile pageFile, long headPageId) { 054 this.pageFile = pageFile; 055 setHeadPageId(headPageId); 056 } 057 058 @SuppressWarnings("rawtypes") 059 public ListIndex(PageFile pageFile, Page page) { 060 this(pageFile, page.getPageId()); 061 } 062 063 @Override 064 synchronized public void load(Transaction tx) throws IOException { 065 if (loaded.compareAndSet(false, true)) { 066 LOG.debug("loading"); 067 if( keyMarshaller == null ) { 068 throw new IllegalArgumentException("The key marshaller must be set before loading the ListIndex"); 069 } 070 if( valueMarshaller == null ) { 071 throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex"); 072 } 073 074 marshaller = new ListNode.NodeMarshaller<Key, Value>(keyMarshaller, valueMarshaller); 075 final Page<ListNode<Key,Value>> p = tx.load(getHeadPageId(), null); 076 if( p.getType() == Page.PAGE_FREE_TYPE ) { 077 // Need to initialize it.. 078 ListNode<Key, Value> root = createNode(p); 079 storeNode(tx, root, true); 080 setHeadPageId(p.getPageId()); 081 setTailPageId(getHeadPageId()); 082 } else { 083 ListNode<Key, Value> node = loadNode(tx, getHeadPageId()); 084 setTailPageId(getHeadPageId()); 085 size.addAndGet(node.size(tx)); 086 onLoad(node, tx); 087 while (node.getNext() != NOT_SET ) { 088 node = loadNode(tx, node.getNext()); 089 size.addAndGet(node.size(tx)); 090 onLoad(node, tx); 091 setTailPageId(node.getPageId()); 092 } 093 } 094 } 095 } 096 097 protected void onLoad(ListNode<Key, Value> node, Transaction tx) { 098 099 } 100 101 @Override 102 synchronized public void unload(Transaction tx) { 103 if (loaded.compareAndSet(true, false)) { 104 } 105 } 106 107 protected ListNode<Key,Value> getHead(Transaction tx) throws IOException { 108 return loadNode(tx, getHeadPageId()); 109 } 110 111 protected ListNode<Key,Value> getTail(Transaction tx) throws IOException { 112 return loadNode(tx, getTailPageId()); 113 } 114 115 @Override 116 synchronized public boolean containsKey(Transaction tx, Key key) throws IOException { 117 assertLoaded(); 118 119 if (size.get() == 0) { 120 return false; 121 } 122 123 for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) { 124 Map.Entry<Key,Value> candidate = iterator.next(); 125 if (key.equals(candidate.getKey())) { 126 return true; 127 } 128 } 129 return false; 130 } 131 132 private ListNode<Key, Value> lastGetNodeCache = null; 133 private Map.Entry<Key, Value> lastGetEntryCache = null; 134 private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null); 135 136 @Override 137 @SuppressWarnings({ "rawtypes", "unchecked" }) 138 synchronized public Value get(Transaction tx, Key key) throws IOException { 139 assertLoaded(); 140 for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) { 141 Map.Entry<Key,Value> candidate = iterator.next(); 142 if (key.equals(candidate.getKey())) { 143 this.lastGetNodeCache = ((ListIterator) iterator).getCurrent(); 144 this.lastGetEntryCache = candidate; 145 this.lastCacheTxSrc = new WeakReference<Transaction>(tx); 146 return candidate.getValue(); 147 } 148 } 149 return null; 150 } 151 152 /** 153 * Update the value of the item with the given key in the list if ot exists, otherwise 154 * it appends the value to the end of the list. 155 * 156 * @return the old value contained in the list if one exists or null. 157 */ 158 @Override 159 @SuppressWarnings({ "rawtypes" }) 160 synchronized public Value put(Transaction tx, Key key, Value value) throws IOException { 161 162 Value oldValue = null; 163 164 if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) { 165 166 if(lastGetEntryCache.getKey().equals(key)) { 167 oldValue = lastGetEntryCache.setValue(value); 168 lastGetEntryCache.setValue(value); 169 lastGetNodeCache.storeUpdate(tx); 170 flushCache(); 171 return oldValue; 172 } 173 174 // This searches from the last location of a call to get for the element to replace 175 // all the way to the end of the ListIndex. 176 Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx); 177 while (iterator.hasNext()) { 178 Map.Entry<Key, Value> entry = iterator.next(); 179 if (entry.getKey().equals(key)) { 180 oldValue = entry.setValue(value); 181 ((ListIterator) iterator).getCurrent().storeUpdate(tx); 182 flushCache(); 183 return oldValue; 184 } 185 } 186 } else { 187 flushCache(); 188 } 189 190 // Not found because the cache wasn't set or its not at the end of the list so we 191 // start from the beginning and go to the cached location or the end, then we do 192 // an add if its not found. 193 Iterator<Map.Entry<Key, Value>> iterator = iterator(tx); 194 while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) { 195 Map.Entry<Key, Value> entry = iterator.next(); 196 if (entry.getKey().equals(key)) { 197 oldValue = entry.setValue(value); 198 ((ListIterator) iterator).getCurrent().storeUpdate(tx); 199 flushCache(); 200 return oldValue; 201 } 202 } 203 204 // Not found so add it last. 205 flushCache(); 206 207 return add(tx, key, value); 208 } 209 210 synchronized public Value add(Transaction tx, Key key, Value value) throws IOException { 211 assertLoaded(); 212 getTail(tx).put(tx, key, value); 213 size.incrementAndGet(); 214 flushCache(); 215 return null; 216 } 217 218 synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException { 219 assertLoaded(); 220 getHead(tx).addFirst(tx, key, value); 221 size.incrementAndGet(); 222 flushCache(); 223 return null; 224 } 225 226 @Override 227 @SuppressWarnings("rawtypes") 228 synchronized public Value remove(Transaction tx, Key key) throws IOException { 229 assertLoaded(); 230 231 if (size.get() == 0) { 232 return null; 233 } 234 235 if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) { 236 237 // This searches from the last location of a call to get for the element to remove 238 // all the way to the end of the ListIndex. 239 Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx); 240 while (iterator.hasNext()) { 241 Map.Entry<Key, Value> entry = iterator.next(); 242 if (entry.getKey().equals(key)) { 243 iterator.remove(); 244 flushCache(); 245 return entry.getValue(); 246 } 247 } 248 } else { 249 flushCache(); 250 } 251 252 // Not found because the cache wasn't set or its not at the end of the list so we 253 // start from the beginning and go to the cached location or the end to find the 254 // element to remove. 255 Iterator<Map.Entry<Key, Value>> iterator = iterator(tx); 256 while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) { 257 Map.Entry<Key, Value> entry = iterator.next(); 258 if (entry.getKey().equals(key)) { 259 iterator.remove(); 260 flushCache(); 261 return entry.getValue(); 262 } 263 } 264 265 return null; 266 } 267 268 public void onRemove(Entry<Key, Value> removed) { 269 size.decrementAndGet(); 270 flushCache(); 271 } 272 273 @Override 274 public boolean isTransient() { 275 return false; 276 } 277 278 @Override 279 synchronized public void clear(Transaction tx) throws IOException { 280 for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) { 281 ListNode<Key,Value>candidate = iterator.next(); 282 candidate.clear(tx); 283 // break up the transaction 284 tx.commit(); 285 } 286 flushCache(); 287 size.set(0); 288 } 289 290 synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException { 291 return getHead(tx).listNodeIterator(tx); 292 } 293 294 synchronized public boolean isEmpty(final Transaction tx) throws IOException { 295 return getHead(tx).isEmpty(tx); 296 } 297 298 @Override 299 synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException { 300 return getHead(tx).iterator(tx); 301 } 302 303 synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException { 304 return getHead(tx).iterator(tx, initialPosition); 305 } 306 307 synchronized public Map.Entry<Key,Value> getFirst(Transaction tx) throws IOException { 308 return getHead(tx).getFirst(tx); 309 } 310 311 synchronized public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException { 312 return getTail(tx).getLast(tx); 313 } 314 315 private void assertLoaded() throws IllegalStateException { 316 if( !loaded.get() ) { 317 throw new IllegalStateException("TheListIndex is not loaded"); 318 } 319 } 320 321 ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException { 322 Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller); 323 ListNode<Key, Value> node = page.get(); 324 node.setPage(page); 325 node.setContainingList(this); 326 return node; 327 } 328 329 ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException { 330 ListNode<Key,Value> node = new ListNode<Key,Value>(); 331 node.setPage(page); 332 page.set(node); 333 node.setContainingList(this); 334 return node; 335 } 336 337 public ListNode<Key,Value> createNode(Transaction tx) throws IOException { 338 return createNode(tx.<ListNode<Key,Value>>load(tx.<ListNode<Key,Value>>allocate().getPageId(), null)); 339 } 340 341 public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException { 342 tx.store(node.getPage(), marshaller, overflow); 343 flushCache(); 344 } 345 346 public PageFile getPageFile() { 347 return pageFile; 348 } 349 350 public void setPageFile(PageFile pageFile) { 351 this.pageFile = pageFile; 352 } 353 354 public long getHeadPageId() { 355 return headPageId; 356 } 357 358 public void setHeadPageId(long headPageId) { 359 this.headPageId = headPageId; 360 } 361 362 public Marshaller<Key> getKeyMarshaller() { 363 return keyMarshaller; 364 } 365 @Override 366 public void setKeyMarshaller(Marshaller<Key> keyMarshaller) { 367 this.keyMarshaller = keyMarshaller; 368 } 369 370 public Marshaller<Value> getValueMarshaller() { 371 return valueMarshaller; 372 } 373 @Override 374 public void setValueMarshaller(Marshaller<Value> valueMarshaller) { 375 this.valueMarshaller = valueMarshaller; 376 } 377 378 public void setTailPageId(long tailPageId) { 379 this.tailPageId = tailPageId; 380 } 381 382 public long getTailPageId() { 383 return tailPageId; 384 } 385 386 public long size() { 387 return size.get(); 388 } 389 390 private void flushCache() { 391 this.lastGetEntryCache = null; 392 this.lastGetNodeCache = null; 393 this.lastCacheTxSrc.clear(); 394 } 395}