001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.jmx; 018 019 import javax.management.openmbean.CompositeData; 020 import javax.management.openmbean.OpenDataException; 021 import javax.jms.JMSException; 022 023 import org.apache.activemq.broker.ConnectionContext; 024 import org.apache.activemq.broker.region.Queue; 025 import org.apache.activemq.command.ActiveMQDestination; 026 import org.apache.activemq.command.Message; 027 028 /** 029 * Provides a JMX Management view of a Queue. 030 */ 031 public class QueueView extends DestinationView implements QueueViewMBean { 032 public QueueView(ManagedRegionBroker broker, Queue destination) { 033 super(broker, destination); 034 } 035 036 public CompositeData getMessage(String messageId) throws OpenDataException { 037 Message rc = ((Queue)destination).getMessage(messageId); 038 if (rc == null) { 039 return null; 040 } 041 return OpenTypeSupport.convert(rc); 042 } 043 044 public void purge() throws Exception { 045 ((Queue)destination).purge(); 046 } 047 048 public boolean removeMessage(String messageId) throws Exception { 049 return ((Queue)destination).removeMessage(messageId); 050 } 051 052 public int removeMatchingMessages(String selector) throws Exception { 053 return ((Queue)destination).removeMatchingMessages(selector); 054 } 055 056 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { 057 return ((Queue)destination).removeMatchingMessages(selector, maximumMessages); 058 } 059 060 public boolean copyMessageTo(String messageId, String destinationName) throws Exception { 061 ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); 062 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 063 return ((Queue)destination).copyMessageTo(context, messageId, toDestination); 064 } 065 066 public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception { 067 ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); 068 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 069 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination); 070 } 071 072 public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { 073 ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); 074 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 075 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages); 076 } 077 078 public boolean moveMessageTo(String messageId, String destinationName) throws Exception { 079 ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); 080 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 081 return ((Queue)destination).moveMessageTo(context, messageId, toDestination); 082 } 083 084 public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception { 085 ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); 086 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 087 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination); 088 } 089 090 public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { 091 ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); 092 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 093 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages); 094 } 095 096 /** 097 * Moves a message back to its original destination 098 */ 099 public boolean retryMessage(String messageId) throws Exception { 100 Queue queue = (Queue) destination; 101 Message rc = queue.getMessage(messageId); 102 if (rc != null) { 103 rc = rc.copy(); 104 rc.getMessage().setRedeliveryCounter(0); 105 ActiveMQDestination originalDestination = rc.getOriginalDestination(); 106 if (originalDestination != null) { 107 ConnectionContext context = BrokerView.getConnectionContext(broker.getContextBroker()); 108 return queue.moveMessageTo(context, rc, originalDestination); 109 } 110 else { 111 throw new JMSException("No original destination for message: "+ messageId); 112 } 113 } 114 else { 115 throw new JMSException("Could not find message: "+ messageId); 116 } 117 } 118 119 public int cursorSize() { 120 Queue queue = (Queue) destination; 121 if (queue.getMessages() != null){ 122 return queue.getMessages().size(); 123 } 124 return 0; 125 } 126 127 128 public boolean doesCursorHaveMessagesBuffered() { 129 Queue queue = (Queue) destination; 130 if (queue.getMessages() != null){ 131 return queue.getMessages().hasMessagesBufferedToDeliver(); 132 } 133 return false; 134 135 } 136 137 138 public boolean doesCursorHaveSpace() { 139 Queue queue = (Queue) destination; 140 if (queue.getMessages() != null){ 141 return queue.getMessages().hasSpace(); 142 } 143 return false; 144 } 145 146 147 public long getCursorMemoryUsage() { 148 Queue queue = (Queue) destination; 149 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){ 150 return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage(); 151 } 152 return 0; 153 } 154 155 public int getCursorPercentUsage() { 156 Queue queue = (Queue) destination; 157 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){ 158 return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage(); 159 } 160 return 0; 161 } 162 163 public boolean isCursorFull() { 164 Queue queue = (Queue) destination; 165 if (queue.getMessages() != null){ 166 return queue.getMessages().isFull(); 167 } 168 return false; 169 } 170 }