001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.command; 019 020 import java.io.DataInputStream; 021 import java.io.DataOutputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.ObjectOutputStream; 025 import java.io.OutputStream; 026 import java.io.Serializable; 027 import java.util.zip.DeflaterOutputStream; 028 import java.util.zip.InflaterInputStream; 029 030 import javax.jms.JMSException; 031 import javax.jms.ObjectMessage; 032 033 import org.apache.activemq.ActiveMQConnection; 034 import org.apache.activemq.util.ByteArrayInputStream; 035 import org.apache.activemq.util.ByteArrayOutputStream; 036 import org.apache.activemq.util.ByteSequence; 037 import org.apache.activemq.util.ClassLoadingAwareObjectInputStream; 038 import org.apache.activemq.util.JMSExceptionSupport; 039 040 /** 041 * An <CODE>ObjectMessage</CODE> object is used to send a message that 042 * contains a serializable object in the Java programming language ("Java 043 * object"). It inherits from the <CODE>Message</CODE> interface and adds a 044 * body containing a single reference to an object. Only 045 * <CODE>Serializable</CODE> Java objects can be used. <p/> 046 * <P> 047 * If a collection of Java objects must be sent, one of the 048 * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/> 049 * <P> 050 * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only 051 * mode. If a client attempts to write to the message at this point, a 052 * <CODE>MessageNotWriteableException</CODE> is thrown. If 053 * <CODE>clearBody</CODE> is called, the message can now be both read from and 054 * written to. 055 * 056 * @openwire:marshaller code="26" 057 * @see javax.jms.Session#createObjectMessage() 058 * @see javax.jms.Session#createObjectMessage(Serializable) 059 * @see javax.jms.BytesMessage 060 * @see javax.jms.MapMessage 061 * @see javax.jms.Message 062 * @see javax.jms.StreamMessage 063 * @see javax.jms.TextMessage 064 */ 065 public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage { 066 067 // TODO: verify classloader 068 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE; 069 static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); 070 071 protected transient Serializable object; 072 073 public Message copy() { 074 ActiveMQObjectMessage copy = new ActiveMQObjectMessage(); 075 copy(copy); 076 return copy; 077 } 078 079 private void copy(ActiveMQObjectMessage copy) { 080 storeContent(); 081 super.copy(copy); 082 copy.object = null; 083 } 084 085 public void storeContent() { 086 ByteSequence bodyAsBytes = getContent(); 087 if (bodyAsBytes == null && object != null) { 088 try { 089 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 090 OutputStream os = bytesOut; 091 ActiveMQConnection connection = getConnection(); 092 if (connection != null && connection.isUseCompression()) { 093 compressed = true; 094 os = new DeflaterOutputStream(os); 095 } 096 DataOutputStream dataOut = new DataOutputStream(os); 097 ObjectOutputStream objOut = new ObjectOutputStream(dataOut); 098 objOut.writeObject(object); 099 objOut.flush(); 100 objOut.reset(); 101 objOut.close(); 102 setContent(bytesOut.toByteSequence()); 103 } catch (IOException ioe) { 104 throw new RuntimeException(ioe.getMessage(), ioe); 105 } 106 } 107 } 108 109 public byte getDataStructureType() { 110 return DATA_STRUCTURE_TYPE; 111 } 112 113 public String getJMSXMimeType() { 114 return "jms/object-message"; 115 } 116 117 /** 118 * Clears out the message body. Clearing a message's body does not clear its 119 * header values or property entries. <p/> 120 * <P> 121 * If this message body was read-only, calling this method leaves the 122 * message body in the same state as an empty body in a newly created 123 * message. 124 * 125 * @throws JMSException if the JMS provider fails to clear the message body 126 * due to some internal error. 127 */ 128 129 public void clearBody() throws JMSException { 130 super.clearBody(); 131 this.object = null; 132 } 133 134 /** 135 * Sets the serializable object containing this message's data. It is 136 * important to note that an <CODE>ObjectMessage</CODE> contains a 137 * snapshot of the object at the time <CODE>setObject()</CODE> is called; 138 * subsequent modifications of the object will have no effect on the 139 * <CODE>ObjectMessage</CODE> body. 140 * 141 * @param newObject the message's data 142 * @throws JMSException if the JMS provider fails to set the object due to 143 * some internal error. 144 * @throws javax.jms.MessageFormatException if object serialization fails. 145 * @throws javax.jms.MessageNotWriteableException if the message is in 146 * read-only mode. 147 */ 148 149 public void setObject(Serializable newObject) throws JMSException { 150 checkReadOnlyBody(); 151 this.object = newObject; 152 setContent(null); 153 ActiveMQConnection connection = getConnection(); 154 if (connection == null || !connection.isObjectMessageSerializationDefered()) { 155 storeContent(); 156 } 157 } 158 159 /** 160 * Gets the serializable object containing this message's data. The default 161 * value is null. 162 * 163 * @return the serializable object containing this message's data 164 * @throws JMSException 165 */ 166 public Serializable getObject() throws JMSException { 167 if (object == null && getContent() != null) { 168 try { 169 ByteSequence content = getContent(); 170 InputStream is = new ByteArrayInputStream(content); 171 if (isCompressed()) { 172 is = new InflaterInputStream(is); 173 } 174 DataInputStream dataIn = new DataInputStream(is); 175 ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn); 176 try { 177 object = (Serializable)objIn.readObject(); 178 } catch (ClassNotFoundException ce) { 179 throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce); 180 } finally { 181 dataIn.close(); 182 } 183 } catch (IOException e) { 184 throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e); 185 } 186 } 187 return this.object; 188 } 189 190 public void onMessageRolledBack() { 191 super.onMessageRolledBack(); 192 193 // lets force the object to be deserialized again - as we could have 194 // changed the object 195 object = null; 196 } 197 198 public String toString() { 199 try { 200 getObject(); 201 } catch (JMSException e) { 202 } 203 return super.toString(); 204 } 205 }