001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import org.apache.activemq.command.ActiveMQDestination; 020 import org.apache.activemq.command.Message; 021 import org.apache.activemq.command.ProducerInfo; 022 023 /** 024 * This broker filter handles composite destinations. If a broker operation is 025 * invoked using a composite destination, this filter repeats the operation 026 * using each destination of the composite. HRC: I think this filter is 027 * dangerous to use to with the consumer operations. Multiple Subscription 028 * objects will be associated with a single JMS consumer each having a different 029 * idea of what the current pre-fetch dispatch size is. If this is used, then 030 * the client has to expect many more messages to be dispatched than the 031 * pre-fetch setting allows. 032 * 033 * @version $Revision: 1.8 $ 034 */ 035 public class CompositeDestinationBroker extends BrokerFilter { 036 037 public CompositeDestinationBroker(Broker next) { 038 super(next); 039 } 040 041 /** 042 * A producer may register to send to multiple destinations via a composite 043 * destination. 044 */ 045 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 046 // The destination may be null. 047 ActiveMQDestination destination = info.getDestination(); 048 if (destination != null && destination.isComposite()) { 049 ActiveMQDestination[] destinations = destination.getCompositeDestinations(); 050 for (int i = 0; i < destinations.length; i++) { 051 ProducerInfo copy = info.copy(); 052 copy.setDestination(destinations[i]); 053 next.addProducer(context, copy); 054 } 055 } else { 056 next.addProducer(context, info); 057 } 058 } 059 060 /** 061 * A producer may de-register from sending to multiple destinations via a 062 * composite destination. 063 */ 064 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 065 // The destination may be null. 066 ActiveMQDestination destination = info.getDestination(); 067 if (destination != null && destination.isComposite()) { 068 ActiveMQDestination[] destinations = destination.getCompositeDestinations(); 069 for (int i = 0; i < destinations.length; i++) { 070 ProducerInfo copy = info.copy(); 071 copy.setDestination(destinations[i]); 072 next.removeProducer(context, copy); 073 } 074 } else { 075 next.removeProducer(context, info); 076 } 077 } 078 079 /** 080 * 081 */ 082 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 083 ActiveMQDestination destination = message.getDestination(); 084 if (destination.isComposite()) { 085 ActiveMQDestination[] destinations = destination.getCompositeDestinations(); 086 for (int i = 0; i < destinations.length; i++) { 087 if (i != 0) { 088 message = message.copy(); 089 } 090 message.setOriginalDestination(destination); 091 message.setDestination(destinations[i]); 092 next.send(producerExchange, message); 093 } 094 } else { 095 next.send(producerExchange, message); 096 } 097 } 098 099 }