001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.util.HashSet; 022 import java.util.Iterator; 023 import java.util.Set; 024 import java.util.concurrent.ConcurrentHashMap; 025 import java.util.concurrent.atomic.AtomicLong; 026 027 import org.apache.activemq.broker.BrokerService; 028 import org.apache.activemq.broker.BrokerServiceAware; 029 import org.apache.activemq.broker.ConnectionContext; 030 import org.apache.activemq.command.ActiveMQDestination; 031 import org.apache.activemq.command.ActiveMQQueue; 032 import org.apache.activemq.command.ActiveMQTopic; 033 import org.apache.activemq.command.Message; 034 import org.apache.activemq.command.MessageId; 035 import org.apache.activemq.kaha.CommandMarshaller; 036 import org.apache.activemq.kaha.ContainerId; 037 import org.apache.activemq.kaha.ListContainer; 038 import org.apache.activemq.kaha.MapContainer; 039 import org.apache.activemq.kaha.Marshaller; 040 import org.apache.activemq.kaha.MessageIdMarshaller; 041 import org.apache.activemq.kaha.MessageMarshaller; 042 import org.apache.activemq.kaha.Store; 043 import org.apache.activemq.kaha.StoreFactory; 044 import org.apache.activemq.kaha.impl.StoreLockedExcpetion; 045 import org.apache.activemq.openwire.OpenWireFormat; 046 import org.apache.activemq.store.MessageStore; 047 import org.apache.activemq.store.PersistenceAdapter; 048 import org.apache.activemq.store.TopicMessageStore; 049 import org.apache.activemq.store.TransactionStore; 050 import org.apache.activemq.usage.SystemUsage; 051 import org.apache.activemq.util.IOHelper; 052 import org.apache.commons.logging.Log; 053 import org.apache.commons.logging.LogFactory; 054 055 /** 056 * @org.apache.xbean.XBean 057 * @version $Revision: 1.4 $ 058 */ 059 public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { 060 061 private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000; 062 private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class); 063 private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions"; 064 065 protected OpenWireFormat wireFormat = new OpenWireFormat(); 066 protected KahaTransactionStore transactionStore; 067 protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>(); 068 protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>(); 069 protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>(); 070 071 private long maxDataFileLength = 32 * 1024 * 1024; 072 private File directory; 073 private String brokerName; 074 private Store theStore; 075 private boolean initialized; 076 private final AtomicLong storeSize; 077 private boolean persistentIndex = true; 078 private BrokerService brokerService; 079 080 081 public KahaPersistenceAdapter(AtomicLong size) { 082 this.storeSize=size; 083 } 084 085 public KahaPersistenceAdapter() { 086 this(new AtomicLong()); 087 } 088 089 public Set<ActiveMQDestination> getDestinations() { 090 Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 091 try { 092 Store store = getStore(); 093 for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) { 094 ContainerId id = (ContainerId)i.next(); 095 Object obj = id.getKey(); 096 if (obj instanceof ActiveMQDestination) { 097 rc.add((ActiveMQDestination)obj); 098 } 099 } 100 } catch (IOException e) { 101 LOG.error("Failed to get destinations ", e); 102 } 103 return rc; 104 } 105 106 public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 107 MessageStore rc = queues.get(destination); 108 if (rc == null) { 109 rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination); 110 messageStores.put(destination, rc); 111 if (transactionStore != null) { 112 rc = transactionStore.proxy(rc); 113 } 114 queues.put(destination, rc); 115 } 116 return rc; 117 } 118 119 public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) 120 throws IOException { 121 TopicMessageStore rc = topics.get(destination); 122 if (rc == null) { 123 Store store = getStore(); 124 MapContainer messageContainer = getMapContainer(destination, "topic-data"); 125 MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions", 126 "topic-subs"); 127 ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(), 128 "topic-acks"); 129 ackContainer.setMarshaller(new TopicSubAckMarshaller()); 130 rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination); 131 messageStores.put(destination, rc); 132 if (transactionStore != null) { 133 rc = transactionStore.proxy(rc); 134 } 135 topics.put(destination, rc); 136 } 137 return rc; 138 } 139 140 /** 141 * Cleanup method to remove any state associated with the given destination 142 * 143 * @param destination Destination to forget 144 */ 145 public void removeQueueMessageStore(ActiveMQQueue destination) { 146 queues.remove(destination); 147 try{ 148 if(theStore!=null){ 149 theStore.deleteMapContainer(destination,"queue-data"); 150 } 151 }catch(IOException e ){ 152 LOG.error("Failed to remove store map container for queue:"+destination, e); 153 } 154 } 155 156 /** 157 * Cleanup method to remove any state associated with the given destination 158 * 159 * @param destination Destination to forget 160 */ 161 public void removeTopicMessageStore(ActiveMQTopic destination) { 162 topics.remove(destination); 163 } 164 165 protected MessageStore retrieveMessageStore(Object id) { 166 MessageStore result = messageStores.get(id); 167 return result; 168 } 169 170 public TransactionStore createTransactionStore() throws IOException { 171 if (transactionStore == null) { 172 while (true) { 173 try { 174 Store store = getStore(); 175 MapContainer container = store 176 .getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions"); 177 container.setKeyMarshaller(new CommandMarshaller(wireFormat)); 178 container.setValueMarshaller(new TransactionMarshaller(wireFormat)); 179 container.load(); 180 transactionStore = new KahaTransactionStore(this, container); 181 transactionStore.setBrokerService(brokerService); 182 break; 183 } catch (StoreLockedExcpetion e) { 184 LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000) 185 + " seconds for the Store to be unlocked."); 186 try { 187 Thread.sleep(STORE_LOCKED_WAIT_DELAY); 188 } catch (InterruptedException e1) { 189 } 190 } 191 } 192 } 193 return transactionStore; 194 } 195 196 public void beginTransaction(ConnectionContext context) { 197 } 198 199 public void commitTransaction(ConnectionContext context) throws IOException { 200 if (theStore != null) { 201 theStore.force(); 202 } 203 } 204 205 public void rollbackTransaction(ConnectionContext context) { 206 } 207 208 public void start() throws Exception { 209 initialize(); 210 } 211 212 public void stop() throws Exception { 213 if (theStore != null) { 214 theStore.close(); 215 } 216 } 217 218 public long getLastMessageBrokerSequenceId() throws IOException { 219 return 0; 220 } 221 222 public void deleteAllMessages() throws IOException { 223 if (theStore != null) { 224 if (theStore.isInitialized()) { 225 theStore.clear(); 226 } else { 227 theStore.delete(); 228 } 229 } else { 230 StoreFactory.delete(getStoreDirectory()); 231 } 232 } 233 234 protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName) 235 throws IOException { 236 Store store = getStore(); 237 MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName); 238 container.setKeyMarshaller(new MessageIdMarshaller()); 239 container.setValueMarshaller(new MessageMarshaller(wireFormat)); 240 container.load(); 241 return container; 242 } 243 244 protected MapContainer getSubsMapContainer(Object id, String containerName) 245 throws IOException { 246 Store store = getStore(); 247 MapContainer container = store.getMapContainer(id, containerName); 248 container.setKeyMarshaller(Store.STRING_MARSHALLER); 249 container.setValueMarshaller(createMessageMarshaller()); 250 container.load(); 251 return container; 252 } 253 254 protected Marshaller<Object> createMessageMarshaller() { 255 return new CommandMarshaller(wireFormat); 256 } 257 258 protected ListContainer<TopicSubAck> getListContainer(Object id, String containerName) throws IOException { 259 Store store = getStore(); 260 ListContainer<TopicSubAck> container = store.getListContainer(id, containerName); 261 container.setMarshaller(createMessageMarshaller()); 262 container.load(); 263 return container; 264 } 265 266 /** 267 * @param usageManager The UsageManager that is controlling the broker's 268 * memory usage. 269 */ 270 public void setUsageManager(SystemUsage usageManager) { 271 } 272 273 /** 274 * @return the maxDataFileLength 275 */ 276 public long getMaxDataFileLength() { 277 return maxDataFileLength; 278 } 279 280 public boolean isPersistentIndex() { 281 return persistentIndex; 282 } 283 284 public void setPersistentIndex(boolean persistentIndex) { 285 this.persistentIndex = persistentIndex; 286 } 287 288 /** 289 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 290 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 291 */ 292 public void setMaxDataFileLength(long maxDataFileLength) { 293 this.maxDataFileLength = maxDataFileLength; 294 } 295 296 protected final synchronized Store getStore() throws IOException { 297 if (theStore == null) { 298 theStore = createStore(); 299 } 300 return theStore; 301 } 302 303 protected final Store createStore() throws IOException { 304 Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize); 305 result.setMaxDataFileLength(maxDataFileLength); 306 result.setPersistentIndex(isPersistentIndex()); 307 result.setDefaultContainerName("container-roots"); 308 return result; 309 } 310 311 private String getStoreName() { 312 initialize(); 313 return directory.getAbsolutePath(); 314 } 315 316 private File getStoreDirectory() { 317 initialize(); 318 return directory; 319 } 320 321 public String toString() { 322 return "KahaPersistenceAdapter(" + getStoreName() + ")"; 323 } 324 325 public void setBrokerName(String brokerName) { 326 this.brokerName = brokerName; 327 } 328 329 public String getBrokerName() { 330 return brokerName; 331 } 332 333 public File getDirectory() { 334 return this.directory; 335 } 336 337 public void setDirectory(File directory) { 338 this.directory = directory; 339 } 340 341 public void checkpoint(boolean sync) throws IOException { 342 if (sync) { 343 getStore().force(); 344 } 345 } 346 347 public long size(){ 348 return storeSize.get(); 349 } 350 351 private void initialize() { 352 if (!initialized) { 353 initialized = true; 354 if (this.directory == null) { 355 File file = new File(IOHelper.getDefaultDataDirectory()); 356 file = new File(file, IOHelper.toFileSystemSafeName(brokerName) + "-kahastore"); 357 setDirectory(file); 358 } 359 try { 360 IOHelper.mkdirs(this.directory); 361 } catch (IOException e) { 362 throw new RuntimeException(e); 363 } 364 wireFormat.setCacheEnabled(false); 365 wireFormat.setTightEncodingEnabled(true); 366 } 367 } 368 369 public void setBrokerService(BrokerService brokerService) { 370 this.brokerService = brokerService; 371 } 372 373 374 }