001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.util.Collections;
023    import java.util.HashMap;
024    import java.util.Map;
025    import javax.jms.JMSException;
026    
027    import org.apache.activemq.ActiveMQConnection;
028    import org.apache.activemq.advisory.AdvisorySupport;
029    import org.apache.activemq.broker.region.Destination;
030    import org.apache.activemq.broker.region.MessageReference;
031    import org.apache.activemq.usage.MemoryUsage;
032    import org.apache.activemq.util.ByteArrayInputStream;
033    import org.apache.activemq.util.ByteArrayOutputStream;
034    import org.apache.activemq.util.ByteSequence;
035    import org.apache.activemq.util.MarshallingSupport;
036    import org.apache.activemq.wireformat.WireFormat;
037    
038    /**
039     * Represents an ActiveMQ message
040     * 
041     * @openwire:marshaller
042     * @version $Revision$
043     */
044    public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
045    
046        /**
047         * The default minimum amount of memory a message is assumed to use
048         */
049        public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
050    
051        protected MessageId messageId;
052        protected ActiveMQDestination originalDestination;
053        protected TransactionId originalTransactionId;
054    
055        protected ProducerId producerId;
056        protected ActiveMQDestination destination;
057        protected TransactionId transactionId;
058    
059        protected long expiration;
060        protected long timestamp;
061        protected long arrival;
062        protected long brokerInTime;
063        protected long brokerOutTime;
064        protected String correlationId;
065        protected ActiveMQDestination replyTo;
066        protected boolean persistent;
067        protected String type;
068        protected byte priority;
069        protected String groupID;
070        protected int groupSequence;
071        protected ConsumerId targetConsumerId;
072        protected boolean compressed;
073        protected String userID;
074    
075        protected ByteSequence content;
076        protected ByteSequence marshalledProperties;
077        protected DataStructure dataStructure;
078        protected int redeliveryCounter;
079    
080        protected int size;
081        protected Map<String, Object> properties;
082        protected boolean readOnlyProperties;
083        protected boolean readOnlyBody;
084        protected transient boolean recievedByDFBridge;
085        protected boolean droppable;
086    
087        private transient short referenceCount;
088        private transient ActiveMQConnection connection;
089        private transient org.apache.activemq.broker.region.Destination regionDestination;
090        private transient MemoryUsage memoryUsage;
091    
092        private BrokerId[] brokerPath;
093        private BrokerId[] cluster;
094    
095        public abstract Message copy();
096        public abstract void clearBody() throws JMSException;
097    
098        protected void copy(Message copy) {
099            super.copy(copy);
100            copy.producerId = producerId;
101            copy.transactionId = transactionId;
102            copy.destination = destination;
103            copy.messageId = messageId != null ? messageId.copy() : null;
104            copy.originalDestination = originalDestination;
105            copy.originalTransactionId = originalTransactionId;
106            copy.expiration = expiration;
107            copy.timestamp = timestamp;
108            copy.correlationId = correlationId;
109            copy.replyTo = replyTo;
110            copy.persistent = persistent;
111            copy.redeliveryCounter = redeliveryCounter;
112            copy.type = type;
113            copy.priority = priority;
114            copy.size = size;
115            copy.groupID = groupID;
116            copy.userID = userID;
117            copy.groupSequence = groupSequence;
118    
119            if (properties != null) {
120                copy.properties = new HashMap<String, Object>(properties);
121            } else {
122                copy.properties = properties;
123            }
124    
125            copy.content = content;
126            copy.marshalledProperties = marshalledProperties;
127            copy.dataStructure = dataStructure;
128            copy.readOnlyProperties = readOnlyProperties;
129            copy.readOnlyBody = readOnlyBody;
130            copy.compressed = compressed;
131            copy.recievedByDFBridge = recievedByDFBridge;
132    
133            copy.arrival = arrival;
134            copy.connection = connection;
135            copy.regionDestination = regionDestination;
136            copy.brokerInTime = brokerInTime;
137            copy.brokerOutTime = brokerOutTime;
138            copy.memoryUsage=this.memoryUsage;
139            copy.brokerPath = brokerPath;
140    
141            // lets not copy the following fields
142            // copy.targetConsumerId = targetConsumerId;
143            // copy.referenceCount = referenceCount;
144        }
145    
146        public Object getProperty(String name) throws IOException {
147            if (properties == null) {
148                if (marshalledProperties == null) {
149                    return null;
150                }
151                properties = unmarsallProperties(marshalledProperties);
152            }
153            return properties.get(name);
154        }
155    
156        @SuppressWarnings("unchecked")
157        public Map<String, Object> getProperties() throws IOException {
158            if (properties == null) {
159                if (marshalledProperties == null) {
160                    return Collections.EMPTY_MAP;
161                }
162                properties = unmarsallProperties(marshalledProperties);
163            }
164            return Collections.unmodifiableMap(properties);
165        }
166    
167        public void clearProperties() {
168            marshalledProperties = null;
169            properties = null;
170        }
171    
172        public void setProperty(String name, Object value) throws IOException {
173            lazyCreateProperties();
174            properties.put(name, value);
175        }
176    
177        protected void lazyCreateProperties() throws IOException {
178            if (properties == null) {
179                if (marshalledProperties == null) {
180                    properties = new HashMap<String, Object>();
181                } else {
182                    properties = unmarsallProperties(marshalledProperties);
183                    marshalledProperties = null;
184                }
185            }
186        }
187    
188        private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
189            return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
190        }
191    
192        public void beforeMarshall(WireFormat wireFormat) throws IOException {
193            // Need to marshal the properties.
194            if (marshalledProperties == null && properties != null) {
195                ByteArrayOutputStream baos = new ByteArrayOutputStream();
196                DataOutputStream os = new DataOutputStream(baos);
197                MarshallingSupport.marshalPrimitiveMap(properties, os);
198                os.close();
199                marshalledProperties = baos.toByteSequence();
200            }
201        }
202    
203        public void afterMarshall(WireFormat wireFormat) throws IOException {
204        }
205    
206        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
207        }
208    
209        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
210        }
211    
212        // /////////////////////////////////////////////////////////////////
213        //
214        // Simple Field accessors
215        //
216        // /////////////////////////////////////////////////////////////////
217    
218        /**
219         * @openwire:property version=1 cache=true
220         */
221        public ProducerId getProducerId() {
222            return producerId;
223        }
224    
225        public void setProducerId(ProducerId producerId) {
226            this.producerId = producerId;
227        }
228    
229        /**
230         * @openwire:property version=1 cache=true
231         */
232        public ActiveMQDestination getDestination() {
233            return destination;
234        }
235    
236        public void setDestination(ActiveMQDestination destination) {
237            this.destination = destination;
238        }
239    
240        /**
241         * @openwire:property version=1 cache=true
242         */
243        public TransactionId getTransactionId() {
244            return transactionId;
245        }
246    
247        public void setTransactionId(TransactionId transactionId) {
248            this.transactionId = transactionId;
249        }
250    
251        public boolean isInTransaction() {
252            return transactionId != null;
253        }
254    
255        /**
256         * @openwire:property version=1 cache=true
257         */
258        public ActiveMQDestination getOriginalDestination() {
259            return originalDestination;
260        }
261    
262        public void setOriginalDestination(ActiveMQDestination destination) {
263            this.originalDestination = destination;
264        }
265    
266        /**
267         * @openwire:property version=1
268         */
269        public MessageId getMessageId() {
270            return messageId;
271        }
272    
273        public void setMessageId(MessageId messageId) {
274            this.messageId = messageId;
275        }
276    
277        /**
278         * @openwire:property version=1 cache=true
279         */
280        public TransactionId getOriginalTransactionId() {
281            return originalTransactionId;
282        }
283    
284        public void setOriginalTransactionId(TransactionId transactionId) {
285            this.originalTransactionId = transactionId;
286        }
287    
288        /**
289         * @openwire:property version=1
290         */
291        public String getGroupID() {
292            return groupID;
293        }
294    
295        public void setGroupID(String groupID) {
296            this.groupID = groupID;
297        }
298    
299        /**
300         * @openwire:property version=1
301         */
302        public int getGroupSequence() {
303            return groupSequence;
304        }
305    
306        public void setGroupSequence(int groupSequence) {
307            this.groupSequence = groupSequence;
308        }
309    
310        /**
311         * @openwire:property version=1
312         */
313        public String getCorrelationId() {
314            return correlationId;
315        }
316    
317        public void setCorrelationId(String correlationId) {
318            this.correlationId = correlationId;
319        }
320    
321        /**
322         * @openwire:property version=1
323         */
324        public boolean isPersistent() {
325            return persistent;
326        }
327    
328        public void setPersistent(boolean deliveryMode) {
329            this.persistent = deliveryMode;
330        }
331    
332        /**
333         * @openwire:property version=1
334         */
335        public long getExpiration() {
336            return expiration;
337        }
338    
339        public void setExpiration(long expiration) {
340            this.expiration = expiration;
341        }
342    
343        /**
344         * @openwire:property version=1
345         */
346        public byte getPriority() {
347            return priority;
348        }
349    
350        public void setPriority(byte priority) {
351            this.priority = priority;
352        }
353    
354        /**
355         * @openwire:property version=1
356         */
357        public ActiveMQDestination getReplyTo() {
358            return replyTo;
359        }
360    
361        public void setReplyTo(ActiveMQDestination replyTo) {
362            this.replyTo = replyTo;
363        }
364    
365        /**
366         * @openwire:property version=1
367         */
368        public long getTimestamp() {
369            return timestamp;
370        }
371    
372        public void setTimestamp(long timestamp) {
373            this.timestamp = timestamp;
374        }
375    
376        /**
377         * @openwire:property version=1
378         */
379        public String getType() {
380            return type;
381        }
382    
383        public void setType(String type) {
384            this.type = type;
385        }
386    
387        /**
388         * @openwire:property version=1
389         */
390        public ByteSequence getContent() {
391            return content;
392        }
393    
394        public void setContent(ByteSequence content) {
395            this.content = content;
396        }
397    
398        /**
399         * @openwire:property version=1
400         */
401        public ByteSequence getMarshalledProperties() {
402            return marshalledProperties;
403        }
404    
405        public void setMarshalledProperties(ByteSequence marshalledProperties) {
406            this.marshalledProperties = marshalledProperties;
407        }
408    
409        /**
410         * @openwire:property version=1
411         */
412        public DataStructure getDataStructure() {
413            return dataStructure;
414        }
415    
416        public void setDataStructure(DataStructure data) {
417            this.dataStructure = data;
418        }
419    
420        /**
421         * Can be used to route the message to a specific consumer. Should be null
422         * to allow the broker use normal JMS routing semantics. If the target
423         * consumer id is an active consumer on the broker, the message is dropped.
424         * Used by the AdvisoryBroker to replay advisory messages to a specific
425         * consumer.
426         * 
427         * @openwire:property version=1 cache=true
428         */
429        public ConsumerId getTargetConsumerId() {
430            return targetConsumerId;
431        }
432    
433        public void setTargetConsumerId(ConsumerId targetConsumerId) {
434            this.targetConsumerId = targetConsumerId;
435        }
436    
437        public boolean isExpired() {
438            long expireTime = getExpiration();
439            return expireTime > 0 && System.currentTimeMillis() > expireTime;
440        }
441    
442        public boolean isAdvisory() {
443            return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
444        }
445    
446        /**
447         * @openwire:property version=1
448         */
449        public boolean isCompressed() {
450            return compressed;
451        }
452    
453        public void setCompressed(boolean compressed) {
454            this.compressed = compressed;
455        }
456    
457        public boolean isRedelivered() {
458            return redeliveryCounter > 0;
459        }
460    
461        public void setRedelivered(boolean redelivered) {
462            if (redelivered) {
463                if (!isRedelivered()) {
464                    setRedeliveryCounter(1);
465                }
466            } else {
467                if (isRedelivered()) {
468                    setRedeliveryCounter(0);
469                }
470            }
471        }
472    
473        public void incrementRedeliveryCounter() {
474            redeliveryCounter++;
475        }
476    
477        /**
478         * @openwire:property version=1
479         */
480        public int getRedeliveryCounter() {
481            return redeliveryCounter;
482        }
483    
484        public void setRedeliveryCounter(int deliveryCounter) {
485            this.redeliveryCounter = deliveryCounter;
486        }
487    
488        /**
489         * The route of brokers the command has moved through.
490         * 
491         * @openwire:property version=1 cache=true
492         */
493        public BrokerId[] getBrokerPath() {
494            return brokerPath;
495        }
496    
497        public void setBrokerPath(BrokerId[] brokerPath) {
498            this.brokerPath = brokerPath;
499        }
500    
501        public boolean isReadOnlyProperties() {
502            return readOnlyProperties;
503        }
504    
505        public void setReadOnlyProperties(boolean readOnlyProperties) {
506            this.readOnlyProperties = readOnlyProperties;
507        }
508    
509        public boolean isReadOnlyBody() {
510            return readOnlyBody;
511        }
512    
513        public void setReadOnlyBody(boolean readOnlyBody) {
514            this.readOnlyBody = readOnlyBody;
515        }
516    
517        public ActiveMQConnection getConnection() {
518            return this.connection;
519        }
520    
521        public void setConnection(ActiveMQConnection connection) {
522            this.connection = connection;
523        }
524    
525        /**
526         * Used to schedule the arrival time of a message to a broker. The broker
527         * will not dispatch a message to a consumer until it's arrival time has
528         * elapsed.
529         * 
530         * @openwire:property version=1
531         */
532        public long getArrival() {
533            return arrival;
534        }
535    
536        public void setArrival(long arrival) {
537            this.arrival = arrival;
538        }
539    
540        /**
541         * Only set by the broker and defines the userID of the producer connection
542         * who sent this message. This is an optional field, it needs to be enabled
543         * on the broker to have this field populated.
544         * 
545         * @openwire:property version=1
546         */
547        public String getUserID() {
548            return userID;
549        }
550    
551        public void setUserID(String jmsxUserID) {
552            this.userID = jmsxUserID;
553        }
554    
555        public int getReferenceCount() {
556            return referenceCount;
557        }
558    
559        public Message getMessageHardRef() {
560            return this;
561        }
562    
563        public Message getMessage() throws IOException {
564            return this;
565        }
566    
567        public org.apache.activemq.broker.region.Destination getRegionDestination() {
568            return regionDestination;
569        }
570    
571        public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
572            this.regionDestination = destination;
573            if(this.memoryUsage==null) {
574                this.memoryUsage=regionDestination.getMemoryUsage();
575            }
576        }
577        
578        public MemoryUsage getMemoryUsage() {
579            return this.memoryUsage;
580        }
581        
582        public void setMemoryUsage(MemoryUsage usage) {
583            this.memoryUsage=usage;
584        }
585    
586        public boolean isMarshallAware() {
587            return true;
588        }
589    
590        public int incrementReferenceCount() {
591            int rc;
592            int size;
593            synchronized (this) {
594                rc = ++referenceCount;
595                size = getSize();
596            }
597    
598            if (rc == 1 && getMemoryUsage() != null) {
599                getMemoryUsage().increaseUsage(size);
600            }
601    
602            //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
603            return rc;
604        }
605    
606        public int decrementReferenceCount() {
607            int rc;
608            int size;
609            synchronized (this) {
610                rc = --referenceCount;
611                size = getSize();
612            }
613    
614            if (rc == 0 && getMemoryUsage() != null) {
615                getMemoryUsage().decreaseUsage(size);
616            }
617            //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
618    
619            return rc;
620        }
621    
622        public int getSize() {
623            int minimumMessageSize = getMinimumMessageSize();
624            if (size < minimumMessageSize || size == 0) {
625                size = minimumMessageSize;
626                if (marshalledProperties != null) {
627                    size += marshalledProperties.getLength();
628                }
629                if (content != null) {
630                    size += content.getLength();
631                }
632            }
633            return size;
634        }
635        
636        protected int getMinimumMessageSize() {
637            int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
638            //let destination override
639            Destination dest = regionDestination;
640            if (dest != null) {
641                result=dest.getMinimumMessageSize();
642            }
643            return result;
644        }
645    
646        /**
647         * @openwire:property version=1
648         * @return Returns the recievedByDFBridge.
649         */
650        public boolean isRecievedByDFBridge() {
651            return recievedByDFBridge;
652        }
653    
654        /**
655         * @param recievedByDFBridge The recievedByDFBridge to set.
656         */
657        public void setRecievedByDFBridge(boolean recievedByDFBridge) {
658            this.recievedByDFBridge = recievedByDFBridge;
659        }
660    
661        public void onMessageRolledBack() {
662            incrementRedeliveryCounter();
663        }
664    
665        /**
666         * @openwire:property version=2 cache=true
667         */
668        public boolean isDroppable() {
669            return droppable;
670        }
671    
672        public void setDroppable(boolean droppable) {
673            this.droppable = droppable;
674        }
675    
676        /**
677         * If a message is stored in multiple nodes on a cluster, all the cluster
678         * members will be listed here. Otherwise, it will be null.
679         * 
680         * @openwire:property version=3 cache=true
681         */
682        public BrokerId[] getCluster() {
683            return cluster;
684        }
685    
686        public void setCluster(BrokerId[] cluster) {
687            this.cluster = cluster;
688        }
689    
690        public boolean isMessage() {
691            return true;
692        }
693    
694        /**
695         * @openwire:property version=3
696         */
697        public long getBrokerInTime() {
698            return this.brokerInTime;
699        }
700    
701        public void setBrokerInTime(long brokerInTime) {
702            this.brokerInTime = brokerInTime;
703        }
704    
705        /**
706         * @openwire:property version=3
707         */
708        public long getBrokerOutTime() {
709            return this.brokerOutTime;
710        }
711    
712        public void setBrokerOutTime(long brokerOutTime) {
713            this.brokerOutTime = brokerOutTime;
714        }
715        
716        public boolean isDropped() {
717            return false;
718        }
719        
720        public String toString() {
721            return toString(null);
722        }
723        
724        public String toString(Map<String, Object>overrideFields) {
725            try {
726                getProperties();
727            } catch (IOException e) {
728            }
729            return super.toString(overrideFields);
730        }    
731    }