001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.IOException; 020 import org.apache.activemq.broker.ConnectionContext; 021 import org.apache.activemq.command.ActiveMQDestination; 022 import org.apache.activemq.command.Message; 023 import org.apache.activemq.command.MessageAck; 024 import org.apache.activemq.command.MessageId; 025 import org.apache.activemq.kaha.MapContainer; 026 import org.apache.activemq.kaha.StoreEntry; 027 import org.apache.activemq.store.MessageRecoveryListener; 028 import org.apache.activemq.store.MessageStore; 029 import org.apache.activemq.store.AbstractMessageStore; 030 import org.apache.activemq.usage.MemoryUsage; 031 import org.apache.activemq.usage.SystemUsage; 032 033 /** 034 * An implementation of {@link org.apache.activemq.store.MessageStore} which 035 * uses a JPS Container 036 * 037 * @version $Revision: 1.7 $ 038 */ 039 public class KahaMessageStore extends AbstractMessageStore { 040 041 protected final MapContainer<MessageId, Message> messageContainer; 042 protected StoreEntry batchEntry; 043 044 public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination) 045 throws IOException { 046 super(destination); 047 this.messageContainer = container; 048 } 049 050 protected MessageId getMessageId(Object object) { 051 return ((Message)object).getMessageId(); 052 } 053 054 public Object getId() { 055 return messageContainer.getId(); 056 } 057 058 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 059 messageContainer.put(message.getMessageId(), message); 060 // TODO: we should do the following but it is not need if the message is 061 // being added within a persistence 062 // transaction 063 // but since I can't tell if one is running right now.. I'll leave this 064 // out for now. 065 // if( message.isResponseRequired() ) { 066 // messageContainer.force(); 067 // } 068 } 069 070 public synchronized Message getMessage(MessageId identity) throws IOException { 071 Message result = messageContainer.get(identity); 072 return result; 073 } 074 075 protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception { 076 listener.recoverMessage(msg); 077 return listener.hasSpace(); 078 } 079 080 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 081 removeMessage(ack.getLastMessageId()); 082 } 083 084 public synchronized void removeMessage(MessageId msgId) throws IOException { 085 StoreEntry entry = messageContainer.getEntry(msgId); 086 if (entry != null) { 087 messageContainer.remove(entry); 088 if (messageContainer.isEmpty() || (batchEntry != null && batchEntry.equals(entry))) { 089 resetBatching(); 090 } 091 } 092 } 093 094 public synchronized void recover(MessageRecoveryListener listener) throws Exception { 095 for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer 096 .getNext(entry)) { 097 Message msg = (Message)messageContainer.getValue(entry); 098 if (!recoverMessage(listener, msg)) { 099 break; 100 } 101 } 102 } 103 104 public synchronized void removeAllMessages(ConnectionContext context) throws IOException { 105 messageContainer.clear(); 106 } 107 108 public synchronized void delete() { 109 messageContainer.clear(); 110 } 111 112 /** 113 * @return the number of messages held by this destination 114 * @see org.apache.activemq.store.MessageStore#getMessageCount() 115 */ 116 public int getMessageCount() { 117 return messageContainer.size(); 118 } 119 120 /** 121 * @param id 122 * @return null 123 * @throws Exception 124 * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId) 125 */ 126 public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception { 127 return null; 128 } 129 130 /** 131 * @param lastMessageId 132 * @param maxReturned 133 * @param listener 134 * @throws Exception 135 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, 136 * int, org.apache.activemq.store.MessageRecoveryListener) 137 */ 138 public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) 139 throws Exception { 140 StoreEntry entry = batchEntry; 141 if (entry == null) { 142 entry = messageContainer.getFirst(); 143 } else { 144 entry = messageContainer.refresh(entry); 145 entry = messageContainer.getNext(entry); 146 if (entry == null) { 147 batchEntry = null; 148 } 149 } 150 if (entry != null) { 151 int count = 0; 152 do { 153 Message msg = messageContainer.getValue(entry); 154 if (msg != null) { 155 recoverMessage(listener, msg); 156 count++; 157 } 158 batchEntry = entry; 159 entry = messageContainer.getNext(entry); 160 } while (entry != null && count < maxReturned && listener.hasSpace()); 161 } 162 } 163 164 /** 165 * @param nextToDispatch 166 * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId) 167 */ 168 public synchronized void resetBatching() { 169 batchEntry = null; 170 } 171 172 /** 173 * @return true if the store supports cursors 174 */ 175 public boolean isSupportForCursors() { 176 return true; 177 } 178 179 @Override 180 public void setBatch(MessageId messageId) { 181 batchEntry = messageContainer.getEntry(messageId); 182 } 183 184 }