001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.http; 018 019 import java.io.DataInputStream; 020 import java.io.IOException; 021 import java.io.InterruptedIOException; 022 import java.net.URI; 023 024 import org.apache.activemq.command.ShutdownInfo; 025 import org.apache.activemq.transport.FutureResponse; 026 import org.apache.activemq.transport.util.TextWireFormat; 027 import org.apache.activemq.util.ByteArrayInputStream; 028 import org.apache.activemq.util.IOExceptionSupport; 029 import org.apache.activemq.util.IdGenerator; 030 import org.apache.activemq.util.ServiceStopper; 031 import org.apache.commons.httpclient.HttpClient; 032 import org.apache.commons.httpclient.HttpMethod; 033 import org.apache.commons.httpclient.HttpStatus; 034 import org.apache.commons.httpclient.methods.GetMethod; 035 import org.apache.commons.httpclient.methods.HeadMethod; 036 import org.apache.commons.httpclient.methods.InputStreamRequestEntity; 037 import org.apache.commons.httpclient.methods.PostMethod; 038 import org.apache.commons.httpclient.params.HttpClientParams; 039 import org.apache.commons.logging.Log; 040 import org.apache.commons.logging.LogFactory; 041 042 /** 043 * A HTTP {@link org.apache.activemq.transport.TransportChannel} which uses the 044 * <a href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a> 045 * library 046 * 047 * @version $Revision$ 048 */ 049 public class HttpClientTransport extends HttpTransportSupport { 050 051 public static final int MAX_CLIENT_TIMEOUT = 30000; 052 private static final Log LOG = LogFactory.getLog(HttpClientTransport.class); 053 private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator(); 054 055 private HttpClient sendHttpClient; 056 private HttpClient receiveHttpClient; 057 058 private final String clientID = CLIENT_ID_GENERATOR.generateId(); 059 private boolean trace; 060 private GetMethod httpMethod; 061 private volatile int receiveCounter; 062 063 public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { 064 super(wireFormat, remoteUrl); 065 } 066 067 public FutureResponse asyncRequest(Object command) throws IOException { 068 return null; 069 } 070 071 public void oneway(Object command) throws IOException { 072 073 if (isStopped()) { 074 throw new IOException("stopped."); 075 } 076 PostMethod httpMethod = new PostMethod(getRemoteUrl().toString()); 077 configureMethod(httpMethod); 078 String data = getTextWireFormat().marshalText(command); 079 byte[] bytes = data.getBytes("UTF-8"); 080 InputStreamRequestEntity entity = new InputStreamRequestEntity(new ByteArrayInputStream(bytes)); 081 httpMethod.setRequestEntity(entity); 082 083 try { 084 085 HttpClient client = getSendHttpClient(); 086 HttpClientParams params = new HttpClientParams(); 087 params.setSoTimeout(MAX_CLIENT_TIMEOUT); 088 client.setParams(params); 089 int answer = client.executeMethod(httpMethod); 090 if (answer != HttpStatus.SC_OK) { 091 throw new IOException("Failed to post command: " + command + " as response was: " + answer); 092 } 093 if (command instanceof ShutdownInfo) { 094 try { 095 stop(); 096 } catch (Exception e) { 097 LOG.warn("Error trying to stop HTTP client: "+ e, e); 098 } 099 } 100 } catch (IOException e) { 101 throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); 102 } finally { 103 httpMethod.getResponseBody(); 104 httpMethod.releaseConnection(); 105 } 106 } 107 108 public Object request(Object command) throws IOException { 109 return null; 110 } 111 112 public void run() { 113 114 LOG.trace("HTTP GET consumer thread starting: " + this); 115 HttpClient httpClient = getReceiveHttpClient(); 116 URI remoteUrl = getRemoteUrl(); 117 118 while (!isStopped() && !isStopping()) { 119 120 httpMethod = new GetMethod(remoteUrl.toString()); 121 configureMethod(httpMethod); 122 123 try { 124 int answer = httpClient.executeMethod(httpMethod); 125 if (answer != HttpStatus.SC_OK) { 126 if (answer == HttpStatus.SC_REQUEST_TIMEOUT) { 127 LOG.debug("GET timed out"); 128 try { 129 Thread.sleep(1000); 130 } catch (InterruptedException e) { 131 onException(new InterruptedIOException()); 132 break; 133 } 134 } else { 135 onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer)); 136 break; 137 } 138 } else { 139 receiveCounter++; 140 DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream()); 141 Object command = (Object)getTextWireFormat().unmarshal(stream); 142 if (command == null) { 143 LOG.debug("Received null command from url: " + remoteUrl); 144 } else { 145 doConsume(command); 146 } 147 } 148 } catch (IOException e) { 149 onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e)); 150 break; 151 } finally { 152 httpMethod.releaseConnection(); 153 } 154 } 155 } 156 157 // Properties 158 // ------------------------------------------------------------------------- 159 public HttpClient getSendHttpClient() { 160 if (sendHttpClient == null) { 161 sendHttpClient = createHttpClient(); 162 } 163 return sendHttpClient; 164 } 165 166 public void setSendHttpClient(HttpClient sendHttpClient) { 167 this.sendHttpClient = sendHttpClient; 168 } 169 170 public HttpClient getReceiveHttpClient() { 171 if (receiveHttpClient == null) { 172 receiveHttpClient = createHttpClient(); 173 } 174 return receiveHttpClient; 175 } 176 177 public void setReceiveHttpClient(HttpClient receiveHttpClient) { 178 this.receiveHttpClient = receiveHttpClient; 179 } 180 181 // Implementation methods 182 // ------------------------------------------------------------------------- 183 protected void doStart() throws Exception { 184 185 LOG.trace("HTTP GET consumer thread starting: " + this); 186 HttpClient httpClient = getReceiveHttpClient(); 187 URI remoteUrl = getRemoteUrl(); 188 189 HeadMethod httpMethod = new HeadMethod(remoteUrl.toString()); 190 configureMethod(httpMethod); 191 192 int answer = httpClient.executeMethod(httpMethod); 193 if (answer != HttpStatus.SC_OK) { 194 throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer); 195 } 196 197 super.doStart(); 198 } 199 200 protected void doStop(ServiceStopper stopper) throws Exception { 201 if (httpMethod != null) { 202 httpMethod.abort(); 203 } 204 } 205 206 protected HttpClient createHttpClient() { 207 HttpClient client = new HttpClient(); 208 if (getProxyHost() != null) { 209 client.getHostConfiguration().setProxy(getProxyHost(), getProxyPort()); 210 } 211 return client; 212 } 213 214 protected void configureMethod(HttpMethod method) { 215 method.setRequestHeader("clientID", clientID); 216 } 217 218 public boolean isTrace() { 219 return trace; 220 } 221 222 public void setTrace(boolean trace) { 223 this.trace = trace; 224 } 225 226 public int getReceiveCounter() { 227 return receiveCounter; 228 } 229 230 }