001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.concurrent.ConcurrentHashMap;
023    
024    import javax.transaction.xa.XAException;
025    
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.command.MessageAck;
029    import org.apache.activemq.command.TransactionId;
030    import org.apache.activemq.command.XATransactionId;
031    import org.apache.activemq.store.MessageStore;
032    import org.apache.activemq.store.PersistenceAdapter;
033    import org.apache.activemq.store.ProxyMessageStore;
034    import org.apache.activemq.store.ProxyTopicMessageStore;
035    import org.apache.activemq.store.TopicMessageStore;
036    import org.apache.activemq.store.TransactionRecoveryListener;
037    import org.apache.activemq.store.TransactionStore;
038    
039    /**
040     * Provides a TransactionStore implementation that can create transaction aware
041     * MessageStore objects from non transaction aware MessageStore objects.
042     * 
043     * @version $Revision: 1.4 $
044     */
045    public class MemoryTransactionStore implements TransactionStore {
046    
047        ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
048        ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
049        final PersistenceAdapter persistenceAdapter;
050    
051        private boolean doingRecover;
052    
053        public class Tx {
054            private ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
055    
056            private ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
057    
058            public void add(AddMessageCommand msg) {
059                messages.add(msg);
060            }
061    
062            public void add(RemoveMessageCommand ack) {
063                acks.add(ack);
064            }
065    
066            public Message[] getMessages() {
067                Message rc[] = new Message[messages.size()];
068                int count = 0;
069                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
070                    AddMessageCommand cmd = iter.next();
071                    rc[count++] = cmd.getMessage();
072                }
073                return rc;
074            }
075    
076            public MessageAck[] getAcks() {
077                MessageAck rc[] = new MessageAck[acks.size()];
078                int count = 0;
079                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
080                    RemoveMessageCommand cmd = iter.next();
081                    rc[count++] = cmd.getMessageAck();
082                }
083                return rc;
084            }
085    
086            /**
087             * @throws IOException
088             */
089            public void commit() throws IOException {
090                ConnectionContext ctx = new ConnectionContext();
091                persistenceAdapter.beginTransaction(ctx);
092                try {
093                    
094                    // Do all the message adds.
095                    for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
096                        AddMessageCommand cmd = iter.next();
097                        cmd.run(ctx);
098                    }
099                    // And removes..
100                    for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
101                        RemoveMessageCommand cmd = iter.next();
102                        cmd.run(ctx);
103                    }
104                    
105                } catch ( IOException e ) {
106                    persistenceAdapter.rollbackTransaction(ctx);
107                    throw e;
108                }
109                persistenceAdapter.commitTransaction(ctx);
110            }
111        }
112        
113        public interface AddMessageCommand {
114            Message getMessage();
115    
116            void run(ConnectionContext context) throws IOException;
117        }
118    
119        public interface RemoveMessageCommand {
120            MessageAck getMessageAck();
121    
122            void run(ConnectionContext context) throws IOException;
123        }
124        
125        public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
126            this.persistenceAdapter=persistenceAdapter;
127        }
128    
129        public MessageStore proxy(MessageStore messageStore) {
130            return new ProxyMessageStore(messageStore) {
131                public void addMessage(ConnectionContext context, final Message send) throws IOException {
132                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
133                }
134    
135                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
136                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
137                }
138            };
139        }
140    
141        public TopicMessageStore proxy(TopicMessageStore messageStore) {
142            return new ProxyTopicMessageStore(messageStore) {
143                public void addMessage(ConnectionContext context, final Message send) throws IOException {
144                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
145                }
146    
147                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
148                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
149                }
150            };
151        }
152    
153        /**
154         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
155         */
156        public void prepare(TransactionId txid) {
157            Tx tx = inflightTransactions.remove(txid);
158            if (tx == null) {
159                return;
160            }
161            preparedTransactions.put(txid, tx);
162        }
163    
164        public Tx getTx(Object txid) {
165            Tx tx = inflightTransactions.get(txid);
166            if (tx == null) {
167                tx = new Tx();
168                inflightTransactions.put(txid, tx);
169            }
170            return tx;
171        }
172    
173        /**
174         * @throws XAException
175         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
176         */
177        public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
178    
179            Tx tx;
180            if (wasPrepared) {
181                tx = preparedTransactions.remove(txid);
182            } else {
183                tx = inflightTransactions.remove(txid);
184            }
185    
186            if (tx == null) {
187                return;
188            }
189            tx.commit();
190    
191        }
192    
193        /**
194         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
195         */
196        public void rollback(TransactionId txid) {
197            preparedTransactions.remove(txid);
198            inflightTransactions.remove(txid);
199        }
200    
201        public void start() throws Exception {
202        }
203    
204        public void stop() throws Exception {
205        }
206    
207        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
208            // All the inflight transactions get rolled back..
209            inflightTransactions.clear();
210            this.doingRecover = true;
211            try {
212                for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
213                    Object txid = iter.next();
214                    Tx tx = preparedTransactions.get(txid);
215                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
216                }
217            } finally {
218                this.doingRecover = false;
219            }
220        }
221    
222        /**
223         * @param message
224         * @throws IOException
225         */
226        void addMessage(final MessageStore destination, final Message message) throws IOException {
227    
228            if (doingRecover) {
229                return;
230            }
231    
232            if (message.getTransactionId() != null) {
233                Tx tx = getTx(message.getTransactionId());
234                tx.add(new AddMessageCommand() {
235                    public Message getMessage() {
236                        return message;
237                    }
238    
239                    public void run(ConnectionContext ctx) throws IOException {
240                        destination.addMessage(ctx, message);
241                    }
242    
243                });
244            } else {
245                destination.addMessage(null, message);
246            }
247        }
248        
249        /**
250         * @param ack
251         * @throws IOException
252         */
253        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
254            if (doingRecover) {
255                return;
256            }
257    
258            if (ack.isInTransaction()) {
259                Tx tx = getTx(ack.getTransactionId());
260                tx.add(new RemoveMessageCommand() {
261                    public MessageAck getMessageAck() {
262                        return ack;
263                    }
264    
265                    public void run(ConnectionContext ctx) throws IOException {
266                        destination.removeMessage(ctx, ack);
267                    }
268                });
269            } else {
270                destination.removeMessage(null, ack);
271            }
272        }
273    
274        public void delete() {
275            inflightTransactions.clear();
276            preparedTransactions.clear();
277            doingRecover = false;
278        }
279    
280    }