001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.camel.component; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.io.InterruptedIOException; 022 import java.util.concurrent.atomic.AtomicReference; 023 024 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 025 import org.apache.activemq.kaha.impl.async.Location; 026 import org.apache.activemq.util.ByteSequence; 027 import org.apache.camel.CamelExchangeException; 028 import org.apache.camel.Consumer; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.NoTypeConversionAvailableException; 031 import org.apache.camel.Processor; 032 import org.apache.camel.Producer; 033 import org.apache.camel.RuntimeCamelException; 034 import org.apache.camel.ExchangePattern; 035 import org.apache.camel.impl.DefaultConsumer; 036 import org.apache.camel.impl.DefaultEndpoint; 037 import org.apache.camel.impl.DefaultExchange; 038 import org.apache.camel.impl.DefaultProducer; 039 import org.apache.commons.logging.Log; 040 import org.apache.commons.logging.LogFactory; 041 042 public class JournalEndpoint extends DefaultEndpoint { 043 044 private static final transient Log LOG = LogFactory.getLog(JournalEndpoint.class); 045 046 private final File directory; 047 private final AtomicReference<DefaultConsumer> consumer = new AtomicReference<DefaultConsumer>(); 048 private final Object activationMutex = new Object(); 049 private int referenceCount; 050 private AsyncDataManager dataManager; 051 private Thread thread; 052 private Location lastReadLocation; 053 private long idleDelay = 1000; 054 private boolean syncProduce = true; 055 private boolean syncConsume; 056 057 public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) { 058 super(uri, journalComponent.getCamelContext()); 059 this.directory = directory; 060 } 061 062 public JournalEndpoint(String endpointUri, File directory) { 063 super(endpointUri); 064 this.directory = directory; 065 } 066 067 public boolean isSingleton() { 068 return true; 069 } 070 071 public File getDirectory() { 072 return directory; 073 } 074 075 public Consumer createConsumer(Processor processor) throws Exception { 076 return new DefaultConsumer(this, processor) { 077 @Override 078 public void start() throws Exception { 079 super.start(); 080 activateConsumer(this); 081 } 082 083 @Override 084 public void stop() throws Exception { 085 deactivateConsumer(this); 086 super.stop(); 087 } 088 }; 089 } 090 091 protected void decrementReference() throws IOException { 092 synchronized (activationMutex) { 093 referenceCount--; 094 if (referenceCount == 0) { 095 LOG.debug("Closing data manager: " + directory); 096 LOG.debug("Last mark at: " + lastReadLocation); 097 dataManager.close(); 098 dataManager = null; 099 } 100 } 101 } 102 103 protected void incrementReference() throws IOException { 104 synchronized (activationMutex) { 105 referenceCount++; 106 if (referenceCount == 1) { 107 LOG.debug("Opening data manager: " + directory); 108 dataManager = new AsyncDataManager(); 109 dataManager.setDirectory(directory); 110 dataManager.start(); 111 112 lastReadLocation = dataManager.getMark(); 113 LOG.debug("Last mark at: " + lastReadLocation); 114 } 115 } 116 } 117 118 protected void deactivateConsumer(DefaultConsumer consumer) throws IOException { 119 synchronized (activationMutex) { 120 if (this.consumer.get() != consumer) { 121 throw new RuntimeCamelException("Consumer was not active."); 122 } 123 this.consumer.set(null); 124 try { 125 thread.join(); 126 } catch (InterruptedException e) { 127 throw new InterruptedIOException(); 128 } 129 decrementReference(); 130 } 131 } 132 133 protected void activateConsumer(DefaultConsumer consumer) throws IOException { 134 synchronized (activationMutex) { 135 if (this.consumer.get() != null) { 136 throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer"); 137 } 138 incrementReference(); 139 this.consumer.set(consumer); 140 thread = new Thread() { 141 @Override 142 public void run() { 143 dispatchToConsumer(); 144 } 145 }; 146 thread.setName("Dipatch thread: " + getEndpointUri()); 147 thread.setDaemon(true); 148 thread.start(); 149 } 150 } 151 152 protected void dispatchToConsumer() { 153 try { 154 DefaultConsumer consumer; 155 while ((consumer = this.consumer.get()) != null) { 156 // See if there is a new record to process 157 Location location = dataManager.getNextLocation(lastReadLocation); 158 if (location != null) { 159 160 // Send it on. 161 ByteSequence read = dataManager.read(location); 162 Exchange exchange = createExchange(); 163 exchange.getIn().setBody(read); 164 exchange.getIn().setHeader("journal", getEndpointUri()); 165 exchange.getIn().setHeader("location", location); 166 consumer.getProcessor().process(exchange); 167 168 // Setting the mark makes the data manager forget about 169 // everything 170 // before that record. 171 if (LOG.isDebugEnabled()) { 172 LOG.debug("Consumed record at: " + location); 173 } 174 dataManager.setMark(location, syncConsume); 175 lastReadLocation = location; 176 } else { 177 // Avoid a tight CPU loop if there is no new record to read. 178 LOG.debug("Sleeping due to no records being available."); 179 Thread.sleep(idleDelay); 180 } 181 } 182 } catch (Throwable e) { 183 e.printStackTrace(); 184 } 185 } 186 187 public Producer createProducer() throws Exception { 188 return new DefaultProducer(this) { 189 public void process(Exchange exchange) throws Exception { 190 incrementReference(); 191 try { 192 ByteSequence body = exchange.getIn().getBody(ByteSequence.class); 193 if (body == null) { 194 byte[] bytes = exchange.getIn().getBody(byte[].class); 195 if (bytes != null) { 196 body = new ByteSequence(bytes); 197 } 198 } 199 if (body == null) { 200 throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange); 201 } 202 dataManager.write(body, syncProduce); 203 204 } finally { 205 decrementReference(); 206 } 207 } 208 }; 209 } 210 211 public boolean isSyncConsume() { 212 return syncConsume; 213 } 214 215 public void setSyncConsume(boolean syncConsume) { 216 this.syncConsume = syncConsume; 217 } 218 219 public boolean isSyncProduce() { 220 return syncProduce; 221 } 222 223 public void setSyncProduce(boolean syncProduce) { 224 this.syncProduce = syncProduce; 225 } 226 227 boolean isOpen() { 228 synchronized (activationMutex) { 229 return referenceCount > 0; 230 } 231 } 232 }