001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.virtual; 018 019 import org.apache.activemq.broker.BrokerService; 020 import org.apache.activemq.broker.BrokerServiceAware; 021 import org.apache.activemq.broker.ProducerBrokerExchange; 022 import org.apache.activemq.broker.region.Destination; 023 import org.apache.activemq.broker.region.DestinationFilter; 024 import org.apache.activemq.broker.region.DestinationInterceptor; 025 import org.apache.activemq.command.ActiveMQDestination; 026 import org.apache.activemq.command.ActiveMQTopic; 027 import org.apache.activemq.command.Message; 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 031 /** 032 * Creates <a href="http://activemq.org/site/mirrored-queues.html">Mirrored 033 * Queue</a> using a prefix and postfix to define the topic name on which to mirror the queue to. 034 * 035 * @version $Revision: 650766 $ 036 * @org.apache.xbean.XBean 037 */ 038 public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware { 039 private static final transient Log LOG = LogFactory.getLog(MirroredQueue.class); 040 private String prefix = "VirtualTopic.Mirror."; 041 private String postfix = ""; 042 private boolean copyMessage = true; 043 private BrokerService brokerService; 044 045 public Destination intercept(final Destination destination) { 046 if (destination.getActiveMQDestination().isQueue()) { 047 if (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues()) { 048 try { 049 final Destination mirrorDestination = getMirrorDestination(destination); 050 if (mirrorDestination != null) { 051 return new DestinationFilter(destination) { 052 public void send(ProducerBrokerExchange context, Message message) throws Exception { 053 message.setDestination(mirrorDestination.getActiveMQDestination()); 054 mirrorDestination.send(context, message); 055 056 if (isCopyMessage()) { 057 message = message.copy(); 058 } 059 message.setDestination(destination.getActiveMQDestination()); 060 super.send(context, message); 061 } 062 }; 063 } 064 } 065 catch (Exception e) { 066 LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e); 067 } 068 } 069 } 070 return destination; 071 } 072 073 074 public void remove(Destination destination) { 075 if (brokerService == null) { 076 throw new IllegalArgumentException("No brokerService injected!"); 077 } 078 ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination()); 079 if (topic != null) { 080 try { 081 brokerService.removeDestination(topic); 082 } catch (Exception e) { 083 LOG.error("Failed to remove mirror destination for " + destination + ". Reason: " + e,e); 084 } 085 } 086 087 } 088 089 // Properties 090 // ------------------------------------------------------------------------- 091 092 public String getPostfix() { 093 return postfix; 094 } 095 096 /** 097 * Sets any postix used to identify the queue consumers 098 */ 099 public void setPostfix(String postfix) { 100 this.postfix = postfix; 101 } 102 103 public String getPrefix() { 104 return prefix; 105 } 106 107 /** 108 * Sets the prefix wildcard used to identify the queue consumers for a given 109 * topic 110 */ 111 public void setPrefix(String prefix) { 112 this.prefix = prefix; 113 } 114 115 public boolean isCopyMessage() { 116 return copyMessage; 117 } 118 119 /** 120 * Sets whether a copy of the message will be sent to each destination. 121 * Defaults to true so that the forward destination is set as the 122 * destination of the message 123 */ 124 public void setCopyMessage(boolean copyMessage) { 125 this.copyMessage = copyMessage; 126 } 127 128 public void setBrokerService(BrokerService brokerService) { 129 this.brokerService = brokerService; 130 } 131 132 // Implementation methods 133 //------------------------------------------------------------------------- 134 protected Destination getMirrorDestination(Destination destination) throws Exception { 135 if (brokerService == null) { 136 throw new IllegalArgumentException("No brokerService injected!"); 137 } 138 ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination()); 139 return brokerService.getDestination(topic); 140 } 141 142 protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) { 143 return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix); 144 } 145 146 }