001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.blob; 018 019 import java.io.File; 020 import java.io.FileInputStream; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.OutputStream; 024 import java.net.HttpURLConnection; 025 import java.net.MalformedURLException; 026 import java.net.URL; 027 028 import javax.jms.JMSException; 029 030 import org.apache.activemq.command.ActiveMQBlobMessage; 031 032 /** 033 * A default implementation of {@link BlobUploadStrategy} which uses the URL 034 * class to upload files or streams to a remote URL 035 */ 036 public class DefaultBlobUploadStrategy implements BlobUploadStrategy { 037 private BlobTransferPolicy transferPolicy; 038 039 public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) { 040 this.transferPolicy = transferPolicy; 041 } 042 043 public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException { 044 return uploadStream(message, new FileInputStream(file)); 045 } 046 047 public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException { 048 URL url = createUploadURL(message); 049 050 HttpURLConnection connection = (HttpURLConnection)url.openConnection(); 051 connection.setRequestMethod("PUT"); 052 connection.setDoOutput(true); 053 054 // use chunked mode or otherwise URLConnection loads everything into 055 // memory 056 // (chunked mode not supported before JRE 1.5) 057 connection.setChunkedStreamingMode(transferPolicy.getBufferSize()); 058 059 OutputStream os = connection.getOutputStream(); 060 061 byte[] buf = new byte[transferPolicy.getBufferSize()]; 062 for (int c = fis.read(buf); c != -1; c = fis.read(buf)) { 063 os.write(buf, 0, c); 064 os.flush(); 065 } 066 os.close(); 067 fis.close(); 068 069 if (!isSuccessfulCode(connection.getResponseCode())) { 070 throw new IOException("PUT was not successful: " + connection.getResponseCode() + " " 071 + connection.getResponseMessage()); 072 } 073 074 return url; 075 } 076 077 public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException { 078 URL url = createUploadURL(message); 079 080 HttpURLConnection connection = (HttpURLConnection)url.openConnection(); 081 connection.setRequestMethod("DELETE"); 082 connection.connect(); 083 connection.disconnect(); 084 085 if (!isSuccessfulCode(connection.getResponseCode())) { 086 throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " " 087 + connection.getResponseMessage()); 088 } 089 } 090 091 private boolean isSuccessfulCode(int responseCode) { 092 return responseCode >= 200 && responseCode < 300; // 2xx => successful 093 } 094 095 protected URL createUploadURL(ActiveMQBlobMessage message) throws JMSException, MalformedURLException { 096 return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString()); 097 } 098 }