001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.util.HashMap; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 import java.util.concurrent.atomic.AtomicInteger; 028 import java.util.concurrent.atomic.AtomicLong; 029 030 import org.apache.activemq.broker.ConnectionContext; 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.ActiveMQQueue; 033 import org.apache.activemq.command.ActiveMQTopic; 034 import org.apache.activemq.command.MessageId; 035 import org.apache.activemq.command.SubscriptionInfo; 036 import org.apache.activemq.command.TransactionId; 037 import org.apache.activemq.kaha.CommandMarshaller; 038 import org.apache.activemq.kaha.ListContainer; 039 import org.apache.activemq.kaha.MapContainer; 040 import org.apache.activemq.kaha.MessageIdMarshaller; 041 import org.apache.activemq.kaha.Store; 042 import org.apache.activemq.kaha.StoreFactory; 043 import org.apache.activemq.kaha.impl.index.hash.HashIndex; 044 import org.apache.activemq.store.MessageStore; 045 import org.apache.activemq.store.ReferenceStore; 046 import org.apache.activemq.store.ReferenceStoreAdapter; 047 import org.apache.activemq.store.TopicMessageStore; 048 import org.apache.activemq.store.TopicReferenceStore; 049 import org.apache.activemq.store.amq.AMQTx; 050 import org.apache.activemq.util.IOHelper; 051 import org.apache.commons.logging.Log; 052 import org.apache.commons.logging.LogFactory; 053 054 public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter { 055 056 057 058 private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class); 059 private static final String STORE_STATE = "store-state"; 060 private static final String QUEUE_DATA = "queue-data"; 061 private static final String INDEX_VERSION_NAME = "INDEX_VERSION"; 062 private static final Integer INDEX_VERSION = new Integer(7); 063 private static final String RECORD_REFERENCES = "record-references"; 064 private static final String TRANSACTIONS = "transactions-state"; 065 private MapContainer stateMap; 066 private MapContainer<TransactionId, AMQTx> preparedTransactions; 067 private Map<Integer, AtomicInteger> recordReferences = new HashMap<Integer, AtomicInteger>(); 068 private ListContainer<SubscriptionInfo> durableSubscribers; 069 private boolean storeValid; 070 private Store stateStore; 071 private boolean persistentIndex = true; 072 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; 073 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; 074 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; 075 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; 076 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; 077 078 079 public KahaReferenceStoreAdapter(AtomicLong size){ 080 super(size); 081 } 082 083 public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 084 throw new RuntimeException("Use createQueueReferenceStore instead"); 085 } 086 087 public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) 088 throws IOException { 089 throw new RuntimeException("Use createTopicReferenceStore instead"); 090 } 091 092 @Override 093 public synchronized void start() throws Exception { 094 super.start(); 095 Store store = getStateStore(); 096 boolean empty = store.getMapContainerIds().isEmpty(); 097 stateMap = store.getMapContainer("state", STORE_STATE); 098 stateMap.load(); 099 storeValid=true; 100 if (!empty) { 101 AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE); 102 if (status != null) { 103 storeValid = status.get(); 104 } 105 106 if (storeValid) { 107 //check what version the indexes are at 108 Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME); 109 if (indexVersion==null || indexVersion.intValue() < INDEX_VERSION.intValue()) { 110 storeValid = false; 111 LOG.warn("Indexes at an older version - need to regenerate"); 112 } 113 } 114 if (storeValid) { 115 if (stateMap.containsKey(RECORD_REFERENCES)) { 116 recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES); 117 } 118 } 119 } 120 stateMap.put(STORE_STATE, new AtomicBoolean()); 121 stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION); 122 durableSubscribers = store.getListContainer("durableSubscribers"); 123 durableSubscribers.setMarshaller(new CommandMarshaller()); 124 preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false); 125 // need to set the Marshallers here 126 preparedTransactions.setKeyMarshaller(Store.COMMAND_MARSHALLER); 127 preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat)); 128 } 129 130 @Override 131 public synchronized void stop() throws Exception { 132 stateMap.put(RECORD_REFERENCES, recordReferences); 133 stateMap.put(STORE_STATE, new AtomicBoolean(true)); 134 stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION); 135 if (this.stateStore != null) { 136 this.stateStore.close(); 137 this.stateStore = null; 138 this.stateMap = null; 139 } 140 super.stop(); 141 } 142 143 public void commitTransaction(ConnectionContext context) throws IOException { 144 //we don;t need to force on a commit - as the reference store 145 //is rebuilt on a non clean shutdown 146 } 147 148 public boolean isStoreValid() { 149 return storeValid; 150 } 151 152 public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException { 153 ReferenceStore rc = (ReferenceStore)queues.get(destination); 154 if (rc == null) { 155 rc = new KahaReferenceStore(this, getMapReferenceContainer(destination, QUEUE_DATA), 156 destination); 157 messageStores.put(destination, rc); 158 // if(transactionStore!=null){ 159 // rc=transactionStore.proxy(rc); 160 // } 161 queues.put(destination, rc); 162 } 163 return rc; 164 } 165 166 public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException { 167 TopicReferenceStore rc = (TopicReferenceStore)topics.get(destination); 168 if (rc == null) { 169 Store store = getStore(); 170 MapContainer messageContainer = getMapReferenceContainer(destination.getPhysicalName(), "topic-data"); 171 MapContainer subsContainer = getSubsMapContainer(destination.getPhysicalName() + "-Subscriptions", "blob"); 172 ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.getPhysicalName(), "topic-acks"); 173 ackContainer.setMarshaller(new TopicSubAckMarshaller()); 174 rc = new KahaTopicReferenceStore(store, this, messageContainer, ackContainer, subsContainer, 175 destination); 176 messageStores.put(destination, rc); 177 // if(transactionStore!=null){ 178 // rc=transactionStore.proxy(rc); 179 // } 180 topics.put(destination, rc); 181 } 182 return rc; 183 } 184 185 public void removeReferenceStore(KahaReferenceStore referenceStore) { 186 ActiveMQDestination destination = referenceStore.getDestination(); 187 if (destination.isQueue()) { 188 queues.remove(destination); 189 try { 190 getStore().deleteMapContainer(destination, QUEUE_DATA); 191 } catch (IOException e) { 192 LOG.error("Failed to delete " + QUEUE_DATA + " map container for destination: " + destination, e); 193 } 194 } else { 195 topics.remove(destination); 196 } 197 messageStores.remove(destination); 198 } 199 /* 200 public void buildReferenceFileIdsInUse() throws IOException { 201 recordReferences = new HashMap<Integer, AtomicInteger>(); 202 Set<ActiveMQDestination> destinations = getDestinations(); 203 for (ActiveMQDestination destination : destinations) { 204 if (destination.isQueue()) { 205 KahaReferenceStore store = (KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination); 206 store.addReferenceFileIdsInUse(); 207 } else { 208 KahaTopicReferenceStore store = (KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination); 209 store.addReferenceFileIdsInUse(); 210 } 211 } 212 } 213 */ 214 215 protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id, 216 String containerName) 217 throws IOException { 218 Store store = getStore(); 219 MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName,persistentIndex); 220 container.setIndexBinSize(getIndexBinSize()); 221 container.setIndexKeySize(getIndexKeySize()); 222 container.setIndexPageSize(getIndexPageSize()); 223 container.setIndexMaxBinSize(getIndexMaxBinSize()); 224 container.setIndexLoadFactor(getIndexLoadFactor()); 225 container.setKeyMarshaller(new MessageIdMarshaller()); 226 container.setValueMarshaller(new ReferenceRecordMarshaller()); 227 container.load(); 228 return container; 229 } 230 231 synchronized void addInterestInRecordFile(int recordNumber) { 232 Integer key = Integer.valueOf(recordNumber); 233 AtomicInteger rr = recordReferences.get(key); 234 if (rr == null) { 235 rr = new AtomicInteger(); 236 recordReferences.put(key, rr); 237 } 238 rr.incrementAndGet(); 239 } 240 241 synchronized void removeInterestInRecordFile(int recordNumber) { 242 Integer key = Integer.valueOf(recordNumber); 243 AtomicInteger rr = recordReferences.get(key); 244 if (rr != null && rr.decrementAndGet() <= 0) { 245 recordReferences.remove(key); 246 } 247 } 248 249 /** 250 * @return 251 * @throws IOException 252 * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse() 253 */ 254 public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException { 255 return new HashSet<Integer>(recordReferences.keySet()); 256 } 257 258 /** 259 * 260 * @throws IOException 261 * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages() 262 */ 263 public void clearMessages() throws IOException { 264 //don't delete messages as it will clear state - call base 265 //class method to clear out the data instead 266 super.deleteAllMessages(); 267 } 268 269 /** 270 * 271 * @throws IOException 272 * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState() 273 */ 274 275 public void recoverState() throws IOException { 276 Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers); 277 for (SubscriptionInfo info:set) { 278 LOG.info("Recovering subscriber state for durable subscriber: " + info); 279 TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination()); 280 ts.addSubsciption(info, false); 281 } 282 } 283 284 public void recoverSubscription(SubscriptionInfo info) throws IOException { 285 TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination()); 286 LOG.info("Recovering subscriber state for durable subscriber: " + info); 287 ts.addSubsciption(info, false); 288 } 289 290 291 public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException { 292 Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>(); 293 preparedTransactions.load(); 294 for (Iterator<TransactionId> i = preparedTransactions.keySet().iterator(); i.hasNext();) { 295 TransactionId key = i.next(); 296 AMQTx value = preparedTransactions.get(key); 297 result.put(key, value); 298 } 299 return result; 300 } 301 302 public void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException { 303 preparedTransactions.clear(); 304 for (Iterator<Map.Entry<TransactionId, AMQTx>> iter = map.entrySet().iterator(); iter.hasNext();) { 305 Map.Entry<TransactionId, AMQTx> entry = iter.next(); 306 preparedTransactions.put(entry.getKey(), entry.getValue()); 307 } 308 } 309 310 @Override 311 public synchronized void setDirectory(File directory) { 312 File file = new File(directory, "data"); 313 super.setDirectory(file); 314 this.stateStore = createStateStore(directory); 315 } 316 317 protected synchronized Store getStateStore() throws IOException { 318 if (this.stateStore == null) { 319 File stateDirectory = new File(getDirectory(), "kr-state"); 320 IOHelper.mkdirs(stateDirectory); 321 this.stateStore = createStateStore(getDirectory()); 322 } 323 return this.stateStore; 324 } 325 326 public void deleteAllMessages() throws IOException { 327 super.deleteAllMessages(); 328 if (stateStore != null) { 329 if (stateStore.isInitialized()) { 330 stateStore.clear(); 331 } else { 332 stateStore.delete(); 333 } 334 } else { 335 File stateDirectory = new File(getDirectory(), "kr-state"); 336 StoreFactory.delete(stateDirectory); 337 } 338 } 339 340 public boolean isPersistentIndex() { 341 return persistentIndex; 342 } 343 344 public void setPersistentIndex(boolean persistentIndex) { 345 this.persistentIndex = persistentIndex; 346 } 347 348 private Store createStateStore(File directory) { 349 File stateDirectory = new File(directory, "state"); 350 try { 351 IOHelper.mkdirs(stateDirectory); 352 return StoreFactory.open(stateDirectory, "rw"); 353 } catch (IOException e) { 354 LOG.error("Failed to create the state store", e); 355 } 356 return null; 357 } 358 359 protected void addSubscriberState(SubscriptionInfo info) throws IOException { 360 durableSubscribers.add(info); 361 } 362 363 protected void removeSubscriberState(SubscriptionInfo info) { 364 durableSubscribers.remove(info); 365 } 366 367 public int getIndexBinSize() { 368 return indexBinSize; 369 } 370 371 public void setIndexBinSize(int indexBinSize) { 372 this.indexBinSize = indexBinSize; 373 } 374 375 public int getIndexKeySize() { 376 return indexKeySize; 377 } 378 379 public void setIndexKeySize(int indexKeySize) { 380 this.indexKeySize = indexKeySize; 381 } 382 383 public int getIndexPageSize() { 384 return indexPageSize; 385 } 386 387 public void setIndexPageSize(int indexPageSize) { 388 this.indexPageSize = indexPageSize; 389 } 390 391 public int getIndexMaxBinSize() { 392 return indexMaxBinSize; 393 } 394 395 public void setIndexMaxBinSize(int maxBinSize) { 396 this.indexMaxBinSize = maxBinSize; 397 } 398 399 /** 400 * @return the loadFactor 401 */ 402 public int getIndexLoadFactor() { 403 return indexLoadFactor; 404 } 405 406 /** 407 * @param loadFactor the loadFactor to set 408 */ 409 public void setIndexLoadFactor(int loadFactor) { 410 this.indexLoadFactor = loadFactor; 411 } 412 }