001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import org.apache.activemq.broker.region.Destination; 022 import org.apache.activemq.broker.region.Subscription; 023 import org.apache.activemq.command.ActiveMQDestination; 024 import org.apache.activemq.command.BrokerInfo; 025 import org.apache.activemq.command.ConnectionInfo; 026 import org.apache.activemq.command.ConsumerInfo; 027 import org.apache.activemq.command.Message; 028 import org.apache.activemq.command.MessageAck; 029 import org.apache.activemq.command.ProducerInfo; 030 import org.apache.activemq.command.RemoveSubscriptionInfo; 031 import org.apache.activemq.command.SessionInfo; 032 import org.apache.activemq.command.TransactionId; 033 034 /** 035 * Used to add listeners for Broker actions 036 * 037 * @version $Revision: 1.10 $ 038 */ 039 public class BrokerBroadcaster extends BrokerFilter { 040 protected volatile Broker[] listeners = new Broker[0]; 041 042 public BrokerBroadcaster(Broker next) { 043 super(next); 044 } 045 046 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 047 next.acknowledge(consumerExchange, ack); 048 Broker brokers[] = getListeners(); 049 for (int i = 0; i < brokers.length; i++) { 050 brokers[i].acknowledge(consumerExchange, ack); 051 } 052 } 053 054 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 055 next.addConnection(context, info); 056 Broker brokers[] = getListeners(); 057 for (int i = 0; i < brokers.length; i++) { 058 brokers[i].addConnection(context, info); 059 } 060 } 061 062 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 063 Subscription answer = next.addConsumer(context, info); 064 Broker brokers[] = getListeners(); 065 for (int i = 0; i < brokers.length; i++) { 066 brokers[i].addConsumer(context, info); 067 } 068 return answer; 069 } 070 071 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 072 next.addProducer(context, info); 073 Broker brokers[] = getListeners(); 074 for (int i = 0; i < brokers.length; i++) { 075 brokers[i].addProducer(context, info); 076 } 077 } 078 079 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 080 next.commitTransaction(context, xid, onePhase); 081 Broker brokers[] = getListeners(); 082 for (int i = 0; i < brokers.length; i++) { 083 brokers[i].commitTransaction(context, xid, onePhase); 084 } 085 } 086 087 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 088 next.removeSubscription(context, info); 089 Broker brokers[] = getListeners(); 090 for (int i = 0; i < brokers.length; i++) { 091 brokers[i].removeSubscription(context, info); 092 } 093 } 094 095 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 096 int result = next.prepareTransaction(context, xid); 097 Broker brokers[] = getListeners(); 098 for (int i = 0; i < brokers.length; i++) { 099 // TODO decide what to do with return values 100 brokers[i].prepareTransaction(context, xid); 101 } 102 return result; 103 } 104 105 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 106 next.removeConnection(context, info, error); 107 Broker brokers[] = getListeners(); 108 for (int i = 0; i < brokers.length; i++) { 109 brokers[i].removeConnection(context, info, error); 110 } 111 } 112 113 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 114 next.removeConsumer(context, info); 115 Broker brokers[] = getListeners(); 116 for (int i = 0; i < brokers.length; i++) { 117 brokers[i].removeConsumer(context, info); 118 } 119 } 120 121 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 122 next.removeProducer(context, info); 123 Broker brokers[] = getListeners(); 124 for (int i = 0; i < brokers.length; i++) { 125 brokers[i].removeProducer(context, info); 126 } 127 } 128 129 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 130 next.rollbackTransaction(context, xid); 131 Broker brokers[] = getListeners(); 132 for (int i = 0; i < brokers.length; i++) { 133 brokers[i].rollbackTransaction(context, xid); 134 } 135 } 136 137 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 138 next.send(producerExchange, messageSend); 139 Broker brokers[] = getListeners(); 140 for (int i = 0; i < brokers.length; i++) { 141 brokers[i].send(producerExchange, messageSend); 142 } 143 } 144 145 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 146 next.beginTransaction(context, xid); 147 Broker brokers[] = getListeners(); 148 for (int i = 0; i < brokers.length; i++) { 149 brokers[i].beginTransaction(context, xid); 150 } 151 } 152 153 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 154 next.forgetTransaction(context, transactionId); 155 Broker brokers[] = getListeners(); 156 for (int i = 0; i < brokers.length; i++) { 157 brokers[i].forgetTransaction(context, transactionId); 158 } 159 } 160 161 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { 162 Destination result = next.addDestination(context, destination); 163 Broker brokers[] = getListeners(); 164 for (int i = 0; i < brokers.length; i++) { 165 brokers[i].addDestination(context, destination); 166 } 167 return result; 168 } 169 170 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 171 next.removeDestination(context, destination, timeout); 172 Broker brokers[] = getListeners(); 173 for (int i = 0; i < brokers.length; i++) { 174 brokers[i].removeDestination(context, destination, timeout); 175 } 176 } 177 178 public void start() throws Exception { 179 next.start(); 180 Broker brokers[] = getListeners(); 181 for (int i = 0; i < brokers.length; i++) { 182 brokers[i].start(); 183 } 184 } 185 186 public void stop() throws Exception { 187 next.stop(); 188 Broker brokers[] = getListeners(); 189 for (int i = 0; i < brokers.length; i++) { 190 brokers[i].stop(); 191 } 192 } 193 194 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 195 next.addSession(context, info); 196 Broker brokers[] = getListeners(); 197 for (int i = 0; i < brokers.length; i++) { 198 brokers[i].addSession(context, info); 199 } 200 } 201 202 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 203 next.removeSession(context, info); 204 Broker brokers[] = getListeners(); 205 for (int i = 0; i < brokers.length; i++) { 206 brokers[i].removeSession(context, info); 207 } 208 } 209 210 public void gc() { 211 next.gc(); 212 Broker brokers[] = getListeners(); 213 for (int i = 0; i < brokers.length; i++) { 214 brokers[i].gc(); 215 } 216 } 217 218 public void addBroker(Connection connection, BrokerInfo info) { 219 next.addBroker(connection, info); 220 Broker brokers[] = getListeners(); 221 for (int i = 0; i < brokers.length; i++) { 222 brokers[i].addBroker(connection, info); 223 } 224 } 225 226 protected Broker[] getListeners() { 227 return listeners; 228 } 229 230 public synchronized void addListener(Broker broker) { 231 List<Broker> tmp = getListenersAsList(); 232 tmp.add(broker); 233 listeners = tmp.toArray(new Broker[tmp.size()]); 234 } 235 236 public synchronized void removeListener(Broker broker) { 237 List<Broker> tmp = getListenersAsList(); 238 tmp.remove(broker); 239 listeners = tmp.toArray(new Broker[tmp.size()]); 240 } 241 242 protected List<Broker> getListenersAsList() { 243 List<Broker> tmp = new ArrayList<Broker>(); 244 Broker brokers[] = getListeners(); 245 for (int i = 0; i < brokers.length; i++) { 246 tmp.add(brokers[i]); 247 } 248 return tmp; 249 } 250 }