001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.xmpp; 018 019 import java.io.DataInput; 020 import java.io.DataInputStream; 021 import java.io.DataOutput; 022 import java.io.DataOutputStream; 023 import java.io.IOException; 024 025 import org.apache.activemq.util.ByteArrayInputStream; 026 import org.apache.activemq.util.ByteArrayOutputStream; 027 import org.apache.activemq.util.ByteSequence; 028 import org.apache.activemq.wireformat.WireFormat; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 /** 033 * A wire format which uses XMPP format of messages 034 * 035 * @version $Revision: 742888 $ 036 */ 037 public class XmppWireFormat implements WireFormat { 038 039 private int version = 1; 040 041 public WireFormat copy() { 042 return new XmppWireFormat(); 043 } 044 045 /* 046 public Packet readPacket(DataInput in) throws IOException { 047 return null; 048 } 049 050 public Packet readPacket(int firstByte, DataInput in) throws IOException { 051 return null; 052 } 053 054 public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException { 055 switch (packet.getPacketType()) { 056 case Packet.ACTIVEMQ_MESSAGE: 057 writeMessage((ActiveMQMessage) packet, "", out); 058 break; 059 060 case Packet.ACTIVEMQ_TEXT_MESSAGE: 061 writeTextMessage((ActiveMQTextMessage) packet, out); 062 break; 063 064 case Packet.ACTIVEMQ_BYTES_MESSAGE: 065 writeBytesMessage((ActiveMQBytesMessage) packet, out); 066 break; 067 068 case Packet.ACTIVEMQ_OBJECT_MESSAGE: 069 writeObjectMessage((ActiveMQObjectMessage) packet, out); 070 break; 071 072 case Packet.ACTIVEMQ_MAP_MESSAGE: 073 case Packet.ACTIVEMQ_STREAM_MESSAGE: 074 075 076 case Packet.ACTIVEMQ_BROKER_INFO: 077 case Packet.ACTIVEMQ_CONNECTION_INFO: 078 case Packet.ACTIVEMQ_MSG_ACK: 079 case Packet.CONSUMER_INFO: 080 case Packet.DURABLE_UNSUBSCRIBE: 081 case Packet.INT_RESPONSE_RECEIPT_INFO: 082 case Packet.PRODUCER_INFO: 083 case Packet.RECEIPT_INFO: 084 case Packet.RESPONSE_RECEIPT_INFO: 085 case Packet.SESSION_INFO: 086 case Packet.TRANSACTION_INFO: 087 case Packet.XA_TRANSACTION_INFO: 088 default: 089 log.warn("Ignoring message type: " + packet.getPacketType() + " packet: " + packet); 090 } 091 return null; 092 } 093 */ 094 095 // /** 096 // * Can this wireformat process packets of this version 097 // * @param version the version number to test 098 // * @return true if can accept the version 099 // */ 100 // public boolean canProcessWireFormatVersion(int version){ 101 // return true; 102 // } 103 // 104 // /** 105 // * @return the current version of this wire format 106 // */ 107 // public int getCurrentWireFormatVersion(){ 108 // return 1; 109 // } 110 // 111 // // Implementation methods 112 // //------------------------------------------------------------------------- 113 // protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException { 114 // Serializable object = message.getObject(); 115 // String text = (object != null) ? object.toString() : ""; 116 // writeMessage(message, text, out); 117 // } 118 // 119 // protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException { 120 // writeMessage(message, message.getText(), out); 121 // } 122 // 123 // protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException { 124 // ByteArray data = message.getBodyAsBytes(); 125 // String text = encodeBinary(data.getBuf(),data.getOffset(),data.getLength()); 126 // writeMessage(message, text, out); 127 // } 128 // 129 // protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException { 130 // String type = getXmppType(message); 131 // 132 // StringBuffer buffer = new StringBuffer("<"); 133 // buffer.append(type); 134 // buffer.append(" to='"); 135 // buffer.append(message.getJMSDestination().toString()); 136 // buffer.append("' from='"); 137 // buffer.append(message.getJMSReplyTo().toString()); 138 // String messageID = message.getJMSMessageID(); 139 // if (messageID != null) { 140 // buffer.append("' id='"); 141 // buffer.append(messageID); 142 // } 143 // 144 // HashMap properties = message.getProperties(); 145 // if (properties != null) { 146 // for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) { 147 // Map.Entry entry = (Map.Entry) iter.next(); 148 // Object key = entry.getKey(); 149 // Object value = entry.getValue(); 150 // if (value != null) { 151 // buffer.append("' "); 152 // buffer.append(key.toString()); 153 // buffer.append("='"); 154 // buffer.append(value.toString()); 155 // } 156 // } 157 // } 158 // 159 // buffer.append("'>"); 160 // 161 // String id = message.getJMSCorrelationID(); 162 // if (id != null) { 163 // buffer.append("<thread>"); 164 // buffer.append(id); 165 // buffer.append("</thread>"); 166 // } 167 // buffer.append(body); 168 // buffer.append("</"); 169 // buffer.append(type); 170 // buffer.append(">"); 171 // 172 // out.write(buffer.toString().getBytes()); 173 // } 174 // 175 // protected String encodeBinary(byte[] data,int offset,int length) { 176 // // TODO 177 // throw new RuntimeException("Not implemented yet!"); 178 // } 179 // 180 // protected String getXmppType(ActiveMQMessage message) { 181 // String type = message.getJMSType(); 182 // if (type == null) { 183 // type = "message"; 184 // } 185 // return type; 186 // } 187 188 189 public ByteSequence marshal(Object command) throws IOException { 190 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 191 DataOutputStream dos = new DataOutputStream(baos); 192 marshal(command, dos); 193 dos.close(); 194 return baos.toByteSequence(); 195 } 196 197 public Object unmarshal(ByteSequence packet) throws IOException { 198 ByteArrayInputStream stream = new ByteArrayInputStream(packet); 199 DataInputStream dis = new DataInputStream(stream); 200 return unmarshal(dis); 201 } 202 203 public void marshal(Object object, DataOutput dataOutput) throws IOException { 204 /** TODO */ 205 } 206 207 public Object unmarshal(DataInput dataInput) throws IOException { 208 return null; /** TODO */ 209 } 210 211 212 public int getVersion() { 213 return version; 214 } 215 216 public void setVersion(int version) { 217 this.version = version; 218 } 219 220 public boolean inReceive() { 221 // TODO Implement for inactivity monitor 222 return false; 223 } 224 }