001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import org.apache.activemq.broker.region.Destination;
022    import org.apache.activemq.broker.region.Subscription;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.BrokerInfo;
025    import org.apache.activemq.command.ConnectionInfo;
026    import org.apache.activemq.command.ConsumerInfo;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.command.MessageAck;
029    import org.apache.activemq.command.ProducerInfo;
030    import org.apache.activemq.command.RemoveSubscriptionInfo;
031    import org.apache.activemq.command.SessionInfo;
032    import org.apache.activemq.command.TransactionId;
033    
034    /**
035     * Used to add listeners for Broker actions
036     * 
037     * @version $Revision: 1.10 $
038     */
039    public class BrokerBroadcaster extends BrokerFilter {
040        protected volatile Broker[] listeners = new Broker[0];
041    
042        public BrokerBroadcaster(Broker next) {
043            super(next);
044        }
045    
046        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
047            next.acknowledge(consumerExchange, ack);
048            Broker brokers[] = getListeners();
049            for (int i = 0; i < brokers.length; i++) {
050                brokers[i].acknowledge(consumerExchange, ack);
051            }
052        }
053    
054        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
055            next.addConnection(context, info);
056            Broker brokers[] = getListeners();
057            for (int i = 0; i < brokers.length; i++) {
058                brokers[i].addConnection(context, info);
059            }
060        }
061    
062        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
063            Subscription answer = next.addConsumer(context, info);
064            Broker brokers[] = getListeners();
065            for (int i = 0; i < brokers.length; i++) {
066                brokers[i].addConsumer(context, info);
067            }
068            return answer;
069        }
070    
071        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
072            next.addProducer(context, info);
073            Broker brokers[] = getListeners();
074            for (int i = 0; i < brokers.length; i++) {
075                brokers[i].addProducer(context, info);
076            }
077        }
078    
079        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
080            next.commitTransaction(context, xid, onePhase);
081            Broker brokers[] = getListeners();
082            for (int i = 0; i < brokers.length; i++) {
083                brokers[i].commitTransaction(context, xid, onePhase);
084            }
085        }
086    
087        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
088            next.removeSubscription(context, info);
089            Broker brokers[] = getListeners();
090            for (int i = 0; i < brokers.length; i++) {
091                brokers[i].removeSubscription(context, info);
092            }
093        }
094    
095        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
096            int result = next.prepareTransaction(context, xid);
097            Broker brokers[] = getListeners();
098            for (int i = 0; i < brokers.length; i++) {
099                // TODO decide what to do with return values
100                brokers[i].prepareTransaction(context, xid);
101            }
102            return result;
103        }
104    
105        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
106            next.removeConnection(context, info, error);
107            Broker brokers[] = getListeners();
108            for (int i = 0; i < brokers.length; i++) {
109                brokers[i].removeConnection(context, info, error);
110            }
111        }
112    
113        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
114            next.removeConsumer(context, info);
115            Broker brokers[] = getListeners();
116            for (int i = 0; i < brokers.length; i++) {
117                brokers[i].removeConsumer(context, info);
118            }
119        }
120    
121        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
122            next.removeProducer(context, info);
123            Broker brokers[] = getListeners();
124            for (int i = 0; i < brokers.length; i++) {
125                brokers[i].removeProducer(context, info);
126            }
127        }
128    
129        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
130            next.rollbackTransaction(context, xid);
131            Broker brokers[] = getListeners();
132            for (int i = 0; i < brokers.length; i++) {
133                brokers[i].rollbackTransaction(context, xid);
134            }
135        }
136    
137        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
138            next.send(producerExchange, messageSend);
139            Broker brokers[] = getListeners();
140            for (int i = 0; i < brokers.length; i++) {
141                brokers[i].send(producerExchange, messageSend);
142            }
143        }
144    
145        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
146            next.beginTransaction(context, xid);
147            Broker brokers[] = getListeners();
148            for (int i = 0; i < brokers.length; i++) {
149                brokers[i].beginTransaction(context, xid);
150            }
151        }
152    
153        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
154            next.forgetTransaction(context, transactionId);
155            Broker brokers[] = getListeners();
156            for (int i = 0; i < brokers.length; i++) {
157                brokers[i].forgetTransaction(context, transactionId);
158            }
159        }
160    
161        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
162            Destination result = next.addDestination(context, destination);
163            Broker brokers[] = getListeners();
164            for (int i = 0; i < brokers.length; i++) {
165                brokers[i].addDestination(context, destination);
166            }
167            return result;
168        }
169    
170        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
171            next.removeDestination(context, destination, timeout);
172            Broker brokers[] = getListeners();
173            for (int i = 0; i < brokers.length; i++) {
174                brokers[i].removeDestination(context, destination, timeout);
175            }
176        }
177    
178        public void start() throws Exception {
179            next.start();
180            Broker brokers[] = getListeners();
181            for (int i = 0; i < brokers.length; i++) {
182                brokers[i].start();
183            }
184        }
185    
186        public void stop() throws Exception {
187            next.stop();
188            Broker brokers[] = getListeners();
189            for (int i = 0; i < brokers.length; i++) {
190                brokers[i].stop();
191            }
192        }
193    
194        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
195            next.addSession(context, info);
196            Broker brokers[] = getListeners();
197            for (int i = 0; i < brokers.length; i++) {
198                brokers[i].addSession(context, info);
199            }
200        }
201    
202        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
203            next.removeSession(context, info);
204            Broker brokers[] = getListeners();
205            for (int i = 0; i < brokers.length; i++) {
206                brokers[i].removeSession(context, info);
207            }
208        }
209    
210        public void gc() {
211            next.gc();
212            Broker brokers[] = getListeners();
213            for (int i = 0; i < brokers.length; i++) {
214                brokers[i].gc();
215            }
216        }
217    
218        public void addBroker(Connection connection, BrokerInfo info) {
219            next.addBroker(connection, info);
220            Broker brokers[] = getListeners();
221            for (int i = 0; i < brokers.length; i++) {
222                brokers[i].addBroker(connection, info);
223            }
224        }
225    
226        protected Broker[] getListeners() {
227            return listeners;
228        }
229    
230        public synchronized void addListener(Broker broker) {
231            List<Broker> tmp = getListenersAsList();
232            tmp.add(broker);
233            listeners = tmp.toArray(new Broker[tmp.size()]);
234        }
235    
236        public synchronized void removeListener(Broker broker) {
237            List<Broker> tmp = getListenersAsList();
238            tmp.remove(broker);
239            listeners = tmp.toArray(new Broker[tmp.size()]);
240        }
241    
242        protected List<Broker> getListenersAsList() {
243            List<Broker> tmp = new ArrayList<Broker>();
244            Broker brokers[] = getListeners();
245            for (int i = 0; i < brokers.length; i++) {
246                tmp.add(brokers[i]);
247            }
248            return tmp;
249        }
250    }