001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.udp; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.net.UnknownHostException; 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import org.apache.activemq.openwire.OpenWireFormat; 027 import org.apache.activemq.transport.CommandJoiner; 028 import org.apache.activemq.transport.InactivityMonitor; 029 import org.apache.activemq.transport.Transport; 030 import org.apache.activemq.transport.TransportFactory; 031 import org.apache.activemq.transport.TransportLoggerFactory; 032 import org.apache.activemq.transport.TransportServer; 033 import org.apache.activemq.transport.reliable.DefaultReplayStrategy; 034 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; 035 import org.apache.activemq.transport.reliable.ReliableTransport; 036 import org.apache.activemq.transport.reliable.ReplayStrategy; 037 import org.apache.activemq.transport.reliable.Replayer; 038 import org.apache.activemq.transport.tcp.TcpTransportFactory; 039 import org.apache.activemq.util.IOExceptionSupport; 040 import org.apache.activemq.util.IntrospectionSupport; 041 import org.apache.activemq.util.URISupport; 042 import org.apache.activemq.wireformat.WireFormat; 043 import org.apache.commons.logging.Log; 044 import org.apache.commons.logging.LogFactory; 045 046 /** 047 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 048 * @version $Revision: 777806 $ 049 */ 050 public class UdpTransportFactory extends TransportFactory { 051 052 private static final Log log = LogFactory.getLog(TcpTransportFactory.class); 053 054 public TransportServer doBind(final URI location) throws IOException { 055 try { 056 Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location)); 057 if (options.containsKey("port")) { 058 throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax"); 059 } 060 WireFormat wf = createWireFormat(options); 061 int port = location.getPort(); 062 OpenWireFormat openWireFormat = asOpenWireFormat(wf); 063 UdpTransport transport = (UdpTransport) createTransport(location, wf); 064 065 Transport configuredTransport = configure(transport, wf, options, true); 066 UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy()); 067 return server; 068 } catch (URISyntaxException e) { 069 throw IOExceptionSupport.create(e); 070 } catch (Exception e) { 071 throw IOExceptionSupport.create(e); 072 } 073 } 074 075 public Transport configure(Transport transport, WireFormat format, Map options) throws Exception { 076 return configure(transport, format, options, false); 077 } 078 079 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { 080 IntrospectionSupport.setProperties(transport, options); 081 final UdpTransport udpTransport = (UdpTransport)transport; 082 083 // deal with fragmentation 084 transport = new CommandJoiner(transport, asOpenWireFormat(format)); 085 086 if (udpTransport.isTrace()) { 087 try { 088 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport); 089 } catch (Throwable e) { 090 log.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName + ", reason: " + e, e); 091 } 092 } 093 094 transport = new InactivityMonitor(transport, format); 095 096 if (format instanceof OpenWireFormat) { 097 transport = configureClientSideNegotiator(transport, format, udpTransport); 098 } 099 100 return transport; 101 } 102 103 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { 104 OpenWireFormat wireFormat = asOpenWireFormat(wf); 105 return new UdpTransport(wireFormat, location); 106 } 107 108 /** 109 * Configures the transport 110 * 111 * @param acceptServer true if this transport is used purely as an 'accept' 112 * transport for new connections which work like TCP 113 * SocketServers where new connections spin up a new separate 114 * UDP transport 115 */ 116 protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception { 117 IntrospectionSupport.setProperties(transport, options); 118 UdpTransport udpTransport = (UdpTransport)transport; 119 120 OpenWireFormat openWireFormat = asOpenWireFormat(format); 121 122 if (udpTransport.isTrace()) { 123 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport); 124 } 125 126 transport = new InactivityMonitor(transport, format); 127 128 if (!acceptServer && format instanceof OpenWireFormat) { 129 transport = configureClientSideNegotiator(transport, format, udpTransport); 130 } 131 132 // deal with fragmentation 133 134 if (acceptServer) { 135 // lets not support a buffer of messages to enable reliable 136 // messaging on the 'accept server' transport 137 udpTransport.setReplayEnabled(false); 138 139 // we don't want to do reliable checks on this transport as we 140 // delegate to one that does 141 transport = new CommandJoiner(transport, openWireFormat); 142 return transport; 143 } else { 144 ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport); 145 Replayer replayer = reliableTransport.getReplayer(); 146 reliableTransport.setReplayStrategy(createReplayStrategy(replayer)); 147 148 // Joiner must be on outside as the inbound messages must be 149 // processed by the reliable transport first 150 return new CommandJoiner(reliableTransport, openWireFormat); 151 } 152 } 153 154 protected ReplayStrategy createReplayStrategy(Replayer replayer) { 155 if (replayer != null) { 156 return new DefaultReplayStrategy(5); 157 } 158 return new ExceptionIfDroppedReplayStrategy(1); 159 } 160 161 protected ReplayStrategy createReplayStrategy() { 162 return new DefaultReplayStrategy(5); 163 } 164 165 protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) { 166 return new ResponseRedirectInterceptor(transport, udpTransport); 167 } 168 169 protected OpenWireFormat asOpenWireFormat(WireFormat wf) { 170 OpenWireFormat answer = (OpenWireFormat)wf; 171 return answer; 172 } 173 }