001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.Collections; 021 import java.util.List; 022 import java.util.concurrent.CopyOnWriteArrayList; 023 import javax.jms.InvalidSelectorException; 024 import javax.jms.JMSException; 025 import javax.management.ObjectName; 026 import org.apache.activemq.broker.Broker; 027 import org.apache.activemq.broker.ConnectionContext; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ConsumerId; 030 import org.apache.activemq.command.ConsumerInfo; 031 import org.apache.activemq.filter.BooleanExpression; 032 import org.apache.activemq.filter.DestinationFilter; 033 import org.apache.activemq.filter.LogicExpression; 034 import org.apache.activemq.filter.MessageEvaluationContext; 035 import org.apache.activemq.filter.NoLocalExpression; 036 import org.apache.activemq.selector.SelectorParser; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 public abstract class AbstractSubscription implements Subscription { 041 042 private static final Log LOG = LogFactory.getLog(AbstractSubscription.class); 043 protected Broker broker; 044 protected ConnectionContext context; 045 protected ConsumerInfo info; 046 protected final DestinationFilter destinationFilter; 047 protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>(); 048 private BooleanExpression selectorExpression; 049 private ObjectName objectName; 050 private int cursorMemoryHighWaterMark = 70; 051 052 053 public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { 054 this.broker = broker; 055 this.context = context; 056 this.info = info; 057 this.destinationFilter = DestinationFilter.parseFilter(info.getDestination()); 058 this.selectorExpression = parseSelector(info); 059 } 060 061 private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException { 062 BooleanExpression rc = null; 063 if (info.getSelector() != null) { 064 rc = SelectorParser.parse(info.getSelector()); 065 } 066 if (info.isNoLocal()) { 067 if (rc == null) { 068 rc = new NoLocalExpression(info.getConsumerId().getConnectionId()); 069 } else { 070 rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc); 071 } 072 } 073 if (info.getAdditionalPredicate() != null) { 074 if (rc == null) { 075 rc = info.getAdditionalPredicate(); 076 } else { 077 rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc); 078 } 079 } 080 return rc; 081 } 082 083 public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { 084 ConsumerId targetConsumerId = node.getTargetConsumerId(); 085 if (targetConsumerId != null) { 086 if (!targetConsumerId.equals(info.getConsumerId())) { 087 return false; 088 } 089 } 090 try { 091 return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node); 092 } catch (JMSException e) { 093 LOG.info("Selector failed to evaluate: " + e.getMessage(), e); 094 return false; 095 } 096 } 097 098 public boolean matches(ActiveMQDestination destination) { 099 return destinationFilter.matches(destination); 100 } 101 102 public void add(ConnectionContext context, Destination destination) throws Exception { 103 destinations.add(destination); 104 } 105 106 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 107 destinations.remove(destination); 108 return Collections.EMPTY_LIST; 109 } 110 111 public ConsumerInfo getConsumerInfo() { 112 return info; 113 } 114 115 public void gc() { 116 } 117 118 public boolean isSlave() { 119 return broker.getBrokerService().isSlave(); 120 } 121 122 public ConnectionContext getContext() { 123 return context; 124 } 125 126 public ConsumerInfo getInfo() { 127 return info; 128 } 129 130 public BooleanExpression getSelectorExpression() { 131 return selectorExpression; 132 } 133 134 public String getSelector() { 135 return info.getSelector(); 136 } 137 138 public void setSelector(String selector) throws InvalidSelectorException { 139 ConsumerInfo copy = info.copy(); 140 copy.setSelector(selector); 141 BooleanExpression newSelector = parseSelector(copy); 142 // its valid so lets actually update it now 143 info.setSelector(selector); 144 this.selectorExpression = newSelector; 145 } 146 147 public ObjectName getObjectName() { 148 return objectName; 149 } 150 151 public void setObjectName(ObjectName objectName) { 152 this.objectName = objectName; 153 } 154 155 public int getPrefetchSize() { 156 return info.getPrefetchSize(); 157 } 158 public void setPrefetchSize(int newSize) { 159 info.setPrefetchSize(newSize); 160 } 161 162 public boolean isRecoveryRequired() { 163 return true; 164 } 165 166 public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception { 167 boolean result = false; 168 MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); 169 try { 170 msgContext.setDestination(message.getRegionDestination().getActiveMQDestination()); 171 msgContext.setMessageReference(message); 172 result = matches(message, msgContext); 173 if (result) { 174 doAddRecoveredMessage(message); 175 } 176 177 } finally { 178 msgContext.clear(); 179 } 180 return result; 181 } 182 183 public ActiveMQDestination getActiveMQDestination() { 184 return info != null ? info.getDestination() : null; 185 } 186 187 public boolean isBrowser() { 188 return info != null && info.isBrowser(); 189 } 190 191 public int getInFlightUsage() { 192 if (info.getPrefetchSize() > 0) { 193 return (getInFlightSize() * 100)/info.getPrefetchSize(); 194 } 195 return Integer.MAX_VALUE; 196 } 197 198 /** 199 * Add a destination 200 * @param destination 201 */ 202 public void addDestination(Destination destination) { 203 204 } 205 206 207 /** 208 * Remove a destination 209 * @param destination 210 */ 211 public void removeDestination(Destination destination) { 212 213 } 214 215 public int getCursorMemoryHighWaterMark(){ 216 return this.cursorMemoryHighWaterMark; 217 } 218 219 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){ 220 this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark; 221 } 222 223 public int countBeforeFull() { 224 return getDispatchedQueueSize() - info.getPrefetchSize(); 225 } 226 227 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 228 add(message); 229 } 230 }