001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.io.InputStream; 021 import java.util.HashMap; 022 import java.util.Map; 023 024 import javax.jms.IllegalStateException; 025 import javax.jms.InvalidDestinationException; 026 import javax.jms.JMSException; 027 028 import org.apache.activemq.command.ActiveMQBytesMessage; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.ActiveMQMessage; 031 import org.apache.activemq.command.CommandTypes; 032 import org.apache.activemq.command.ConsumerId; 033 import org.apache.activemq.command.ConsumerInfo; 034 import org.apache.activemq.command.MessageAck; 035 import org.apache.activemq.command.MessageDispatch; 036 import org.apache.activemq.command.ProducerId; 037 import org.apache.activemq.selector.SelectorParser; 038 import org.apache.activemq.util.IOExceptionSupport; 039 import org.apache.activemq.util.IntrospectionSupport; 040 import org.apache.activemq.util.JMSExceptionSupport; 041 042 /** 043 * @version $Revision$ 044 */ 045 public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher { 046 047 private final ActiveMQConnection connection; 048 private final ConsumerInfo info; 049 // These are the messages waiting to be delivered to the client 050 private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); 051 052 private int deliveredCounter; 053 private MessageDispatch lastDelivered; 054 private boolean eosReached; 055 private byte buffer[]; 056 private int pos; 057 058 private ProducerId producerId; 059 private long nextSequenceId; 060 061 public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch) 062 throws JMSException { 063 this.connection = connection; 064 065 if (dest == null) { 066 throw new InvalidDestinationException("Don't understand null destinations"); 067 } else if (dest.isTemporary()) { 068 String physicalName = dest.getPhysicalName(); 069 070 if (physicalName == null) { 071 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 072 } 073 074 String connectionID = connection.getConnectionInfo().getConnectionId().getValue(); 075 076 if (physicalName.indexOf(connectionID) < 0) { 077 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); 078 } 079 080 if (connection.isDeleted(dest)) { 081 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); 082 } 083 } 084 085 this.info = new ConsumerInfo(consumerId); 086 this.info.setSubscriptionName(name); 087 088 if (selector != null && selector.trim().length() != 0) { 089 selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) "; 090 } else { 091 selector = "JMSType='org.apache.activemq.Stream'"; 092 } 093 094 SelectorParser.parse(selector); 095 this.info.setSelector(selector); 096 097 this.info.setPrefetchSize(prefetch); 098 this.info.setNoLocal(noLocal); 099 this.info.setBrowser(false); 100 this.info.setDispatchAsync(false); 101 102 // Allows the options on the destination to configure the consumerInfo 103 if (dest.getOptions() != null) { 104 Map<String, String> options = new HashMap<String, String>(dest.getOptions()); 105 IntrospectionSupport.setProperties(this.info, options, "consumer."); 106 } 107 108 this.info.setDestination(dest); 109 110 this.connection.addInputStream(this); 111 this.connection.addDispatcher(info.getConsumerId(), this); 112 this.connection.syncSendPacket(info); 113 unconsumedMessages.start(); 114 } 115 116 public void close() throws IOException { 117 if (!unconsumedMessages.isClosed()) { 118 try { 119 if (lastDelivered != null) { 120 MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 121 connection.asyncSendPacket(ack); 122 } 123 dispose(); 124 this.connection.syncSendPacket(info.createRemoveCommand()); 125 } catch (JMSException e) { 126 throw IOExceptionSupport.create(e); 127 } 128 } 129 } 130 131 public void dispose() { 132 if (!unconsumedMessages.isClosed()) { 133 unconsumedMessages.close(); 134 this.connection.removeDispatcher(info.getConsumerId()); 135 this.connection.removeInputStream(this); 136 } 137 } 138 139 public ActiveMQMessage receive() throws JMSException { 140 checkClosed(); 141 MessageDispatch md; 142 try { 143 md = unconsumedMessages.dequeue(-1); 144 } catch (InterruptedException e) { 145 Thread.currentThread().interrupt(); 146 throw JMSExceptionSupport.create(e); 147 } 148 149 if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) { 150 return null; 151 } 152 153 deliveredCounter++; 154 if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) { 155 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 156 connection.asyncSendPacket(ack); 157 deliveredCounter = 0; 158 lastDelivered = null; 159 } else { 160 lastDelivered = md; 161 } 162 163 return (ActiveMQMessage)md.getMessage(); 164 } 165 166 /** 167 * @throws IllegalStateException 168 */ 169 protected void checkClosed() throws IllegalStateException { 170 if (unconsumedMessages.isClosed()) { 171 throw new IllegalStateException("The Consumer is closed"); 172 } 173 } 174 175 public int read() throws IOException { 176 fillBuffer(); 177 if (eosReached || buffer.length == 0) { 178 return -1; 179 } 180 181 return buffer[pos++] & 0xff; 182 } 183 184 public int read(byte[] b, int off, int len) throws IOException { 185 fillBuffer(); 186 if (eosReached || buffer.length == 0) { 187 return -1; 188 } 189 190 int max = Math.min(len, buffer.length - pos); 191 System.arraycopy(buffer, pos, b, off, max); 192 193 pos += max; 194 return max; 195 } 196 197 private void fillBuffer() throws IOException { 198 if (eosReached || (buffer != null && buffer.length > pos)) { 199 return; 200 } 201 try { 202 while (true) { 203 ActiveMQMessage m = receive(); 204 if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) { 205 // First message. 206 long producerSequenceId = m.getMessageId().getProducerSequenceId(); 207 if (producerId == null) { 208 // We have to start a stream at sequence id = 0 209 if (producerSequenceId != 0) { 210 continue; 211 } 212 nextSequenceId++; 213 producerId = m.getMessageId().getProducerId(); 214 } else { 215 // Verify it's the next message of the sequence. 216 if (!m.getMessageId().getProducerId().equals(producerId)) { 217 throw new IOException("Received an unexpected message: invalid producer: " + m); 218 } 219 if (producerSequenceId != nextSequenceId++) { 220 throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m); 221 } 222 } 223 224 // Read the buffer in. 225 ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m; 226 buffer = new byte[(int)bm.getBodyLength()]; 227 bm.readBytes(buffer); 228 pos = 0; 229 } else { 230 eosReached = true; 231 } 232 return; 233 } 234 } catch (JMSException e) { 235 eosReached = true; 236 throw IOExceptionSupport.create(e); 237 } 238 } 239 240 public void dispatch(MessageDispatch md) { 241 unconsumedMessages.enqueue(md); 242 } 243 244 public String toString() { 245 return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }"; 246 } 247 248 }