001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.command;
019    
020    import java.io.BufferedInputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.EOFException;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.OutputStream;
027    import java.util.zip.DeflaterOutputStream;
028    import java.util.zip.InflaterInputStream;
029    
030    import javax.jms.JMSException;
031    import javax.jms.MessageEOFException;
032    import javax.jms.MessageFormatException;
033    import javax.jms.MessageNotReadableException;
034    import javax.jms.MessageNotWriteableException;
035    import javax.jms.StreamMessage;
036    
037    import org.apache.activemq.ActiveMQConnection;
038    import org.apache.activemq.util.ByteArrayInputStream;
039    import org.apache.activemq.util.ByteArrayOutputStream;
040    import org.apache.activemq.util.ByteSequence;
041    import org.apache.activemq.util.JMSExceptionSupport;
042    import org.apache.activemq.util.MarshallingSupport;
043    
044    /**
045     * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
046     * types in the Java programming language. It is filled and read sequentially.
047     * It inherits from the <CODE>Message</CODE> interface and adds a stream
048     * message body. Its methods are based largely on those found in
049     * <CODE>java.io.DataInputStream</CODE> and
050     * <CODE>java.io.DataOutputStream</CODE>. <p/>
051     * <P>
052     * The primitive types can be read or written explicitly using methods for each
053     * type. They may also be read or written generically as objects. For instance,
054     * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
055     * <CODE>StreamMessage.writeObject(new
056     * Integer(6))</CODE>. Both forms are
057     * provided, because the explicit form is convenient for static programming, and
058     * the object form is needed when types are not known at compile time. <p/>
059     * <P>
060     * When the message is first created, and when <CODE>clearBody</CODE> is
061     * called, the body of the message is in write-only mode. After the first call
062     * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
063     * After a message has been sent, the client that sent it can retain and modify
064     * it without affecting the message that has been sent. The same message object
065     * can be sent multiple times. When a message has been received, the provider
066     * has called <CODE>reset</CODE> so that the message body is in read-only mode
067     * for the client. <p/>
068     * <P>
069     * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
070     * message body is cleared and the message body is in write-only mode. <p/>
071     * <P>
072     * If a client attempts to read a message in write-only mode, a
073     * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
074     * <P>
075     * If a client attempts to write a message in read-only mode, a
076     * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
077     * <P>
078     * <CODE>StreamMessage</CODE> objects support the following conversion table.
079     * The marked cases must be supported. The unmarked cases must throw a
080     * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
081     * conversions may throw a runtime exception if the primitive's
082     * <CODE>valueOf()</CODE> method does not accept it as a valid
083     * <CODE>String</CODE> representation of the primitive. <p/>
084     * <P>
085     * A value written as the row type can be read as the column type. <p/>
086     * 
087     * <PRE>
088     *  | | boolean byte short char int long float double String byte[]
089     * |----------------------------------------------------------------------
090     * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
091     * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
092     * X |----------------------------------------------------------------------
093     * 
094     * </PRE>
095     * 
096     * <p/>
097     * <P>
098     * Attempting to read a null value as a primitive type must be treated as
099     * calling the primitive's corresponding <code>valueOf(String)</code>
100     * conversion method with a null value. Since <code>char</code> does not
101     * support a <code>String</code> conversion, attempting to read a null value
102     * as a <code>char</code> must throw a <code>NullPointerException</code>.
103     * 
104     * @openwire:marshaller code="27"
105     * @see javax.jms.Session#createStreamMessage()
106     * @see javax.jms.BytesMessage
107     * @see javax.jms.MapMessage
108     * @see javax.jms.Message
109     * @see javax.jms.ObjectMessage
110     * @see javax.jms.TextMessage
111     */
112    public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
113    
114        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
115    
116        protected transient DataOutputStream dataOut;
117        protected transient ByteArrayOutputStream bytesOut;
118        protected transient DataInputStream dataIn;
119        protected transient int remainingBytes = -1;
120    
121        public Message copy() {
122            ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
123            copy(copy);
124            return copy;
125        }
126    
127        private void copy(ActiveMQStreamMessage copy) {
128            storeContent();
129            super.copy(copy);
130            copy.dataOut = null;
131            copy.bytesOut = null;
132            copy.dataIn = null;
133        }
134    
135        public void onSend() throws JMSException {
136            super.onSend();
137            storeContent();
138        }
139    
140        private void storeContent() {
141            if (dataOut != null) {
142                try {
143                    dataOut.close();
144                    setContent(bytesOut.toByteSequence());
145                    bytesOut = null;
146                    dataOut = null;
147                } catch (IOException ioe) {
148                    throw new RuntimeException(ioe);
149                }
150            }
151        }
152    
153        public byte getDataStructureType() {
154            return DATA_STRUCTURE_TYPE;
155        }
156    
157        public String getJMSXMimeType() {
158            return "jms/stream-message";
159        }
160    
161        /**
162         * Clears out the message body. Clearing a message's body does not clear its
163         * header values or property entries. <p/>
164         * <P>
165         * If this message body was read-only, calling this method leaves the
166         * message body in the same state as an empty body in a newly created
167         * message.
168         * 
169         * @throws JMSException if the JMS provider fails to clear the message body
170         *                 due to some internal error.
171         */
172    
173        public void clearBody() throws JMSException {
174            super.clearBody();
175            this.dataOut = null;
176            this.dataIn = null;
177            this.bytesOut = null;
178            this.remainingBytes = -1;
179        }
180    
181        /**
182         * Reads a <code>boolean</code> from the stream message.
183         * 
184         * @return the <code>boolean</code> value read
185         * @throws JMSException if the JMS provider fails to read the message due to
186         *                 some internal error.
187         * @throws MessageEOFException if unexpected end of message stream has been
188         *                 reached.
189         * @throws MessageFormatException if this type conversion is invalid.
190         * @throws MessageNotReadableException if the message is in write-only mode.
191         */
192    
193        public boolean readBoolean() throws JMSException {
194            initializeReading();
195            try {
196    
197                this.dataIn.mark(10);
198                int type = this.dataIn.read();
199                if (type == -1) {
200                    throw new MessageEOFException("reached end of data");
201                }
202                if (type == MarshallingSupport.BOOLEAN_TYPE) {
203                    return this.dataIn.readBoolean();
204                }
205                if (type == MarshallingSupport.STRING_TYPE) {
206                    return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
207                }
208                if (type == MarshallingSupport.NULL) {
209                    this.dataIn.reset();
210                    throw new NullPointerException("Cannot convert NULL value to boolean.");
211                } else {
212                    this.dataIn.reset();
213                    throw new MessageFormatException(" not a boolean type");
214                }
215            } catch (EOFException e) {
216                throw JMSExceptionSupport.createMessageEOFException(e);
217            } catch (IOException e) {
218                throw JMSExceptionSupport.createMessageFormatException(e);
219            }
220        }
221    
222        /**
223         * Reads a <code>byte</code> value from the stream message.
224         * 
225         * @return the next byte from the stream message as a 8-bit
226         *         <code>byte</code>
227         * @throws JMSException if the JMS provider fails to read the message due to
228         *                 some internal error.
229         * @throws MessageEOFException if unexpected end of message stream has been
230         *                 reached.
231         * @throws MessageFormatException if this type conversion is invalid.
232         * @throws MessageNotReadableException if the message is in write-only mode.
233         */
234    
235        public byte readByte() throws JMSException {
236            initializeReading();
237            try {
238    
239                this.dataIn.mark(10);
240                int type = this.dataIn.read();
241                if (type == -1) {
242                    throw new MessageEOFException("reached end of data");
243                }
244                if (type == MarshallingSupport.BYTE_TYPE) {
245                    return this.dataIn.readByte();
246                }
247                if (type == MarshallingSupport.STRING_TYPE) {
248                    return Byte.valueOf(this.dataIn.readUTF()).byteValue();
249                }
250                if (type == MarshallingSupport.NULL) {
251                    this.dataIn.reset();
252                    throw new NullPointerException("Cannot convert NULL value to byte.");
253                } else {
254                    this.dataIn.reset();
255                    throw new MessageFormatException(" not a byte type");
256                }
257            } catch (NumberFormatException mfe) {
258                try {
259                    this.dataIn.reset();
260                } catch (IOException ioe) {
261                    throw JMSExceptionSupport.create(ioe);
262                }
263                throw mfe;
264    
265            } catch (EOFException e) {
266                throw JMSExceptionSupport.createMessageEOFException(e);
267            } catch (IOException e) {
268                throw JMSExceptionSupport.createMessageFormatException(e);
269            }
270        }
271    
272        /**
273         * Reads a 16-bit integer from the stream message.
274         * 
275         * @return a 16-bit integer from the stream message
276         * @throws JMSException if the JMS provider fails to read the message due to
277         *                 some internal error.
278         * @throws MessageEOFException if unexpected end of message stream has been
279         *                 reached.
280         * @throws MessageFormatException if this type conversion is invalid.
281         * @throws MessageNotReadableException if the message is in write-only mode.
282         */
283    
284        public short readShort() throws JMSException {
285            initializeReading();
286            try {
287    
288                this.dataIn.mark(17);
289                int type = this.dataIn.read();
290                if (type == -1) {
291                    throw new MessageEOFException("reached end of data");
292                }
293                if (type == MarshallingSupport.SHORT_TYPE) {
294                    return this.dataIn.readShort();
295                }
296                if (type == MarshallingSupport.BYTE_TYPE) {
297                    return this.dataIn.readByte();
298                }
299                if (type == MarshallingSupport.STRING_TYPE) {
300                    return Short.valueOf(this.dataIn.readUTF()).shortValue();
301                }
302                if (type == MarshallingSupport.NULL) {
303                    this.dataIn.reset();
304                    throw new NullPointerException("Cannot convert NULL value to short.");
305                } else {
306                    this.dataIn.reset();
307                    throw new MessageFormatException(" not a short type");
308                }
309            } catch (NumberFormatException mfe) {
310                try {
311                    this.dataIn.reset();
312                } catch (IOException ioe) {
313                    throw JMSExceptionSupport.create(ioe);
314                }
315                throw mfe;
316    
317            } catch (EOFException e) {
318                throw JMSExceptionSupport.createMessageEOFException(e);
319            } catch (IOException e) {
320                throw JMSExceptionSupport.createMessageFormatException(e);
321            }
322    
323        }
324    
325        /**
326         * Reads a Unicode character value from the stream message.
327         * 
328         * @return a Unicode character from the stream message
329         * @throws JMSException if the JMS provider fails to read the message due to
330         *                 some internal error.
331         * @throws MessageEOFException if unexpected end of message stream has been
332         *                 reached.
333         * @throws MessageFormatException if this type conversion is invalid
334         * @throws MessageNotReadableException if the message is in write-only mode.
335         */
336    
337        public char readChar() throws JMSException {
338            initializeReading();
339            try {
340    
341                this.dataIn.mark(17);
342                int type = this.dataIn.read();
343                if (type == -1) {
344                    throw new MessageEOFException("reached end of data");
345                }
346                if (type == MarshallingSupport.CHAR_TYPE) {
347                    return this.dataIn.readChar();
348                }
349                if (type == MarshallingSupport.NULL) {
350                    this.dataIn.reset();
351                    throw new NullPointerException("Cannot convert NULL value to char.");
352                } else {
353                    this.dataIn.reset();
354                    throw new MessageFormatException(" not a char type");
355                }
356            } catch (NumberFormatException mfe) {
357                try {
358                    this.dataIn.reset();
359                } catch (IOException ioe) {
360                    throw JMSExceptionSupport.create(ioe);
361                }
362                throw mfe;
363    
364            } catch (EOFException e) {
365                throw JMSExceptionSupport.createMessageEOFException(e);
366            } catch (IOException e) {
367                throw JMSExceptionSupport.createMessageFormatException(e);
368            }
369        }
370    
371        /**
372         * Reads a 32-bit integer from the stream message.
373         * 
374         * @return a 32-bit integer value from the stream message, interpreted as an
375         *         <code>int</code>
376         * @throws JMSException if the JMS provider fails to read the message due to
377         *                 some internal error.
378         * @throws MessageEOFException if unexpected end of message stream has been
379         *                 reached.
380         * @throws MessageFormatException if this type conversion is invalid.
381         * @throws MessageNotReadableException if the message is in write-only mode.
382         */
383    
384        public int readInt() throws JMSException {
385            initializeReading();
386            try {
387    
388                this.dataIn.mark(33);
389                int type = this.dataIn.read();
390                if (type == -1) {
391                    throw new MessageEOFException("reached end of data");
392                }
393                if (type == MarshallingSupport.INTEGER_TYPE) {
394                    return this.dataIn.readInt();
395                }
396                if (type == MarshallingSupport.SHORT_TYPE) {
397                    return this.dataIn.readShort();
398                }
399                if (type == MarshallingSupport.BYTE_TYPE) {
400                    return this.dataIn.readByte();
401                }
402                if (type == MarshallingSupport.STRING_TYPE) {
403                    return Integer.valueOf(this.dataIn.readUTF()).intValue();
404                }
405                if (type == MarshallingSupport.NULL) {
406                    this.dataIn.reset();
407                    throw new NullPointerException("Cannot convert NULL value to int.");
408                } else {
409                    this.dataIn.reset();
410                    throw new MessageFormatException(" not an int type");
411                }
412            } catch (NumberFormatException mfe) {
413                try {
414                    this.dataIn.reset();
415                } catch (IOException ioe) {
416                    throw JMSExceptionSupport.create(ioe);
417                }
418                throw mfe;
419    
420            } catch (EOFException e) {
421                throw JMSExceptionSupport.createMessageEOFException(e);
422            } catch (IOException e) {
423                throw JMSExceptionSupport.createMessageFormatException(e);
424            }
425        }
426    
427        /**
428         * Reads a 64-bit integer from the stream message.
429         * 
430         * @return a 64-bit integer value from the stream message, interpreted as a
431         *         <code>long</code>
432         * @throws JMSException if the JMS provider fails to read the message due to
433         *                 some internal error.
434         * @throws MessageEOFException if unexpected end of message stream has been
435         *                 reached.
436         * @throws MessageFormatException if this type conversion is invalid.
437         * @throws MessageNotReadableException if the message is in write-only mode.
438         */
439    
440        public long readLong() throws JMSException {
441            initializeReading();
442            try {
443    
444                this.dataIn.mark(65);
445                int type = this.dataIn.read();
446                if (type == -1) {
447                    throw new MessageEOFException("reached end of data");
448                }
449                if (type == MarshallingSupport.LONG_TYPE) {
450                    return this.dataIn.readLong();
451                }
452                if (type == MarshallingSupport.INTEGER_TYPE) {
453                    return this.dataIn.readInt();
454                }
455                if (type == MarshallingSupport.SHORT_TYPE) {
456                    return this.dataIn.readShort();
457                }
458                if (type == MarshallingSupport.BYTE_TYPE) {
459                    return this.dataIn.readByte();
460                }
461                if (type == MarshallingSupport.STRING_TYPE) {
462                    return Long.valueOf(this.dataIn.readUTF()).longValue();
463                }
464                if (type == MarshallingSupport.NULL) {
465                    this.dataIn.reset();
466                    throw new NullPointerException("Cannot convert NULL value to long.");
467                } else {
468                    this.dataIn.reset();
469                    throw new MessageFormatException(" not a long type");
470                }
471            } catch (NumberFormatException mfe) {
472                try {
473                    this.dataIn.reset();
474                } catch (IOException ioe) {
475                    throw JMSExceptionSupport.create(ioe);
476                }
477                throw mfe;
478    
479            } catch (EOFException e) {
480                throw JMSExceptionSupport.createMessageEOFException(e);
481            } catch (IOException e) {
482                throw JMSExceptionSupport.createMessageFormatException(e);
483            }
484        }
485    
486        /**
487         * Reads a <code>float</code> from the stream message.
488         * 
489         * @return a <code>float</code> value from the stream message
490         * @throws JMSException if the JMS provider fails to read the message due to
491         *                 some internal error.
492         * @throws MessageEOFException if unexpected end of message stream has been
493         *                 reached.
494         * @throws MessageFormatException if this type conversion is invalid.
495         * @throws MessageNotReadableException if the message is in write-only mode.
496         */
497    
498        public float readFloat() throws JMSException {
499            initializeReading();
500            try {
501                this.dataIn.mark(33);
502                int type = this.dataIn.read();
503                if (type == -1) {
504                    throw new MessageEOFException("reached end of data");
505                }
506                if (type == MarshallingSupport.FLOAT_TYPE) {
507                    return this.dataIn.readFloat();
508                }
509                if (type == MarshallingSupport.STRING_TYPE) {
510                    return Float.valueOf(this.dataIn.readUTF()).floatValue();
511                }
512                if (type == MarshallingSupport.NULL) {
513                    this.dataIn.reset();
514                    throw new NullPointerException("Cannot convert NULL value to float.");
515                } else {
516                    this.dataIn.reset();
517                    throw new MessageFormatException(" not a float type");
518                }
519            } catch (NumberFormatException mfe) {
520                try {
521                    this.dataIn.reset();
522                } catch (IOException ioe) {
523                    throw JMSExceptionSupport.create(ioe);
524                }
525                throw mfe;
526    
527            } catch (EOFException e) {
528                throw JMSExceptionSupport.createMessageEOFException(e);
529            } catch (IOException e) {
530                throw JMSExceptionSupport.createMessageFormatException(e);
531            }
532        }
533    
534        /**
535         * Reads a <code>double</code> from the stream message.
536         * 
537         * @return a <code>double</code> value from the stream message
538         * @throws JMSException if the JMS provider fails to read the message due to
539         *                 some internal error.
540         * @throws MessageEOFException if unexpected end of message stream has been
541         *                 reached.
542         * @throws MessageFormatException if this type conversion is invalid.
543         * @throws MessageNotReadableException if the message is in write-only mode.
544         */
545    
546        public double readDouble() throws JMSException {
547            initializeReading();
548            try {
549    
550                this.dataIn.mark(65);
551                int type = this.dataIn.read();
552                if (type == -1) {
553                    throw new MessageEOFException("reached end of data");
554                }
555                if (type == MarshallingSupport.DOUBLE_TYPE) {
556                    return this.dataIn.readDouble();
557                }
558                if (type == MarshallingSupport.FLOAT_TYPE) {
559                    return this.dataIn.readFloat();
560                }
561                if (type == MarshallingSupport.STRING_TYPE) {
562                    return Double.valueOf(this.dataIn.readUTF()).doubleValue();
563                }
564                if (type == MarshallingSupport.NULL) {
565                    this.dataIn.reset();
566                    throw new NullPointerException("Cannot convert NULL value to double.");
567                } else {
568                    this.dataIn.reset();
569                    throw new MessageFormatException(" not a double type");
570                }
571            } catch (NumberFormatException mfe) {
572                try {
573                    this.dataIn.reset();
574                } catch (IOException ioe) {
575                    throw JMSExceptionSupport.create(ioe);
576                }
577                throw mfe;
578    
579            } catch (EOFException e) {
580                throw JMSExceptionSupport.createMessageEOFException(e);
581            } catch (IOException e) {
582                throw JMSExceptionSupport.createMessageFormatException(e);
583            }
584        }
585    
586        /**
587         * Reads a <CODE>String</CODE> from the stream message.
588         * 
589         * @return a Unicode string from the stream message
590         * @throws JMSException if the JMS provider fails to read the message due to
591         *                 some internal error.
592         * @throws MessageEOFException if unexpected end of message stream has been
593         *                 reached.
594         * @throws MessageFormatException if this type conversion is invalid.
595         * @throws MessageNotReadableException if the message is in write-only mode.
596         */
597    
598        public String readString() throws JMSException {
599            initializeReading();
600            try {
601    
602                this.dataIn.mark(65);
603                int type = this.dataIn.read();
604                if (type == -1) {
605                    throw new MessageEOFException("reached end of data");
606                }
607                if (type == MarshallingSupport.NULL) {
608                    return null;
609                }
610                if (type == MarshallingSupport.BIG_STRING_TYPE) {
611                    return MarshallingSupport.readUTF8(dataIn);
612                }
613                if (type == MarshallingSupport.STRING_TYPE) {
614                    return this.dataIn.readUTF();
615                }
616                if (type == MarshallingSupport.LONG_TYPE) {
617                    return new Long(this.dataIn.readLong()).toString();
618                }
619                if (type == MarshallingSupport.INTEGER_TYPE) {
620                    return new Integer(this.dataIn.readInt()).toString();
621                }
622                if (type == MarshallingSupport.SHORT_TYPE) {
623                    return new Short(this.dataIn.readShort()).toString();
624                }
625                if (type == MarshallingSupport.BYTE_TYPE) {
626                    return new Byte(this.dataIn.readByte()).toString();
627                }
628                if (type == MarshallingSupport.FLOAT_TYPE) {
629                    return new Float(this.dataIn.readFloat()).toString();
630                }
631                if (type == MarshallingSupport.DOUBLE_TYPE) {
632                    return new Double(this.dataIn.readDouble()).toString();
633                }
634                if (type == MarshallingSupport.BOOLEAN_TYPE) {
635                    return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
636                }
637                if (type == MarshallingSupport.CHAR_TYPE) {
638                    return new Character(this.dataIn.readChar()).toString();
639                } else {
640                    this.dataIn.reset();
641                    throw new MessageFormatException(" not a String type");
642                }
643            } catch (NumberFormatException mfe) {
644                try {
645                    this.dataIn.reset();
646                } catch (IOException ioe) {
647                    throw JMSExceptionSupport.create(ioe);
648                }
649                throw mfe;
650    
651            } catch (EOFException e) {
652                throw JMSExceptionSupport.createMessageEOFException(e);
653            } catch (IOException e) {
654                throw JMSExceptionSupport.createMessageFormatException(e);
655            }
656        }
657    
658        /**
659         * Reads a byte array field from the stream message into the specified
660         * <CODE>byte[]</CODE> object (the read buffer). <p/>
661         * <P>
662         * To read the field value, <CODE>readBytes</CODE> should be successively
663         * called until it returns a value less than the length of the read buffer.
664         * The value of the bytes in the buffer following the last byte read is
665         * undefined. <p/>
666         * <P>
667         * If <CODE>readBytes</CODE> returns a value equal to the length of the
668         * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
669         * are no more bytes to be read, this call returns -1. <p/>
670         * <P>
671         * If the byte array field value is null, <CODE>readBytes</CODE> returns
672         * -1. <p/>
673         * <P>
674         * If the byte array field value is empty, <CODE>readBytes</CODE> returns
675         * 0. <p/>
676         * <P>
677         * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
678         * field value has been made, the full value of the field must be read
679         * before it is valid to read the next field. An attempt to read the next
680         * field before that has been done will throw a
681         * <CODE>MessageFormatException</CODE>. <p/>
682         * <P>
683         * To read the byte field value into a new <CODE>byte[]</CODE> object, use
684         * the <CODE>readObject</CODE> method.
685         * 
686         * @param value the buffer into which the data is read
687         * @return the total number of bytes read into the buffer, or -1 if there is
688         *         no more data because the end of the byte field has been reached
689         * @throws JMSException if the JMS provider fails to read the message due to
690         *                 some internal error.
691         * @throws MessageEOFException if unexpected end of message stream has been
692         *                 reached.
693         * @throws MessageFormatException if this type conversion is invalid.
694         * @throws MessageNotReadableException if the message is in write-only mode.
695         * @see #readObject()
696         */
697    
698        public int readBytes(byte[] value) throws JMSException {
699    
700            initializeReading();
701            try {
702                if (value == null) {
703                    throw new NullPointerException();
704                }
705    
706                if (remainingBytes == -1) {
707                    this.dataIn.mark(value.length + 1);
708                    int type = this.dataIn.read();
709                    if (type == -1) {
710                        throw new MessageEOFException("reached end of data");
711                    }
712                    if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
713                        throw new MessageFormatException("Not a byte array");
714                    }
715                    remainingBytes = this.dataIn.readInt();
716                } else if (remainingBytes == 0) {
717                    remainingBytes = -1;
718                    return -1;
719                }
720    
721                if (value.length <= remainingBytes) {
722                    // small buffer
723                    remainingBytes -= value.length;
724                    this.dataIn.readFully(value);
725                    return value.length;
726                } else {
727                    // big buffer
728                    int rc = this.dataIn.read(value, 0, remainingBytes);
729                    remainingBytes = 0;
730                    return rc;
731                }
732    
733            } catch (EOFException e) {
734                JMSException jmsEx = new MessageEOFException(e.getMessage());
735                jmsEx.setLinkedException(e);
736                throw jmsEx;
737            } catch (IOException e) {
738                JMSException jmsEx = new MessageFormatException(e.getMessage());
739                jmsEx.setLinkedException(e);
740                throw jmsEx;
741            }
742        }
743    
744        /**
745         * Reads an object from the stream message. <p/>
746         * <P>
747         * This method can be used to return, in objectified format, an object in
748         * the Java programming language ("Java object") that has been written to
749         * the stream with the equivalent <CODE>writeObject</CODE> method call, or
750         * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
751         * <P>
752         * Note that byte values are returned as <CODE>byte[]</CODE>, not
753         * <CODE>Byte[]</CODE>. <p/>
754         * <P>
755         * An attempt to call <CODE>readObject</CODE> to read a byte field value
756         * into a new <CODE>byte[]</CODE> object before the full value of the byte
757         * field has been read will throw a <CODE>MessageFormatException</CODE>.
758         * 
759         * @return a Java object from the stream message, in objectified format (for
760         *         example, if the object was written as an <CODE>int</CODE>, an
761         *         <CODE>Integer</CODE> is returned)
762         * @throws JMSException if the JMS provider fails to read the message due to
763         *                 some internal error.
764         * @throws MessageEOFException if unexpected end of message stream has been
765         *                 reached.
766         * @throws MessageFormatException if this type conversion is invalid.
767         * @throws MessageNotReadableException if the message is in write-only mode.
768         * @see #readBytes(byte[] value)
769         */
770    
771        public Object readObject() throws JMSException {
772            initializeReading();
773            try {
774                this.dataIn.mark(65);
775                int type = this.dataIn.read();
776                if (type == -1) {
777                    throw new MessageEOFException("reached end of data");
778                }
779                if (type == MarshallingSupport.NULL) {
780                    return null;
781                }
782                if (type == MarshallingSupport.BIG_STRING_TYPE) {
783                    return MarshallingSupport.readUTF8(dataIn);
784                }
785                if (type == MarshallingSupport.STRING_TYPE) {
786                    return this.dataIn.readUTF();
787                }
788                if (type == MarshallingSupport.LONG_TYPE) {
789                    return Long.valueOf(this.dataIn.readLong());
790                }
791                if (type == MarshallingSupport.INTEGER_TYPE) {
792                    return Integer.valueOf(this.dataIn.readInt());
793                }
794                if (type == MarshallingSupport.SHORT_TYPE) {
795                    return Short.valueOf(this.dataIn.readShort());
796                }
797                if (type == MarshallingSupport.BYTE_TYPE) {
798                    return Byte.valueOf(this.dataIn.readByte());
799                }
800                if (type == MarshallingSupport.FLOAT_TYPE) {
801                    return new Float(this.dataIn.readFloat());
802                }
803                if (type == MarshallingSupport.DOUBLE_TYPE) {
804                    return new Double(this.dataIn.readDouble());
805                }
806                if (type == MarshallingSupport.BOOLEAN_TYPE) {
807                    return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
808                }
809                if (type == MarshallingSupport.CHAR_TYPE) {
810                    return Character.valueOf(this.dataIn.readChar());
811                }
812                if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
813                    int len = this.dataIn.readInt();
814                    byte[] value = new byte[len];
815                    this.dataIn.readFully(value);
816                    return value;
817                } else {
818                    this.dataIn.reset();
819                    throw new MessageFormatException("unknown type");
820                }
821            } catch (NumberFormatException mfe) {
822                try {
823                    this.dataIn.reset();
824                } catch (IOException ioe) {
825                    throw JMSExceptionSupport.create(ioe);
826                }
827                throw mfe;
828    
829            } catch (EOFException e) {
830                JMSException jmsEx = new MessageEOFException(e.getMessage());
831                jmsEx.setLinkedException(e);
832                throw jmsEx;
833            } catch (IOException e) {
834                JMSException jmsEx = new MessageFormatException(e.getMessage());
835                jmsEx.setLinkedException(e);
836                throw jmsEx;
837            }
838        }
839    
840        /**
841         * Writes a <code>boolean</code> to the stream message. The value
842         * <code>true</code> is written as the value <code>(byte)1</code>; the
843         * value <code>false</code> is written as the value <code>(byte)0</code>.
844         * 
845         * @param value the <code>boolean</code> value to be written
846         * @throws JMSException if the JMS provider fails to write the message due
847         *                 to some internal error.
848         * @throws MessageNotWriteableException if the message is in read-only mode.
849         */
850    
851        public void writeBoolean(boolean value) throws JMSException {
852            initializeWriting();
853            try {
854                MarshallingSupport.marshalBoolean(dataOut, value);
855            } catch (IOException ioe) {
856                throw JMSExceptionSupport.create(ioe);
857            }
858        }
859    
860        /**
861         * Writes a <code>byte</code> to the stream message.
862         * 
863         * @param value the <code>byte</code> value to be written
864         * @throws JMSException if the JMS provider fails to write the message due
865         *                 to some internal error.
866         * @throws MessageNotWriteableException if the message is in read-only mode.
867         */
868    
869        public void writeByte(byte value) throws JMSException {
870            initializeWriting();
871            try {
872                MarshallingSupport.marshalByte(dataOut, value);
873            } catch (IOException ioe) {
874                throw JMSExceptionSupport.create(ioe);
875            }
876        }
877    
878        /**
879         * Writes a <code>short</code> to the stream message.
880         * 
881         * @param value the <code>short</code> value to be written
882         * @throws JMSException if the JMS provider fails to write the message due
883         *                 to some internal error.
884         * @throws MessageNotWriteableException if the message is in read-only mode.
885         */
886    
887        public void writeShort(short value) throws JMSException {
888            initializeWriting();
889            try {
890                MarshallingSupport.marshalShort(dataOut, value);
891            } catch (IOException ioe) {
892                throw JMSExceptionSupport.create(ioe);
893            }
894        }
895    
896        /**
897         * Writes a <code>char</code> to the stream message.
898         * 
899         * @param value the <code>char</code> value to be written
900         * @throws JMSException if the JMS provider fails to write the message due
901         *                 to some internal error.
902         * @throws MessageNotWriteableException if the message is in read-only mode.
903         */
904    
905        public void writeChar(char value) throws JMSException {
906            initializeWriting();
907            try {
908                MarshallingSupport.marshalChar(dataOut, value);
909            } catch (IOException ioe) {
910                throw JMSExceptionSupport.create(ioe);
911            }
912        }
913    
914        /**
915         * Writes an <code>int</code> to the stream message.
916         * 
917         * @param value the <code>int</code> value to be written
918         * @throws JMSException if the JMS provider fails to write the message due
919         *                 to some internal error.
920         * @throws MessageNotWriteableException if the message is in read-only mode.
921         */
922    
923        public void writeInt(int value) throws JMSException {
924            initializeWriting();
925            try {
926                MarshallingSupport.marshalInt(dataOut, value);
927            } catch (IOException ioe) {
928                throw JMSExceptionSupport.create(ioe);
929            }
930        }
931    
932        /**
933         * Writes a <code>long</code> to the stream message.
934         * 
935         * @param value the <code>long</code> value to be written
936         * @throws JMSException if the JMS provider fails to write the message due
937         *                 to some internal error.
938         * @throws MessageNotWriteableException if the message is in read-only mode.
939         */
940    
941        public void writeLong(long value) throws JMSException {
942            initializeWriting();
943            try {
944                MarshallingSupport.marshalLong(dataOut, value);
945            } catch (IOException ioe) {
946                throw JMSExceptionSupport.create(ioe);
947            }
948        }
949    
950        /**
951         * Writes a <code>float</code> to the stream message.
952         * 
953         * @param value the <code>float</code> value to be written
954         * @throws JMSException if the JMS provider fails to write the message due
955         *                 to some internal error.
956         * @throws MessageNotWriteableException if the message is in read-only mode.
957         */
958    
959        public void writeFloat(float value) throws JMSException {
960            initializeWriting();
961            try {
962                MarshallingSupport.marshalFloat(dataOut, value);
963            } catch (IOException ioe) {
964                throw JMSExceptionSupport.create(ioe);
965            }
966        }
967    
968        /**
969         * Writes a <code>double</code> to the stream message.
970         * 
971         * @param value the <code>double</code> value to be written
972         * @throws JMSException if the JMS provider fails to write the message due
973         *                 to some internal error.
974         * @throws MessageNotWriteableException if the message is in read-only mode.
975         */
976    
977        public void writeDouble(double value) throws JMSException {
978            initializeWriting();
979            try {
980                MarshallingSupport.marshalDouble(dataOut, value);
981            } catch (IOException ioe) {
982                throw JMSExceptionSupport.create(ioe);
983            }
984        }
985    
986        /**
987         * Writes a <code>String</code> to the stream message.
988         * 
989         * @param value the <code>String</code> value to be written
990         * @throws JMSException if the JMS provider fails to write the message due
991         *                 to some internal error.
992         * @throws MessageNotWriteableException if the message is in read-only mode.
993         */
994    
995        public void writeString(String value) throws JMSException {
996            initializeWriting();
997            try {
998                if (value == null) {
999                    MarshallingSupport.marshalNull(dataOut);
1000                } else {
1001                    MarshallingSupport.marshalString(dataOut, value);
1002                }
1003            } catch (IOException ioe) {
1004                throw JMSExceptionSupport.create(ioe);
1005            }
1006        }
1007    
1008        /**
1009         * Writes a byte array field to the stream message. <p/>
1010         * <P>
1011         * The byte array <code>value</code> is written to the message as a byte
1012         * array field. Consecutively written byte array fields are treated as two
1013         * distinct fields when the fields are read.
1014         * 
1015         * @param value the byte array value to be written
1016         * @throws JMSException if the JMS provider fails to write the message due
1017         *                 to some internal error.
1018         * @throws MessageNotWriteableException if the message is in read-only mode.
1019         */
1020    
1021        public void writeBytes(byte[] value) throws JMSException {
1022            writeBytes(value, 0, value.length);
1023        }
1024    
1025        /**
1026         * Writes a portion of a byte array as a byte array field to the stream
1027         * message. <p/>
1028         * <P>
1029         * The a portion of the byte array <code>value</code> is written to the
1030         * message as a byte array field. Consecutively written byte array fields
1031         * are treated as two distinct fields when the fields are read.
1032         * 
1033         * @param value the byte array value to be written
1034         * @param offset the initial offset within the byte array
1035         * @param length the number of bytes to use
1036         * @throws JMSException if the JMS provider fails to write the message due
1037         *                 to some internal error.
1038         * @throws MessageNotWriteableException if the message is in read-only mode.
1039         */
1040    
1041        public void writeBytes(byte[] value, int offset, int length) throws JMSException {
1042            initializeWriting();
1043            try {
1044                MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
1045            } catch (IOException ioe) {
1046                throw JMSExceptionSupport.create(ioe);
1047            }
1048        }
1049    
1050        /**
1051         * Writes an object to the stream message. <p/>
1052         * <P>
1053         * This method works only for the objectified primitive object types (<code>Integer</code>,
1054         * <code>Double</code>, <code>Long</code>&nbsp;...),
1055         * <code>String</code> objects, and byte arrays.
1056         * 
1057         * @param value the Java object to be written
1058         * @throws JMSException if the JMS provider fails to write the message due
1059         *                 to some internal error.
1060         * @throws MessageFormatException if the object is invalid.
1061         * @throws MessageNotWriteableException if the message is in read-only mode.
1062         */
1063    
1064        public void writeObject(Object value) throws JMSException {
1065            initializeWriting();
1066            if (value == null) {
1067                try {
1068                    MarshallingSupport.marshalNull(dataOut);
1069                } catch (IOException ioe) {
1070                    throw JMSExceptionSupport.create(ioe);
1071                }
1072            } else if (value instanceof String) {
1073                writeString(value.toString());
1074            } else if (value instanceof Character) {
1075                writeChar(((Character)value).charValue());
1076            } else if (value instanceof Boolean) {
1077                writeBoolean(((Boolean)value).booleanValue());
1078            } else if (value instanceof Byte) {
1079                writeByte(((Byte)value).byteValue());
1080            } else if (value instanceof Short) {
1081                writeShort(((Short)value).shortValue());
1082            } else if (value instanceof Integer) {
1083                writeInt(((Integer)value).intValue());
1084            } else if (value instanceof Float) {
1085                writeFloat(((Float)value).floatValue());
1086            } else if (value instanceof Double) {
1087                writeDouble(((Double)value).doubleValue());
1088            } else if (value instanceof byte[]) {
1089                writeBytes((byte[])value);
1090            }else if (value instanceof Long) {
1091                writeLong(((Long)value).longValue());
1092            }else {
1093                throw new MessageFormatException("Unsupported Object type: " + value.getClass());
1094            }
1095        }
1096    
1097        /**
1098         * Puts the message body in read-only mode and repositions the stream of
1099         * bytes to the beginning.
1100         * 
1101         * @throws JMSException if an internal error occurs
1102         */
1103    
1104        public void reset() throws JMSException {
1105            storeContent();
1106            this.bytesOut = null;
1107            this.dataIn = null;
1108            this.dataOut = null;
1109            this.remainingBytes = -1;
1110            setReadOnlyBody(true);
1111        }
1112    
1113        private void initializeWriting() throws MessageNotWriteableException {
1114            checkReadOnlyBody();
1115            if (this.dataOut == null) {
1116                this.bytesOut = new ByteArrayOutputStream();
1117                OutputStream os = bytesOut;
1118                ActiveMQConnection connection = getConnection();
1119                if (connection != null && connection.isUseCompression()) {
1120                    compressed = true;
1121                    os = new DeflaterOutputStream(os);
1122                }
1123                this.dataOut = new DataOutputStream(os);
1124            }
1125        }
1126    
1127        protected void checkWriteOnlyBody() throws MessageNotReadableException {
1128            if (!readOnlyBody) {
1129                throw new MessageNotReadableException("Message body is write-only");
1130            }
1131        }
1132    
1133        private void initializeReading() throws MessageNotReadableException {
1134            checkWriteOnlyBody();
1135            if (this.dataIn == null) {
1136                ByteSequence data = getContent();
1137                if (data == null) {
1138                    data = new ByteSequence(new byte[] {}, 0, 0);
1139                }
1140                InputStream is = new ByteArrayInputStream(data);
1141                if (isCompressed()) {
1142                    is = new InflaterInputStream(is);
1143                    is = new BufferedInputStream(is);
1144                }
1145                this.dataIn = new DataInputStream(is);
1146            }
1147        }
1148    
1149        public String toString() {
1150            return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
1151        }
1152    }