001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.stomp;
019    
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    import java.net.Socket;
026    import java.net.UnknownHostException;
027    import java.util.HashMap;
028    
029    public class StompConnection {
030    
031        public static final long RECEIVE_TIMEOUT = 10000;
032    
033        private Socket stompSocket;
034        private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035        
036        public void open(String host, int port) throws IOException, UnknownHostException {
037            open(new Socket(host, port));
038        }
039        
040        public void open(Socket socket) {
041            stompSocket = socket;
042        }
043    
044        public void close() throws IOException {
045            if (stompSocket != null) {
046                stompSocket.close();
047                stompSocket = null;
048            }
049        }
050    
051        public void sendFrame(String data) throws Exception {
052            byte[] bytes = data.getBytes("UTF-8");
053            OutputStream outputStream = stompSocket.getOutputStream();
054            outputStream.write(bytes);
055            outputStream.write(0);
056            outputStream.flush();
057        }
058        
059        public StompFrame receive() throws Exception {
060            return receive(RECEIVE_TIMEOUT);
061        }    
062        
063        public StompFrame receive(long timeOut) throws Exception {
064            stompSocket.setSoTimeout((int)timeOut);
065            InputStream is = stompSocket.getInputStream();
066            StompWireFormat wf = new StompWireFormat();
067            DataInputStream dis = new DataInputStream(is);
068            return (StompFrame)wf.unmarshal(dis);
069        }
070    
071        public String receiveFrame() throws Exception {
072            return receiveFrame(RECEIVE_TIMEOUT);
073        }
074    
075        public String receiveFrame(long timeOut) throws Exception {
076            stompSocket.setSoTimeout((int)timeOut);
077            InputStream is = stompSocket.getInputStream();
078            int c = 0;
079            for (;;) {
080                c = is.read();
081                if (c < 0) {
082                    throw new IOException("socket closed.");
083                } else if (c == 0) {
084                    c = is.read();
085                    if (c != '\n') {
086                        throw new IOException("Expecting stomp frame to terminate with \0\n");
087                    }
088                    byte[] ba = inputBuffer.toByteArray();
089                    inputBuffer.reset();
090                    return new String(ba, "UTF-8");
091                } else {
092                    inputBuffer.write(c);
093                }
094            }
095        }
096    
097            public Socket getStompSocket() {
098                    return stompSocket;
099            }
100    
101            public void setStompSocket(Socket stompSocket) {
102                    this.stompSocket = stompSocket;
103            }
104            
105        public void connect(String username, String password) throws Exception {
106            connect(username, password, null);
107        }
108            
109        public void connect(String username, String password, String client) throws Exception {
110            HashMap<String, String> headers = new HashMap();
111            headers.put("login", username);
112            headers.put("passcode", password);
113            if (client != null) {
114                    headers.put("client-id", client);
115            }
116            StompFrame frame = new StompFrame("CONNECT", headers);
117            sendFrame(frame.toString());
118            
119            StompFrame connect = receive();
120            if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
121                    throw new Exception ("Not connected: " + connect.getBody());
122            }
123        }
124        
125        public void disconnect() throws Exception {
126            StompFrame frame = new StompFrame("DISCONNECT");
127            sendFrame(frame.toString());            
128        }
129        
130        public void send(String destination, String message) throws Exception {
131            send(destination, message, null, null);
132        }
133        
134        public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
135            if (headers == null) {
136                    headers = new HashMap<String, String>();
137            }
138            headers.put("destination", destination);
139            if (transaction != null) {
140                    headers.put("transaction", transaction);
141            }
142            StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
143            sendFrame(frame.toString());            
144        }
145        
146        public void subscribe(String destination) throws Exception {
147            subscribe(destination, null, null);
148        }
149        
150        public void subscribe(String destination, String ack) throws Exception {
151            subscribe(destination, ack, new HashMap<String, String>());
152        }
153        
154        public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
155                    if (headers == null) {
156                            headers = new HashMap<String, String>();
157                    }
158                    headers.put("destination", destination);
159            if (ack != null) {
160                    headers.put("ack", ack);
161            }
162            StompFrame frame = new StompFrame("SUBSCRIBE", headers);
163            sendFrame(frame.toString());            
164        }
165        
166        public void unsubscribe(String destination) throws Exception {
167            unsubscribe(destination, null);
168        }
169        
170        public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
171                    if (headers == null) {
172                            headers = new HashMap<String, String>();
173                    }
174                    headers.put("destination", destination);
175            StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
176            sendFrame(frame.toString());            
177        }    
178        
179        public void begin(String transaction) throws Exception {
180            HashMap<String, String> headers = new HashMap<String, String>();
181            headers.put("transaction", transaction);
182            StompFrame frame = new StompFrame("BEGIN", headers);
183            sendFrame(frame.toString());
184        }
185        
186        public void abort(String transaction) throws Exception {
187            HashMap<String, String> headers = new HashMap<String, String>();
188            headers.put("transaction", transaction);
189            StompFrame frame = new StompFrame("ABORT", headers);
190            sendFrame(frame.toString());
191        }
192        
193        public void commit(String transaction) throws Exception {
194            HashMap<String, String> headers = new HashMap<String, String>();
195            headers.put("transaction", transaction);
196            StompFrame frame = new StompFrame("COMMIT", headers);
197            sendFrame(frame.toString());
198        }
199        
200        public void ack(StompFrame frame) throws Exception {
201            ack(frame.getHeaders().get("message-id"), null);
202        }    
203        
204        public void ack(StompFrame frame, String transaction) throws Exception {
205            ack(frame.getHeaders().get("message-id"), transaction);
206        }
207        
208        public void ack(String messageId) throws Exception {
209            ack(messageId, null);
210        }
211        
212        public void ack(String messageId, String transaction) throws Exception {
213            HashMap<String, String> headers = new HashMap<String, String>();
214            headers.put("message-id", messageId);
215            if (transaction != null)
216                    headers.put("transaction", transaction);
217            StompFrame frame = new StompFrame("ACK", headers);
218            sendFrame(frame.toString());    
219        }
220        
221        protected String appendHeaders(HashMap<String, Object> headers) {
222            StringBuffer result = new StringBuffer();
223            for (String key : headers.keySet()) {
224                    result.append(key + ":" + headers.get(key) + "\n");
225            }
226            result.append("\n");
227            return result.toString();
228        }
229    
230    }