001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.blob;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.FilterInputStream;
022    import java.net.ConnectException;
023    import java.net.URL;
024    
025    import javax.jms.JMSException;
026    
027    import org.apache.activemq.command.ActiveMQBlobMessage;
028    import org.apache.commons.net.ftp.FTPClient;
029    
030    /**
031     * A FTP implementation for {@link BlobDownloadStrategy}.
032     */
033    public class FTPBlobDownloadStrategy implements BlobDownloadStrategy {
034        private String ftpUser;
035        private String ftpPass;
036    
037        public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
038            URL url = message.getURL();
039    
040            setUserInformation(url.getUserInfo());
041            String connectUrl = url.getHost();
042            int port = url.getPort() < 1 ? 21 : url.getPort();
043    
044            final FTPClient ftp = new FTPClient();
045            try {
046                ftp.connect(connectUrl, port);
047            } catch(ConnectException e) {
048                throw new JMSException("Problem connecting the FTP-server");
049            }
050    
051            if(!ftp.login(ftpUser, ftpPass)) {
052                ftp.quit();
053                ftp.disconnect();
054                throw new JMSException("Cant Authentificate to FTP-Server");
055            }
056            String path = url.getPath();
057            String workingDir = path.substring(0, path.lastIndexOf("/"));
058            String file = path.substring(path.lastIndexOf("/")+1);
059    
060            ftp.changeWorkingDirectory(workingDir);
061            ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
062    
063            InputStream input = new FilterInputStream(ftp.retrieveFileStream(file)) {
064    
065                public void close() throws IOException {
066                    in.close();
067                    ftp.quit();
068                    ftp.disconnect();
069                }
070            };
071    
072            return input;
073        }
074    
075        private void setUserInformation(String userInfo) {
076            if(userInfo != null) {
077                String[] userPass = userInfo.split(":");
078                if(userPass.length > 0) this.ftpUser = userPass[0];
079                if(userPass.length > 1) this.ftpPass = userPass[1];
080            } else {
081                this.ftpUser = "anonymous";
082                this.ftpPass = "anonymous";
083            }
084        }
085    
086    }