001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.Map;
022    import java.util.Map.Entry;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import javax.transaction.xa.XAException;
026    
027    import org.apache.activemq.broker.BrokerService;
028    import org.apache.activemq.broker.BrokerServiceAware;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.command.MessageAck;
032    import org.apache.activemq.command.TransactionId;
033    import org.apache.activemq.command.XATransactionId;
034    import org.apache.activemq.kaha.RuntimeStoreException;
035    import org.apache.activemq.store.MessageStore;
036    import org.apache.activemq.store.ProxyMessageStore;
037    import org.apache.activemq.store.ProxyTopicMessageStore;
038    import org.apache.activemq.store.TopicMessageStore;
039    import org.apache.activemq.store.TransactionRecoveryListener;
040    import org.apache.activemq.store.TransactionStore;
041    import org.apache.activemq.store.journal.JournalPersistenceAdapter;
042    import org.apache.commons.logging.Log;
043    import org.apache.commons.logging.LogFactory;
044    
045    /**
046     * Provides a TransactionStore implementation that can create transaction aware
047     * MessageStore objects from non transaction aware MessageStore objects.
048     * 
049     * @version $Revision: 1.4 $
050     */
051    public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {     
052        private static final Log LOG = LogFactory.getLog(KahaTransactionStore.class);
053            
054        private Map transactions = new ConcurrentHashMap();
055        private Map prepared;
056        private KahaPersistenceAdapter adaptor;
057        
058        private BrokerService brokerService;
059    
060        KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
061            this.adaptor = adaptor;
062            this.prepared = preparedMap;
063        }
064    
065        public MessageStore proxy(MessageStore messageStore) {
066            return new ProxyMessageStore(messageStore) {
067                public void addMessage(ConnectionContext context, final Message send) throws IOException {
068                    KahaTransactionStore.this.addMessage(getDelegate(), send);
069                }
070    
071                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
072                    KahaTransactionStore.this.removeMessage(getDelegate(), ack);
073                }
074            };
075        }
076    
077        public TopicMessageStore proxy(TopicMessageStore messageStore) {
078            return new ProxyTopicMessageStore(messageStore) {
079                public void addMessage(ConnectionContext context, final Message send) throws IOException {
080                    KahaTransactionStore.this.addMessage(getDelegate(), send);
081                }
082    
083                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
084                    KahaTransactionStore.this.removeMessage(getDelegate(), ack);
085                }
086            };
087        }
088    
089        /**
090         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
091         */
092        public void prepare(TransactionId txid) {
093            KahaTransaction tx = getTx(txid);
094            if (tx != null) {
095                tx.prepare();
096                prepared.put(txid, tx);
097            }
098        }
099    
100        /**
101         * @throws XAException
102         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
103         */
104        public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
105            KahaTransaction tx = getTx(txid);
106            if (tx != null) {
107                tx.commit(this);
108                removeTx(txid);
109            }
110        }
111    
112        /**
113         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
114         */
115        public void rollback(TransactionId txid) {
116            KahaTransaction tx = getTx(txid);
117            if (tx != null) {
118                tx.rollback();
119                removeTx(txid);
120            }
121        }
122    
123        public void start() throws Exception {
124        }
125    
126        public void stop() throws Exception {
127        }
128    
129        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
130            for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
131                Map.Entry entry = (Entry)i.next();
132                XATransactionId xid = (XATransactionId)entry.getKey();
133                KahaTransaction kt = (KahaTransaction)entry.getValue();
134                listener.recover(xid, kt.getMessages(), kt.getAcks());
135            }
136        }
137    
138        /**
139         * @param message
140         * @throws IOException
141         */
142        void addMessage(final MessageStore destination, final Message message) throws IOException {
143            try {
144                    if (message.isInTransaction()) {
145                            KahaTransaction tx = getOrCreateTx(message.getTransactionId());
146                            tx.add((KahaMessageStore)destination, message);
147                    } else {
148                            destination.addMessage(null, message);
149                    }
150            } catch (RuntimeStoreException rse) {
151                if (rse.getCause() instanceof IOException) {
152                    brokerService.handleIOException((IOException)rse.getCause());
153                }
154                throw rse;
155            }
156        }
157    
158        /**
159         * @param ack
160         * @throws IOException
161         */
162        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
163            try {
164                    if (ack.isInTransaction()) {
165                            KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
166                            tx.add((KahaMessageStore)destination, ack);
167                    } else {
168                            destination.removeMessage(null, ack);
169                    }
170            } catch (RuntimeStoreException rse) {
171                if (rse.getCause() instanceof IOException) {
172                    brokerService.handleIOException((IOException)rse.getCause());
173                }
174                throw rse;
175            }
176        }
177    
178        protected synchronized KahaTransaction getTx(TransactionId key) {
179            KahaTransaction result = (KahaTransaction)transactions.get(key);
180            if (result == null) {
181                result = (KahaTransaction)prepared.get(key);
182            }
183            return result;
184        }
185    
186        protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
187            KahaTransaction result = (KahaTransaction)transactions.get(key);
188            if (result == null) {
189                result = new KahaTransaction();
190                transactions.put(key, result);
191            }
192            return result;
193        }
194    
195        protected synchronized void removeTx(TransactionId key) {
196            transactions.remove(key);
197            prepared.remove(key);
198        }
199    
200        public void delete() {
201            transactions.clear();
202            prepared.clear();
203        }
204    
205        protected MessageStore getStoreById(Object id) {
206            return adaptor.retrieveMessageStore(id);
207        }
208    
209            public void setBrokerService(BrokerService brokerService) {
210                    this.brokerService = brokerService;
211            }
212    }