001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.http; 018 019 import java.io.IOException; 020 import java.io.InputStream; 021 import java.io.OutputStreamWriter; 022 import java.io.Writer; 023 import java.net.HttpURLConnection; 024 import java.net.MalformedURLException; 025 import java.net.URI; 026 import java.net.URL; 027 028 import org.apache.activemq.command.Command; 029 import org.apache.activemq.command.ConnectionInfo; 030 import org.apache.activemq.transport.util.TextWireFormat; 031 import org.apache.activemq.util.ByteArrayOutputStream; 032 import org.apache.activemq.util.ByteSequence; 033 import org.apache.activemq.util.Callback; 034 import org.apache.activemq.util.IOExceptionSupport; 035 import org.apache.activemq.util.ServiceStopper; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * @version $Revision$ 041 */ 042 public class HttpTransport extends HttpTransportSupport { 043 044 private static final Log LOG = LogFactory.getLog(HttpTransport.class); 045 046 private HttpURLConnection sendConnection; 047 private HttpURLConnection receiveConnection; 048 private URL url; 049 private String clientID; 050 private volatile int receiveCounter; 051 052 // private String sessionID; 053 054 public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException { 055 super(wireFormat, remoteUrl); 056 url = new URL(remoteUrl.toString()); 057 } 058 059 public void oneway(Object o) throws IOException { 060 final Command command = (Command)o; 061 try { 062 if (command.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) { 063 boolean startGetThread = clientID == null; 064 clientID = ((ConnectionInfo)command).getClientId(); 065 if (startGetThread && isStarted()) { 066 try { 067 super.doStart(); 068 } catch (Exception e) { 069 throw IOExceptionSupport.create(e); 070 } 071 } 072 } 073 074 HttpURLConnection connection = getSendConnection(); 075 String text = getTextWireFormat().marshalText(command); 076 Writer writer = new OutputStreamWriter(connection.getOutputStream()); 077 writer.write(text); 078 writer.flush(); 079 int answer = connection.getResponseCode(); 080 if (answer != HttpURLConnection.HTTP_OK) { 081 throw new IOException("Failed to post command: " + command + " as response was: " + answer); 082 } 083 // checkSession(connection); 084 } catch (IOException e) { 085 throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); 086 } 087 } 088 089 public void run() { 090 LOG.trace("HTTP GET consumer thread starting for transport: " + this); 091 URI remoteUrl = getRemoteUrl(); 092 while (!isStopped()) { 093 try { 094 HttpURLConnection connection = getReceiveConnection(); 095 int answer = connection.getResponseCode(); 096 if (answer != HttpURLConnection.HTTP_OK) { 097 if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) { 098 LOG.trace("GET timed out"); 099 } else { 100 LOG.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer); 101 } 102 } else { 103 // checkSession(connection); 104 105 // Create a String for the UTF content 106 receiveCounter++; 107 InputStream is = connection.getInputStream(); 108 ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength() > 0 ? connection.getContentLength() : 1024); 109 int c = 0; 110 while ((c = is.read()) >= 0) { 111 baos.write(c); 112 } 113 ByteSequence sequence = baos.toByteSequence(); 114 String data = new String(sequence.data, sequence.offset, sequence.length, "UTF-8"); 115 116 Command command = (Command)getTextWireFormat().unmarshalText(data); 117 118 if (command == null) { 119 LOG.warn("Received null packet from url: " + remoteUrl); 120 } else { 121 doConsume(command); 122 } 123 } 124 } catch (Throwable e) { 125 if (!isStopped()) { 126 LOG.error("Failed to perform GET on: " + remoteUrl + " due to: " + e, e); 127 } else { 128 LOG.trace("Caught error after closed: " + e, e); 129 } 130 } finally { 131 safeClose(receiveConnection); 132 receiveConnection = null; 133 } 134 } 135 } 136 137 // Implementation methods 138 // ------------------------------------------------------------------------- 139 protected HttpURLConnection createSendConnection() throws IOException { 140 HttpURLConnection conn = (HttpURLConnection)getRemoteURL().openConnection(); 141 conn.setDoOutput(true); 142 conn.setRequestMethod("POST"); 143 configureConnection(conn); 144 conn.connect(); 145 return conn; 146 } 147 148 protected HttpURLConnection createReceiveConnection() throws IOException { 149 HttpURLConnection conn = (HttpURLConnection)getRemoteURL().openConnection(); 150 conn.setDoOutput(false); 151 conn.setDoInput(true); 152 conn.setRequestMethod("GET"); 153 configureConnection(conn); 154 conn.connect(); 155 return conn; 156 } 157 158 // protected void checkSession(HttpURLConnection connection) 159 // { 160 // String set_cookie=connection.getHeaderField("Set-Cookie"); 161 // if (set_cookie!=null && set_cookie.startsWith("JSESSIONID=")) 162 // { 163 // String[] bits=set_cookie.split("[=;]"); 164 // sessionID=bits[1]; 165 // } 166 // } 167 168 protected void configureConnection(HttpURLConnection connection) { 169 // if (sessionID !=null) { 170 // connection.addRequestProperty("Cookie", "JSESSIONID="+sessionID); 171 // } 172 // else 173 if (clientID != null) { 174 connection.setRequestProperty("clientID", clientID); 175 } 176 } 177 178 protected URL getRemoteURL() { 179 return url; 180 } 181 182 protected HttpURLConnection getSendConnection() throws IOException { 183 setSendConnection(createSendConnection()); 184 return sendConnection; 185 } 186 187 protected HttpURLConnection getReceiveConnection() throws IOException { 188 setReceiveConnection(createReceiveConnection()); 189 return receiveConnection; 190 } 191 192 protected void setSendConnection(HttpURLConnection conn) { 193 safeClose(sendConnection); 194 sendConnection = conn; 195 } 196 197 protected void setReceiveConnection(HttpURLConnection conn) { 198 safeClose(receiveConnection); 199 receiveConnection = conn; 200 } 201 202 protected void doStart() throws Exception { 203 // Don't start the background thread until the clientId has been 204 // established. 205 if (clientID != null) { 206 super.doStart(); 207 } 208 } 209 210 protected void doStop(ServiceStopper stopper) throws Exception { 211 stopper.run(new Callback() { 212 public void execute() throws Exception { 213 safeClose(sendConnection); 214 } 215 }); 216 sendConnection = null; 217 stopper.run(new Callback() { 218 public void execute() { 219 safeClose(receiveConnection); 220 } 221 }); 222 } 223 224 /** 225 * @param connection TODO 226 */ 227 private void safeClose(HttpURLConnection connection) { 228 if (connection != null) { 229 connection.disconnect(); 230 } 231 } 232 233 public int getReceiveCounter() { 234 return receiveCounter; 235 } 236 237 }