001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.OutputStream;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.MessageId;
032    import org.apache.activemq.command.ProducerId;
033    import org.apache.activemq.command.ProducerInfo;
034    import org.apache.activemq.util.IOExceptionSupport;
035    
036    /**
037     * @version $Revision$
038     */
039    public class ActiveMQOutputStream extends OutputStream implements Disposable {
040    
041        // Send down 64k messages.
042        protected int count;
043    
044        final byte buffer[] = new byte[64 * 1024];
045    
046        private final ActiveMQConnection connection;
047        private final Map<String, Object> properties;
048        private final ProducerInfo info;
049    
050        private long messageSequence;
051        private boolean closed;
052        private final int deliveryMode;
053        private final int priority;
054        private final long timeToLive;
055    
056        public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
057                                    long timeToLive) throws JMSException {
058            this.connection = connection;
059            this.deliveryMode = deliveryMode;
060            this.priority = priority;
061            this.timeToLive = timeToLive;
062            this.properties = properties == null ? null : new HashMap<String, Object>(properties);
063    
064            if (destination == null) {
065                throw new InvalidDestinationException("Don't understand null destinations");
066            }
067    
068            this.info = new ProducerInfo(producerId);
069            this.info.setDestination(destination);
070    
071            this.connection.addOutputStream(this);
072            this.connection.asyncSendPacket(info);
073        }
074    
075        public void close() throws IOException {
076            if (!closed) {
077                flushBuffer();
078                try {
079                    // Send an EOS style empty message to signal EOS.
080                    send(new ActiveMQMessage(), true);
081                    dispose();
082                    this.connection.asyncSendPacket(info.createRemoveCommand());
083                } catch (JMSException e) {
084                    IOExceptionSupport.create(e);
085                }
086            }
087        }
088    
089        public void dispose() {
090            if (!closed) {
091                this.connection.removeOutputStream(this);
092                closed = true;
093            }
094        }
095    
096        public synchronized void write(int b) throws IOException {
097            buffer[count++] = (byte) b;
098            if (count == buffer.length) {
099                flushBuffer();
100            }
101        }
102    
103        public synchronized void write(byte b[], int off, int len) throws IOException {
104            while (len > 0) {
105                int max = Math.min(len, buffer.length - count);
106                System.arraycopy(b, off, buffer, count, max);
107    
108                len -= max;
109                count += max;
110                off += max;
111    
112                if (count == buffer.length) {
113                    flushBuffer();
114                }
115            }
116        }
117    
118        public synchronized void flush() throws IOException {
119            flushBuffer();
120        }
121    
122        private void flushBuffer() throws IOException {
123            if (count != 0) {
124                try {
125                    ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
126                    msg.writeBytes(buffer, 0, count);
127                    send(msg, false);
128                } catch (JMSException e) {
129                    throw IOExceptionSupport.create(e);
130                }
131                count = 0;
132            }
133        }
134    
135        /**
136         * @param msg
137         * @throws JMSException
138         */
139        private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
140            if (properties != null) {
141                for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
142                    String key = (String) iter.next();
143                    Object value = properties.get(key);
144                    msg.setObjectProperty(key, value);
145                }
146            }
147            msg.setType("org.apache.activemq.Stream");
148            msg.setGroupID(info.getProducerId().toString());
149            if (eosMessage) {
150                msg.setGroupSequence(-1);
151            } else {
152                msg.setGroupSequence((int) messageSequence);
153            }
154            MessageId id = new MessageId(info.getProducerId(), messageSequence++);
155            connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
156        }
157    
158        public String toString() {
159            return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
160        }
161    
162    }