001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport;
019    
020    import java.io.IOException;
021    import java.net.Socket;
022    import java.util.Iterator;
023    import java.util.concurrent.ConcurrentLinkedQueue;
024    import java.util.concurrent.atomic.AtomicInteger;
025    import java.util.concurrent.locks.Condition;
026    import java.util.concurrent.locks.ReentrantLock;
027    
028    import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * This filter implements write timeouts for socket write operations.
034     * When using blocking IO, the Java implementation doesn't have an explicit flag
035     * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
036     * which is usually around 13-30 minutes).<br/>
037     * To enable this transport, in the transport URI, simpley add<br/>
038     * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
039     * For example (15 second timeout on write operations to the socket):</br>
040     * <pre><code>
041     * &lt;transportConnector 
042     *     name=&quot;tcp1&quot; 
043     *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
044     * /&gt;
045     * </code></pre><br/>
046     * For example (enable default timeout on the socket):</br>
047     * <pre><code>
048     * &lt;transportConnector 
049     *     name=&quot;tcp1&quot; 
050     *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
051     * /&gt;
052     * </code></pre>
053     * @author Filip Hanik
054     *
055     */
056    public class WriteTimeoutFilter extends TransportFilter {
057    
058        private static final Log LOG = LogFactory.getLog(WriteTimeoutFilter.class);
059        protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
060        protected static AtomicInteger messageCounter = new AtomicInteger(0);
061        protected static TimeoutThread timeoutThread = new TimeoutThread(); 
062        
063        protected static long sleep = 5000l;
064    
065        protected long writeTimeout = -1;
066        
067        public WriteTimeoutFilter(Transport next) {
068            super(next);
069        }
070    
071        @Override
072        public void oneway(Object command) throws IOException {
073            try {
074                registerWrite(this);
075                super.oneway(command);
076            } catch (IOException x) {
077                throw x;
078            } finally {
079                deRegisterWrite(this,false,null);
080            }
081        }
082        
083        public long getWriteTimeout() {
084            return writeTimeout;
085        }
086    
087        public void setWriteTimeout(long writeTimeout) {
088            this.writeTimeout = writeTimeout;
089        }
090        
091        public static long getSleep() {
092            return sleep;
093        }
094    
095        public static void setSleep(long sleep) {
096            WriteTimeoutFilter.sleep = sleep;
097        }
098    
099        
100        protected TcpBufferedOutputStream getWriter() {
101            return next.narrow(TcpBufferedOutputStream.class);
102        }
103        
104        protected Socket getSocket() {
105            return next.narrow(Socket.class);
106        }
107        
108        protected static void registerWrite(WriteTimeoutFilter filter) {
109            writers.add(filter);
110        }
111        
112        protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
113            boolean result = writers.remove(filter); 
114            if (result) {
115                if (fail) {
116                    String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
117                    LOG.warn(message);
118                    Socket sock = filter.getSocket();
119                    if (sock==null) {
120                        LOG.error("Destination socket is null, unable to close socket.("+message+")");
121                    } else {
122                        try {
123                            sock.close();
124                        }catch (IOException ignore) {
125                        }
126                    }
127                }
128            }
129            return result;
130        }
131        
132        @Override
133        public void start() throws Exception {
134            super.start();
135        }
136        
137        @Override
138        public void stop() throws Exception {
139            super.stop();
140        }
141        
142        protected static class TimeoutThread extends Thread {
143            static AtomicInteger instance = new AtomicInteger(0);
144            boolean run = true;
145            public TimeoutThread() {
146                setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
147                setDaemon(true);
148                setPriority(Thread.MIN_PRIORITY);
149                start();
150            }
151    
152            
153            public void run() {
154                while (run) {
155                    boolean error = false;
156                    try {
157                            if (!interrupted()) {
158                                    Iterator<WriteTimeoutFilter> filters = writers.iterator();
159                                while (run && filters.hasNext()) { 
160                                WriteTimeoutFilter filter = filters.next();
161                                if (filter.getWriteTimeout()<=0) continue; //no timeout set
162                                long writeStart = filter.getWriter().getWriteTimestamp();
163                                long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
164                                if (delta>filter.getWriteTimeout()) {
165                                    WriteTimeoutFilter.deRegisterWrite(filter, true,null);
166                                }//if timeout
167                            }//while
168                        }//if interrupted
169                        try {
170                            Thread.sleep(getSleep());
171                            error = false;
172                        } catch (InterruptedException x) {
173                            //do nothing
174                        }
175                    }catch (Throwable t) { //make sure this thread never dies
176                        if (!error) { //use error flag to avoid filling up the logs
177                            LOG.error("WriteTimeout thread unable validate existing sockets.",t);
178                            error = true;
179                        }
180                    }
181                }
182            }
183        }
184    
185    }