001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import java.util.concurrent.atomic.AtomicLong;
020    
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    import javax.jms.MessageListener;
024    
025    import org.apache.activemq.ActiveMQMessageTransformation;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.region.Destination;
028    import org.apache.activemq.broker.region.MessageReference;
029    import org.apache.activemq.broker.region.SubscriptionRecovery;
030    import org.apache.activemq.broker.region.Topic;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ActiveMQMessage;
033    import org.apache.activemq.command.ConnectionId;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.ProducerId;
036    import org.apache.activemq.command.SessionId;
037    import org.apache.activemq.util.IdGenerator;
038    import org.apache.commons.logging.Log;
039    import org.apache.commons.logging.LogFactory;
040    
041    /**
042     * This implementation of {@link SubscriptionRecoveryPolicy} will perform a user
043     * specific query mechanism to load any messages they may have missed.
044     * 
045     * @org.apache.xbean.XBean
046     * @version $Revision: 564271 $
047     */
048    public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
049    
050        private static final Log LOG = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class);
051    
052        private MessageQuery query;
053        private AtomicLong messageSequence = new AtomicLong(0);
054        private IdGenerator idGenerator = new IdGenerator();
055        private ProducerId producerId = createProducerId();
056    
057        public SubscriptionRecoveryPolicy copy() {
058            QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy();
059            rc.setQuery(query);
060            return rc;
061        }
062    
063        public boolean add(ConnectionContext context, MessageReference message) throws Exception {
064            return query.validateUpdate(message.getMessage());
065        }
066    
067        public void recover(final ConnectionContext context, final Topic topic, final SubscriptionRecovery sub) throws Exception {
068            if (query != null) {
069                ActiveMQDestination destination = sub.getActiveMQDestination();
070                query.execute(destination, new MessageListener() {
071    
072                    public void onMessage(Message message) {
073                        dispatchInitialMessage(message, topic, context, sub);
074                    }
075                });
076            }
077        }
078    
079        public void start() throws Exception {
080            if (query == null) {
081                throw new IllegalArgumentException("No query property configured");
082            }
083        }
084    
085        public void stop() throws Exception {
086        }
087    
088        public MessageQuery getQuery() {
089            return query;
090        }
091    
092        /**
093         * Sets the query strategy to load initial messages
094         */
095        public void setQuery(MessageQuery query) {
096            this.query = query;
097        }
098    
099        public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception {
100            return new org.apache.activemq.command.Message[0];
101        }
102    
103        protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) {
104            try {
105                ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null);
106                ActiveMQDestination destination = activeMessage.getDestination();
107                if (destination == null) {
108                    destination = sub.getActiveMQDestination();
109                    activeMessage.setDestination(destination);
110                }
111                activeMessage.setRegionDestination(regionDestination);
112                configure(activeMessage);
113                sub.addRecoveredMessage(context, activeMessage);
114            } catch (Throwable e) {
115                LOG.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e);
116            }
117        }
118    
119        protected void configure(ActiveMQMessage msg) throws JMSException {
120            long sequenceNumber = messageSequence.incrementAndGet();
121            msg.setMessageId(new MessageId(producerId, sequenceNumber));
122            msg.onSend();
123            msg.setProducerId(producerId);
124        }
125    
126        protected ProducerId createProducerId() {
127            String id = idGenerator.generateId();
128            ConnectionId connectionId = new ConnectionId(id);
129            SessionId sessionId = new SessionId(connectionId, 1);
130            return new ProducerId(sessionId, 1);
131        }
132    }