001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.LinkedList;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ActiveMQMessage;
030    import org.apache.activemq.command.ConsumerInfo;
031    import org.apache.activemq.command.MessageAck;
032    import org.apache.activemq.command.MessageDispatch;
033    import org.apache.activemq.command.MessageId;
034    import org.apache.activemq.command.TransactionId;
035    
036    /**
037     * Keeps track of the STOMP subscription so that acking is correctly done.
038     *
039     * @author <a href="http://hiramchirino.com">chirino</a>
040     */
041    public class StompSubscription {
042    
043        public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
044        public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
045        public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
046    
047        private final ProtocolConverter protocolConverter;
048        private final String subscriptionId;
049        private final ConsumerInfo consumerInfo;
050    
051        private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
052        private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
053    
054        private String ackMode = AUTO_ACK;
055        private ActiveMQDestination destination;
056        private String transformation;
057    
058    
059        public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
060            this.protocolConverter = stompTransport;
061            this.subscriptionId = subscriptionId;
062            this.consumerInfo = consumerInfo;
063            this.transformation = transformation;
064        }
065    
066        void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
067            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
068            if (ackMode == CLIENT_ACK) {
069                synchronized (this) {
070                    dispatchedMessage.put(message.getMessageId(), md);
071                }
072            } else if (ackMode == INDIVIDUAL_ACK) {
073                synchronized (this) {
074                    dispatchedMessage.put(message.getMessageId(), md);
075                }
076            } else if (ackMode == AUTO_ACK) {
077                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
078                protocolConverter.getTransportFilter().sendToActiveMQ(ack);
079            }
080    
081            boolean ignoreTransformation = false;
082    
083            if (transformation != null) {
084                    message.setReadOnlyProperties(false);
085                    message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
086            } else {
087                    if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
088                            ignoreTransformation = true;
089                    }
090            }
091    
092            StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
093    
094            command.setAction(Stomp.Responses.MESSAGE);
095            if (subscriptionId != null) {
096                command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
097            }
098    
099            protocolConverter.getTransportFilter().sendToStomp(command);
100        }
101    
102        synchronized void onStompAbort(TransactionId transactionId) {
103            unconsumedMessage.clear();
104        }
105    
106        synchronized void onStompCommit(TransactionId transactionId) {
107            for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
108                Map.Entry entry = (Entry)iter.next();
109                MessageId id = (MessageId)entry.getKey();
110                MessageDispatch msg = (MessageDispatch)entry.getValue();
111                if (unconsumedMessage.contains(msg)) {
112                    iter.remove();
113                }
114            }
115            unconsumedMessage.clear();
116        }
117    
118        synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
119    
120            MessageId msgId = new MessageId(messageId);
121    
122            if (!dispatchedMessage.containsKey(msgId)) {
123                return null;
124            }
125    
126            MessageAck ack = new MessageAck();
127            ack.setDestination(consumerInfo.getDestination());
128            ack.setConsumerId(consumerInfo.getConsumerId());
129    
130            if (ackMode == CLIENT_ACK) {
131                    ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
132                int count = 0;
133                for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
134    
135                    Map.Entry entry = (Entry)iter.next();
136                    MessageId id = (MessageId)entry.getKey();
137                    MessageDispatch msg = (MessageDispatch)entry.getValue();
138    
139                    if (ack.getFirstMessageId() == null) {
140                        ack.setFirstMessageId(id);
141                    }
142    
143                    if (transactionId != null) {
144                            if (!unconsumedMessage.contains(msg)) {
145                                    unconsumedMessage.add(msg);
146                            }
147                    } else {
148                            iter.remove();
149                    }
150    
151    
152                    count++;
153    
154                    if (id.equals(msgId)) {
155                        ack.setLastMessageId(id);
156                        break;
157                    }
158    
159                }
160                ack.setMessageCount(count);
161                if (transactionId != null) {
162                    ack.setTransactionId(transactionId);
163                }
164            }
165            else if (ackMode == INDIVIDUAL_ACK) {
166                ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
167                ack.setMessageID(msgId);
168                if (transactionId != null) {
169                    unconsumedMessage.add(dispatchedMessage.get(msgId));
170                    ack.setTransactionId(transactionId);
171                }
172                dispatchedMessage.remove(msgId);
173            }
174            return ack;
175        }
176    
177        public String getAckMode() {
178            return ackMode;
179        }
180    
181        public void setAckMode(String ackMode) {
182            this.ackMode = ackMode;
183        }
184    
185        public String getSubscriptionId() {
186            return subscriptionId;
187        }
188    
189        public void setDestination(ActiveMQDestination destination) {
190            this.destination = destination;
191        }
192    
193        public ActiveMQDestination getDestination() {
194            return destination;
195        }
196    
197        public ConsumerInfo getConsumerInfo() {
198            return consumerInfo;
199        }
200    
201    }