001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.page; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.File; 024 import java.io.FileInputStream; 025 import java.io.FileOutputStream; 026 import java.io.IOException; 027 import java.io.InterruptedIOException; 028 import java.io.RandomAccessFile; 029 import java.util.ArrayList; 030 import java.util.Arrays; 031 import java.util.Collection; 032 import java.util.Iterator; 033 import java.util.LinkedHashMap; 034 import java.util.Map; 035 import java.util.Properties; 036 import java.util.TreeMap; 037 import java.util.Map.Entry; 038 import java.util.concurrent.CountDownLatch; 039 import java.util.concurrent.atomic.AtomicBoolean; 040 import java.util.concurrent.atomic.AtomicLong; 041 import java.util.zip.Adler32; 042 import java.util.zip.Checksum; 043 044 import org.apache.commons.logging.Log; 045 import org.apache.commons.logging.LogFactory; 046 import org.apache.kahadb.util.DataByteArrayOutputStream; 047 import org.apache.kahadb.util.IOExceptionSupport; 048 import org.apache.kahadb.util.IOHelper; 049 import org.apache.kahadb.util.IntrospectionSupport; 050 import org.apache.kahadb.util.LRUCache; 051 import org.apache.kahadb.util.Sequence; 052 import org.apache.kahadb.util.SequenceSet; 053 054 /** 055 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 056 * be externally synchronized. 057 * 058 * The file has 3 parts: 059 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file. 060 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent 061 * Page Space: The pages in the page file. 062 * 063 * @version $Revision: 882511 $ 064 */ 065 public class PageFile { 066 067 private static final String PAGEFILE_SUFFIX = ".data"; 068 private static final String RECOVERY_FILE_SUFFIX = ".redo"; 069 private static final String FREE_FILE_SUFFIX = ".free"; 070 071 // 4k Default page size. 072 public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 073 public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000)); 074 private static final int RECOVERY_FILE_HEADER_SIZE=1024*4; 075 private static final int PAGE_FILE_HEADER_SIZE=1024*4; 076 077 // Recovery header is (long offset) 078 private static final Log LOG = LogFactory.getLog(PageFile.class); 079 080 // A PageFile will use a couple of files in this directory 081 private File directory; 082 // And the file names in that directory will be based on this name. 083 private final String name; 084 085 // File handle used for reading pages.. 086 private RandomAccessFile readFile; 087 // File handle used for writing pages.. 088 private RandomAccessFile writeFile; 089 // File handle used for writing pages.. 090 private RandomAccessFile recoveryFile; 091 092 // The size of pages 093 private int pageSize = DEFAULT_PAGE_SIZE; 094 095 // The minimum number of space allocated to the recovery file in number of pages. 096 private int recoveryFileMinPageCount = 1000; 097 // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 098 // to this max size as soon as possible. 099 private int recoveryFileMaxPageCount = 10000; 100 // The number of pages in the current recovery buffer 101 private int recoveryPageCount; 102 103 private AtomicBoolean loaded = new AtomicBoolean(); 104 // The number of pages we are aiming to write every time we 105 // write to disk. 106 int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; 107 108 // We keep a cache of pages recently used? 109 private LRUCache<Long, Page> pageCache; 110 // The cache of recently used pages. 111 private boolean enablePageCaching=true; 112 // How many pages will we keep in the cache? 113 private int pageCacheSize = 100; 114 115 // Should first log the page write to the recovery buffer? Avoids partial 116 // page write failures.. 117 private boolean enableRecoveryFile=true; 118 // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() 119 private boolean enableDiskSyncs=true; 120 // Will writes be done in an async thread? 121 private boolean enabledWriteThread=false; 122 123 // These are used if enableAsyncWrites==true 124 private AtomicBoolean stopWriter = new AtomicBoolean(); 125 private Thread writerThread; 126 private CountDownLatch checkpointLatch; 127 128 // Keeps track of writes that are being written to disk. 129 private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>(); 130 131 // Keeps track of free pages. 132 private final AtomicLong nextFreePageId = new AtomicLong(); 133 private SequenceSet freeList = new SequenceSet(); 134 135 private AtomicLong nextTxid = new AtomicLong(); 136 137 // Persistent settings stored in the page file. 138 private MetaData metaData; 139 140 /** 141 * Use to keep track of updated pages which have not yet been committed. 142 */ 143 static class PageWrite { 144 Page page; 145 byte[] current; 146 byte[] diskBound; 147 148 public PageWrite(Page page, byte[] data) { 149 this.page=page; 150 current=data; 151 } 152 153 public void setCurrent(Page page, byte[] data) { 154 this.page=page; 155 current=data; 156 } 157 158 @Override 159 public String toString() { 160 return "[PageWrite:"+page.getPageId()+"]"; 161 } 162 163 @SuppressWarnings("unchecked") 164 public Page getPage() { 165 return page; 166 } 167 168 void begin() { 169 diskBound = current; 170 current = null; 171 } 172 173 /** 174 * @return true if there is no pending writes to do. 175 */ 176 boolean done() { 177 diskBound=null; 178 return current == null; 179 } 180 181 boolean isDone() { 182 return diskBound == null && current == null; 183 } 184 185 } 186 187 /** 188 * The MetaData object hold the persistent data associated with a PageFile object. 189 */ 190 public static class MetaData { 191 192 String fileType; 193 String fileTypeVersion; 194 195 long metaDataTxId=-1; 196 int pageSize; 197 boolean cleanShutdown; 198 long lastTxId; 199 long freePages; 200 201 public String getFileType() { 202 return fileType; 203 } 204 public void setFileType(String fileType) { 205 this.fileType = fileType; 206 } 207 public String getFileTypeVersion() { 208 return fileTypeVersion; 209 } 210 public void setFileTypeVersion(String version) { 211 this.fileTypeVersion = version; 212 } 213 public long getMetaDataTxId() { 214 return metaDataTxId; 215 } 216 public void setMetaDataTxId(long metaDataTxId) { 217 this.metaDataTxId = metaDataTxId; 218 } 219 public int getPageSize() { 220 return pageSize; 221 } 222 public void setPageSize(int pageSize) { 223 this.pageSize = pageSize; 224 } 225 public boolean isCleanShutdown() { 226 return cleanShutdown; 227 } 228 public void setCleanShutdown(boolean cleanShutdown) { 229 this.cleanShutdown = cleanShutdown; 230 } 231 public long getLastTxId() { 232 return lastTxId; 233 } 234 public void setLastTxId(long lastTxId) { 235 this.lastTxId = lastTxId; 236 } 237 public long getFreePages() { 238 return freePages; 239 } 240 public void setFreePages(long value) { 241 this.freePages = value; 242 } 243 } 244 245 public Transaction tx() { 246 assertLoaded(); 247 return new Transaction(this); 248 } 249 250 /** 251 * Creates a PageFile in the specified directory who's data files are named by name. 252 * 253 * @param directory 254 * @param name 255 */ 256 public PageFile(File directory, String name) { 257 this.directory = directory; 258 this.name = name; 259 } 260 261 /** 262 * Deletes the files used by the PageFile object. This method can only be used when this object is not loaded. 263 * 264 * @throws IOException 265 * if the files cannot be deleted. 266 * @throws IllegalStateException 267 * if this PageFile is loaded 268 */ 269 public void delete() throws IOException { 270 if( loaded.get() ) { 271 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 272 } 273 delete(getMainPageFile()); 274 delete(getFreeFile()); 275 delete(getRecoveryFile()); 276 } 277 278 /** 279 * @param file 280 * @throws IOException 281 */ 282 private void delete(File file) throws IOException { 283 if( file.exists() ) { 284 if( !file.delete() ) { 285 throw new IOException("Could not delete: "+file.getPath()); 286 } 287 } 288 } 289 290 /** 291 * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the 292 * first time the page file is loaded, then this creates the page file in the file system. 293 * 294 * @throws IOException 295 * If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 296 * there was a disk error. 297 * @throws IllegalStateException 298 * If the page file was already loaded. 299 */ 300 public void load() throws IOException, IllegalStateException { 301 if (loaded.compareAndSet(false, true)) { 302 303 if( enablePageCaching ) { 304 pageCache = new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true); 305 } 306 307 File file = getMainPageFile(); 308 IOHelper.mkdirs(file.getParentFile()); 309 writeFile = new RandomAccessFile(file, "rw"); 310 readFile = new RandomAccessFile(file, "r"); 311 312 if (readFile.length() > 0) { 313 // Load the page size setting cause that can't change once the file is created. 314 loadMetaData(); 315 pageSize = metaData.getPageSize(); 316 } else { 317 // Store the page size setting cause that can't change once the file is created. 318 metaData = new MetaData(); 319 metaData.setFileType(PageFile.class.getName()); 320 metaData.setFileTypeVersion("1"); 321 metaData.setPageSize(getPageSize()); 322 metaData.setCleanShutdown(true); 323 metaData.setFreePages(-1); 324 metaData.setLastTxId(0); 325 storeMetaData(); 326 } 327 328 if( enableRecoveryFile ) { 329 recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw"); 330 } 331 332 if( metaData.isCleanShutdown() ) { 333 nextTxid.set(metaData.getLastTxId()+1); 334 if( metaData.getFreePages()>0 ) { 335 loadFreeList(); 336 } 337 } else { 338 LOG.debug("Recovering page file..."); 339 nextTxid.set(redoRecoveryUpdates()); 340 341 // Scan all to find the free pages. 342 freeList = new SequenceSet(); 343 for (Iterator i = tx().iterator(true); i.hasNext();) { 344 Page page = (Page)i.next(); 345 if( page.getType() == Page.PAGE_FREE_TYPE ) { 346 freeList.add(page.getPageId()); 347 } 348 } 349 350 } 351 352 metaData.setCleanShutdown(false); 353 storeMetaData(); 354 getFreeFile().delete(); 355 356 if( writeFile.length() < PAGE_FILE_HEADER_SIZE) { 357 writeFile.setLength(PAGE_FILE_HEADER_SIZE); 358 } 359 nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize); 360 startWriter(); 361 362 } else { 363 throw new IllegalStateException("Cannot load the page file when it is allready loaded."); 364 } 365 } 366 367 368 /** 369 * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles. 370 * once unloaded, you can no longer use the page file to read or write Pages. 371 * 372 * @throws IOException 373 * if there was a disk error occurred while closing the down the page file. 374 * @throws IllegalStateException 375 * if the PageFile is not loaded 376 */ 377 public void unload() throws IOException { 378 if (loaded.compareAndSet(true, false)) { 379 flush(); 380 try { 381 stopWriter(); 382 } catch (InterruptedException e) { 383 throw new InterruptedIOException(); 384 } 385 386 if( freeList.isEmpty() ) { 387 metaData.setFreePages(0); 388 } else { 389 storeFreeList(); 390 metaData.setFreePages(freeList.size()); 391 } 392 393 metaData.setLastTxId( nextTxid.get()-1 ); 394 metaData.setCleanShutdown(true); 395 storeMetaData(); 396 397 if (readFile != null) { 398 readFile.close(); 399 readFile = null; 400 writeFile.close(); 401 writeFile=null; 402 if( enableRecoveryFile ) { 403 recoveryFile.close(); 404 recoveryFile=null; 405 } 406 freeList.clear(); 407 if( pageCache!=null ) { 408 pageCache=null; 409 } 410 synchronized(writes) { 411 writes.clear(); 412 } 413 } 414 } else { 415 throw new IllegalStateException("Cannot unload the page file when it is not loaded"); 416 } 417 } 418 419 public boolean isLoaded() { 420 return loaded.get(); 421 } 422 423 /** 424 * Flush and sync all write buffers to disk. 425 * 426 * @throws IOException 427 * If an disk error occurred. 428 */ 429 public void flush() throws IOException { 430 431 if( enabledWriteThread && stopWriter.get() ) { 432 throw new IOException("Page file already stopped: checkpointing is not allowed"); 433 } 434 435 // Setup a latch that gets notified when all buffered writes hits the disk. 436 CountDownLatch checkpointLatch; 437 synchronized( writes ) { 438 if( writes.isEmpty()) { 439 return; 440 } 441 if( enabledWriteThread ) { 442 if( this.checkpointLatch == null ) { 443 this.checkpointLatch = new CountDownLatch(1); 444 } 445 checkpointLatch = this.checkpointLatch; 446 writes.notify(); 447 } else { 448 writeBatch(); 449 return; 450 } 451 } 452 try { 453 int size = writes.size(); 454 long start = System.currentTimeMillis(); 455 checkpointLatch.await(); 456 long end = System.currentTimeMillis(); 457 if( end-start > 100 ) { 458 LOG.warn("KahaDB PageFile flush: " + size + " queued writes, latch wait took "+(end-start)); 459 } 460 } catch (InterruptedException e) { 461 throw new InterruptedIOException(); 462 } 463 } 464 465 466 public String toString() { 467 return "Page File: "+getMainPageFile(); 468 } 469 470 /////////////////////////////////////////////////////////////////// 471 // Private Implementation Methods 472 /////////////////////////////////////////////////////////////////// 473 private File getMainPageFile() { 474 return new File(directory, IOHelper.toFileSystemSafeName(name)+PAGEFILE_SUFFIX); 475 } 476 477 public File getFreeFile() { 478 return new File(directory, IOHelper.toFileSystemSafeName(name)+FREE_FILE_SUFFIX); 479 } 480 481 public File getRecoveryFile() { 482 return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX); 483 } 484 485 private long toOffset(long pageId) { 486 return PAGE_FILE_HEADER_SIZE+(pageId*pageSize); 487 } 488 489 private void loadMetaData() throws IOException { 490 491 ByteArrayInputStream is; 492 MetaData v1 = new MetaData(); 493 MetaData v2 = new MetaData(); 494 try { 495 Properties p = new Properties(); 496 byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2]; 497 readFile.seek(0); 498 readFile.readFully(d); 499 is = new ByteArrayInputStream(d); 500 p.load(is); 501 IntrospectionSupport.setProperties(v1, p); 502 } catch (IOException e) { 503 v1 = null; 504 } 505 506 try { 507 Properties p = new Properties(); 508 byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2]; 509 readFile.seek(PAGE_FILE_HEADER_SIZE/2); 510 readFile.readFully(d); 511 is = new ByteArrayInputStream(d); 512 p.load(is); 513 IntrospectionSupport.setProperties(v2, p); 514 } catch (IOException e) { 515 v2 = null; 516 } 517 518 if( v1==null && v2==null ) { 519 throw new IOException("Could not load page file meta data"); 520 } 521 522 if( v1 == null || v1.metaDataTxId<0 ) { 523 metaData = v2; 524 } else if( v2==null || v1.metaDataTxId<0 ) { 525 metaData = v1; 526 } else if( v1.metaDataTxId==v2.metaDataTxId ) { 527 metaData = v1; // use the first since the 2nd could be a partial.. 528 } else { 529 metaData = v2; // use the second cause the first is probably a partial. 530 } 531 } 532 533 private void storeMetaData() throws IOException { 534 // Convert the metadata into a property format 535 metaData.metaDataTxId++; 536 Properties p = new Properties(); 537 IntrospectionSupport.getProperties(metaData, p, null); 538 539 ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE); 540 p.store(os, ""); 541 if( os.size() > PAGE_FILE_HEADER_SIZE/2) { 542 throw new IOException("Configuation is to larger than: "+PAGE_FILE_HEADER_SIZE/2); 543 } 544 // Fill the rest with space... 545 byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE/2)-os.size()]; 546 Arrays.fill(filler, (byte)' '); 547 os.write(filler); 548 os.flush(); 549 550 byte[] d = os.toByteArray(); 551 552 // So we don't loose it.. write it 2 times... 553 writeFile.seek(0); 554 writeFile.write(d); 555 writeFile.getFD().sync(); 556 writeFile.seek(PAGE_FILE_HEADER_SIZE/2); 557 writeFile.write(d); 558 writeFile.getFD().sync(); 559 } 560 561 private void storeFreeList() throws IOException { 562 FileOutputStream os = new FileOutputStream(getFreeFile()); 563 DataOutputStream dos = new DataOutputStream(os); 564 SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos); 565 dos.close(); 566 } 567 568 private void loadFreeList() throws IOException { 569 freeList.clear(); 570 FileInputStream is = new FileInputStream(getFreeFile()); 571 DataInputStream dis = new DataInputStream(is); 572 freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); 573 dis.close(); 574 } 575 576 /////////////////////////////////////////////////////////////////// 577 // Property Accessors 578 /////////////////////////////////////////////////////////////////// 579 580 /** 581 * Is the recovery buffer used to double buffer page writes. Enabled by default. 582 * 583 * @return is the recovery buffer enabled. 584 */ 585 public boolean isEnableRecoveryFile() { 586 return enableRecoveryFile; 587 } 588 589 /** 590 * Sets if the recovery buffer uses to double buffer page writes. Enabled by default. Disabling this 591 * may potentially cause partial page writes which can lead to page file corruption. 592 */ 593 public void setEnableRecoveryFile(boolean doubleBuffer) { 594 assertNotLoaded(); 595 this.enableRecoveryFile = doubleBuffer; 596 } 597 598 /** 599 * @return Are page writes synced to disk? 600 */ 601 public boolean isEnableDiskSyncs() { 602 return enableDiskSyncs; 603 } 604 605 /** 606 * Allows you enable syncing writes to disk. 607 * @param syncWrites 608 */ 609 public void setEnableDiskSyncs(boolean syncWrites) { 610 assertNotLoaded(); 611 this.enableDiskSyncs = syncWrites; 612 } 613 614 /** 615 * @return the page size 616 */ 617 public int getPageSize() { 618 return this.pageSize; 619 } 620 621 /** 622 * @return the amount of content data that a page can hold. 623 */ 624 public int getPageContentSize() { 625 return this.pageSize-Page.PAGE_HEADER_SIZE; 626 } 627 628 /** 629 * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk, 630 * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting 631 * can no longer be changed. 632 * 633 * @param pageSize the pageSize to set 634 * @throws IllegalStateException 635 * once the page file is loaded. 636 */ 637 public void setPageSize(int pageSize) throws IllegalStateException { 638 assertNotLoaded(); 639 this.pageSize = pageSize; 640 } 641 642 /** 643 * @return true if read page caching is enabled 644 */ 645 public boolean isEnablePageCaching() { 646 return this.enablePageCaching; 647 } 648 649 /** 650 * @param allows you to enable read page caching 651 */ 652 public void setEnablePageCaching(boolean enablePageCaching) { 653 assertNotLoaded(); 654 this.enablePageCaching = enablePageCaching; 655 } 656 657 /** 658 * @return the maximum number of pages that will get stored in the read page cache. 659 */ 660 public int getPageCacheSize() { 661 return this.pageCacheSize; 662 } 663 664 /** 665 * @param Sets the maximum number of pages that will get stored in the read page cache. 666 */ 667 public void setPageCacheSize(int pageCacheSize) { 668 assertNotLoaded(); 669 this.pageCacheSize = pageCacheSize; 670 } 671 672 public boolean isEnabledWriteThread() { 673 return enabledWriteThread; 674 } 675 676 public void setEnableWriteThread(boolean enableAsyncWrites) { 677 assertNotLoaded(); 678 this.enabledWriteThread = enableAsyncWrites; 679 } 680 681 public long getDiskSize() throws IOException { 682 return toOffset(nextFreePageId.get()); 683 } 684 685 /** 686 * @return the number of pages allocated in the PageFile 687 */ 688 public long getPageCount() { 689 return nextFreePageId.get(); 690 } 691 692 public int getRecoveryFileMinPageCount() { 693 return recoveryFileMinPageCount; 694 } 695 696 public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) { 697 assertNotLoaded(); 698 this.recoveryFileMinPageCount = recoveryFileMinPageCount; 699 } 700 701 public int getRecoveryFileMaxPageCount() { 702 return recoveryFileMaxPageCount; 703 } 704 705 public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) { 706 assertNotLoaded(); 707 this.recoveryFileMaxPageCount = recoveryFileMaxPageCount; 708 } 709 710 public int getWriteBatchSize() { 711 return writeBatchSize; 712 } 713 714 public void setWriteBatchSize(int writeBatchSize) { 715 assertNotLoaded(); 716 this.writeBatchSize = writeBatchSize; 717 } 718 719 /////////////////////////////////////////////////////////////////// 720 // Package Protected Methods exposed to Transaction 721 /////////////////////////////////////////////////////////////////// 722 723 /** 724 * @throws IllegalStateException if the page file is not loaded. 725 */ 726 void assertLoaded() throws IllegalStateException { 727 if( !loaded.get() ) { 728 throw new IllegalStateException("PageFile is not loaded"); 729 } 730 } 731 void assertNotLoaded() throws IllegalStateException { 732 if( loaded.get() ) { 733 throw new IllegalStateException("PageFile is loaded"); 734 } 735 } 736 737 /** 738 * Allocates a block of free pages that you can write data to. 739 * 740 * @param count the number of sequential pages to allocate 741 * @return the first page of the sequential set. 742 * @throws IOException 743 * If an disk error occurred. 744 * @throws IllegalStateException 745 * if the PageFile is not loaded 746 */ 747 <T> Page<T> allocate(int count) throws IOException { 748 assertLoaded(); 749 if (count <= 0) { 750 throw new IllegalArgumentException("The allocation count must be larger than zero"); 751 } 752 753 Sequence seq = freeList.removeFirstSequence(count); 754 755 // We may need to create new free pages... 756 if (seq == null) { 757 758 Page<T> first = null; 759 int c = count; 760 while (c > 0) { 761 Page<T> page = new Page<T>(nextFreePageId.getAndIncrement()); 762 page.makeFree(getNextWriteTransactionId()); 763 764 if (first == null) { 765 first = page; 766 } 767 768 addToCache(page); 769 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize); 770 page.write(out); 771 write(page, out.getData()); 772 773 // LOG.debug("allocate writing: "+page.getPageId()); 774 c--; 775 } 776 777 return first; 778 } 779 780 Page<T> page = new Page<T>(seq.getFirst()); 781 page.makeFree(0); 782 // LOG.debug("allocated: "+page.getPageId()); 783 return page; 784 } 785 786 long getNextWriteTransactionId() { 787 return nextTxid.incrementAndGet(); 788 } 789 790 void readPage(long pageId, byte[] data) throws IOException { 791 readFile.seek(toOffset(pageId)); 792 readFile.readFully(data); 793 } 794 795 public void freePage(long pageId) { 796 freeList.add(pageId); 797 if( enablePageCaching ) { 798 pageCache.remove(pageId); 799 } 800 } 801 802 @SuppressWarnings("unchecked") 803 private <T> void write(Page<T> page, byte[] data) throws IOException { 804 final PageWrite write = new PageWrite(page, data); 805 Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>(){ 806 public Long getKey() { 807 return write.getPage().getPageId(); 808 } 809 public PageWrite getValue() { 810 return write; 811 } 812 public PageWrite setValue(PageWrite value) { 813 return null; 814 } 815 }; 816 Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry}; 817 write(Arrays.asList(entries)); 818 } 819 820 void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException { 821 synchronized( writes ) { 822 if( enabledWriteThread ) { 823 while( writes.size() >= writeBatchSize && !stopWriter.get() ) { 824 try { 825 writes.wait(); 826 } catch (InterruptedException e) { 827 Thread.currentThread().interrupt(); 828 throw new InterruptedIOException(); 829 } 830 } 831 } 832 833 for (Map.Entry<Long, PageWrite> entry : updates) { 834 Long key = entry.getKey(); 835 PageWrite value = entry.getValue(); 836 PageWrite write = writes.get(key); 837 if( write==null ) { 838 writes.put(key, value); 839 } else { 840 write.setCurrent(value.page, value.current); 841 } 842 } 843 844 // Once we start approaching capacity, notify the writer to start writing 845 if( canStartWriteBatch() ) { 846 if( enabledWriteThread ) { 847 writes.notify(); 848 } else { 849 writeBatch(); 850 } 851 } 852 } 853 } 854 855 private boolean canStartWriteBatch() { 856 int capacityUsed = ((writes.size() * 100)/writeBatchSize); 857 if( enabledWriteThread ) { 858 // The constant 10 here controls how soon write batches start going to disk.. 859 // would be nice to figure out how to auto tune that value. Make to small and 860 // we reduce through put because we are locking the write mutex too often doing writes 861 return capacityUsed >= 10 || checkpointLatch!=null; 862 } else { 863 return capacityUsed >= 80 || checkpointLatch!=null; 864 } 865 } 866 867 /////////////////////////////////////////////////////////////////// 868 // Cache Related operations 869 /////////////////////////////////////////////////////////////////// 870 @SuppressWarnings("unchecked") 871 <T> Page<T> getFromCache(long pageId) { 872 synchronized(writes) { 873 PageWrite pageWrite = writes.get(pageId); 874 if( pageWrite != null ) { 875 return pageWrite.page; 876 } 877 } 878 879 Page<T> result = null; 880 if (enablePageCaching) { 881 result = pageCache.get(pageId); 882 } 883 return result; 884 } 885 886 void addToCache(Page page) { 887 if (enablePageCaching) { 888 pageCache.put(page.getPageId(), page); 889 } 890 } 891 892 void removeFromCache(Page page) { 893 if (enablePageCaching) { 894 pageCache.remove(page.getPageId()); 895 } 896 } 897 898 /////////////////////////////////////////////////////////////////// 899 // Internal Double write implementation follows... 900 /////////////////////////////////////////////////////////////////// 901 /** 902 * 903 */ 904 private void pollWrites() { 905 try { 906 while( !stopWriter.get() ) { 907 // Wait for a notification... 908 synchronized( writes ) { 909 writes.notifyAll(); 910 911 // If there is not enough to write, wait for a notification... 912 while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get() ) { 913 writes.wait(100); 914 } 915 916 if( writes.isEmpty() ) { 917 releaseCheckpointWaiter(); 918 } 919 } 920 writeBatch(); 921 } 922 } catch (Throwable e) { 923 e.printStackTrace(); 924 } finally { 925 releaseCheckpointWaiter(); 926 } 927 } 928 929 /** 930 * 931 * @param timeout 932 * @param unit 933 * @return true if there are still pending writes to do. 934 * @throws InterruptedException 935 * @throws IOException 936 */ 937 private void writeBatch() throws IOException { 938 939 CountDownLatch checkpointLatch; 940 ArrayList<PageWrite> batch; 941 synchronized( writes ) { 942 // If there is not enough to write, wait for a notification... 943 944 batch = new ArrayList<PageWrite>(writes.size()); 945 // build a write batch from the current write cache. 946 for (PageWrite write : writes.values()) { 947 batch.add(write); 948 // Move the current write to the diskBound write, this lets folks update the 949 // page again without blocking for this write. 950 write.begin(); 951 if (write.diskBound == null) { 952 batch.remove(write); 953 } 954 } 955 956 // Grab on to the existing checkpoint latch cause once we do this write we can 957 // release the folks that were waiting for those writes to hit disk. 958 checkpointLatch = this.checkpointLatch; 959 this.checkpointLatch=null; 960 } 961 962 try { 963 if (enableRecoveryFile) { 964 965 // Using Adler-32 instead of CRC-32 because it's much faster and 966 // it's 967 // weakness for short messages with few hundred bytes is not a 968 // factor in this case since we know 969 // our write batches are going to much larger. 970 Checksum checksum = new Adler32(); 971 for (PageWrite w : batch) { 972 try { 973 checksum.update(w.diskBound, 0, pageSize); 974 } catch (Throwable t) { 975 throw IOExceptionSupport.create( 976 "Cannot create recovery file. Reason: " + t, t); 977 } 978 } 979 980 // Can we shrink the recovery buffer?? 981 if (recoveryPageCount > recoveryFileMaxPageCount) { 982 int t = Math.max(recoveryFileMinPageCount, batch.size()); 983 recoveryFile.setLength(recoveryFileSizeForPages(t)); 984 } 985 986 // Record the page writes in the recovery buffer. 987 recoveryFile.seek(0); 988 // Store the next tx id... 989 recoveryFile.writeLong(nextTxid.get()); 990 // Store the checksum for thw write batch so that on recovery we 991 // know if we have a consistent 992 // write batch on disk. 993 recoveryFile.writeLong(checksum.getValue()); 994 // Write the # of pages that will follow 995 recoveryFile.writeInt(batch.size()); 996 997 // Write the pages. 998 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 999 1000 for (PageWrite w : batch) { 1001 recoveryFile.writeLong(w.page.getPageId()); 1002 recoveryFile.write(w.diskBound, 0, pageSize); 1003 } 1004 1005 if (enableDiskSyncs) { 1006 // Sync to make sure recovery buffer writes land on disk.. 1007 recoveryFile.getFD().sync(); 1008 } 1009 1010 recoveryPageCount = batch.size(); 1011 } 1012 1013 for (PageWrite w : batch) { 1014 writeFile.seek(toOffset(w.page.getPageId())); 1015 writeFile.write(w.diskBound, 0, pageSize); 1016 w.done(); 1017 } 1018 1019 // Sync again 1020 if (enableDiskSyncs) { 1021 writeFile.getFD().sync(); 1022 } 1023 1024 } finally { 1025 synchronized (writes) { 1026 for (PageWrite w : batch) { 1027 // If there are no more pending writes, then remove it from 1028 // the write cache. 1029 if (w.isDone()) { 1030 writes.remove(w.page.getPageId()); 1031 } 1032 } 1033 } 1034 1035 if( checkpointLatch!=null ) { 1036 checkpointLatch.countDown(); 1037 } 1038 } 1039 } 1040 1041 private long recoveryFileSizeForPages(int pageCount) { 1042 return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount); 1043 } 1044 1045 private void releaseCheckpointWaiter() { 1046 if( checkpointLatch!=null ) { 1047 checkpointLatch.countDown(); 1048 checkpointLatch=null; 1049 } 1050 } 1051 1052 /** 1053 * Inspects the recovery buffer and re-applies any 1054 * partially applied page writes. 1055 * 1056 * @return the next transaction id that can be used. 1057 * @throws IOException 1058 */ 1059 private long redoRecoveryUpdates() throws IOException { 1060 if( !enableRecoveryFile ) { 1061 return 0; 1062 } 1063 recoveryPageCount=0; 1064 1065 // Are we initializing the recovery file? 1066 if( recoveryFile.length() == 0 ) { 1067 // Write an empty header.. 1068 recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]); 1069 // Preallocate the minium size for better performance. 1070 recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount)); 1071 return 0; 1072 } 1073 1074 // How many recovery pages do we have in the recovery buffer? 1075 recoveryFile.seek(0); 1076 long nextTxId = readFile.readLong(); 1077 long expectedChecksum = readFile.readLong(); 1078 int pageCounter = readFile.readInt(); 1079 1080 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1081 Checksum checksum = new Adler32(); 1082 LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>(); 1083 try { 1084 for (int i = 0; i < pageCounter; i++) { 1085 long offset = recoveryFile.readLong(); 1086 byte []data = new byte[pageSize]; 1087 if( recoveryFile.read(data, 0, pageSize) != pageSize ) { 1088 // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer 1089 return nextTxId; 1090 } 1091 checksum.update(data, 0, pageSize); 1092 batch.put(offset, data); 1093 } 1094 } catch (Exception e) { 1095 // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 1096 // as the pages should still be consistent. 1097 LOG.debug("Redo buffer was not fully intact: ", e); 1098 return nextTxId; 1099 } 1100 1101 recoveryPageCount = pageCounter; 1102 1103 // If the checksum is not valid then the recovery buffer was partially written to disk. 1104 if( checksum.getValue() != expectedChecksum ) { 1105 return nextTxId; 1106 } 1107 1108 // Re-apply all the writes in the recovery buffer. 1109 for (Map.Entry<Long, byte[]> e : batch.entrySet()) { 1110 writeFile.seek(e.getKey()); 1111 e.getValue(); 1112 writeFile.write(e.getValue()); 1113 } 1114 1115 // And sync it to disk 1116 writeFile.getFD().sync(); 1117 return nextTxId; 1118 } 1119 1120 private void startWriter() { 1121 synchronized( writes ) { 1122 if( enabledWriteThread ) { 1123 stopWriter.set(false); 1124 writerThread = new Thread("KahaDB Page Writer") { 1125 @Override 1126 public void run() { 1127 pollWrites(); 1128 } 1129 }; 1130 writerThread.setPriority(Thread.MAX_PRIORITY); 1131 writerThread.setDaemon(true); 1132 writerThread.start(); 1133 } 1134 } 1135 } 1136 1137 private void stopWriter() throws InterruptedException { 1138 if( enabledWriteThread ) { 1139 stopWriter.set(true); 1140 writerThread.join(); 1141 } 1142 } 1143 1144 public File getFile() { 1145 return getMainPageFile(); 1146 } 1147 1148 }