001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.transport.stomp; 019 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.OutputStream; 025 import java.net.Socket; 026 import java.net.UnknownHostException; 027 import java.util.HashMap; 028 029 public class StompConnection { 030 031 public static final long RECEIVE_TIMEOUT = 10000; 032 033 private Socket stompSocket; 034 private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream(); 035 036 public void open(String host, int port) throws IOException, UnknownHostException { 037 open(new Socket(host, port)); 038 } 039 040 public void open(Socket socket) { 041 stompSocket = socket; 042 } 043 044 public void close() throws IOException { 045 if (stompSocket != null) { 046 stompSocket.close(); 047 stompSocket = null; 048 } 049 } 050 051 public void sendFrame(String data) throws Exception { 052 byte[] bytes = data.getBytes("UTF-8"); 053 OutputStream outputStream = stompSocket.getOutputStream(); 054 outputStream.write(bytes); 055 outputStream.write(0); 056 outputStream.flush(); 057 } 058 059 public StompFrame receive() throws Exception { 060 return receive(RECEIVE_TIMEOUT); 061 } 062 063 public StompFrame receive(long timeOut) throws Exception { 064 stompSocket.setSoTimeout((int)timeOut); 065 InputStream is = stompSocket.getInputStream(); 066 StompWireFormat wf = new StompWireFormat(); 067 DataInputStream dis = new DataInputStream(is); 068 return (StompFrame)wf.unmarshal(dis); 069 } 070 071 public String receiveFrame() throws Exception { 072 return receiveFrame(RECEIVE_TIMEOUT); 073 } 074 075 public String receiveFrame(long timeOut) throws Exception { 076 stompSocket.setSoTimeout((int)timeOut); 077 InputStream is = stompSocket.getInputStream(); 078 int c = 0; 079 for (;;) { 080 c = is.read(); 081 if (c < 0) { 082 throw new IOException("socket closed."); 083 } else if (c == 0) { 084 c = is.read(); 085 if (c != '\n') { 086 throw new IOException("Expecting stomp frame to terminate with \0\n"); 087 } 088 byte[] ba = inputBuffer.toByteArray(); 089 inputBuffer.reset(); 090 return new String(ba, "UTF-8"); 091 } else { 092 inputBuffer.write(c); 093 } 094 } 095 } 096 097 public Socket getStompSocket() { 098 return stompSocket; 099 } 100 101 public void setStompSocket(Socket stompSocket) { 102 this.stompSocket = stompSocket; 103 } 104 105 public void connect(String username, String password) throws Exception { 106 connect(username, password, null); 107 } 108 109 public void connect(String username, String password, String client) throws Exception { 110 HashMap<String, String> headers = new HashMap(); 111 headers.put("login", username); 112 headers.put("passcode", password); 113 if (client != null) { 114 headers.put("client-id", client); 115 } 116 StompFrame frame = new StompFrame("CONNECT", headers); 117 sendFrame(frame.toString()); 118 119 StompFrame connect = receive(); 120 if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) { 121 throw new Exception ("Not connected: " + connect.getBody()); 122 } 123 } 124 125 public void disconnect() throws Exception { 126 StompFrame frame = new StompFrame("DISCONNECT"); 127 sendFrame(frame.toString()); 128 } 129 130 public void send(String destination, String message) throws Exception { 131 send(destination, message, null, null); 132 } 133 134 public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception { 135 if (headers == null) { 136 headers = new HashMap<String, String>(); 137 } 138 headers.put("destination", destination); 139 if (transaction != null) { 140 headers.put("transaction", transaction); 141 } 142 StompFrame frame = new StompFrame("SEND", headers, message.getBytes()); 143 sendFrame(frame.toString()); 144 } 145 146 public void subscribe(String destination) throws Exception { 147 subscribe(destination, null, null); 148 } 149 150 public void subscribe(String destination, String ack) throws Exception { 151 subscribe(destination, ack, new HashMap<String, String>()); 152 } 153 154 public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception { 155 if (headers == null) { 156 headers = new HashMap<String, String>(); 157 } 158 headers.put("destination", destination); 159 if (ack != null) { 160 headers.put("ack", ack); 161 } 162 StompFrame frame = new StompFrame("SUBSCRIBE", headers); 163 sendFrame(frame.toString()); 164 } 165 166 public void unsubscribe(String destination) throws Exception { 167 unsubscribe(destination, null); 168 } 169 170 public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception { 171 if (headers == null) { 172 headers = new HashMap<String, String>(); 173 } 174 headers.put("destination", destination); 175 StompFrame frame = new StompFrame("UNSUBSCRIBE", headers); 176 sendFrame(frame.toString()); 177 } 178 179 public void begin(String transaction) throws Exception { 180 HashMap<String, String> headers = new HashMap<String, String>(); 181 headers.put("transaction", transaction); 182 StompFrame frame = new StompFrame("BEGIN", headers); 183 sendFrame(frame.toString()); 184 } 185 186 public void abort(String transaction) throws Exception { 187 HashMap<String, String> headers = new HashMap<String, String>(); 188 headers.put("transaction", transaction); 189 StompFrame frame = new StompFrame("ABORT", headers); 190 sendFrame(frame.toString()); 191 } 192 193 public void commit(String transaction) throws Exception { 194 HashMap<String, String> headers = new HashMap<String, String>(); 195 headers.put("transaction", transaction); 196 StompFrame frame = new StompFrame("COMMIT", headers); 197 sendFrame(frame.toString()); 198 } 199 200 public void ack(StompFrame frame) throws Exception { 201 ack(frame.getHeaders().get("message-id"), null); 202 } 203 204 public void ack(StompFrame frame, String transaction) throws Exception { 205 ack(frame.getHeaders().get("message-id"), transaction); 206 } 207 208 public void ack(String messageId) throws Exception { 209 ack(messageId, null); 210 } 211 212 public void ack(String messageId, String transaction) throws Exception { 213 HashMap<String, String> headers = new HashMap<String, String>(); 214 headers.put("message-id", messageId); 215 if (transaction != null) 216 headers.put("transaction", transaction); 217 StompFrame frame = new StompFrame("ACK", headers); 218 sendFrame(frame.toString()); 219 } 220 221 protected String appendHeaders(HashMap<String, Object> headers) { 222 StringBuffer result = new StringBuffer(); 223 for (String key : headers.keySet()) { 224 result.append(key + ":" + headers.get(key) + "\n"); 225 } 226 result.append("\n"); 227 return result.toString(); 228 } 229 230 }