001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.LinkedList; 022 import java.util.concurrent.atomic.AtomicBoolean; 023 import java.util.concurrent.atomic.AtomicLong; 024 025 import org.apache.activemq.broker.Broker; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.broker.region.Destination; 028 import org.apache.activemq.broker.region.MessageReference; 029 import org.apache.activemq.broker.region.QueueMessageReference; 030 import org.apache.activemq.command.Message; 031 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 032 import org.apache.activemq.kaha.CommandMarshaller; 033 import org.apache.activemq.kaha.ListContainer; 034 import org.apache.activemq.kaha.Store; 035 import org.apache.activemq.openwire.OpenWireFormat; 036 import org.apache.activemq.usage.SystemUsage; 037 import org.apache.activemq.usage.Usage; 038 import org.apache.activemq.usage.UsageListener; 039 import org.apache.commons.logging.Log; 040 import org.apache.commons.logging.LogFactory; 041 042 /** 043 * persist pending messages pending message (messages awaiting dispatch to a 044 * consumer) cursor 045 * 046 * @version $Revision: 911759 $ 047 */ 048 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { 049 private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class); 050 private static final AtomicLong NAME_COUNT = new AtomicLong(); 051 protected Broker broker; 052 private Store store; 053 private String name; 054 private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>(); 055 private ListContainer<MessageReference> diskList; 056 private Iterator<MessageReference> iter; 057 private Destination regionDestination; 058 private boolean iterating; 059 private boolean flushRequired; 060 private AtomicBoolean started = new AtomicBoolean(); 061 /** 062 * @param name 063 * @param store 064 */ 065 public FilePendingMessageCursor(Broker broker,String name) { 066 this.useCache=false; 067 this.broker = broker; 068 //the store can be null if the BrokerService has persistence 069 //turned off 070 this.store= broker.getTempDataStore(); 071 this.name = NAME_COUNT.incrementAndGet() + "_" + name; 072 } 073 074 public void start() throws Exception { 075 if (started.compareAndSet(false, true)) { 076 super.start(); 077 if (systemUsage != null) { 078 systemUsage.getMemoryUsage().addUsageListener(this); 079 } 080 } 081 } 082 083 public void stop() throws Exception { 084 if (started.compareAndSet(true, false)) { 085 super.stop(); 086 if (systemUsage != null) { 087 systemUsage.getMemoryUsage().removeUsageListener(this); 088 } 089 } 090 } 091 092 /** 093 * @return true if there are no pending messages 094 */ 095 public synchronized boolean isEmpty() { 096 if(memoryList.isEmpty() && isDiskListEmpty()){ 097 return true; 098 } 099 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 100 MessageReference node = iterator.next(); 101 if (node== QueueMessageReference.NULL_MESSAGE){ 102 continue; 103 } 104 if (!node.isDropped()) { 105 return false; 106 } 107 // We can remove dropped references. 108 iterator.remove(); 109 } 110 return isDiskListEmpty(); 111 } 112 113 114 115 /** 116 * reset the cursor 117 */ 118 public synchronized void reset() { 119 iterating = true; 120 last = null; 121 iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator(); 122 } 123 124 public synchronized void release() { 125 iterating = false; 126 if (flushRequired) { 127 flushRequired = false; 128 flushToDisk(); 129 } 130 } 131 132 public synchronized void destroy() throws Exception { 133 stop(); 134 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { 135 Message node = (Message)i.next(); 136 node.decrementReferenceCount(); 137 } 138 memoryList.clear(); 139 destroyDiskList(); 140 } 141 142 private void destroyDiskList() throws Exception { 143 if (!isDiskListEmpty()) { 144 Iterator<MessageReference> iterator = diskList.iterator(); 145 while (iterator.hasNext()) { 146 iterator.next(); 147 iterator.remove(); 148 } 149 diskList.clear(); 150 } 151 store.deleteListContainer(name, "TopicSubscription"); 152 } 153 154 public synchronized LinkedList<MessageReference> pageInList(int maxItems) { 155 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 156 int count = 0; 157 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { 158 MessageReference ref = i.next(); 159 ref.incrementReferenceCount(); 160 result.add(ref); 161 count++; 162 } 163 if (count < maxItems && !isDiskListEmpty()) { 164 for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) { 165 Message message = (Message)i.next(); 166 message.setRegionDestination(regionDestination); 167 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 168 message.incrementReferenceCount(); 169 result.add(message); 170 count++; 171 } 172 } 173 return result; 174 } 175 176 /** 177 * add message to await dispatch 178 * 179 * @param node 180 */ 181 public synchronized void addMessageLast(MessageReference node) { 182 if (!node.isExpired()) { 183 try { 184 regionDestination = node.getMessage().getRegionDestination(); 185 if (isDiskListEmpty()) { 186 if (hasSpace() || this.store==null) { 187 memoryList.add(node); 188 node.incrementReferenceCount(); 189 return; 190 } 191 } 192 if (!hasSpace()) { 193 if (isDiskListEmpty()) { 194 expireOldMessages(); 195 if (hasSpace()) { 196 memoryList.add(node); 197 node.incrementReferenceCount(); 198 return; 199 } else { 200 flushToDisk(); 201 } 202 } 203 } 204 systemUsage.getTempUsage().waitForSpace(); 205 getDiskList().add(node); 206 207 } catch (Exception e) { 208 LOG.error("Caught an Exception adding a message: " + node 209 + " first to FilePendingMessageCursor ", e); 210 throw new RuntimeException(e); 211 } 212 } else { 213 discard(node); 214 } 215 } 216 217 /** 218 * add message to await dispatch 219 * 220 * @param node 221 */ 222 public synchronized void addMessageFirst(MessageReference node) { 223 if (!node.isExpired()) { 224 try { 225 regionDestination = node.getMessage().getRegionDestination(); 226 if (isDiskListEmpty()) { 227 if (hasSpace()) { 228 memoryList.addFirst(node); 229 node.incrementReferenceCount(); 230 return; 231 } 232 } 233 if (!hasSpace()) { 234 if (isDiskListEmpty()) { 235 expireOldMessages(); 236 if (hasSpace()) { 237 memoryList.addFirst(node); 238 node.incrementReferenceCount(); 239 return; 240 } else { 241 flushToDisk(); 242 } 243 } 244 } 245 systemUsage.getTempUsage().waitForSpace(); 246 node.decrementReferenceCount(); 247 getDiskList().addFirst(node); 248 249 } catch (Exception e) { 250 LOG.error("Caught an Exception adding a message: " + node 251 + " first to FilePendingMessageCursor ", e); 252 throw new RuntimeException(e); 253 } 254 } else { 255 discard(node); 256 } 257 } 258 259 /** 260 * @return true if there pending messages to dispatch 261 */ 262 public synchronized boolean hasNext() { 263 return iter.hasNext(); 264 } 265 266 /** 267 * @return the next pending message 268 */ 269 public synchronized MessageReference next() { 270 Message message = (Message)iter.next(); 271 last = message; 272 if (!isDiskListEmpty()) { 273 // got from disk 274 message.setRegionDestination(regionDestination); 275 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 276 } 277 message.incrementReferenceCount(); 278 return message; 279 } 280 281 /** 282 * remove the message at the cursor position 283 */ 284 public synchronized void remove() { 285 iter.remove(); 286 if (last != null) { 287 last.decrementReferenceCount(); 288 } 289 } 290 291 /** 292 * @param node 293 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) 294 */ 295 public synchronized void remove(MessageReference node) { 296 if (memoryList.remove(node)) { 297 node.decrementReferenceCount(); 298 } 299 if (!isDiskListEmpty()) { 300 getDiskList().remove(node); 301 } 302 } 303 304 /** 305 * @return the number of pending messages 306 */ 307 public synchronized int size() { 308 return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size()); 309 } 310 311 /** 312 * clear all pending messages 313 */ 314 public synchronized void clear() { 315 memoryList.clear(); 316 if (!isDiskListEmpty()) { 317 getDiskList().clear(); 318 } 319 last=null; 320 } 321 322 public synchronized boolean isFull() { 323 324 return super.isFull() 325 || (systemUsage != null && systemUsage.getTempUsage().isFull()); 326 327 } 328 329 public boolean hasMessagesBufferedToDeliver() { 330 return !isEmpty(); 331 } 332 333 public void setSystemUsage(SystemUsage usageManager) { 334 super.setSystemUsage(usageManager); 335 } 336 337 public void onUsageChanged(Usage usage, int oldPercentUsage, 338 int newPercentUsage) { 339 if (newPercentUsage >= getMemoryUsageHighWaterMark()) { 340 synchronized (this) { 341 flushRequired = true; 342 if (!iterating) { 343 expireOldMessages(); 344 if (!hasSpace()) { 345 flushToDisk(); 346 flushRequired = false; 347 } 348 } 349 } 350 } 351 } 352 353 public boolean isTransient() { 354 return true; 355 } 356 357 protected boolean isSpaceInMemoryList() { 358 return hasSpace() && isDiskListEmpty(); 359 } 360 361 protected synchronized void expireOldMessages() { 362 if (!memoryList.isEmpty()) { 363 LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList); 364 this.memoryList = new LinkedList<MessageReference>(); 365 while (!tmpList.isEmpty()) { 366 MessageReference node = tmpList.removeFirst(); 367 if (node.isExpired()) { 368 discard(node); 369 }else { 370 memoryList.add(node); 371 } 372 } 373 } 374 375 } 376 377 protected synchronized void flushToDisk() { 378 379 if (!memoryList.isEmpty()) { 380 while (!memoryList.isEmpty()) { 381 MessageReference node = memoryList.removeFirst(); 382 node.decrementReferenceCount(); 383 getDiskList().addLast(node); 384 } 385 memoryList.clear(); 386 } 387 } 388 389 protected boolean isDiskListEmpty() { 390 return diskList == null || diskList.isEmpty(); 391 } 392 393 protected ListContainer<MessageReference> getDiskList() { 394 if (diskList == null) { 395 try { 396 diskList = store.getListContainer(name, "TopicSubscription", true); 397 diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat())); 398 } catch (IOException e) { 399 LOG.error("Caught an IO Exception getting the DiskList " + name, e); 400 throw new RuntimeException(e); 401 } 402 } 403 return diskList; 404 } 405 406 protected void discard(MessageReference message) { 407 message.decrementReferenceCount(); 408 if (LOG.isDebugEnabled()) { 409 LOG.debug("Discarding message " + message); 410 } 411 broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message); 412 } 413 }