001    /*
002     *  Licensed to the Apache Software Foundation (ASF) under one
003     *  or more contributor license agreements.  See the NOTICE file
004     *  distributed with this work for additional information
005     *  regarding copyright ownership.  The ASF licenses this file
006     *  to you under the Apache License, Version 2.0 (the
007     *  "License"); you may not use this file except in compliance
008     *  with the License.  You may obtain a copy of the License at
009     *  
010     *    http://www.apache.org/licenses/LICENSE-2.0
011     *  
012     *  Unless required by applicable law or agreed to in writing,
013     *  software distributed under the License is distributed on an
014     *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015     *  KIND, either express or implied.  See the License for the
016     *  specific language governing permissions and limitations
017     *  under the License. 
018     *  
019     */
020    package org.apache.directory.server.core.partition.impl.btree.jdbm;
021    
022    
023    import java.io.IOException;
024    import java.util.Comparator;
025    import java.util.Map;
026    
027    import jdbm.RecordManager;
028    import jdbm.btree.BTree;
029    import jdbm.helper.Serializer;
030    import jdbm.helper.TupleBrowser;
031    
032    import org.apache.directory.server.core.avltree.ArrayMarshaller;
033    import org.apache.directory.server.core.avltree.ArrayTree;
034    import org.apache.directory.server.core.avltree.ArrayTreeCursor;
035    import org.apache.directory.server.core.avltree.Marshaller;
036    import org.apache.directory.server.i18n.I18n;
037    import org.apache.directory.server.xdbm.Table;
038    import org.apache.directory.shared.ldap.cursor.Cursor;
039    import org.apache.directory.shared.ldap.cursor.EmptyCursor;
040    import org.apache.directory.shared.ldap.cursor.SingletonCursor;
041    import org.apache.directory.shared.ldap.schema.SchemaManager;
042    import org.apache.directory.shared.ldap.schema.comparators.SerializableComparator;
043    import org.apache.directory.shared.ldap.util.StringTools;
044    import org.apache.directory.shared.ldap.util.SynchronizedLRUMap;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    
049    /**
050     * A jdbm Btree wrapper that enables duplicate sorted keys using collections.
051     *
052     * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
053     * @version $Rev: 902608 $
054     */
055    public class JdbmTable<K,V> implements Table<K,V>
056    {
057        /** A logger for this class */
058        private static final Logger LOG = LoggerFactory.getLogger( JdbmTable.class.getSimpleName() );
059    
060        /** the key to store and retreive the count information */
061        private static final String SZSUFFIX = "_btree_sz";
062    
063        /** the name of this table */
064        private final String name;
065    
066        /** the JDBM record manager for the file this table is managed in */
067        private final RecordManager recMan;
068        
069        /** whether or not this table allows for duplicates */
070        private final boolean allowsDuplicates;
071        
072        /** a key comparator for the keys in this Table */
073        private final Comparator<K> keyComparator;
074        
075        /** a value comparator for the values in this Table */
076        private final Comparator<V> valueComparator;
077    
078        /** the current count of entries in this Table */
079        private int count;
080        
081        /** the wrappedCursor JDBM btree used in this Table */
082        private BTree bt;
083    
084        /** the limit at which we start using btree redirection for duplicates */
085        private int numDupLimit = JdbmIndex.DEFAULT_DUPLICATE_LIMIT;
086    
087        /** a cache of duplicate BTrees */
088        private final Map<Long, BTree> duplicateBtrees;
089    
090        private final Serializer keySerializer;
091    
092        private final Serializer valueSerializer;
093    
094        Marshaller<ArrayTree<V>> marshaller;
095    
096        /** The global SchemaManager */
097        private SchemaManager schemaManager;
098    
099        // ------------------------------------------------------------------------
100        // C O N S T R U C T O R
101        // ------------------------------------------------------------------------
102    
103        
104        /**
105         * Creates a Jdbm BTree based tuple Table abstraction that enables 
106         * duplicates.
107         *
108         * @param name the name of the table
109         * @param numDupLimit the size limit of duplicates before switching to
110         * BTrees for values instead of AvlTrees
111         * @param manager the record manager to be used for this table
112         * @param keyComparator a key comparator
113         * @param valueComparator a value comparator
114         * @param keySerializer a serializer to use for the keys instead of using
115         * default Java serialization which could be very expensive
116         * @param valueSerializer a serializer to use for the values instead of
117         * using default Java serialization which could be very expensive
118         * @throws IOException if the table's file cannot be created
119         */
120        @SuppressWarnings("unchecked")
121        public JdbmTable( SchemaManager schemaManager, String name, int numDupLimit, RecordManager manager,
122            Comparator<K> keyComparator, Comparator<V> valueComparator,
123            Serializer keySerializer, Serializer valueSerializer )
124            throws IOException
125        {
126            this.schemaManager = schemaManager;
127    
128            // TODO make the size of the duplicate btree cache configurable via constructor
129            duplicateBtrees = new SynchronizedLRUMap( 100 );
130    
131            if ( valueSerializer != null )
132            {
133                marshaller = new ArrayMarshaller<V>( valueComparator,
134                        new MarshallerSerializerBridge<V>( valueSerializer ) );
135            }
136            else
137            {
138                marshaller = new ArrayMarshaller<V>( valueComparator );
139            }
140    
141            if ( keyComparator == null )
142            {
143                throw new NullPointerException( I18n.err( I18n.ERR_591 ) );
144            }
145            else
146            {
147                this.keyComparator = keyComparator;
148            }
149    
150            if ( valueComparator == null )
151            {
152                throw new NullPointerException( I18n.err( I18n.ERR_592 ) );
153            }
154            else
155            {
156                this.valueComparator = valueComparator;
157            }
158    
159            this.numDupLimit = numDupLimit;
160            this.name = name;
161            this.recMan = manager;
162    
163            this.keySerializer = keySerializer;
164            this.valueSerializer = valueSerializer;
165    
166            this.allowsDuplicates = true;
167    
168            long recId = recMan.getNamedObject( name );
169    
170            if ( recId == 0 ) // Create new main BTree
171            {
172                // we do not use the value serializer in the btree since duplicates will use
173                // either BTreeRedirect objects or AvlTree objects whose marshalling is
174                // explicitly managed by this code.  Value serialization is delegated to these
175                // marshallers.
176    
177                bt = BTree.createInstance( recMan, keyComparator, keySerializer, null );
178                recId = bt.getRecid();
179                recMan.setNamedObject( name, recId );
180                recId = recMan.insert( 0 );
181                recMan.setNamedObject( name + SZSUFFIX, recId );
182            }
183            else // Load existing BTree
184            {
185                bt = BTree.load( recMan, recId );
186                ((SerializableComparator<K>)bt.getComparator()).setSchemaManager( schemaManager );
187                recId = recMan.getNamedObject( name + SZSUFFIX );
188                count = ( Integer ) recMan.fetch( recId );
189            }
190    
191        }
192    
193    
194        /**
195         * Creates a Jdbm BTree based tuple Table abstraction without duplicates 
196         * enabled using a simple key comparator.
197         *
198         * @param name the name of the table
199         * @param manager the record manager to be used for this table
200         * @param keyComparator a tuple comparator
201         * @param keySerializer a serializer to use for the keys instead of using
202         * default Java serialization which could be very expensive
203         * @param valueSerializer a serializer to use for the values instead of
204         * using default Java serialization which could be very expensive
205         * @throws IOException if the table's file cannot be created
206         */
207        public JdbmTable( SchemaManager schemaManager, String name, RecordManager manager, Comparator<K> keyComparator,
208                          Serializer keySerializer, Serializer valueSerializer )
209            throws IOException
210        {
211            this.schemaManager = schemaManager;
212            this.duplicateBtrees = null;
213            this.numDupLimit = Integer.MAX_VALUE;
214            this.name = name;
215            this.recMan = manager;
216    
217            if ( keyComparator == null )
218            {
219                throw new NullPointerException( I18n.err( I18n.ERR_591 ) );
220            }
221            else
222            {
223                this.keyComparator = keyComparator;
224            }
225    
226            this.valueComparator = null;
227    
228            this.keySerializer = keySerializer;
229            this.valueSerializer = valueSerializer;
230    
231            this.allowsDuplicates = false;
232    
233            long recId = recMan.getNamedObject( name );
234    
235            if ( recId != 0 )
236            {
237                bt = BTree.load( recMan, recId );
238                ((SerializableComparator<K>)bt.getComparator()).setSchemaManager( schemaManager );
239                bt.setValueSerializer( valueSerializer );
240                recId = recMan.getNamedObject( name + SZSUFFIX );
241                count = ( Integer ) recMan.fetch( recId );
242            }
243            else
244            {
245                bt = BTree.createInstance( recMan, keyComparator, keySerializer, valueSerializer );
246                recId = bt.getRecid();
247                recMan.setNamedObject( name, recId );
248                recId = recMan.insert( 0 );
249                recMan.setNamedObject( name + SZSUFFIX, recId );
250            }
251        }
252    
253    
254        // ------------------------------------------------------------------------
255        // Simple Table Properties
256        // ------------------------------------------------------------------------
257    
258        
259        /**
260         * @see Table#getKeyComparator()
261         */
262        public Comparator<K> getKeyComparator()
263        {
264            return keyComparator;
265        }
266    
267    
268        /**
269         * @see Table#getValueComparator()
270         */
271        public Comparator<V> getValueComparator()
272        {
273            return valueComparator;
274        }
275    
276    
277        public Serializer getKeySerializer()
278        {
279            return keySerializer;
280        }
281    
282    
283        public Serializer getValueSerializer()
284        {
285            return valueSerializer;
286        }
287    
288    
289        /**
290         * @see org.apache.directory.server.xdbm.Table#isDupsEnabled()
291         */
292        public boolean isDupsEnabled()
293        {
294            return allowsDuplicates;
295        }
296    
297    
298        /**
299         * @see org.apache.directory.server.xdbm.Table#getName()
300         */
301        public String getName()
302        {
303            return name;
304        }
305    
306    
307        public boolean isCountExact()
308        {
309            return false;
310        }
311            
312    
313        // ------------------------------------------------------------------------
314        // Count Overloads
315        // ------------------------------------------------------------------------
316    
317        
318        /**
319         * @see Table#greaterThanCount(Object)
320         */
321        public int greaterThanCount( K key ) throws IOException
322        {
323            // take a best guess
324            return count;
325        }
326        
327        
328        /**
329         * @see Table#lessThanCount(Object)
330         */
331        public int lessThanCount( K key ) throws IOException
332        {
333            // take a best guess
334            return count;
335        }
336    
337    
338        /**
339         * @see org.apache.directory.server.xdbm.Table#count(java.lang.Object)
340         */
341        public int count( K key ) throws IOException
342        {
343            if ( key == null )
344            {
345                return 0;
346            }
347    
348            if ( ! allowsDuplicates )
349            {
350                if ( null == bt.find( key ) )
351                {
352                    return 0;
353                }
354                else
355                {
356                    return 1;
357                }
358            }
359    
360            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
361            
362            if ( values.isArrayTree() )
363            {
364                return values.getArrayTree().size();
365            }
366    
367            return getBTree( values.getBTreeRedirect() ).size();
368        }
369    
370    
371        /**
372         * @see org.apache.directory.server.xdbm.Table#count()
373         */
374        public int count() throws IOException
375        {
376            return count;
377        }
378    
379        
380        // ------------------------------------------------------------------------
381        // get/has/put/remove Methods and Overloads
382        // ------------------------------------------------------------------------
383    
384    
385        @SuppressWarnings("unchecked")
386        public V get( K key ) throws Exception
387        {
388            if ( key == null )
389            {
390                return null;
391            }
392    
393            if ( ! allowsDuplicates )
394            {
395                return ( V ) bt.find( key );
396            }                         
397    
398            
399            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
400            if ( values.isArrayTree() )
401            {
402                ArrayTree<V> set = values.getArrayTree();
403    
404                if ( set.getFirst() == null )
405                {
406                    return null;
407                }
408                
409                return set.getFirst();
410            }
411    
412            // Handle values if they are stored in another BTree
413            BTree tree = getBTree( values.getBTreeRedirect() );
414    
415            jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
416            tree.browse().getNext( tuple );
417            //noinspection unchecked
418            
419            return ( V ) tuple.getKey();
420        }
421    
422        
423        /**
424         * @see Table#hasGreaterOrEqual(Object,Object)
425         */
426        public boolean hasGreaterOrEqual( K key, V val ) throws IOException
427        {
428            if ( key == null )
429            {
430                return false;
431            }
432    
433            if ( ! allowsDuplicates )
434            {
435                throw new UnsupportedOperationException( I18n.err( I18n.ERR_593 ) );
436            }
437    
438            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
439            
440            if ( values.isArrayTree() )
441            {
442                ArrayTree<V> set = values.getArrayTree();
443                V result = set.findGreaterOrEqual( val );
444                return result != null;
445            }
446    
447            // last option is to try a btree with BTreeRedirects
448            BTree tree = getBTree( values.getBTreeRedirect() );
449            
450            return tree.size() != 0 && btreeHas( tree, val, true );
451        }
452    
453    
454        /**
455         * @see Table#hasLessOrEqual(Object,Object)
456         */
457        public boolean hasLessOrEqual( K key, V val ) throws IOException
458        {
459            if ( key == null )
460            {
461                return false;
462            }
463    
464            if ( ! allowsDuplicates )
465            {
466                throw new UnsupportedOperationException( I18n.err( I18n.ERR_593 ) );
467            }
468    
469            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
470            
471            if ( values.isArrayTree() )
472            {
473                ArrayTree<V> set = values.getArrayTree();
474                V result = set.findLessOrEqual( val );
475                return result != null;
476            }
477    
478            // last option is to try a btree with BTreeRedirects
479            BTree tree = getBTree( values.getBTreeRedirect() );
480            
481            return tree.size() != 0 && btreeHas( tree, val, false );
482        }
483    
484    
485        /**
486         * @see org.apache.directory.server.xdbm.Table#hasGreaterOrEqual(Object)
487         */
488        @SuppressWarnings("unchecked")
489        public boolean hasGreaterOrEqual( K key ) throws IOException
490        {
491            // See if we can find the border between keys greater than and less
492            // than in the set of keys.  This will be the spot we search from.
493            jdbm.helper.Tuple tuple = bt.findGreaterOrEqual( key );
494    
495            // Test for equality first since it satisfies both greater/less than
496            if ( null != tuple && keyComparator.compare( ( K ) tuple.getKey(), key ) == 0 )
497            {
498                return true;
499            }
500    
501            // Greater searches are easy and quick thanks to findGreaterOrEqual
502            // A null return above means there were no equal or greater keys
503            if ( null == tuple )
504            {
505                return false;
506            }
507    
508            // Not Null! - we found a tuple with equal or greater key value
509            return true;
510        }
511    
512    
513        /**
514         * @see Table#hasLessOrEqual(Object)
515         */
516        @SuppressWarnings("unchecked")
517        public boolean hasLessOrEqual( K key ) throws IOException
518        {
519            // Can only find greater than or equal to with JDBM so we find that
520            // and work backwards to see if we can find one less than the key
521            jdbm.helper.Tuple tuple = bt.findGreaterOrEqual( key );
522    
523            // Test for equality first since it satisfies equal to condition
524            if ( null != tuple && keyComparator.compare( ( K ) tuple.getKey(), key ) == 0 )
525            {
526                return true;
527            }
528    
529            if ( null == tuple )
530            {
531                /*
532                 * Jdbm failed to find a key greater than or equal to the argument
533                 * which means all the keys in the table are less than the
534                 * supplied key argument.  We can hence return true if the table
535                 * contains any Tuples.
536                 */
537                return count > 0;
538            }
539            else
540            {
541                /*
542                 * We have the next tuple whose key is the next greater than the
543                 * key argument supplied.  We use this key to advance a browser to
544                 * that tuple and scan down to lesser Tuples until we hit one
545                 * that is less than the key argument supplied.  Usually this will
546                 * be the previous tuple if it exists.
547                 */
548                TupleBrowser browser = bt.browse( tuple.getKey() );
549                if ( browser.getPrevious( tuple ) )
550                {
551                    return true;
552                }
553            }
554    
555            return false;
556        }
557    
558    
559        /**
560         * @see org.apache.directory.server.xdbm.Table#has(java.lang.Object,
561         * java.lang.Object)
562         */
563        @SuppressWarnings("unchecked")
564        public boolean has( K key, V value ) throws IOException
565        {
566            if ( key == null )
567            {
568                return false;
569            }
570    
571            if ( ! allowsDuplicates )
572            {
573                V stored = ( V ) bt.find( key );
574                return null != stored && stored.equals( value );
575            }
576            
577            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
578            
579            if ( values.isArrayTree() )
580            {
581                return values.getArrayTree().find( value ) != null;
582            }
583            
584            return getBTree( values.getBTreeRedirect() ).find( value ) != null;
585        }
586        
587    
588        /**
589         * @see Table#has(java.lang.Object)
590         */
591        public boolean has( K key ) throws IOException
592        {
593            return key != null && bt.find(key) != null;
594        }
595    
596    
597        /**
598         * @see org.apache.directory.server.xdbm.Table#put(java.lang.Object,
599         * java.lang.Object)
600         */
601        @SuppressWarnings("unchecked")
602        public synchronized void put( K key, V value ) throws Exception
603        {
604            try
605            {
606                if ( LOG.isDebugEnabled() )
607                {
608                    LOG.debug( "---> Add {} = {}", name, key );
609                }
610    
611                if ( ( value == null ) || ( key == null ) )
612                {
613                    throw new IllegalArgumentException( I18n.err( I18n.ERR_594 ) );
614                }
615                
616                V replaced;
617        
618                if ( ! allowsDuplicates )
619                {
620                    replaced = ( V ) bt.insert( key, value, true );
621        
622                    if ( null == replaced )
623                    {
624                        count++;
625                    }
626        
627                    if ( LOG.isDebugEnabled() )
628                    {
629                        LOG.debug( "<--- Add ONE {} = {}", name, key );
630                    }
631                    
632                    return;
633                }
634                
635                DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
636                
637                if ( values.isArrayTree() )
638                {
639                    ArrayTree<V> set = values.getArrayTree();
640                    replaced = set.insert( value );
641                    
642                    if ( replaced != null )// if the value already present returns the same value
643                    {
644                        return;
645                    }
646                    if ( set.size() > numDupLimit )
647                    {
648                        BTree tree = convertToBTree( set );
649                        BTreeRedirect redirect = new BTreeRedirect( tree.getRecid() );
650                        bt.insert( key, BTreeRedirectMarshaller.INSTANCE.serialize( redirect ), true );
651                        
652                        if ( LOG.isDebugEnabled() )
653                        {
654                            LOG.debug( "<--- Add new BTREE {} = {}", name, key );
655                        }
656                    }
657                    else
658                    {
659                        bt.insert( key, marshaller.serialize( set ), true );
660                        
661                        if ( LOG.isDebugEnabled() )
662                        {
663                            LOG.debug( "<--- Add AVL {} = {}", name, key );
664                        }
665                    }
666        
667                    count++;
668                    return;
669                }
670                
671                BTree tree = getBTree( values.getBTreeRedirect() );
672                replaced = ( V ) tree.insert( value, StringTools.EMPTY_BYTES, true );
673                
674                if ( replaced == null )
675                {
676                    count++;
677                }
678                
679                if ( LOG.isDebugEnabled() )
680                {
681                    LOG.debug( "<--- Add BTREE {} = {}", name, key );
682                }
683            }
684            catch ( Exception e )
685            {
686                LOG.error( I18n.err( I18n.ERR_131, key, name ), e );
687                throw e;
688            }
689        }
690        
691    
692        /**
693         * @see org.apache.directory.server.xdbm.Table#remove(java.lang.Object,
694         * java.lang.Object)
695         */
696        @SuppressWarnings("unchecked")
697        public synchronized void remove( K key, V value ) throws IOException
698        {
699            try
700            {
701                if ( LOG.isDebugEnabled() )
702                {
703                    LOG.debug( "---> Remove " + name + " = " + key + ", " + value );
704                }
705                
706                if ( key == null )
707                {
708                    if ( LOG.isDebugEnabled() )
709                    {
710                        LOG.debug( "<--- Remove NULL key " + name );
711                    }
712                    return;
713                }
714        
715                if ( ! allowsDuplicates )
716                {
717                    V oldValue = ( V ) bt.find( key );
718                
719                    // Remove the value only if it is the same as value.
720                    if ( ( oldValue != null ) && oldValue.equals( value ) )
721                    {
722                        bt.remove( key );
723                        count--;
724                        
725                        if ( LOG.isDebugEnabled() )
726                        {
727                            LOG.debug( "<--- Remove ONE " + name + " = " + key + ", " + value );
728                        }
729                        
730                        return;
731                    }
732        
733                    return;
734                }
735        
736                DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
737                
738                if ( values.isArrayTree() )
739                {
740                    ArrayTree<V> set = values.getArrayTree();
741        
742                    // If removal succeeds then remove if set is empty else replace it
743                    if ( set.remove( value ) != null )
744                    {
745                        if ( set.isEmpty() )
746                        {
747                            bt.remove( key );
748                        }
749                        else
750                        {
751                            bt.insert( key, marshaller.serialize( set ), true );
752                        }
753                        count--;
754    
755                        if ( LOG.isDebugEnabled() )
756                        {
757                            LOG.debug( "<--- Remove AVL " + name + " = " + key + ", " + value );
758                        }
759                        
760                        return;
761                    }
762        
763                    return;
764                }
765        
766                // if the number of duplicates falls below the numDupLimit value
767                BTree tree = getBTree( values.getBTreeRedirect() );
768                
769                if ( tree.find( value ) != null )
770                {
771                    if ( tree.remove( value ) != null )
772                    {
773                        /*
774                         * If we drop below the duplicate limit then we revert from using
775                         * a Jdbm BTree to using an in memory AvlTree.
776                         */
777                        if ( tree.size() <= numDupLimit )
778                        {
779                            ArrayTree<V> avlTree = convertToArrayTree( tree );
780                            bt.insert( key, marshaller.serialize( avlTree ), true );
781                            recMan.delete( tree.getRecid() );
782                        }
783                        
784                        count--;
785                        
786                        if ( LOG.isDebugEnabled() )
787                        {
788                            LOG.debug( "<--- Remove BTREE " + name + " = " + key + ", " + value );
789                        }
790                        
791                        return;
792                    }
793                }
794            }
795            catch ( Exception e )
796            {
797                LOG.error( I18n.err( I18n.ERR_132, key, value, name ), e );
798            }
799        }
800    
801    
802        /**
803         * @see Table#remove(Object)
804         */
805        public synchronized void remove( K key ) throws IOException
806        {
807            try
808            {
809                if ( LOG.isDebugEnabled() )
810                {
811                    LOG.debug( "---> Remove {} = {}", name, key );
812                }
813                
814                if ( key == null )
815                {
816                    return;
817                }
818        
819                Object returned = bt.remove( key );
820        
821                if ( null == returned )
822                {
823                    if ( LOG.isDebugEnabled() )
824                    {
825                        LOG.debug( "<--- Remove AVL {} = {} (not found)", name, key );
826                    }
827                    
828                    return;
829                }
830        
831                if ( ! allowsDuplicates )
832                {
833                    this.count--;
834                 
835                    if ( LOG.isDebugEnabled() )
836                    {
837                        LOG.debug( "<--- Remove ONE {} = {}", name, key );
838                    }
839                    
840                    return;
841                }
842        
843                byte[] serialized = ( byte[] ) returned;
844        
845                if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
846                {
847                    BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
848                    this.count -= tree.size();
849                    
850                    if ( LOG.isDebugEnabled() )
851                    {
852                        LOG.debug( "<--- Remove BTree {} = {}", name, key );
853                    }
854    
855                    recMan.delete( tree.getRecid() );
856                    duplicateBtrees.remove( tree.getRecid() );
857                    return;
858                }
859                else
860                {
861                    ArrayTree<V> set = marshaller.deserialize( serialized );
862                    this.count -= set.size();
863        
864                    if ( LOG.isDebugEnabled() )
865                    {
866                        LOG.debug( "<--- Remove AVL {} = {}", name, key );
867                    }
868                    
869                    return;
870                }
871            }
872            catch ( Exception e )
873            {
874                LOG.error( I18n.err( I18n.ERR_133, key, name ), e );
875                
876                if ( e instanceof IOException )
877                {
878                    throw (IOException)e;
879                }
880            }
881        }
882    
883    
884        public Cursor<org.apache.directory.server.xdbm.Tuple<K,V>> cursor() throws Exception
885        {
886            if ( allowsDuplicates )
887            {
888                return new DupsCursor<K,V>( this );
889            }
890    
891            return new NoDupsCursor<K,V>( this );
892        }
893    
894    
895        @SuppressWarnings("unchecked")
896        public Cursor<org.apache.directory.server.xdbm.Tuple<K,V>> cursor( K key ) throws Exception
897        {
898            if ( key == null )
899            {
900                return new EmptyCursor<org.apache.directory.server.xdbm.Tuple<K,V>>();
901            }
902    
903            Object raw = bt.find( key );
904    
905            if ( null == raw )
906            {
907                return new EmptyCursor<org.apache.directory.server.xdbm.Tuple<K,V>>();
908            }
909    
910            if ( ! allowsDuplicates )
911            {
912                return new SingletonCursor<org.apache.directory.server.xdbm.Tuple<K,V>>( new org.apache.directory.server.xdbm.Tuple<K,V>( key, ( V ) raw ) );
913            }
914    
915            byte[] serialized = ( byte[] ) raw;
916            
917            if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
918            {
919                BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
920                return new KeyTupleBTreeCursor<K,V>( tree, key, valueComparator );
921            }
922    
923            ArrayTree<V> set = marshaller.deserialize( serialized );
924            return new KeyTupleArrayCursor<K,V>( set, key );
925        }
926    
927    
928        @SuppressWarnings("unchecked")
929        public Cursor<V> valueCursor( K key ) throws Exception
930        {
931            if ( key == null )
932            {
933                return new EmptyCursor<V>();
934            }
935    
936            Object raw = bt.find( key );
937    
938            if ( null == raw )
939            {
940                return new EmptyCursor<V>();
941            }
942    
943            if ( ! allowsDuplicates )
944            {
945                return new SingletonCursor<V>( ( V ) raw );
946            }
947    
948            byte[] serialized = ( byte[] ) raw;
949            if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
950            {
951                BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
952                return new KeyBTreeCursor<V>( tree, valueComparator );
953            }
954    
955            return new ArrayTreeCursor<V>( marshaller.deserialize( serialized ) );
956        }
957    
958    
959        // ------------------------------------------------------------------------
960        // Maintenance Operations 
961        // ------------------------------------------------------------------------
962    
963    
964        /**
965         * @see Table#close()
966         */
967        public synchronized void close() throws IOException
968        {
969            sync();
970        }
971    
972    
973        /**
974         * Synchronizes the buffers with disk.
975         *
976         * @throws IOException if errors are encountered on the flush
977         */
978        public synchronized void sync() throws IOException
979        {
980            long recId = recMan.getNamedObject( name + SZSUFFIX );
981            recMan.update( recId, count );
982        }
983    
984        
985        public Marshaller<ArrayTree<V>> getMarshaller()
986        {
987            return marshaller;
988        }
989        
990    
991        // ------------------------------------------------------------------------
992        // Private/Package Utility Methods 
993        // ------------------------------------------------------------------------
994        DupsContainer<V> getDupsContainer( byte[] serialized ) throws IOException
995        {
996            if ( serialized == null )
997            {
998                return new DupsContainer<V>( new ArrayTree<V>( valueComparator ) );
999            }
1000    
1001            if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
1002            {
1003                return new DupsContainer<V>( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
1004            }
1005    
1006            return new DupsContainer<V>( marshaller.deserialize( serialized ) );
1007        }
1008    
1009    
1010        /**
1011         * Returns the main BTree used by this table.
1012         *
1013         * @return the main JDBM BTree used by this table
1014         */
1015        BTree getBTree()
1016        {
1017            return bt;
1018        }
1019    
1020    
1021        BTree getBTree( BTreeRedirect redirect ) throws IOException
1022        {
1023            if ( duplicateBtrees.containsKey( redirect.getRecId() ) )
1024            {
1025                return duplicateBtrees.get( redirect.getRecId() );
1026            }
1027            
1028            BTree tree = BTree.load( recMan, redirect.getRecId() );
1029            ((SerializableComparator<K>)tree.getComparator()).setSchemaManager( schemaManager );
1030            duplicateBtrees.put( redirect.getRecId(), tree );
1031            return tree;
1032        }
1033    
1034    
1035        @SuppressWarnings("unchecked")
1036        private boolean btreeHas( BTree tree, V key, boolean isGreaterThan ) throws IOException
1037        {
1038            jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
1039            
1040            TupleBrowser browser = tree.browse( key );
1041            if ( isGreaterThan )
1042            {
1043                return browser.getNext( tuple );
1044            }
1045            else
1046            {
1047                if ( browser.getPrevious( tuple ) )
1048                {
1049                    return true;
1050                }
1051                else
1052                {
1053                    /*
1054                     * getPrevious() above fails which means the browser has is
1055                     * before the first Tuple of the btree.  A call to getNext()
1056                     * should work every time.
1057                     */
1058                    browser.getNext( tuple );
1059    
1060                    /*
1061                     * Since the browser is positioned now on the Tuple with the
1062                     * smallest key we just need to check if it equals this key
1063                     * which is the only chance for returning true.
1064                     */
1065                    V firstKey = ( V ) tuple.getKey();
1066                    return valueComparator.compare( key, firstKey ) == 0;
1067                }
1068            }
1069        }
1070    
1071    
1072        @SuppressWarnings("unchecked")
1073        private ArrayTree<V> convertToArrayTree( BTree bTree ) throws IOException
1074        {
1075            ArrayTree<V> avlTree = new ArrayTree<V>( valueComparator );
1076            TupleBrowser browser = bTree.browse();
1077            jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
1078            
1079            while ( browser.getNext( tuple ) )
1080            {
1081                avlTree.insert( ( V ) tuple.getKey() );
1082            }
1083    
1084            return avlTree;
1085        }
1086        
1087    
1088        private BTree convertToBTree( ArrayTree<V> arrayTree ) throws Exception
1089        {
1090            BTree bTree;
1091    
1092            if ( valueSerializer != null )
1093            {
1094                bTree = BTree.createInstance( recMan, valueComparator, valueSerializer, null );
1095            }
1096            else
1097            {
1098                bTree = BTree.createInstance( recMan, valueComparator );
1099            }
1100    
1101            Cursor<V> keys = new ArrayTreeCursor<V>( arrayTree );
1102            keys.beforeFirst();
1103            
1104            while ( keys.next() )
1105            {
1106                bTree.insert( keys.get(), StringTools.EMPTY_BYTES, true );
1107            }
1108            
1109            return bTree;
1110        }
1111    }