001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.DataInput; 020 import java.io.DataOutput; 021 import java.io.File; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.OutputStream; 025 import java.util.ArrayList; 026 import java.util.Collection; 027 import java.util.Date; 028 import java.util.HashMap; 029 import java.util.HashSet; 030 import java.util.Iterator; 031 import java.util.LinkedHashMap; 032 import java.util.List; 033 import java.util.SortedSet; 034 import java.util.TreeMap; 035 import java.util.TreeSet; 036 import java.util.Map.Entry; 037 import java.util.concurrent.atomic.AtomicBoolean; 038 039 import org.apache.activemq.broker.BrokerService; 040 import org.apache.activemq.broker.BrokerServiceAware; 041 import org.apache.activemq.command.ConnectionId; 042 import org.apache.activemq.command.LocalTransactionId; 043 import org.apache.activemq.command.SubscriptionInfo; 044 import org.apache.activemq.command.TransactionId; 045 import org.apache.activemq.command.XATransactionId; 046 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 047 import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 048 import org.apache.activemq.store.kahadb.data.KahaDestination; 049 import org.apache.activemq.store.kahadb.data.KahaEntryType; 050 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 051 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 052 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 053 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 054 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 055 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 056 import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 057 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 058 import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 059 import org.apache.activemq.util.Callback; 060 import org.apache.commons.logging.Log; 061 import org.apache.commons.logging.LogFactory; 062 import org.apache.kahadb.index.BTreeIndex; 063 import org.apache.kahadb.index.BTreeVisitor; 064 import org.apache.kahadb.journal.DataFile; 065 import org.apache.kahadb.journal.Journal; 066 import org.apache.kahadb.journal.Location; 067 import org.apache.kahadb.page.Page; 068 import org.apache.kahadb.page.PageFile; 069 import org.apache.kahadb.page.Transaction; 070 import org.apache.kahadb.util.ByteSequence; 071 import org.apache.kahadb.util.DataByteArrayInputStream; 072 import org.apache.kahadb.util.DataByteArrayOutputStream; 073 import org.apache.kahadb.util.LockFile; 074 import org.apache.kahadb.util.LongMarshaller; 075 import org.apache.kahadb.util.Marshaller; 076 import org.apache.kahadb.util.Sequence; 077 import org.apache.kahadb.util.SequenceSet; 078 import org.apache.kahadb.util.StringMarshaller; 079 import org.apache.kahadb.util.VariableMarshaller; 080 import org.springframework.core.enums.LetterCodedLabeledEnum; 081 082 public class MessageDatabase implements BrokerServiceAware { 083 084 private BrokerService brokerService; 085 086 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 087 public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500")); 088 089 private static final Log LOG = LogFactory.getLog(MessageDatabase.class); 090 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 091 092 public static final int CLOSED_STATE = 1; 093 public static final int OPEN_STATE = 2; 094 095 096 protected class Metadata { 097 protected Page<Metadata> page; 098 protected int state; 099 protected BTreeIndex<String, StoredDestination> destinations; 100 protected Location lastUpdate; 101 protected Location firstInProgressTransactionLocation; 102 103 public void read(DataInput is) throws IOException { 104 state = is.readInt(); 105 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 106 if (is.readBoolean()) { 107 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 108 } else { 109 lastUpdate = null; 110 } 111 if (is.readBoolean()) { 112 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 113 } else { 114 firstInProgressTransactionLocation = null; 115 } 116 } 117 118 public void write(DataOutput os) throws IOException { 119 os.writeInt(state); 120 os.writeLong(destinations.getPageId()); 121 122 if (lastUpdate != null) { 123 os.writeBoolean(true); 124 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 125 } else { 126 os.writeBoolean(false); 127 } 128 129 if (firstInProgressTransactionLocation != null) { 130 os.writeBoolean(true); 131 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 132 } else { 133 os.writeBoolean(false); 134 } 135 } 136 } 137 138 class MetadataMarshaller extends VariableMarshaller<Metadata> { 139 public Metadata readPayload(DataInput dataIn) throws IOException { 140 Metadata rc = new Metadata(); 141 rc.read(dataIn); 142 return rc; 143 } 144 145 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 146 object.write(dataOut); 147 } 148 } 149 150 protected PageFile pageFile; 151 protected Journal journal; 152 protected Metadata metadata = new Metadata(); 153 154 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 155 156 protected boolean failIfDatabaseIsLocked; 157 158 protected boolean deleteAllMessages; 159 protected File directory; 160 protected Thread checkpointThread; 161 protected boolean enableJournalDiskSyncs=true; 162 long checkpointInterval = 5*1000; 163 long cleanupInterval = 30*1000; 164 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 165 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 166 boolean enableIndexWriteAsync = false; 167 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 168 169 protected AtomicBoolean started = new AtomicBoolean(); 170 protected AtomicBoolean opened = new AtomicBoolean(); 171 private LockFile lockFile; 172 private boolean ignoreMissingJournalfiles = false; 173 private int indexCacheSize = 100; 174 private boolean checkForCorruptJournalFiles = false; 175 private boolean checksumJournalFiles = false; 176 177 public MessageDatabase() { 178 } 179 180 public void start() throws Exception { 181 if (started.compareAndSet(false, true)) { 182 load(); 183 } 184 } 185 186 public void stop() throws Exception { 187 if (started.compareAndSet(true, false)) { 188 unload(); 189 } 190 } 191 192 private void loadPageFile() throws IOException { 193 synchronized (indexMutex) { 194 final PageFile pageFile = getPageFile(); 195 pageFile.load(); 196 pageFile.tx().execute(new Transaction.Closure<IOException>() { 197 public void execute(Transaction tx) throws IOException { 198 if (pageFile.getPageCount() == 0) { 199 // First time this is created.. Initialize the metadata 200 Page<Metadata> page = tx.allocate(); 201 assert page.getPageId() == 0; 202 page.set(metadata); 203 metadata.page = page; 204 metadata.state = CLOSED_STATE; 205 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 206 207 tx.store(metadata.page, metadataMarshaller, true); 208 } else { 209 Page<Metadata> page = tx.load(0, metadataMarshaller); 210 metadata = page.get(); 211 metadata.page = page; 212 } 213 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 214 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 215 metadata.destinations.load(tx); 216 } 217 }); 218 pageFile.flush(); 219 220 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 221 // Perhaps we should just keep an index of file 222 storedDestinations.clear(); 223 pageFile.tx().execute(new Transaction.Closure<IOException>() { 224 public void execute(Transaction tx) throws IOException { 225 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 226 Entry<String, StoredDestination> entry = iterator.next(); 227 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 228 storedDestinations.put(entry.getKey(), sd); 229 } 230 } 231 }); 232 } 233 } 234 235 private void startCheckpoint() { 236 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { 237 public void run() { 238 try { 239 long lastCleanup = System.currentTimeMillis(); 240 long lastCheckpoint = System.currentTimeMillis(); 241 // Sleep for a short time so we can periodically check 242 // to see if we need to exit this thread. 243 long sleepTime = Math.min(checkpointInterval, 500); 244 while (opened.get()) { 245 246 Thread.sleep(sleepTime); 247 long now = System.currentTimeMillis(); 248 if( now - lastCleanup >= cleanupInterval ) { 249 checkpointCleanup(true); 250 lastCleanup = now; 251 lastCheckpoint = now; 252 } else if( now - lastCheckpoint >= checkpointInterval ) { 253 checkpointCleanup(false); 254 lastCheckpoint = now; 255 } 256 } 257 } catch (InterruptedException e) { 258 // Looks like someone really wants us to exit this thread... 259 } catch (IOException ioe) { 260 LOG.error("Checkpoint failed", ioe); 261 brokerService.handleIOException(ioe); 262 } 263 } 264 265 }; 266 checkpointThread.setDaemon(true); 267 checkpointThread.start(); 268 } 269 270 /** 271 * @throws IOException 272 */ 273 public void open() throws IOException { 274 if( opened.compareAndSet(false, true) ) { 275 getJournal().start(); 276 277 loadPageFile(); 278 279 startCheckpoint(); 280 recover(); 281 } 282 } 283 284 private void lock() throws IOException { 285 if( lockFile == null ) { 286 File lockFileName = new File(directory, "lock"); 287 lockFile = new LockFile(lockFileName, true); 288 if (failIfDatabaseIsLocked) { 289 lockFile.lock(); 290 } else { 291 while (true) { 292 try { 293 lockFile.lock(); 294 break; 295 } catch (IOException e) { 296 LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked. Reason: " + e); 297 try { 298 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 299 } catch (InterruptedException e1) { 300 } 301 } 302 } 303 } 304 } 305 } 306 307 public void load() throws IOException { 308 309 synchronized (indexMutex) { 310 lock(); 311 if (deleteAllMessages) { 312 getJournal().start(); 313 getJournal().delete(); 314 getJournal().close(); 315 journal = null; 316 getPageFile().delete(); 317 LOG.info("Persistence store purged."); 318 deleteAllMessages = false; 319 } 320 321 open(); 322 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 323 324 } 325 326 } 327 328 329 public void close() throws IOException, InterruptedException { 330 if( opened.compareAndSet(true, false)) { 331 synchronized (indexMutex) { 332 pageFile.unload(); 333 metadata = new Metadata(); 334 } 335 journal.close(); 336 checkpointThread.join(); 337 lockFile.unlock(); 338 lockFile=null; 339 } 340 } 341 342 public void unload() throws IOException, InterruptedException { 343 synchronized (indexMutex) { 344 if( pageFile != null && pageFile.isLoaded() ) { 345 metadata.state = CLOSED_STATE; 346 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); 347 348 pageFile.tx().execute(new Transaction.Closure<IOException>() { 349 public void execute(Transaction tx) throws IOException { 350 tx.store(metadata.page, metadataMarshaller, true); 351 } 352 }); 353 } 354 } 355 close(); 356 } 357 358 /** 359 * @return 360 */ 361 private Location getFirstInProgressTxLocation() { 362 Location l = null; 363 if (!inflightTransactions.isEmpty()) { 364 l = inflightTransactions.values().iterator().next().get(0).getLocation(); 365 } 366 if (!preparedTransactions.isEmpty()) { 367 Location t = preparedTransactions.values().iterator().next().get(0).getLocation(); 368 if (l==null || t.compareTo(l) <= 0) { 369 l = t; 370 } 371 } 372 return l; 373 } 374 375 /** 376 * Move all the messages that were in the journal into long term storage. We 377 * just replay and do a checkpoint. 378 * 379 * @throws IOException 380 * @throws IOException 381 * @throws IllegalStateException 382 */ 383 private void recover() throws IllegalStateException, IOException { 384 synchronized (indexMutex) { 385 long start = System.currentTimeMillis(); 386 387 Location recoveryPosition = getRecoveryPosition(); 388 if( recoveryPosition!=null ) { 389 int redoCounter = 0; 390 while (recoveryPosition != null) { 391 JournalCommand message = load(recoveryPosition); 392 metadata.lastUpdate = recoveryPosition; 393 process(message, recoveryPosition); 394 redoCounter++; 395 recoveryPosition = journal.getNextLocation(recoveryPosition); 396 } 397 long end = System.currentTimeMillis(); 398 LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 399 } 400 401 // We may have to undo some index updates. 402 pageFile.tx().execute(new Transaction.Closure<IOException>() { 403 public void execute(Transaction tx) throws IOException { 404 recoverIndex(tx); 405 } 406 }); 407 } 408 } 409 410 protected void recoverIndex(Transaction tx) throws IOException { 411 long start = System.currentTimeMillis(); 412 // It is possible index updates got applied before the journal updates.. 413 // in that case we need to removed references to messages that are not in the journal 414 final Location lastAppendLocation = journal.getLastAppendLocation(); 415 long undoCounter=0; 416 417 // Go through all the destinations to see if they have messages past the lastAppendLocation 418 for (StoredDestination sd : storedDestinations.values()) { 419 420 final ArrayList<Long> matches = new ArrayList<Long>(); 421 // Find all the Locations that are >= than the last Append Location. 422 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 423 @Override 424 protected void matched(Location key, Long value) { 425 matches.add(value); 426 } 427 }); 428 429 430 for (Long sequenceId : matches) { 431 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 432 sd.locationIndex.remove(tx, keys.location); 433 sd.messageIdIndex.remove(tx, keys.messageId); 434 undoCounter++; 435 // TODO: do we need to modify the ack positions for the pub sub case? 436 } 437 } 438 439 long end = System.currentTimeMillis(); 440 if( undoCounter > 0 ) { 441 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 442 // should do sync writes to the journal. 443 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 444 } 445 446 undoCounter = 0; 447 start = System.currentTimeMillis(); 448 449 // Lets be extra paranoid here and verify that all the datafiles being referenced 450 // by the indexes still exists. 451 452 final SequenceSet ss = new SequenceSet(); 453 for (StoredDestination sd : storedDestinations.values()) { 454 // Use a visitor to cut down the number of pages that we load 455 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 456 int last=-1; 457 458 public boolean isInterestedInKeysBetween(Location first, Location second) { 459 if( first==null ) { 460 return !ss.contains(0, second.getDataFileId()); 461 } else if( second==null ) { 462 return true; 463 } else { 464 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 465 } 466 } 467 468 public void visit(List<Location> keys, List<Long> values) { 469 for (Location l : keys) { 470 int fileId = l.getDataFileId(); 471 if( last != fileId ) { 472 ss.add(fileId); 473 last = fileId; 474 } 475 } 476 } 477 478 }); 479 } 480 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 481 while( !ss.isEmpty() ) { 482 missingJournalFiles.add( (int)ss.removeFirst() ); 483 } 484 missingJournalFiles.removeAll( journal.getFileMap().keySet() ); 485 486 if( !missingJournalFiles.isEmpty() ) { 487 LOG.info("Some journal files are missing: "+missingJournalFiles); 488 } 489 490 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 491 for (Integer missing : missingJournalFiles) { 492 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0))); 493 } 494 495 if ( checkForCorruptJournalFiles ) { 496 Collection<DataFile> dataFiles = journal.getFileMap().values(); 497 for (DataFile dataFile : dataFiles) { 498 int id = dataFile.getDataFileId(); 499 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0))); 500 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 501 while( seq!=null ) { 502 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1))); 503 seq = seq.getNext(); 504 } 505 } 506 } 507 508 if( !missingPredicates.isEmpty() ) { 509 for (StoredDestination sd : storedDestinations.values()) { 510 511 final ArrayList<Long> matches = new ArrayList<Long>(); 512 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 513 protected void matched(Location key, Long value) { 514 matches.add(value); 515 } 516 }); 517 518 // If somes message references are affected by the missing data files... 519 if( !matches.isEmpty() ) { 520 521 // We either 'gracefully' recover dropping the missing messages or 522 // we error out. 523 if( ignoreMissingJournalfiles ) { 524 // Update the index to remove the references to the missing data 525 for (Long sequenceId : matches) { 526 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 527 sd.locationIndex.remove(tx, keys.location); 528 sd.messageIdIndex.remove(tx, keys.messageId); 529 undoCounter++; 530 // TODO: do we need to modify the ack positions for the pub sub case? 531 } 532 533 } else { 534 throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); 535 } 536 } 537 } 538 } 539 540 end = System.currentTimeMillis(); 541 if( undoCounter > 0 ) { 542 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 543 // should do sync writes to the journal. 544 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 545 } 546 } 547 548 private Location nextRecoveryPosition; 549 private Location lastRecoveryPosition; 550 551 public void incrementalRecover() throws IOException { 552 synchronized (indexMutex) { 553 if( nextRecoveryPosition == null ) { 554 if( lastRecoveryPosition==null ) { 555 nextRecoveryPosition = getRecoveryPosition(); 556 } else { 557 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 558 } 559 } 560 while (nextRecoveryPosition != null) { 561 lastRecoveryPosition = nextRecoveryPosition; 562 metadata.lastUpdate = lastRecoveryPosition; 563 JournalCommand message = load(lastRecoveryPosition); 564 process(message, lastRecoveryPosition); 565 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 566 } 567 } 568 } 569 570 public Location getLastUpdatePosition() throws IOException { 571 return metadata.lastUpdate; 572 } 573 574 private Location getRecoveryPosition() throws IOException { 575 576 // If we need to recover the transactions.. 577 if (metadata.firstInProgressTransactionLocation != null) { 578 return metadata.firstInProgressTransactionLocation; 579 } 580 581 // Perhaps there were no transactions... 582 if( metadata.lastUpdate!=null) { 583 // Start replay at the record after the last one recorded in the index file. 584 return journal.getNextLocation(metadata.lastUpdate); 585 } 586 587 // This loads the first position. 588 return journal.getNextLocation(null); 589 } 590 591 protected void checkpointCleanup(final boolean cleanup) throws IOException { 592 long start = System.currentTimeMillis(); 593 synchronized (indexMutex) { 594 if( !opened.get() ) { 595 return; 596 } 597 pageFile.tx().execute(new Transaction.Closure<IOException>() { 598 public void execute(Transaction tx) throws IOException { 599 checkpointUpdate(tx, cleanup); 600 } 601 }); 602 } 603 long end = System.currentTimeMillis(); 604 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 605 LOG.info("Slow KahaDB access: cleanup took "+(end-start)); 606 } 607 } 608 609 610 public void checkpoint(Callback closure) throws Exception { 611 synchronized (indexMutex) { 612 pageFile.tx().execute(new Transaction.Closure<IOException>() { 613 public void execute(Transaction tx) throws IOException { 614 checkpointUpdate(tx, false); 615 } 616 }); 617 closure.execute(); 618 } 619 } 620 621 // ///////////////////////////////////////////////////////////////// 622 // Methods call by the broker to update and query the store. 623 // ///////////////////////////////////////////////////////////////// 624 public Location store(JournalCommand data) throws IOException { 625 return store(data, false); 626 } 627 628 /** 629 * All updated are are funneled through this method. The updates are converted 630 * to a JournalMessage which is logged to the journal and then the data from 631 * the JournalMessage is used to update the index just like it would be done 632 * during a recovery process. 633 */ 634 public Location store(JournalCommand data, boolean sync) throws IOException { 635 try { 636 int size = data.serializedSizeFramed(); 637 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 638 os.writeByte(data.type().getNumber()); 639 data.writeFramed(os); 640 641 long start = System.currentTimeMillis(); 642 Location location = journal.write(os.toByteSequence(), sync); 643 long start2 = System.currentTimeMillis(); 644 process(data, location); 645 long end = System.currentTimeMillis(); 646 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 647 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 648 } 649 650 synchronized (indexMutex) { 651 metadata.lastUpdate = location; 652 } 653 if (!checkpointThread.isAlive()) { 654 LOG.info("KahaDB: Recovering checkpoint thread after exception"); 655 startCheckpoint(); 656 } 657 return location; 658 } catch (IOException ioe) { 659 LOG.error("KahaDB failed to store to Journal", ioe); 660 brokerService.handleIOException(ioe); 661 throw ioe; 662 } 663 } 664 665 /** 666 * Loads a previously stored JournalMessage 667 * 668 * @param location 669 * @return 670 * @throws IOException 671 */ 672 public JournalCommand load(Location location) throws IOException { 673 ByteSequence data = journal.read(location); 674 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 675 byte readByte = is.readByte(); 676 KahaEntryType type = KahaEntryType.valueOf(readByte); 677 if( type == null ) { 678 throw new IOException("Could not load journal record. Invalid location: "+location); 679 } 680 JournalCommand message = (JournalCommand)type.createMessage(); 681 message.mergeFramed(is); 682 return message; 683 } 684 685 // ///////////////////////////////////////////////////////////////// 686 // Journaled record processing methods. Once the record is journaled, 687 // these methods handle applying the index updates. These may be called 688 // from the recovery method too so they need to be idempotent 689 // ///////////////////////////////////////////////////////////////// 690 691 private void process(JournalCommand data, final Location location) throws IOException { 692 data.visit(new Visitor() { 693 @Override 694 public void visit(KahaAddMessageCommand command) throws IOException { 695 process(command, location); 696 } 697 698 @Override 699 public void visit(KahaRemoveMessageCommand command) throws IOException { 700 process(command, location); 701 } 702 703 @Override 704 public void visit(KahaPrepareCommand command) throws IOException { 705 process(command, location); 706 } 707 708 @Override 709 public void visit(KahaCommitCommand command) throws IOException { 710 process(command, location); 711 } 712 713 @Override 714 public void visit(KahaRollbackCommand command) throws IOException { 715 process(command, location); 716 } 717 718 @Override 719 public void visit(KahaRemoveDestinationCommand command) throws IOException { 720 process(command, location); 721 } 722 723 @Override 724 public void visit(KahaSubscriptionCommand command) throws IOException { 725 process(command, location); 726 } 727 }); 728 } 729 730 private void process(final KahaAddMessageCommand command, final Location location) throws IOException { 731 if (command.hasTransactionInfo()) { 732 synchronized (indexMutex) { 733 ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location); 734 inflightTx.add(new AddOpperation(command, location)); 735 } 736 } else { 737 synchronized (indexMutex) { 738 pageFile.tx().execute(new Transaction.Closure<IOException>() { 739 public void execute(Transaction tx) throws IOException { 740 upadateIndex(tx, command, location); 741 } 742 }); 743 } 744 } 745 } 746 747 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 748 if (command.hasTransactionInfo()) { 749 synchronized (indexMutex) { 750 ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location); 751 inflightTx.add(new RemoveOpperation(command, location)); 752 } 753 } else { 754 synchronized (indexMutex) { 755 pageFile.tx().execute(new Transaction.Closure<IOException>() { 756 public void execute(Transaction tx) throws IOException { 757 updateIndex(tx, command, location); 758 } 759 }); 760 } 761 } 762 763 } 764 765 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 766 synchronized (indexMutex) { 767 pageFile.tx().execute(new Transaction.Closure<IOException>() { 768 public void execute(Transaction tx) throws IOException { 769 updateIndex(tx, command, location); 770 } 771 }); 772 } 773 } 774 775 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 776 synchronized (indexMutex) { 777 pageFile.tx().execute(new Transaction.Closure<IOException>() { 778 public void execute(Transaction tx) throws IOException { 779 updateIndex(tx, command, location); 780 } 781 }); 782 } 783 } 784 785 protected void process(KahaCommitCommand command, Location location) throws IOException { 786 TransactionId key = key(command.getTransactionInfo()); 787 synchronized (indexMutex) { 788 ArrayList<Operation> inflightTx = inflightTransactions.remove(key); 789 if (inflightTx == null) { 790 inflightTx = preparedTransactions.remove(key); 791 } 792 if (inflightTx == null) { 793 return; 794 } 795 796 final ArrayList<Operation> messagingTx = inflightTx; 797 pageFile.tx().execute(new Transaction.Closure<IOException>() { 798 public void execute(Transaction tx) throws IOException { 799 for (Operation op : messagingTx) { 800 op.execute(tx); 801 } 802 } 803 }); 804 } 805 } 806 807 protected void process(KahaPrepareCommand command, Location location) { 808 synchronized (indexMutex) { 809 TransactionId key = key(command.getTransactionInfo()); 810 ArrayList<Operation> tx = inflightTransactions.remove(key); 811 if (tx != null) { 812 preparedTransactions.put(key, tx); 813 } 814 } 815 } 816 817 protected void process(KahaRollbackCommand command, Location location) { 818 synchronized (indexMutex) { 819 TransactionId key = key(command.getTransactionInfo()); 820 ArrayList<Operation> tx = inflightTransactions.remove(key); 821 if (tx == null) { 822 preparedTransactions.remove(key); 823 } 824 } 825 } 826 827 // ///////////////////////////////////////////////////////////////// 828 // These methods do the actual index updates. 829 // ///////////////////////////////////////////////////////////////// 830 831 protected final Object indexMutex = new Object(); 832 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 833 834 private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 835 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 836 837 // Skip adding the message to the index if this is a topic and there are 838 // no subscriptions. 839 if (sd.subscriptions != null && sd.ackPositions.isEmpty()) { 840 return; 841 } 842 843 // Add the message. 844 long id = sd.nextMessageId++; 845 Long previous = sd.locationIndex.put(tx, location, id); 846 if( previous == null ) { 847 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 848 if( previous == null ) { 849 sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location)); 850 } else { 851 // If the message ID as indexed, then the broker asked us to store a DUP 852 // message. Bad BOY! Don't do it, and log a warning. 853 LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId()); 854 // TODO: consider just rolling back the tx. 855 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 856 } 857 } else { 858 // restore the previous value.. Looks like this was a redo of a previously 859 // added message. We don't want to assign it a new id as the other indexes would 860 // be wrong.. 861 // 862 // TODO: consider just rolling back the tx. 863 sd.locationIndex.put(tx, location, previous); 864 } 865 866 } 867 868 private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 869 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 870 if (!command.hasSubscriptionKey()) { 871 872 // In the queue case we just remove the message from the index.. 873 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 874 if (sequenceId != null) { 875 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 876 if (keys != null) { 877 sd.locationIndex.remove(tx, keys.location); 878 } 879 } 880 } else { 881 // In the topic case we need remove the message once it's been acked 882 // by all the subs 883 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 884 885 // Make sure it's a valid message id... 886 if (sequence != null) { 887 String subscriptionKey = command.getSubscriptionKey(); 888 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence); 889 890 // The following method handles deleting un-referenced messages. 891 removeAckLocation(tx, sd, subscriptionKey, prev); 892 893 // Add it to the new location set. 894 addAckLocation(sd, sequence, subscriptionKey); 895 } 896 897 } 898 } 899 900 private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 901 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 902 sd.orderIndex.clear(tx); 903 sd.orderIndex.unload(tx); 904 tx.free(sd.orderIndex.getPageId()); 905 906 sd.locationIndex.clear(tx); 907 sd.locationIndex.unload(tx); 908 tx.free(sd.locationIndex.getPageId()); 909 910 sd.messageIdIndex.clear(tx); 911 sd.messageIdIndex.unload(tx); 912 tx.free(sd.messageIdIndex.getPageId()); 913 914 if (sd.subscriptions != null) { 915 sd.subscriptions.clear(tx); 916 sd.subscriptions.unload(tx); 917 tx.free(sd.subscriptions.getPageId()); 918 919 sd.subscriptionAcks.clear(tx); 920 sd.subscriptionAcks.unload(tx); 921 tx.free(sd.subscriptionAcks.getPageId()); 922 } 923 924 String key = key(command.getDestination()); 925 storedDestinations.remove(key); 926 metadata.destinations.remove(tx, key); 927 } 928 929 private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 930 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 931 932 // If set then we are creating it.. otherwise we are destroying the sub 933 if (command.hasSubscriptionInfo()) { 934 String subscriptionKey = command.getSubscriptionKey(); 935 sd.subscriptions.put(tx, subscriptionKey, command); 936 long ackLocation=-1; 937 if (!command.getRetroactive()) { 938 ackLocation = sd.nextMessageId-1; 939 } 940 941 sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation); 942 addAckLocation(sd, ackLocation, subscriptionKey); 943 } else { 944 // delete the sub... 945 String subscriptionKey = command.getSubscriptionKey(); 946 sd.subscriptions.remove(tx, subscriptionKey); 947 Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey); 948 if( prev!=null ) { 949 removeAckLocation(tx, sd, subscriptionKey, prev); 950 } 951 } 952 953 } 954 955 /** 956 * @param tx 957 * @throws IOException 958 */ 959 private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 960 961 LOG.debug("Checkpoint started."); 962 963 metadata.state = OPEN_STATE; 964 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); 965 tx.store(metadata.page, metadataMarshaller, true); 966 pageFile.flush(); 967 968 if( cleanup ) { 969 970 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 971 972 // Don't GC files under replication 973 if( journalFilesBeingReplicated!=null ) { 974 gcCandidateSet.removeAll(journalFilesBeingReplicated); 975 } 976 977 // Don't GC files after the first in progress tx 978 Location firstTxLocation = metadata.lastUpdate; 979 if( metadata.firstInProgressTransactionLocation!=null ) { 980 firstTxLocation = metadata.firstInProgressTransactionLocation; 981 } 982 983 if( firstTxLocation!=null ) { 984 while( !gcCandidateSet.isEmpty() ) { 985 Integer last = gcCandidateSet.last(); 986 if( last >= firstTxLocation.getDataFileId() ) { 987 gcCandidateSet.remove(last); 988 } else { 989 break; 990 } 991 } 992 } 993 994 // Go through all the destinations to see if any of them can remove GC candidates. 995 for (StoredDestination sd : storedDestinations.values()) { 996 if( gcCandidateSet.isEmpty() ) { 997 break; 998 } 999 1000 // Use a visitor to cut down the number of pages that we load 1001 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1002 int last=-1; 1003 public boolean isInterestedInKeysBetween(Location first, Location second) { 1004 if( first==null ) { 1005 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1006 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1007 subset.remove(second.getDataFileId()); 1008 } 1009 return !subset.isEmpty(); 1010 } else if( second==null ) { 1011 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1012 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1013 subset.remove(first.getDataFileId()); 1014 } 1015 return !subset.isEmpty(); 1016 } else { 1017 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1018 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1019 subset.remove(first.getDataFileId()); 1020 } 1021 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1022 subset.remove(second.getDataFileId()); 1023 } 1024 return !subset.isEmpty(); 1025 } 1026 } 1027 1028 public void visit(List<Location> keys, List<Long> values) { 1029 for (Location l : keys) { 1030 int fileId = l.getDataFileId(); 1031 if( last != fileId ) { 1032 gcCandidateSet.remove(fileId); 1033 last = fileId; 1034 } 1035 } 1036 } 1037 1038 }); 1039 } 1040 1041 if( !gcCandidateSet.isEmpty() ) { 1042 LOG.debug("Cleanup removing the data files: "+gcCandidateSet); 1043 journal.removeDataFiles(gcCandidateSet); 1044 } 1045 } 1046 1047 LOG.debug("Checkpoint done."); 1048 } 1049 1050 public HashSet<Integer> getJournalFilesBeingReplicated() { 1051 return journalFilesBeingReplicated; 1052 } 1053 1054 // ///////////////////////////////////////////////////////////////// 1055 // StoredDestination related implementation methods. 1056 // ///////////////////////////////////////////////////////////////// 1057 1058 1059 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 1060 1061 class StoredSubscription { 1062 SubscriptionInfo subscriptionInfo; 1063 String lastAckId; 1064 Location lastAckLocation; 1065 Location cursor; 1066 } 1067 1068 static class MessageKeys { 1069 final String messageId; 1070 final Location location; 1071 1072 public MessageKeys(String messageId, Location location) { 1073 this.messageId=messageId; 1074 this.location=location; 1075 } 1076 1077 @Override 1078 public String toString() { 1079 return "["+messageId+","+location+"]"; 1080 } 1081 } 1082 1083 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 1084 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 1085 1086 public MessageKeys readPayload(DataInput dataIn) throws IOException { 1087 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); 1088 } 1089 1090 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 1091 dataOut.writeUTF(object.messageId); 1092 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); 1093 } 1094 } 1095 1096 static class StoredDestination { 1097 long nextMessageId; 1098 BTreeIndex<Long, MessageKeys> orderIndex; 1099 BTreeIndex<Location, Long> locationIndex; 1100 BTreeIndex<String, Long> messageIdIndex; 1101 1102 // These bits are only set for Topics 1103 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 1104 BTreeIndex<String, Long> subscriptionAcks; 1105 HashMap<String, Long> subscriptionCursors; 1106 TreeMap<Long, HashSet<String>> ackPositions; 1107 } 1108 1109 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 1110 1111 public StoredDestination readPayload(DataInput dataIn) throws IOException { 1112 StoredDestination value = new StoredDestination(); 1113 value.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1114 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 1115 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 1116 1117 if (dataIn.readBoolean()) { 1118 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 1119 value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 1120 } 1121 return value; 1122 } 1123 1124 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 1125 dataOut.writeLong(value.orderIndex.getPageId()); 1126 dataOut.writeLong(value.locationIndex.getPageId()); 1127 dataOut.writeLong(value.messageIdIndex.getPageId()); 1128 if (value.subscriptions != null) { 1129 dataOut.writeBoolean(true); 1130 dataOut.writeLong(value.subscriptions.getPageId()); 1131 dataOut.writeLong(value.subscriptionAcks.getPageId()); 1132 } else { 1133 dataOut.writeBoolean(false); 1134 } 1135 } 1136 } 1137 1138 static class LocationMarshaller implements Marshaller<Location> { 1139 final static LocationMarshaller INSTANCE = new LocationMarshaller(); 1140 1141 public Location readPayload(DataInput dataIn) throws IOException { 1142 Location rc = new Location(); 1143 rc.setDataFileId(dataIn.readInt()); 1144 rc.setOffset(dataIn.readInt()); 1145 return rc; 1146 } 1147 1148 public void writePayload(Location object, DataOutput dataOut) throws IOException { 1149 dataOut.writeInt(object.getDataFileId()); 1150 dataOut.writeInt(object.getOffset()); 1151 } 1152 1153 public int getFixedSize() { 1154 return 8; 1155 } 1156 1157 public Location deepCopy(Location source) { 1158 return new Location(source); 1159 } 1160 1161 public boolean isDeepCopySupported() { 1162 return true; 1163 } 1164 } 1165 1166 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 1167 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 1168 1169 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 1170 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 1171 rc.mergeFramed((InputStream)dataIn); 1172 return rc; 1173 } 1174 1175 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 1176 object.writeFramed((OutputStream)dataOut); 1177 } 1178 } 1179 1180 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 1181 String key = key(destination); 1182 StoredDestination rc = storedDestinations.get(key); 1183 if (rc == null) { 1184 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 1185 rc = loadStoredDestination(tx, key, topic); 1186 // Cache it. We may want to remove/unload destinations from the 1187 // cache that are not used for a while 1188 // to reduce memory usage. 1189 storedDestinations.put(key, rc); 1190 } 1191 return rc; 1192 } 1193 1194 /** 1195 * @param tx 1196 * @param key 1197 * @param topic 1198 * @return 1199 * @throws IOException 1200 */ 1201 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 1202 // Try to load the existing indexes.. 1203 StoredDestination rc = metadata.destinations.get(tx, key); 1204 if (rc == null) { 1205 // Brand new destination.. allocate indexes for it. 1206 rc = new StoredDestination(); 1207 rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 1208 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 1209 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 1210 1211 if (topic) { 1212 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 1213 rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 1214 } 1215 metadata.destinations.put(tx, key, rc); 1216 } 1217 1218 // Configure the marshalers and load. 1219 rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 1220 rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 1221 rc.orderIndex.load(tx); 1222 1223 // Figure out the next key using the last entry in the destination. 1224 Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx); 1225 if( lastEntry!=null ) { 1226 rc.nextMessageId = lastEntry.getKey()+1; 1227 } 1228 1229 rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE); 1230 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 1231 rc.locationIndex.load(tx); 1232 1233 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 1234 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 1235 rc.messageIdIndex.load(tx); 1236 1237 // If it was a topic... 1238 if (topic) { 1239 1240 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 1241 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 1242 rc.subscriptions.load(tx); 1243 1244 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 1245 rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE); 1246 rc.subscriptionAcks.load(tx); 1247 1248 rc.ackPositions = new TreeMap<Long, HashSet<String>>(); 1249 rc.subscriptionCursors = new HashMap<String, Long>(); 1250 1251 for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 1252 Entry<String, Long> entry = iterator.next(); 1253 addAckLocation(rc, entry.getValue(), entry.getKey()); 1254 } 1255 1256 } 1257 return rc; 1258 } 1259 1260 /** 1261 * @param sd 1262 * @param messageSequence 1263 * @param subscriptionKey 1264 */ 1265 private void addAckLocation(StoredDestination sd, Long messageSequence, String subscriptionKey) { 1266 HashSet<String> hs = sd.ackPositions.get(messageSequence); 1267 if (hs == null) { 1268 hs = new HashSet<String>(); 1269 sd.ackPositions.put(messageSequence, hs); 1270 } 1271 hs.add(subscriptionKey); 1272 } 1273 1274 /** 1275 * @param tx 1276 * @param sd 1277 * @param subscriptionKey 1278 * @param sequenceId 1279 * @throws IOException 1280 */ 1281 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { 1282 // Remove the sub from the previous location set.. 1283 if (sequenceId != null) { 1284 HashSet<String> hs = sd.ackPositions.get(sequenceId); 1285 if (hs != null) { 1286 hs.remove(subscriptionKey); 1287 if (hs.isEmpty()) { 1288 HashSet<String> firstSet = sd.ackPositions.values().iterator().next(); 1289 sd.ackPositions.remove(sequenceId); 1290 1291 // Did we just empty out the first set in the 1292 // ordered list of ack locations? Then it's time to 1293 // delete some messages. 1294 if (hs == firstSet) { 1295 1296 // Find all the entries that need to get deleted. 1297 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 1298 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 1299 Entry<Long, MessageKeys> entry = iterator.next(); 1300 if (entry.getKey().compareTo(sequenceId) <= 0) { 1301 // We don't do the actually delete while we are 1302 // iterating the BTree since 1303 // iterating would fail. 1304 deletes.add(entry); 1305 }else { 1306 //no point in iterating the in-order sequences anymore 1307 break; 1308 } 1309 } 1310 1311 // Do the actual deletes. 1312 for (Entry<Long, MessageKeys> entry : deletes) { 1313 sd.locationIndex.remove(tx, entry.getValue().location); 1314 sd.messageIdIndex.remove(tx,entry.getValue().messageId); 1315 sd.orderIndex.remove(tx,entry.getKey()); 1316 } 1317 } 1318 } 1319 } 1320 } 1321 } 1322 1323 private String key(KahaDestination destination) { 1324 return destination.getType().getNumber() + ":" + destination.getName(); 1325 } 1326 1327 // ///////////////////////////////////////////////////////////////// 1328 // Transaction related implementation methods. 1329 // ///////////////////////////////////////////////////////////////// 1330 protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>(); 1331 protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>(); 1332 1333 private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) { 1334 TransactionId key = key(info); 1335 ArrayList<Operation> tx = inflightTransactions.get(key); 1336 if (tx == null) { 1337 tx = new ArrayList<Operation>(); 1338 inflightTransactions.put(key, tx); 1339 } 1340 return tx; 1341 } 1342 1343 private TransactionId key(KahaTransactionInfo transactionInfo) { 1344 if (transactionInfo.hasLocalTransacitonId()) { 1345 KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId(); 1346 LocalTransactionId rc = new LocalTransactionId(); 1347 rc.setConnectionId(new ConnectionId(tx.getConnectionId())); 1348 rc.setValue(tx.getTransacitonId()); 1349 return rc; 1350 } else { 1351 KahaXATransactionId tx = transactionInfo.getXaTransacitonId(); 1352 XATransactionId rc = new XATransactionId(); 1353 rc.setBranchQualifier(tx.getBranchQualifier().toByteArray()); 1354 rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray()); 1355 rc.setFormatId(tx.getFormatId()); 1356 return rc; 1357 } 1358 } 1359 1360 abstract class Operation { 1361 final Location location; 1362 1363 public Operation(Location location) { 1364 this.location = location; 1365 } 1366 1367 public Location getLocation() { 1368 return location; 1369 } 1370 1371 abstract public void execute(Transaction tx) throws IOException; 1372 } 1373 1374 class AddOpperation extends Operation { 1375 final KahaAddMessageCommand command; 1376 1377 public AddOpperation(KahaAddMessageCommand command, Location location) { 1378 super(location); 1379 this.command = command; 1380 } 1381 1382 public void execute(Transaction tx) throws IOException { 1383 upadateIndex(tx, command, location); 1384 } 1385 1386 public KahaAddMessageCommand getCommand() { 1387 return command; 1388 } 1389 } 1390 1391 class RemoveOpperation extends Operation { 1392 final KahaRemoveMessageCommand command; 1393 1394 public RemoveOpperation(KahaRemoveMessageCommand command, Location location) { 1395 super(location); 1396 this.command = command; 1397 } 1398 1399 public void execute(Transaction tx) throws IOException { 1400 updateIndex(tx, command, location); 1401 } 1402 1403 public KahaRemoveMessageCommand getCommand() { 1404 return command; 1405 } 1406 } 1407 1408 // ///////////////////////////////////////////////////////////////// 1409 // Initialization related implementation methods. 1410 // ///////////////////////////////////////////////////////////////// 1411 1412 private PageFile createPageFile() { 1413 PageFile index = new PageFile(directory, "db"); 1414 index.setEnableWriteThread(isEnableIndexWriteAsync()); 1415 index.setWriteBatchSize(getIndexWriteBatchSize()); 1416 index.setPageCacheSize(indexCacheSize); 1417 return index; 1418 } 1419 1420 private Journal createJournal() { 1421 Journal manager = new Journal(); 1422 manager.setDirectory(directory); 1423 manager.setMaxFileLength(getJournalMaxFileLength()); 1424 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 1425 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 1426 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 1427 return manager; 1428 } 1429 1430 public int getJournalMaxWriteBatchSize() { 1431 return journalMaxWriteBatchSize; 1432 } 1433 1434 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 1435 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 1436 } 1437 1438 public File getDirectory() { 1439 return directory; 1440 } 1441 1442 public void setDirectory(File directory) { 1443 this.directory = directory; 1444 } 1445 1446 public boolean isDeleteAllMessages() { 1447 return deleteAllMessages; 1448 } 1449 1450 public void setDeleteAllMessages(boolean deleteAllMessages) { 1451 this.deleteAllMessages = deleteAllMessages; 1452 } 1453 1454 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 1455 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 1456 } 1457 1458 public int getIndexWriteBatchSize() { 1459 return setIndexWriteBatchSize; 1460 } 1461 1462 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 1463 this.enableIndexWriteAsync = enableIndexWriteAsync; 1464 } 1465 1466 boolean isEnableIndexWriteAsync() { 1467 return enableIndexWriteAsync; 1468 } 1469 1470 public boolean isEnableJournalDiskSyncs() { 1471 return enableJournalDiskSyncs; 1472 } 1473 1474 public void setEnableJournalDiskSyncs(boolean syncWrites) { 1475 this.enableJournalDiskSyncs = syncWrites; 1476 } 1477 1478 public long getCheckpointInterval() { 1479 return checkpointInterval; 1480 } 1481 1482 public void setCheckpointInterval(long checkpointInterval) { 1483 this.checkpointInterval = checkpointInterval; 1484 } 1485 1486 public long getCleanupInterval() { 1487 return cleanupInterval; 1488 } 1489 1490 public void setCleanupInterval(long cleanupInterval) { 1491 this.cleanupInterval = cleanupInterval; 1492 } 1493 1494 public void setJournalMaxFileLength(int journalMaxFileLength) { 1495 this.journalMaxFileLength = journalMaxFileLength; 1496 } 1497 1498 public int getJournalMaxFileLength() { 1499 return journalMaxFileLength; 1500 } 1501 1502 public PageFile getPageFile() { 1503 if (pageFile == null) { 1504 pageFile = createPageFile(); 1505 } 1506 return pageFile; 1507 } 1508 1509 public Journal getJournal() { 1510 if (journal == null) { 1511 journal = createJournal(); 1512 } 1513 return journal; 1514 } 1515 1516 public boolean isFailIfDatabaseIsLocked() { 1517 return failIfDatabaseIsLocked; 1518 } 1519 1520 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 1521 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 1522 } 1523 1524 public boolean isIgnoreMissingJournalfiles() { 1525 return ignoreMissingJournalfiles; 1526 } 1527 1528 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 1529 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 1530 } 1531 1532 public int getIndexCacheSize() { 1533 return indexCacheSize; 1534 } 1535 1536 public void setIndexCacheSize(int indexCacheSize) { 1537 this.indexCacheSize = indexCacheSize; 1538 } 1539 1540 public boolean isCheckForCorruptJournalFiles() { 1541 return checkForCorruptJournalFiles; 1542 } 1543 1544 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 1545 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 1546 } 1547 1548 public boolean isChecksumJournalFiles() { 1549 return checksumJournalFiles; 1550 } 1551 1552 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 1553 this.checksumJournalFiles = checksumJournalFiles; 1554 } 1555 1556 public void setBrokerService(BrokerService brokerService) { 1557 this.brokerService = brokerService; 1558 } 1559 }