001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.Map.Entry;
027    
028    import org.apache.activemq.broker.ConnectionContext;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQQueue;
031    import org.apache.activemq.command.ActiveMQTempQueue;
032    import org.apache.activemq.command.ActiveMQTempTopic;
033    import org.apache.activemq.command.ActiveMQTopic;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageId;
037    import org.apache.activemq.command.SubscriptionInfo;
038    import org.apache.activemq.command.TransactionId;
039    import org.apache.activemq.command.XATransactionId;
040    import org.apache.activemq.openwire.OpenWireFormat;
041    import org.apache.activemq.protobuf.Buffer;
042    import org.apache.activemq.store.AbstractMessageStore;
043    import org.apache.activemq.store.MessageRecoveryListener;
044    import org.apache.activemq.store.MessageStore;
045    import org.apache.activemq.store.PersistenceAdapter;
046    import org.apache.activemq.store.TopicMessageStore;
047    import org.apache.activemq.store.TransactionRecoveryListener;
048    import org.apache.activemq.store.TransactionStore;
049    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
050    import org.apache.activemq.store.kahadb.data.KahaDestination;
051    import org.apache.activemq.store.kahadb.data.KahaLocation;
052    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
053    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
054    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
055    import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
056    import org.apache.activemq.usage.MemoryUsage;
057    import org.apache.activemq.usage.SystemUsage;
058    import org.apache.activemq.util.ByteSequence;
059    import org.apache.activemq.wireformat.WireFormat;
060    import org.apache.kahadb.journal.Location;
061    import org.apache.kahadb.page.Transaction;
062    
063    public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
064    
065        private WireFormat wireFormat = new OpenWireFormat();
066    
067        public void setBrokerName(String brokerName) {
068        }
069        public void setUsageManager(SystemUsage usageManager) {
070        }
071    
072        public TransactionStore createTransactionStore() throws IOException {
073            return new TransactionStore(){
074                
075                public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
076                    processCommit(txid);
077                }
078                public void prepare(TransactionId txid) throws IOException {
079                    processPrepare(txid);
080                }
081                public void rollback(TransactionId txid) throws IOException {
082                    processRollback(txid);
083                }
084                public void recover(TransactionRecoveryListener listener) throws IOException {
085                    for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
086                        XATransactionId xid = (XATransactionId)entry.getKey();
087                        ArrayList<Message> messageList = new ArrayList<Message>();
088                        ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
089                        
090                        for (Operation op : entry.getValue()) {
091                            if( op.getClass() == AddOpperation.class ) {
092                                AddOpperation addOp = (AddOpperation)op;
093                                Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
094                                messageList.add(msg);
095                            } else {
096                                RemoveOpperation rmOp = (RemoveOpperation)op;
097                                MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
098                                ackList.add(ack);
099                            }
100                        }
101                        
102                        Message[] addedMessages = new Message[messageList.size()];
103                        MessageAck[] acks = new MessageAck[ackList.size()];
104                        messageList.toArray(addedMessages);
105                        ackList.toArray(acks);
106                        listener.recover(xid, addedMessages, acks);
107                    }
108                }
109                public void start() throws Exception {
110                }
111                public void stop() throws Exception {
112                }
113            };
114        }
115    
116        public class KahaDBMessageStore extends AbstractMessageStore {
117            protected KahaDestination dest;
118    
119            public KahaDBMessageStore(ActiveMQDestination destination) {
120                super(destination);
121                this.dest = convert( destination );
122            }
123    
124            public ActiveMQDestination getDestination() {
125                return destination;
126            }
127    
128            public void addMessage(ConnectionContext context, Message message) throws IOException {
129                KahaAddMessageCommand command = new KahaAddMessageCommand();
130                command.setDestination(dest);
131                command.setMessageId(message.getMessageId().toString());
132                processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
133            }
134            
135            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
136                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
137                command.setDestination(dest);
138                command.setMessageId(ack.getLastMessageId().toString());
139                processRemove(command, ack.getTransactionId());
140            }
141    
142            public void removeAllMessages(ConnectionContext context) throws IOException {
143                KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
144                command.setDestination(dest);
145                process(command);
146            }
147    
148            public Message getMessage(MessageId identity) throws IOException {
149                final String key = identity.toString();
150                
151                // Hopefully one day the page file supports concurrent read operations... but for now we must
152                // externally synchronize...
153                ByteSequence data;
154                synchronized(indexMutex) {
155                    data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
156                        public ByteSequence execute(Transaction tx) throws IOException {
157                            StoredDestination sd = getStoredDestination(dest, tx);
158                            Long sequence = sd.messageIdIndex.get(tx, key);
159                            if( sequence ==null ) {
160                                return null;
161                            }
162                            return sd.orderIndex.get(tx, sequence).data;
163                        }
164                    });
165                }
166                if( data == null ) {
167                    return null;
168                }
169                
170                Message msg = (Message)wireFormat.unmarshal( data );
171                            return msg;
172            }
173            
174            public int getMessageCount() throws IOException {
175                synchronized(indexMutex) {
176                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
177                        public Integer execute(Transaction tx) throws IOException {
178                            // Iterate through all index entries to get a count of messages in the destination.
179                            StoredDestination sd = getStoredDestination(dest, tx);
180                            int rc=0;
181                            for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
182                                iterator.next();
183                                rc++;
184                            }
185                            return rc;
186                        }
187                    });
188                }
189            }
190    
191            public void recover(final MessageRecoveryListener listener) throws Exception {
192                synchronized(indexMutex) {
193                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
194                        public void execute(Transaction tx) throws Exception {
195                            StoredDestination sd = getStoredDestination(dest, tx);
196                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
197                                Entry<Long, MessageRecord> entry = iterator.next();
198                                listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
199                            }
200                        }
201                    });
202                }
203            }
204    
205            long cursorPos=0;
206            
207            public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
208                synchronized(indexMutex) {
209                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
210                        public void execute(Transaction tx) throws Exception {
211                            StoredDestination sd = getStoredDestination(dest, tx);
212                            Entry<Long, MessageRecord> entry=null;
213                            int counter = 0;
214                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
215                                entry = iterator.next();
216                                listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
217                                counter++;
218                                if( counter >= maxReturned ) {
219                                    break;
220                                }
221                            }
222                            if( entry!=null ) {
223                                cursorPos = entry.getKey()+1;
224                            }
225                        }
226                    });
227                }
228            }
229    
230            public void resetBatching() {
231                cursorPos=0;
232            }
233    
234            
235            @Override
236            public void setBatch(MessageId identity) throws IOException {
237                final String key = identity.toString();
238                
239                // Hopefully one day the page file supports concurrent read operations... but for now we must
240                // externally synchronize...
241                Long location;
242                synchronized(indexMutex) {
243                    location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
244                        public Long execute(Transaction tx) throws IOException {
245                            StoredDestination sd = getStoredDestination(dest, tx);
246                            return sd.messageIdIndex.get(tx, key);
247                        }
248                    });
249                }
250                if( location!=null ) {
251                    cursorPos=location+1;
252                }
253                
254            }
255    
256            public void setMemoryUsage(MemoryUsage memoeyUSage) {
257            }
258            public void start() throws Exception {
259            }
260            public void stop() throws Exception {
261            }
262            
263        }
264            
265        class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
266            public KahaDBTopicMessageStore(ActiveMQTopic destination) {
267                super(destination);
268            }
269            
270            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
271                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
272                command.setDestination(dest);
273                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
274                command.setMessageId(messageId.toString());
275                // We are not passed a transaction info.. so we can't participate in a transaction.
276                // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
277                // to pass back to the XA recover method.
278                // command.setTransactionInfo();
279                processRemove(command, null);
280            }
281    
282            public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
283                String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
284                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
285                command.setDestination(dest);
286                command.setSubscriptionKey(subscriptionKey);
287                command.setRetroactive(retroactive);
288                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
289                command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
290                process(command);
291            }
292    
293            public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
294                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
295                command.setDestination(dest);
296                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
297                process(command);
298            }
299    
300            public SubscriptionInfo[] getAllSubscriptions() throws IOException {
301                
302                final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
303                synchronized(indexMutex) {
304                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
305                        public void execute(Transaction tx) throws IOException {
306                            StoredDestination sd = getStoredDestination(dest, tx);
307                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
308                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
309                                SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
310                                subscriptions.add(info);
311    
312                            }
313                        }
314                    });
315                }
316                
317                SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
318                subscriptions.toArray(rc);
319                return rc;
320            }
321    
322            public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
323                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
324                synchronized(indexMutex) {
325                    return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
326                        public SubscriptionInfo execute(Transaction tx) throws IOException {
327                            StoredDestination sd = getStoredDestination(dest, tx);
328                            KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
329                            if( command ==null ) {
330                                return null;
331                            }
332                            return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
333                        }
334                    });
335                }
336            }
337           
338            public int getMessageCount(String clientId, String subscriptionName) throws IOException {
339                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
340                synchronized(indexMutex) {
341                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
342                        public Integer execute(Transaction tx) throws IOException {
343                            StoredDestination sd = getStoredDestination(dest, tx);
344                            Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
345                            if ( cursorPos==null ) {
346                                // The subscription might not exist.
347                                return 0;
348                            }
349                            cursorPos += 1;
350                            
351                            int counter = 0;
352                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
353                                iterator.next();
354                                counter++;
355                            }
356                            return counter;
357                        }
358                    });
359                }        
360            }
361    
362            public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
363                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
364                synchronized(indexMutex) {
365                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
366                        public void execute(Transaction tx) throws Exception {
367                            StoredDestination sd = getStoredDestination(dest, tx);
368                            Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
369                            cursorPos += 1;
370                            
371                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
372                                Entry<Long, MessageRecord> entry = iterator.next();
373                                listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
374                            }
375                        }
376                    });
377                }
378            }
379    
380            public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
381                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
382                synchronized(indexMutex) {
383                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
384                        public void execute(Transaction tx) throws Exception {
385                            StoredDestination sd = getStoredDestination(dest, tx);
386                            Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
387                            if( cursorPos == null ) {
388                                cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
389                                cursorPos += 1;
390                            }
391                            
392                            Entry<Long, MessageRecord> entry=null;
393                            int counter = 0;
394                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
395                                entry = iterator.next();
396                                listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
397                                counter++;
398                                if( counter >= maxReturned ) {
399                                    break;
400                                }
401                            }
402                            if( entry!=null ) {
403                                sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
404                            }
405                        }
406                    });
407                }
408            }
409    
410            public void resetBatching(String clientId, String subscriptionName) {
411                try {
412                    final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
413                    synchronized(indexMutex) {
414                        pageFile.tx().execute(new Transaction.Closure<IOException>(){
415                            public void execute(Transaction tx) throws IOException {
416                                StoredDestination sd = getStoredDestination(dest, tx);
417                                sd.subscriptionCursors.remove(subscriptionKey);
418                            }
419                        });
420                    }
421                } catch (IOException e) {
422                    throw new RuntimeException(e);
423                }
424            }
425        }
426    
427        String subscriptionKey(String clientId, String subscriptionName){
428            return clientId+":"+subscriptionName;
429        }
430        
431        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
432            return new KahaDBMessageStore(destination);
433        }
434    
435        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
436            return new KahaDBTopicMessageStore(destination);
437        }
438    
439        /**
440         * Cleanup method to remove any state associated with the given destination.
441         * This method does not stop the message store (it might not be cached).
442         *
443         * @param destination Destination to forget
444         */
445        public void removeQueueMessageStore(ActiveMQQueue destination) {
446        }
447    
448        /**
449         * Cleanup method to remove any state associated with the given destination
450         * This method does not stop the message store (it might not be cached).
451         *
452         * @param destination Destination to forget
453         */
454        public void removeTopicMessageStore(ActiveMQTopic destination) {
455        }
456    
457        public void deleteAllMessages() throws IOException {
458        }
459        
460        
461        public Set<ActiveMQDestination> getDestinations() {
462            try {
463                final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
464                synchronized(indexMutex) {
465                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
466                        public void execute(Transaction tx) throws IOException {
467                            for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
468                                Entry<String, StoredDestination> entry = iterator.next();
469                                rc.add(convert(entry.getKey()));
470                            }
471                        }
472                    });
473                }
474                return rc;
475            } catch (IOException e) {
476                throw new RuntimeException(e);
477            }
478        }
479        
480        public long getLastMessageBrokerSequenceId() throws IOException {
481            return 0;
482        }
483        
484        public long size() {
485            if ( !started.get() ) {
486                return 0;
487            }
488            try {
489                return pageFile.getDiskSize();
490            } catch (IOException e) {
491                throw new RuntimeException(e);
492            }
493        }
494    
495        public void beginTransaction(ConnectionContext context) throws IOException {
496            throw new IOException("Not yet implemented.");
497        }
498        public void commitTransaction(ConnectionContext context) throws IOException {
499            throw new IOException("Not yet implemented.");
500        }
501        public void rollbackTransaction(ConnectionContext context) throws IOException {
502            throw new IOException("Not yet implemented.");
503        }
504        
505        public void checkpoint(boolean sync) throws IOException {
506        }    
507    
508        ///////////////////////////////////////////////////////////////////
509        // Internal conversion methods.
510        ///////////////////////////////////////////////////////////////////
511        
512    
513        
514        KahaLocation convert(Location location) {
515            KahaLocation rc = new KahaLocation();
516            rc.setLogId(location.getDataFileId());
517            rc.setOffset(location.getOffset());
518            return rc;
519        }
520        
521        KahaDestination convert(ActiveMQDestination dest) {
522            KahaDestination rc = new KahaDestination();
523            rc.setName(dest.getPhysicalName());
524            switch( dest.getDestinationType() ) {
525            case ActiveMQDestination.QUEUE_TYPE:
526                rc.setType(DestinationType.QUEUE);
527                return rc;
528            case ActiveMQDestination.TOPIC_TYPE:
529                rc.setType(DestinationType.TOPIC);
530                return rc;
531            case ActiveMQDestination.TEMP_QUEUE_TYPE:
532                rc.setType(DestinationType.TEMP_QUEUE);
533                return rc;
534            case ActiveMQDestination.TEMP_TOPIC_TYPE:
535                rc.setType(DestinationType.TEMP_TOPIC);
536                return rc;
537            default:
538                return null;
539            }
540        }
541    
542        ActiveMQDestination convert(String dest) {
543            int p = dest.indexOf(":");
544            if( p<0 ) {
545                throw new IllegalArgumentException("Not in the valid destination format");
546            }
547            int type = Integer.parseInt(dest.substring(0, p));
548            String name = dest.substring(p+1);
549            
550            switch( KahaDestination.DestinationType.valueOf(type) ) {
551            case QUEUE:
552                return new ActiveMQQueue(name);
553            case TOPIC:
554                return new ActiveMQTopic(name);
555            case TEMP_QUEUE:
556                return new ActiveMQTempQueue(name);
557            case TEMP_TOPIC:
558                return new ActiveMQTempTopic(name);
559            default:    
560                throw new IllegalArgumentException("Not in the valid destination format");
561            }
562        }
563            
564    }