001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import java.io.IOException;
020    import java.net.InetAddress;
021    import java.net.InetSocketAddress;
022    import java.net.ServerSocket;
023    import java.net.Socket;
024    import java.net.SocketException;
025    import java.net.SocketTimeoutException;
026    import java.net.URI;
027    import java.net.URISyntaxException;
028    import java.net.UnknownHostException;
029    import java.util.HashMap;
030    import java.util.Map;
031    import java.util.concurrent.BlockingQueue;
032    import java.util.concurrent.LinkedBlockingQueue;
033    import java.util.concurrent.TimeUnit;
034    
035    import javax.net.ServerSocketFactory;
036    
037    import org.apache.activemq.Service;
038    import org.apache.activemq.ThreadPriorities;
039    import org.apache.activemq.command.BrokerInfo;
040    import org.apache.activemq.openwire.OpenWireFormatFactory;
041    import org.apache.activemq.transport.Transport;
042    import org.apache.activemq.transport.TransportLoggerFactory;
043    import org.apache.activemq.transport.TransportServer;
044    import org.apache.activemq.transport.TransportServerThreadSupport;
045    import org.apache.activemq.util.IOExceptionSupport;
046    import org.apache.activemq.util.IntrospectionSupport;
047    import org.apache.activemq.util.ServiceListener;
048    import org.apache.activemq.util.ServiceStopper;
049    import org.apache.activemq.util.ServiceSupport;
050    import org.apache.activemq.wireformat.WireFormat;
051    import org.apache.activemq.wireformat.WireFormatFactory;
052    import org.apache.commons.logging.Log;
053    import org.apache.commons.logging.LogFactory;
054    
055    /**
056     * A TCP based implementation of {@link TransportServer}
057     * 
058     * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
059     * @version $Revision: 1.1 $
060     */
061    
062    public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
063    
064        private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
065        protected ServerSocket serverSocket;
066        protected int backlog = 5000;
067        protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
068        protected final TcpTransportFactory transportFactory;
069        protected long maxInactivityDuration = 30000;
070        protected long maxInactivityDurationInitalDelay = 10000;
071        protected int minmumWireFormatVersion;
072        protected boolean useQueueForAccept=true;
073           
074        /**
075         * trace=true -> the Transport stack where this TcpTransport
076         * object will be, will have a TransportLogger layer
077         * trace=false -> the Transport stack where this TcpTransport
078         * object will be, will NOT have a TransportLogger layer, and therefore
079         * will never be able to print logging messages.
080         * This parameter is most probably set in Connection or TransportConnector URIs.
081         */
082        protected boolean trace = false;
083    
084        protected int soTimeout = 0;
085        protected int socketBufferSize = 64 * 1024;
086        protected int connectionTimeout =  30000;
087    
088        /**
089         * Name of the LogWriter implementation to use.
090         * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
091         * This parameter is most probably set in Connection or TransportConnector URIs.
092         */
093        protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
094        /**
095         * Specifies if the TransportLogger will be manageable by JMX or not.
096         * Also, as long as there is at least 1 TransportLogger which is manageable,
097         * a TransportLoggerControl MBean will me created.
098         */
099        protected boolean dynamicManagement = false;
100        /**
101         * startLogging=true -> the TransportLogger object of the Transport stack
102         * will initially write messages to the log.
103         * startLogging=false -> the TransportLogger object of the Transport stack
104         * will initially NOT write messages to the log.
105         * This parameter only has an effect if trace == true.
106         * This parameter is most probably set in Connection or TransportConnector URIs.
107         */
108        protected boolean startLogging = true;
109        protected Map<String, Object> transportOptions;
110        protected final ServerSocketFactory serverSocketFactory;
111        protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
112        protected Thread socketHandlerThread;
113        /**
114         * The maximum number of sockets allowed for this server
115         */
116        protected int maximumConnections = Integer.MAX_VALUE;
117        protected int currentTransportCount=0;
118      
119        public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
120            super(location);
121            this.transportFactory = transportFactory;
122            this.serverSocketFactory = serverSocketFactory;
123            
124        }
125    
126        public void bind() throws IOException {
127            URI bind = getBindLocation();
128    
129            String host = bind.getHost();
130            host = (host == null || host.length() == 0) ? "localhost" : host;
131            InetAddress addr = InetAddress.getByName(host);
132    
133            try {
134    
135                this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
136                configureServerSocket(this.serverSocket);
137                
138            } catch (IOException e) {
139                throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
140            }
141            try {
142                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
143                    .getFragment()));
144            } catch (URISyntaxException e) {
145    
146                // it could be that the host name contains invalid characters such
147                // as _ on unix platforms
148                // so lets try use the IP address instead
149                try {
150                    setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
151                } catch (URISyntaxException e2) {
152                    throw IOExceptionSupport.create(e2);
153                }
154            }
155        }
156    
157        private void configureServerSocket(ServerSocket socket) throws SocketException {
158            socket.setSoTimeout(2000);
159            if (transportOptions != null) {
160                IntrospectionSupport.setProperties(socket, transportOptions);
161            }
162        }
163    
164        /**
165         * @return Returns the wireFormatFactory.
166         */
167        public WireFormatFactory getWireFormatFactory() {
168            return wireFormatFactory;
169        }
170    
171        /**
172         * @param wireFormatFactory The wireFormatFactory to set.
173         */
174        public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
175            this.wireFormatFactory = wireFormatFactory;
176        }
177    
178        /**
179         * Associates a broker info with the transport server so that the transport
180         * can do discovery advertisements of the broker.
181         * 
182         * @param brokerInfo
183         */
184        public void setBrokerInfo(BrokerInfo brokerInfo) {
185        }
186    
187        public long getMaxInactivityDuration() {
188            return maxInactivityDuration;
189        }
190    
191        public void setMaxInactivityDuration(long maxInactivityDuration) {
192            this.maxInactivityDuration = maxInactivityDuration;
193        }
194        
195        public long getMaxInactivityDurationInitalDelay() {
196            return this.maxInactivityDurationInitalDelay;
197        }
198    
199        public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
200            this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
201        }
202    
203        public int getMinmumWireFormatVersion() {
204            return minmumWireFormatVersion;
205        }
206    
207        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
208            this.minmumWireFormatVersion = minmumWireFormatVersion;
209        }
210    
211        public boolean isTrace() {
212            return trace;
213        }
214    
215        public void setTrace(boolean trace) {
216            this.trace = trace;
217        }
218        
219        public String getLogWriterName() {
220            return logWriterName;
221        }
222    
223        public void setLogWriterName(String logFormat) {
224            this.logWriterName = logFormat;
225        }        
226    
227        public boolean isDynamicManagement() {
228            return dynamicManagement;
229        }
230    
231        public void setDynamicManagement(boolean useJmx) {
232            this.dynamicManagement = useJmx;
233        }
234    
235        public boolean isStartLogging() {
236            return startLogging;
237        }
238    
239    
240        public void setStartLogging(boolean startLogging) {
241            this.startLogging = startLogging;
242        }
243        
244        /**
245         * @return the backlog
246         */
247        public int getBacklog() {
248            return backlog;
249        }
250    
251        /**
252         * @param backlog the backlog to set
253         */
254        public void setBacklog(int backlog) {
255            this.backlog = backlog;
256        }
257    
258        /**
259         * @return the useQueueForAccept
260         */
261        public boolean isUseQueueForAccept() {
262            return useQueueForAccept;
263        }
264    
265        /**
266         * @param useQueueForAccept the useQueueForAccept to set
267         */
268        public void setUseQueueForAccept(boolean useQueueForAccept) {
269            this.useQueueForAccept = useQueueForAccept;
270        }
271        
272    
273        /**
274         * pull Sockets from the ServerSocket
275         */
276        public void run() {
277            while (!isStopped()) {
278                Socket socket = null;
279                try {
280                    socket = serverSocket.accept();
281                    if (socket != null) {
282                        if (isStopped() || getAcceptListener() == null) {
283                            socket.close();
284                        } else {
285                            if (useQueueForAccept) {
286                                socketQueue.put(socket);
287                            }else {
288                                handleSocket(socket);
289                            }
290                        }
291                    }
292                } catch (SocketTimeoutException ste) {
293                    // expect this to happen
294                } catch (Exception e) {
295                    if (!isStopping()) {
296                        onAcceptError(e);
297                    } else if (!isStopped()) {
298                        LOG.warn("run()", e);
299                        onAcceptError(e);
300                    }
301                }
302            }
303        }
304    
305        /**
306         * Allow derived classes to override the Transport implementation that this
307         * transport server creates.
308         * 
309         * @param socket
310         * @param format
311         * @return
312         * @throws IOException
313         */
314        protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
315            return new TcpTransport(format, socket);
316        }
317    
318        /**
319         * @return pretty print of this
320         */
321        public String toString() {
322            return "" + getBindLocation();
323        }
324    
325        /**
326         * @param socket 
327         * @param inetAddress
328         * @return real hostName
329         * @throws UnknownHostException
330         */
331        protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
332            String result = null;
333            if (socket.isBound()) {
334                if (socket.getInetAddress().isAnyLocalAddress()) {
335                    // make it more human readable and useful, an alternative to 0.0.0.0
336                    result = InetAddress.getLocalHost().getHostName();
337                } else {
338                    result = socket.getInetAddress().getCanonicalHostName();
339                }
340            } else {
341                result = bindAddress.getCanonicalHostName();
342            }
343            return result;
344        }
345        
346        protected void doStart() throws Exception {
347            if(useQueueForAccept) {
348                Runnable run = new Runnable() {
349                    public void run() {
350                        try {
351                            while (!isStopped() && !isStopping()) {
352                                Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
353                                if (sock != null) {
354                                    handleSocket(sock);
355                                }
356                            }
357        
358                        } catch (InterruptedException e) {
359                            LOG.info("socketQueue interuppted - stopping");
360                            if (!isStopping()) {
361                                onAcceptError(e);
362                            }
363                        }
364        
365                    }
366        
367                };
368                socketHandlerThread = new Thread(null, run,
369                        "ActiveMQ Transport Server Thread Handler: " + toString(),
370                        getStackSize());
371                socketHandlerThread.setDaemon(true);
372                socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
373                socketHandlerThread.start();
374            }
375            super.doStart();
376            
377        }
378    
379        protected void doStop(ServiceStopper stopper) throws Exception {
380            super.doStop(stopper);
381            if (serverSocket != null) {
382                serverSocket.close();
383            }
384        }
385    
386        public InetSocketAddress getSocketAddress() {
387            return (InetSocketAddress)serverSocket.getLocalSocketAddress();
388        }
389    
390        public void setTransportOption(Map<String, Object> transportOptions) {
391            this.transportOptions = transportOptions;
392        }
393        
394        protected final void handleSocket(Socket socket) {
395            try {
396                if (this.currentTransportCount >= this.maximumConnections) {
397                    throw new ExceededMaximumConnectionsException("Exceeded the maximum " + 
398                        "number of allowed client connections. See the 'maximumConnections' " + 
399                        "property on the TCP transport configuration URI in the ActiveMQ " + 
400                        "configuration file (e.g., activemq.xml)"); 
401                    
402                } else {
403                    HashMap<String, Object> options = new HashMap<String, Object>();
404                    options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
405                    options.put("maxInactivityDurationInitalDelay", 
406                        Long.valueOf(maxInactivityDurationInitalDelay));
407                    options.put("minmumWireFormatVersion", 
408                        Integer.valueOf(minmumWireFormatVersion));
409                    options.put("trace", Boolean.valueOf(trace));
410                    options.put("soTimeout", Integer.valueOf(soTimeout));
411                    options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
412                    options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
413                    options.put("logWriterName", logWriterName);
414                    options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
415                    options.put("startLogging", Boolean.valueOf(startLogging));
416                    options.putAll(transportOptions);
417    
418                    WireFormat format = wireFormatFactory.createWireFormat();
419                    Transport transport = createTransport(socket, format);
420    
421                    if (transport instanceof ServiceSupport) {
422                        ((ServiceSupport) transport).addServiceListener(this);
423                    }
424    
425                    Transport configuredTransport = 
426                        transportFactory.serverConfigure( transport, format, options);
427    
428                    getAcceptListener().onAccept(configuredTransport);
429                }
430            } catch (SocketTimeoutException ste) {
431                // expect this to happen
432            } catch (Exception e) {
433                if (!isStopping()) {
434                    onAcceptError(e);
435                } else if (!isStopped()) {
436                    LOG.warn("run()", e);
437                    onAcceptError(e);
438                }
439            }
440            
441        }    
442    
443            public int getSoTimeout() {
444                    return soTimeout;
445            }
446    
447            public void setSoTimeout(int soTimeout) {
448                    this.soTimeout = soTimeout;
449            }
450    
451            public int getSocketBufferSize() {
452                    return socketBufferSize;
453            }
454    
455            public void setSocketBufferSize(int socketBufferSize) {
456                    this.socketBufferSize = socketBufferSize;
457            }
458    
459            public int getConnectionTimeout() {
460                    return connectionTimeout;
461            }
462    
463            public void setConnectionTimeout(int connectionTimeout) {
464                    this.connectionTimeout = connectionTimeout;
465            }
466    
467        /**
468         * @return the maximumConnections
469         */
470        public int getMaximumConnections() {
471            return maximumConnections;
472        }
473    
474        /**
475         * @param maximumConnections the maximumConnections to set
476         */
477        public void setMaximumConnections(int maximumConnections) {
478            this.maximumConnections = maximumConnections;
479        }
480    
481        
482        public void started(Service service) {
483           this.currentTransportCount++;
484        }
485    
486        public void stopped(Service service) {
487            this.currentTransportCount--;
488        }
489    }