001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.Set; 021 import org.apache.activemq.advisory.AdvisorySupport; 022 import org.apache.activemq.broker.BrokerService; 023 import org.apache.activemq.broker.ConnectionContext; 024 import org.apache.activemq.broker.region.policy.PolicyEntry; 025 import org.apache.activemq.command.ActiveMQDestination; 026 import org.apache.activemq.command.ActiveMQQueue; 027 import org.apache.activemq.command.ActiveMQTempDestination; 028 import org.apache.activemq.command.ActiveMQTopic; 029 import org.apache.activemq.command.SubscriptionInfo; 030 import org.apache.activemq.store.MessageStore; 031 import org.apache.activemq.store.PersistenceAdapter; 032 import org.apache.activemq.store.TopicMessageStore; 033 import org.apache.activemq.thread.TaskRunnerFactory; 034 035 /** 036 * Creates standard ActiveMQ implementations of 037 * {@link org.apache.activemq.broker.region.Destination}. 038 * 039 * @author fateev@amazon.com 040 * @version $Revision: 732259 $ 041 */ 042 public class DestinationFactoryImpl extends DestinationFactory { 043 044 protected final TaskRunnerFactory taskRunnerFactory; 045 protected final PersistenceAdapter persistenceAdapter; 046 protected RegionBroker broker; 047 private final BrokerService brokerService; 048 049 public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) { 050 this.brokerService = brokerService; 051 this.taskRunnerFactory = taskRunnerFactory; 052 if (persistenceAdapter == null) { 053 throw new IllegalArgumentException("null persistenceAdapter"); 054 } 055 this.persistenceAdapter = persistenceAdapter; 056 } 057 058 public void setRegionBroker(RegionBroker broker) { 059 if (broker == null) { 060 throw new IllegalArgumentException("null broker"); 061 } 062 this.broker = broker; 063 } 064 065 public Set<ActiveMQDestination> getDestinations() { 066 return persistenceAdapter.getDestinations(); 067 } 068 069 /** 070 * @return instance of {@link Queue} or {@link Topic} 071 */ 072 public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception { 073 if (destination.isQueue()) { 074 if (destination.isTemporary()) { 075 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; 076 Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory); 077 queue.initialize(); 078 return queue; 079 } else { 080 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination); 081 Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory); 082 configureQueue(queue, destination); 083 queue.initialize(); 084 return queue; 085 } 086 } else if (destination.isTemporary()) { 087 088 Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory); 089 topic.initialize(); 090 return topic; 091 } else { 092 TopicMessageStore store = null; 093 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 094 store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination); 095 } 096 Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory); 097 configureTopic(topic, destination); 098 topic.initialize(); 099 return topic; 100 } 101 } 102 103 public void removeDestination(Destination dest) { 104 ActiveMQDestination destination = dest.getActiveMQDestination(); 105 if (!destination.isTemporary()) { 106 if (destination.isQueue()) { 107 persistenceAdapter.removeQueueMessageStore((ActiveMQQueue) destination); 108 } 109 else if (!AdvisorySupport.isAdvisoryTopic(destination)) { 110 persistenceAdapter.removeTopicMessageStore((ActiveMQTopic) destination); 111 } 112 } 113 } 114 115 protected void configureQueue(Queue queue, ActiveMQDestination destination) { 116 if (broker == null) { 117 throw new IllegalStateException("broker property is not set"); 118 } 119 if (broker.getDestinationPolicy() != null) { 120 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 121 if (entry != null) { 122 entry.configure(broker,queue); 123 } 124 } 125 } 126 127 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 128 if (broker == null) { 129 throw new IllegalStateException("broker property is not set"); 130 } 131 if (broker.getDestinationPolicy() != null) { 132 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 133 if (entry != null) { 134 entry.configure(topic); 135 } 136 } 137 } 138 139 public long getLastMessageBrokerSequenceId() throws IOException { 140 return persistenceAdapter.getLastMessageBrokerSequenceId(); 141 } 142 143 public PersistenceAdapter getPersistenceAdapter() { 144 return persistenceAdapter; 145 } 146 147 public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException { 148 return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions(); 149 } 150 }