001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.async;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.io.FilenameFilter;
025    import java.io.IOException;
026    import java.util.ArrayList;
027    import java.util.Collections;
028    import java.util.HashMap;
029    import java.util.HashSet;
030    import java.util.Iterator;
031    import java.util.LinkedHashMap;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Set;
035    import java.util.concurrent.ConcurrentHashMap;
036    import java.util.concurrent.atomic.AtomicLong;
037    import java.util.concurrent.atomic.AtomicReference;
038    
039    import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
040    import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
041    import org.apache.activemq.thread.Scheduler;
042    import org.apache.activemq.util.ByteSequence;
043    import org.apache.activemq.util.IOHelper;
044    import org.apache.commons.logging.Log;
045    import org.apache.commons.logging.LogFactory;
046    
047    
048    
049    /**
050     * Manages DataFiles
051     * 
052     * @version $Revision: 1.1.1.1 $
053     */
054    public class AsyncDataManager {
055    
056        public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
057        public static final int ITEM_HEAD_RESERVED_SPACE = 21;
058        // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
059        public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
060        public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
061        public static final int ITEM_FOOT_SPACE = 3; // EOR
062    
063        public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
064    
065        public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // 
066        public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // 
067    
068        public static final byte DATA_ITEM_TYPE = 1;
069        public static final byte REDO_ITEM_TYPE = 2;
070        public static final String DEFAULT_DIRECTORY = "data";
071        public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
072        public static final String DEFAULT_FILE_PREFIX = "data-";
073        public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
074        public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
075        public static final int PREFERED_DIFF = 1024 * 512;
076    
077        private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
078        protected static Scheduler scheduler  = Scheduler.getInstance();
079    
080        protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
081    
082        protected File directory = new File(DEFAULT_DIRECTORY);
083        protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
084        protected String filePrefix = DEFAULT_FILE_PREFIX;
085        protected ControlFile controlFile;
086        protected boolean started;
087        protected boolean useNio = true;
088    
089        protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
090        protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
091    
092        protected DataFileAppender appender;
093        protected DataFileAccessorPool accessorPool;
094    
095        protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
096        protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
097        protected DataFile currentWriteFile;
098    
099        protected Location mark;
100        protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
101        protected Runnable cleanupTask;
102        protected final AtomicLong storeSize;
103        protected boolean archiveDataLogs;
104        
105        public AsyncDataManager(AtomicLong storeSize) {
106            this.storeSize=storeSize;
107        }
108        
109        public AsyncDataManager() {
110            this(new AtomicLong());
111        }
112    
113        @SuppressWarnings("unchecked")
114        public synchronized void start() throws IOException {
115            if (started) {
116                return;
117            }
118    
119            started = true;
120            preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
121            lock();
122    
123            accessorPool = new DataFileAccessorPool(this);
124            ByteSequence sequence = controlFile.load();
125            if (sequence != null && sequence.getLength() > 0) {
126                unmarshallState(sequence);
127            }
128            if (useNio) {
129                appender = new NIODataFileAppender(this);
130            } else {
131                appender = new DataFileAppender(this);
132            }
133    
134            File[] files = directory.listFiles(new FilenameFilter() {
135                public boolean accept(File dir, String n) {
136                    return dir.equals(directory) && n.startsWith(filePrefix);
137                }
138            });
139           
140            if (files != null) {
141                for (int i = 0; i < files.length; i++) {
142                    try {
143                        File file = files[i];
144                        String n = file.getName();
145                        String numStr = n.substring(filePrefix.length(), n.length());
146                        int num = Integer.parseInt(numStr);
147                        DataFile dataFile = new DataFile(file, num, preferedFileLength);
148                        fileMap.put(dataFile.getDataFileId(), dataFile);
149                        storeSize.addAndGet(dataFile.getLength());
150                    } catch (NumberFormatException e) {
151                        // Ignore file that do not match the pattern.
152                    }
153                }
154    
155                // Sort the list so that we can link the DataFiles together in the
156                // right order.
157                List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
158                Collections.sort(l);
159                currentWriteFile = null;
160                for (DataFile df : l) {
161                    if (currentWriteFile != null) {
162                        currentWriteFile.linkAfter(df);
163                    }
164                    currentWriteFile = df;
165                    fileByFileMap.put(df.getFile(), df);
166                }
167            }
168    
169            // Need to check the current Write File to see if there was a partial
170            // write to it.
171            if (currentWriteFile != null) {
172    
173                // See if the lastSyncedLocation is valid..
174                Location l = lastAppendLocation.get();
175                if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
176                    l = null;
177                }
178    
179                // If we know the last location that was ok.. then we can skip lots
180                // of checking
181                try{
182                l = recoveryCheck(currentWriteFile, l);
183                lastAppendLocation.set(l);
184                }catch(IOException e){
185                    LOG.warn("recovery check failed", e);
186                }
187            }
188    
189            storeState(false);
190    
191            cleanupTask = new Runnable() {
192                public void run() {
193                    cleanup();
194                }
195            };
196            scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
197        }
198    
199        public void lock() throws IOException {
200            synchronized (this) {
201                if (controlFile == null || controlFile.isDisposed()) {
202                    IOHelper.mkdirs(directory);
203                    controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
204                }
205                controlFile.lock();
206            }
207        }
208    
209        protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
210            if (location == null) {
211                location = new Location();
212                location.setDataFileId(dataFile.getDataFileId());
213                location.setOffset(0);
214            }
215            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
216            try {
217                reader.readLocationDetails(location);
218                while (reader.readLocationDetailsAndValidate(location)) {
219                    location.setOffset(location.getOffset() + location.getSize());
220                }
221            } finally {
222                accessorPool.closeDataFileAccessor(reader);
223            }
224            dataFile.setLength(location.getOffset());
225            return location;
226        }
227    
228        protected void unmarshallState(ByteSequence sequence) throws IOException {
229            ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
230            DataInputStream dis = new DataInputStream(bais);
231            if (dis.readBoolean()) {
232                mark = new Location();
233                mark.readExternal(dis);
234            } else {
235                mark = null;
236            }
237            if (dis.readBoolean()) {
238                Location l = new Location();
239                l.readExternal(dis);
240                lastAppendLocation.set(l);
241            } else {
242                lastAppendLocation.set(null);
243            }
244        }
245    
246        private synchronized ByteSequence marshallState() throws IOException {
247            ByteArrayOutputStream baos = new ByteArrayOutputStream();
248            DataOutputStream dos = new DataOutputStream(baos);
249    
250            if (mark != null) {
251                dos.writeBoolean(true);
252                mark.writeExternal(dos);
253            } else {
254                dos.writeBoolean(false);
255            }
256            Location l = lastAppendLocation.get();
257            if (l != null) {
258                dos.writeBoolean(true);
259                l.writeExternal(dos);
260            } else {
261                dos.writeBoolean(false);
262            }
263    
264            byte[] bs = baos.toByteArray();
265            return new ByteSequence(bs, 0, bs.length);
266        }
267    
268        synchronized DataFile allocateLocation(Location location) throws IOException {
269            if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
270                int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
271    
272                String fileName = filePrefix + nextNum;
273                File file = new File(directory, fileName);
274                DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
275                //actually allocate the disk space
276                nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
277                fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
278                fileByFileMap.put(file, nextWriteFile);
279                if (currentWriteFile != null) {
280                    currentWriteFile.linkAfter(nextWriteFile);
281                    if (currentWriteFile.isUnused()) {
282                        removeDataFile(currentWriteFile);
283                    }
284                }
285                currentWriteFile = nextWriteFile;
286    
287            }
288            location.setOffset(currentWriteFile.getLength());
289            location.setDataFileId(currentWriteFile.getDataFileId().intValue());
290            int size = location.getSize();
291            currentWriteFile.incrementLength(size);
292            currentWriteFile.increment();
293            storeSize.addAndGet(size);
294            return currentWriteFile;
295        }
296        
297        public synchronized void removeLocation(Location location) throws IOException{
298           
299            DataFile dataFile = getDataFile(location);
300            dataFile.decrement();
301        }
302    
303        synchronized DataFile getDataFile(Location item) throws IOException {
304            Integer key = Integer.valueOf(item.getDataFileId());
305            DataFile dataFile = fileMap.get(key);
306            if (dataFile == null) {
307                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
308                throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
309            }
310            return dataFile;
311        }
312        
313        synchronized File getFile(Location item) throws IOException {
314            Integer key = Integer.valueOf(item.getDataFileId());
315            DataFile dataFile = fileMap.get(key);
316            if (dataFile == null) {
317                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
318                throw new IOException("Could not locate data file " + filePrefix  + item.getDataFileId());
319            }
320            return dataFile.getFile();
321        }
322    
323        private DataFile getNextDataFile(DataFile dataFile) {
324            return (DataFile)dataFile.getNext();
325        }
326    
327        public synchronized void close() throws IOException {
328            if (!started) {
329                return;
330            }
331            scheduler.cancel(cleanupTask);
332            accessorPool.close();
333            storeState(false);
334            appender.close();
335            fileMap.clear();
336            fileByFileMap.clear();
337            controlFile.unlock();
338            controlFile.dispose();
339            started = false;
340        }
341    
342        synchronized void cleanup() {
343            if (accessorPool != null) {
344                accessorPool.disposeUnused();
345            }
346        }
347    
348        public synchronized boolean delete() throws IOException {
349    
350            // Close all open file handles...
351            appender.close();
352            accessorPool.close();
353    
354            boolean result = true;
355            for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
356                DataFile dataFile = (DataFile)i.next();
357                storeSize.addAndGet(-dataFile.getLength());
358                result &= dataFile.delete();
359            }
360            fileMap.clear();
361            fileByFileMap.clear();
362            lastAppendLocation.set(null);
363            mark = null;
364            currentWriteFile = null;
365    
366            // reopen open file handles...
367            accessorPool = new DataFileAccessorPool(this);
368            if (useNio) {
369                appender = new NIODataFileAppender(this);
370            } else {
371                appender = new DataFileAppender(this);
372            }
373            return result;
374        }
375    
376        public synchronized void addInterestInFile(int file) throws IOException {
377            if (file >= 0) {
378                Integer key = Integer.valueOf(file);
379                DataFile dataFile = (DataFile)fileMap.get(key);
380                if (dataFile == null) {
381                    throw new IOException("That data file does not exist");
382                }
383                addInterestInFile(dataFile);
384            }
385        }
386    
387        synchronized void addInterestInFile(DataFile dataFile) {
388            if (dataFile != null) {
389                dataFile.increment();
390            }
391        }
392    
393        public synchronized void removeInterestInFile(int file) throws IOException {
394            if (file >= 0) {
395                Integer key = Integer.valueOf(file);
396                DataFile dataFile = (DataFile)fileMap.get(key);
397                removeInterestInFile(dataFile);
398            }
399           
400        }
401    
402        synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
403            if (dataFile != null) {
404                if (dataFile.decrement() <= 0) {
405                    removeDataFile(dataFile);
406                }
407            }
408        }
409    
410        public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
411            Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
412            unUsed.removeAll(inUse);
413            unUsed.removeAll(inProgress);
414                    
415            List<DataFile> purgeList = new ArrayList<DataFile>();
416            for (Integer key : unUsed) {
417                DataFile dataFile = (DataFile)fileMap.get(key);
418                purgeList.add(dataFile);
419            }
420            for (DataFile dataFile : purgeList) {
421                if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
422                    forceRemoveDataFile(dataFile);
423                }
424            }
425        }
426    
427        public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
428            Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
429            unUsed.removeAll(inUse);
430                    
431            List<DataFile> purgeList = new ArrayList<DataFile>();
432            for (Integer key : unUsed) {
433                    // Only add files less than the lastFile..
434                    if( key.intValue() < lastFile.intValue() ) {
435                    DataFile dataFile = (DataFile)fileMap.get(key);
436                    purgeList.add(dataFile);
437                    }
438            }
439            if (LOG.isDebugEnabled()) {
440                LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ") " + purgeList);
441            }
442            for (DataFile dataFile : purgeList) {
443                forceRemoveDataFile(dataFile);
444            }
445            }
446    
447        public synchronized void consolidateDataFiles() throws IOException {
448            List<DataFile> purgeList = new ArrayList<DataFile>();
449            for (DataFile dataFile : fileMap.values()) {
450                if (dataFile.isUnused()) {
451                    purgeList.add(dataFile);
452                }
453            }
454            for (DataFile dataFile : purgeList) {
455                removeDataFile(dataFile);
456            }
457        }
458    
459        private synchronized void removeDataFile(DataFile dataFile) throws IOException {
460    
461            // Make sure we don't delete too much data.
462            if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
463                LOG.debug("Won't remove DataFile" + dataFile);
464                    return;
465            }
466            forceRemoveDataFile(dataFile);
467        }
468        
469        private synchronized void forceRemoveDataFile(DataFile dataFile)
470                throws IOException {
471            accessorPool.disposeDataFileAccessors(dataFile);
472            fileByFileMap.remove(dataFile.getFile());
473            fileMap.remove(dataFile.getDataFileId());
474            storeSize.addAndGet(-dataFile.getLength());
475            dataFile.unlink();
476            if (archiveDataLogs) {
477                dataFile.move(getDirectoryArchive());
478                LOG.debug("moved data file " + dataFile + " to "
479                        + getDirectoryArchive());
480            } else {
481                boolean result = dataFile.delete();
482                if (!result) {
483                    LOG.info("Failed to discard data file " + dataFile);
484                }
485            }
486        }
487    
488        /**
489         * @return the maxFileLength
490         */
491        public int getMaxFileLength() {
492            return maxFileLength;
493        }
494    
495        /**
496         * @param maxFileLength the maxFileLength to set
497         */
498        public void setMaxFileLength(int maxFileLength) {
499            this.maxFileLength = maxFileLength;
500        }
501    
502        public String toString() {
503            return "DataManager:(" + filePrefix + ")";
504        }
505    
506        public synchronized Location getMark() throws IllegalStateException {
507            return mark;
508        }
509    
510        public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
511    
512            Location cur = null;
513            while (true) {
514                if (cur == null) {
515                    if (location == null) {
516                        DataFile head = (DataFile)currentWriteFile.getHeadNode();
517                        cur = new Location();
518                        cur.setDataFileId(head.getDataFileId());
519                        cur.setOffset(0);
520                    } else {
521                        // Set to the next offset..
522                            if( location.getSize() == -1 ) {
523                                    cur = new Location(location);
524                            }  else {
525                                    cur = new Location(location);
526                                    cur.setOffset(location.getOffset()+location.getSize());
527                            }
528                    }
529                } else {
530                    cur.setOffset(cur.getOffset() + cur.getSize());
531                }
532    
533                DataFile dataFile = getDataFile(cur);
534    
535                // Did it go into the next file??
536                if (dataFile.getLength() <= cur.getOffset()) {
537                    dataFile = getNextDataFile(dataFile);
538                    if (dataFile == null) {
539                        return null;
540                    } else {
541                        cur.setDataFileId(dataFile.getDataFileId().intValue());
542                        cur.setOffset(0);
543                    }
544                }
545    
546                // Load in location size and type.
547                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
548                try {
549                    reader.readLocationDetails(cur);
550                } finally {
551                    accessorPool.closeDataFileAccessor(reader);
552                }
553    
554                if (cur.getType() == 0) {
555                    return null;
556                } else if (cur.getType() > 0) {
557                    // Only return user records.
558                    return cur;
559                }
560            }
561        }
562        
563        public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
564            DataFile df = fileByFileMap.get(file);
565            return getNextLocation(df, lastLocation,thisFileOnly);
566        }
567        
568        public synchronized Location getNextLocation(DataFile dataFile,
569                Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
570    
571            Location cur = null;
572            while (true) {
573                if (cur == null) {
574                    if (lastLocation == null) {
575                        DataFile head = (DataFile)dataFile.getHeadNode();
576                        cur = new Location();
577                        cur.setDataFileId(head.getDataFileId());
578                        cur.setOffset(0);
579                    } else {
580                        // Set to the next offset..
581                        cur = new Location(lastLocation);
582                        cur.setOffset(cur.getOffset() + cur.getSize());
583                    }
584                } else {
585                    cur.setOffset(cur.getOffset() + cur.getSize());
586                }
587    
588                
589                // Did it go into the next file??
590                if (dataFile.getLength() <= cur.getOffset()) {
591                    if (thisFileOnly) {
592                        return null;
593                    }else {
594                    dataFile = getNextDataFile(dataFile);
595                    if (dataFile == null) {
596                        return null;
597                    } else {
598                        cur.setDataFileId(dataFile.getDataFileId().intValue());
599                        cur.setOffset(0);
600                    }
601                    }
602                }
603    
604                // Load in location size and type.
605                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
606                try {
607                    reader.readLocationDetails(cur);
608                } finally {
609                    accessorPool.closeDataFileAccessor(reader);
610                }
611    
612                if (cur.getType() == 0) {
613                    return null;
614                } else if (cur.getType() > 0) {
615                    // Only return user records.
616                    return cur;
617                }
618            }
619        }
620    
621        public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
622            DataFile dataFile = getDataFile(location);
623            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
624            ByteSequence rc = null;
625            try {
626                rc = reader.readRecord(location);
627            } finally {
628                accessorPool.closeDataFileAccessor(reader);
629            }
630            return rc;
631        }
632    
633        public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
634            synchronized (this) {
635                mark = location;
636            }
637            storeState(sync);
638        }
639    
640        protected synchronized void storeState(boolean sync) throws IOException {
641            ByteSequence state = marshallState();
642            appender.storeItem(state, Location.MARK_TYPE, sync);
643            controlFile.store(state, sync);
644        }
645    
646        public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
647            Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
648            return loc;
649        }
650        
651        public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
652            Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
653            return loc;
654        }
655    
656        public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
657            return appender.storeItem(data, type, sync);
658        }
659    
660        public void update(Location location, ByteSequence data, boolean sync) throws IOException {
661            DataFile dataFile = getDataFile(location);
662            DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
663            try {
664                updater.updateRecord(location, data, sync);
665            } finally {
666                accessorPool.closeDataFileAccessor(updater);
667            }
668        }
669    
670        public File getDirectory() {
671            return directory;
672        }
673    
674        public void setDirectory(File directory) {
675            this.directory = directory;
676        }
677    
678        public String getFilePrefix() {
679            return filePrefix;
680        }
681    
682        public void setFilePrefix(String filePrefix) {
683            this.filePrefix = IOHelper.toFileSystemSafeName(filePrefix);
684        }
685    
686        public Map<WriteKey, WriteCommand> getInflightWrites() {
687            return inflightWrites;
688        }
689    
690        public Location getLastAppendLocation() {
691            return lastAppendLocation.get();
692        }
693    
694        public void setLastAppendLocation(Location lastSyncedLocation) {
695            this.lastAppendLocation.set(lastSyncedLocation);
696        }
697    
698            public boolean isUseNio() {
699                    return useNio;
700            }
701    
702            public void setUseNio(boolean useNio) {
703                    this.useNio = useNio;
704            }
705            
706            public File getDirectoryArchive() {
707            return directoryArchive;
708        }
709    
710        public void setDirectoryArchive(File directoryArchive) {
711            this.directoryArchive = directoryArchive;
712        }
713        
714        public boolean isArchiveDataLogs() {
715            return archiveDataLogs;
716        }
717    
718        public void setArchiveDataLogs(boolean archiveDataLogs) {
719            this.archiveDataLogs = archiveDataLogs;
720        }
721    
722        synchronized public Integer getCurrentDataFileId() {
723            if( currentWriteFile==null )
724                return null;
725            return currentWriteFile.getDataFileId();
726        }
727        
728        /**
729         * Get a set of files - only valid after start()
730         * @return files currently being used
731         */
732        public Set<File> getFiles(){
733            return fileByFileMap.keySet();
734        }
735    
736            synchronized public long getDiskSize() {
737                    long rc=0;
738            DataFile cur = (DataFile)currentWriteFile.getHeadNode();
739            while( cur !=null ) {
740                    rc += cur.getLength();
741                    cur = (DataFile) cur.getNext();
742            }
743                    return rc;
744            }
745    
746            synchronized public long getDiskSizeUntil(Location startPosition) {
747                    long rc=0;
748            DataFile cur = (DataFile)currentWriteFile.getHeadNode();
749            while( cur !=null ) {
750                    if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
751                            return rc + startPosition.getOffset();
752                    }
753                    rc += cur.getLength();
754                    cur = (DataFile) cur.getNext();
755            }
756                    return rc;
757            }
758    
759    }