001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import javax.jms.JMSException; 020 import javax.jms.Message; 021 022 import org.apache.activemq.broker.region.MessageReference; 023 import org.apache.activemq.command.MessageId; 024 import org.apache.activemq.command.ProducerId; 025 import org.apache.activemq.util.BitArrayBin; 026 import org.apache.activemq.util.IdGenerator; 027 import org.apache.activemq.util.LRUCache; 028 029 /** 030 * Provides basic audit functions for Messages 031 * 032 * @version $Revision: 1.1.1.1 $ 033 */ 034 public class ActiveMQMessageAudit { 035 036 public static final int DEFAULT_WINDOW_SIZE = 2048; 037 public static final int MAXIMUM_PRODUCER_COUNT = 64; 038 private int auditDepth; 039 private int maximumNumberOfProducersToTrack; 040 private LRUCache<Object, BitArrayBin> map; 041 042 /** 043 * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 044 * 64 045 */ 046 public ActiveMQMessageAudit() { 047 this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); 048 } 049 050 /** 051 * Construct a MessageAudit 052 * 053 * @param auditDepth range of ids to track 054 * @param maximumNumberOfProducersToTrack number of producers expected in 055 * the system 056 */ 057 public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) { 058 this.auditDepth = auditDepth; 059 this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; 060 this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true); 061 } 062 063 /** 064 * @return the auditDepth 065 */ 066 public int getAuditDepth() { 067 return auditDepth; 068 } 069 070 /** 071 * @param auditDepth the auditDepth to set 072 */ 073 public void setAuditDepth(int auditDepth) { 074 this.auditDepth = auditDepth; 075 } 076 077 /** 078 * @return the maximumNumberOfProducersToTrack 079 */ 080 public int getMaximumNumberOfProducersToTrack() { 081 return maximumNumberOfProducersToTrack; 082 } 083 084 /** 085 * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set 086 */ 087 public void setMaximumNumberOfProducersToTrack( 088 int maximumNumberOfProducersToTrack) { 089 this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; 090 this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); 091 } 092 093 /** 094 * Checks if this message has been seen before 095 * 096 * @param message 097 * @return true if the message is a duplicate 098 * @throws JMSException 099 */ 100 public boolean isDuplicate(Message message) throws JMSException { 101 return isDuplicate(message.getJMSMessageID()); 102 } 103 104 /** 105 * checks whether this messageId has been seen before and adds this 106 * messageId to the list 107 * 108 * @param id 109 * @return true if the message is a duplicate 110 */ 111 public synchronized boolean isDuplicate(String id) { 112 boolean answer = false; 113 String seed = IdGenerator.getSeedFromId(id); 114 if (seed != null) { 115 BitArrayBin bab = map.get(seed); 116 if (bab == null) { 117 bab = new BitArrayBin(auditDepth); 118 map.put(seed, bab); 119 } 120 long index = IdGenerator.getSequenceFromId(id); 121 if (index >= 0) { 122 answer = bab.setBit(index, true); 123 } 124 } 125 return answer; 126 } 127 128 /** 129 * Checks if this message has been seen before 130 * 131 * @param message 132 * @return true if the message is a duplicate 133 */ 134 public boolean isDuplicate(final MessageReference message) { 135 MessageId id = message.getMessageId(); 136 return isDuplicate(id); 137 } 138 139 /** 140 * Checks if this messageId has been seen before 141 * 142 * @param id 143 * @return true if the message is a duplicate 144 */ 145 public synchronized boolean isDuplicate(final MessageId id) { 146 boolean answer = false; 147 148 if (id != null) { 149 ProducerId pid = id.getProducerId(); 150 if (pid != null) { 151 BitArrayBin bab = map.get(pid); 152 if (bab == null) { 153 bab = new BitArrayBin(auditDepth); 154 map.put(pid, bab); 155 } 156 answer = bab.setBit(id.getProducerSequenceId(), true); 157 } 158 } 159 return answer; 160 } 161 162 /** 163 * mark this message as being received 164 * 165 * @param message 166 */ 167 public void rollback(final MessageReference message) { 168 MessageId id = message.getMessageId(); 169 rollback(id); 170 } 171 172 /** 173 * mark this message as being received 174 * 175 * @param id 176 */ 177 public synchronized void rollback(final MessageId id) { 178 if (id != null) { 179 ProducerId pid = id.getProducerId(); 180 if (pid != null) { 181 BitArrayBin bab = map.get(pid); 182 if (bab != null) { 183 bab.setBit(id.getProducerSequenceId(), false); 184 } 185 } 186 } 187 } 188 189 /** 190 * Check the message is in order 191 * @param msg 192 * @return 193 * @throws JMSException 194 */ 195 public boolean isInOrder(Message msg) throws JMSException { 196 return isInOrder(msg.getJMSMessageID()); 197 } 198 199 /** 200 * Check the message id is in order 201 * @param id 202 * @return 203 */ 204 public synchronized boolean isInOrder(final String id) { 205 boolean answer = true; 206 207 if (id != null) { 208 String seed = IdGenerator.getSeedFromId(id); 209 if (seed != null) { 210 BitArrayBin bab = map.get(seed); 211 if (bab != null) { 212 long index = IdGenerator.getSequenceFromId(id); 213 answer = bab.isInOrder(index); 214 } 215 216 } 217 } 218 return answer; 219 } 220 221 /** 222 * Check the MessageId is in order 223 * @param message 224 * @return 225 */ 226 public synchronized boolean isInOrder(final MessageReference message) { 227 return isInOrder(message.getMessageId()); 228 } 229 230 /** 231 * Check the MessageId is in order 232 * @param id 233 * @return 234 */ 235 public synchronized boolean isInOrder(final MessageId id) { 236 boolean answer = false; 237 238 if (id != null) { 239 ProducerId pid = id.getProducerId(); 240 if (pid != null) { 241 BitArrayBin bab = map.get(pid); 242 if (bab == null) { 243 bab = new BitArrayBin(auditDepth); 244 map.put(pid, bab); 245 } 246 answer = bab.isInOrder(id.getProducerSequenceId()); 247 248 } 249 } 250 return answer; 251 } 252 }