Chris Pollett > Students >
Aggarwal

    ( Print View)

    [Bio]

    [Blog]

    [CS297 Proposal]

    [Deliverable-1]

    [Deliverable-2]

    [Deliverable-3]

    [Deliverable-4]

    [CS297_Report - PDF]

    [CS298 Proposal]

    [Code]

    [CS298_Report - PDF]

    [CS298_Presentation - PDF]

Key-Value store using Rust: Linear hashing implementation

Ishaan Aggarwal (ishaan.aggarwal@sjsu.edu)

Purpose:

The purpose of this deliverable is to explore the implementation of dynamic hashing scheme known as Linear Hashing. This will help in extending the simple key value value store in deliverable 1.

Linear Hashing:

Linear Hashing is a dynamic hashing technique that grows the number of initial buckets 1 bucket at a time according to some criteria. Hence it's name Linear Hashing.

A hash function's output will always give a fixed number of bits. Let's say our hash function gives a 32 bit hash output from some key. In Linear Hashing however, we will only use the first I bits to address to N initial buckets. If we start with N =2 bucket, then I = 1 bit. So, we will only use the first bit of the hash function's 32 bit output to map to a bucket. Let our criteria for adding a bucket be passing a load factor threshold i.e. number of items / (number of buckets * average items in each bucket). Once the number of insertions exceeds our threshold we add a bucket to N. If N becomes another power of 2: N > (2^I -1) we increment I to address the new buckets. When any bucket is added we split the bucket at an index S. S is initially the first bucket. When we split a bucket we rehash all the keys at bucket S add if the keys rehash to the address of the newly added bucket, we move the key there. Once N buckets has doubled from it's initial position we reset the S index to 0.

Implementation Overview:

Project Setup:

The project has two parts. One is the library which contains the actual implementation for the methods pertaining to the linear hashing and other is where the library is being utilized to actually run linear hashing.
* The library is in the project named rust-linear-hash-kvstore. This directory also contains the main project which utilizes the library under the linear-hash-db directory.
* Both the projects have their own cargo.toml files as shown below:


rust-linear-hash-kvstore library

[package]
name = "linearhash"
version = "0.1.0"

[lib]
name = "linearhash"
path = "src/lib.rs"

linear-hash-db

[package]
name = "linear-hash-db"
version = "0.1.0"

[dependencies]
linearhash = { path = "../" }

Note: We can see here that the latter declares the prior as its dependency.

How to run:

To run this project, run command - "cargo build" in the parent directory of the project. This builds the library to be used.
Next, run commands - "cargo build" followed by "cargo run" in the linear-hash-db directory. This will run the mail file and execute the testcases, utilizing the linear hash library.

Code: Linear Hash library

The library consists of 4 files with functions separated as per the part of operation they deal with. This can be understood from file names.
lib.rs


use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::path::Path;

// TODO: implement remove

pub mod util;
pub mod page;
pub mod disk;

use disk::{DbFile,SearchResult};

/// Linear Hashtable
pub struct LinHash {
    buckets: DbFile,
    nbits: usize,               // no of bits used from hash
    nitems: usize,              // number of items in hashtable
    nbuckets: usize,            // number of buckets
}

impl LinHash {
    /// "load factor" needed before the hashmap needs to grow.
    const THRESHOLD: f32 = 0.8;

    /// Creates a new Linear Hashtable.
    pub fn open(filename: &str, keysize: usize, valsize: usize) -> LinHash {
        let file_exists = Path::new(filename).exists();
        let mut dbfile = DbFile::new(filename, keysize, valsize);
        let (nbits, nitems, nbuckets) =
            if file_exists {
                dbfile.read_ctrlpage()
            } else {
                (1, 0, 2)
            };
        println!("{:?}", (nbits, nitems, nbuckets));
        LinHash {
            buckets: dbfile,
            nbits: nbits,
            nitems: nitems,
            nbuckets: nbuckets,
        }
    }

    fn hash(&self, key: &[u8]) -> u64 {
        let mut s = DefaultHasher::new();
        key.hash(&mut s);
        s.finish()
    }

    /// Which bucket to place the key-value pair in. If the target
    /// bucket does not yet exist, it is guaranteed that the MSB is a
    /// `1`. To find the bucket, the pair should be placed in,
    /// subtract this `1`.
    fn bucket(&self, key: &[u8]) -> usize {
        let hash = self.hash(key);
        let bucket = (hash & ((1 << self.nbits) - 1)) as usize;
        let adjusted_bucket_index =
            if bucket < self.nbuckets {
                bucket
            } else {
                bucket - (1 << (self.nbits-1))
            };

        adjusted_bucket_index
    }

    /// Returns true if the `load` exceeds `LinHash::THRESHOLD`
    fn split_needed(&self) -> bool {
        (self.nitems as f32 / (self.buckets.records_per_page * self.nbuckets) as f32) >
            LinHash::THRESHOLD
    }

    /// If necessary, allocates new bucket. If there's no more space
    /// in the buckets vector(ie. n > 2^i), increment number of bits
    /// used(i).

    /// Note that, the bucket split is not necessarily the one just
    /// inserted to.
    fn maybe_split(&mut self) -> bool {
        if self.split_needed() {
            println!("Load factor exceeding the threshold of {}; Split required!", LinHash::THRESHOLD);
            self.nbuckets += 1;

            self.buckets.allocate_new_bucket();
            if self.nbuckets > (1 << self.nbits) {
                self.nbits += 1;
            }

            // Take index of last item added and subtract the 1 at the
            // MSB position. eg: after bucket 11 is added, bucket 01
            // needs to be split
            let bucket_to_split =
                (self.nbuckets-1) ^ (1 << (self.nbits-1));
            println!("nbits: {} nitems: {} new nbuckets: {} after splitting bucket {:b} ({}))",
                     self.nbits, self.nitems, self.nbuckets, bucket_to_split, bucket_to_split);
            // Replace the bucket to split with a fresh, empty
            // page. And get a list of all records stored in the bucket
            let old_bucket_records =
                self.buckets.clear_bucket(bucket_to_split);

            // Re-hash all records in old_bucket. Ideally, about half
            // of the records will go into the new bucket.
            for (k, v) in old_bucket_records.into_iter() {
                self.reinsert(&k, &v);
            }
            return true
        }

        false
    }

    /// Does the hashmap contain a record with key `key`?
    pub fn contains(&mut self, key: &[u8]) -> bool {
        match self.get(key) {
            Some(_) => true,
            None => false,
        }
    }

    /// Update the mapping of record with key `key`.
    pub fn update(&mut self, key: &[u8], val: &[u8]) -> bool {
        let bucket_index = self.bucket(&key);
        match self.buckets.search_bucket(bucket_index, key.clone()) {
            SearchResult { page_id, row_num, val: old_val } => {
                match (page_id, row_num, old_val) {
                    (Some(page_id), Some(row_num), Some(_)) => {
                        println!("update: {:?}", (page_id, row_num, key.clone(), val.clone()));
                        self.buckets.write_record(page_id, row_num, key, val);
                        true
                    }
                    _ => false,
                }
            },
        }
    }

    /// Insert (key,value) pair into the hashtable.
    pub fn put(&mut self, key: &[u8], val: &[u8]) {
        let bucket_index = self.bucket(&key);
        match self.buckets.search_bucket(bucket_index, key.clone()) {
            SearchResult { page_id, row_num, val: old_val } => {
                match (page_id, row_num, old_val) {
                    // new insert
                    (Some(page_id), Some(pos), None) => {
                        self.buckets.write_record_incr(page_id, pos, key, val);
                        self.nitems += 1;
                    },
                    // case for update
                    (Some(_page_id), Some(pos), Some(_old_val)) => {
                        panic!("can't use put to reinsert old item: {:?}", (key, val));
                    },
                    // new insert, in overflow page
                    (Some(last_page_id), None, None) => { // overflow
                        self.buckets.allocate_overflow(bucket_index, last_page_id);
                        self.put(key, val);
                    },
                    _ => panic!("impossible case"),
                }
            },
        }

        self.maybe_split();
        self.buckets.write_ctrlpage((self.nbits, self.nitems, self.nbuckets));
    }

    /// Re-insert (key, value) pair after a split
    fn reinsert(&mut self, key: &[u8], val: &[u8]) {
        self.put(key, val);
        // correct for nitems increment in `put`
        self.nitems -= 1;
    }

    /// Lookup `key` in hashtable
    pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
        let bucket_index = self.bucket(&key);
        match self.buckets.search_bucket(bucket_index, key) {
            SearchResult { page_id, row_num, val } => {
                match val {
                    Some(v) => Some(v),
                    _ => None,
                }
            },
        }
    }

    // Removes record with `key` in hashtable.
    // pub fn remove(&mut self, key: K) -> Option<V< {
    //     let bucket_index = self.bucket(&key);
    //     let index_to_delete = self.search_bucket(bucket_index, &key);

    //     // Delete item from bucket
    //     match index_to_delete {
    //         Some(x) => Some(self.buckets[bucket_index].remove(x).1),
    //         None => None,
    //     }
    // }

    pub fn close(&mut self) {
        self.buckets.write_ctrlpage((self.nbits, self.nitems, self.nbuckets));
        self.buckets.close();
    }
}

#[cfg(test)]
mod tests {
    use LinHash;
    use std::fs;
    use util::*;

    #[test]
    fn all_put_update() {
        let mut h = LinHash::open("/tmp/test_all_put_update", 32, 4);
        h.put(b"hello", &[12]);
        h.put(b"there", &[13]);
        h.put(b"foo", &[42]);
        h.put(b"bar", &[11]);
        h.update(b"foo", &[84]);
        h.update(b"bar", &[22]);

        println!("Starting asserts.");
        assert_eq!(h.get(b"hello"), Some(vec![12, 0, 0, 0]));
        assert_eq!(h.get(b"there"), Some(vec![13, 0, 0, 0]));
        assert_eq!(h.get(b"foo"), Some(vec![84, 0, 0, 0]));
        assert_eq!(h.get(b"bar"), Some(vec![22, 0, 0, 0]));

        // assert_eq!(h.update(String::from("doesn't exist"), 99), false);
        assert_eq!(h.contains(b"doesn't exist"), false);
        assert_eq!(h.contains(b"hello"), true);

        h.close();
        // fs::remove_file("/tmp/test_all_put_update").ok();
    }

    #[test]
    fn test_persistence() {
        let mut h = LinHash::open("/tmp/test_persistence", 32, 4);
        h.put(b"hello", &[12]);
        h.put(b"world", &[13]);
        h.put(b"linear", &[144]);
        h.put(b"hashing", &[255]);
        h.close();

        // This reloads the file and creates a new hashtable
        let mut h2 = LinHash::open("/tmp/test_persistence", 32, 4);
        assert_eq!(h2.get(b"hello"), Some(vec![12, 0, 0, 0]));

        h2.close();
        // fs::remove_file("/tmp/test_persistence").ok();
    }

    // TODO: figure out a testing strategy for this.
    // This test should insert 10,000 records and checks that they are all
    // there and also return the time it took to do so.
    #[test]
    fn test_overflow_and_splitting() {
        let mut h = LinHash::open("/tmp/test_overflow_and_splitting", 4, 4);
        for k in 0..10000 {
            h.put(&i32_to_bytearray(k),
                   &i32_to_bytearray(k+1));
        }
        h.close();

        let mut h2 = LinHash::open("/tmp/test_overflow_and_splitting", 4, 4);
        for k in 0..10000 {
            assert_eq!(h2.get(&i32_to_bytearray(k)),
                       Some(i32_to_bytearray(k+1).to_vec()));
        }

        fs::remove_file("/tmp/test_overflow_and_splitting").ok();
    }
}

disk.rs


use std::collections::VecDeque;
use std::io::prelude::*;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::SeekFrom;
use std::mem;

use page::{Page, PAGE_SIZE, HEADER_SIZE};
use util::*;

const NUM_BUFFERS : usize = 16;

pub struct SearchResult {
    pub page_id: Option<usize>,
    pub row_num: Option<usize>,
    pub val: Option<Vec<u8>>
}

fn flatten<T>(v: Vec<(usize, Vec<T>)>) -> Vec<T> {
    let mut result = vec![];
    for (_, mut i) in v {
        result.append(&mut i);
    }
    result
}

pub struct DbFile {
    path: String,
    file: File,
    ctrl_buffer: Page,
    pub buffers: VecDeque<Page>,
    pub records_per_page: usize,
    bucket_to_page: Vec<usize>,
    keysize: usize,
    valsize: usize,
    num_pages: usize,
    // overflow pages no longer in use
    free_list: Option<usize>,
    num_free: usize,
}

impl DbFile {
    pub fn new(filename: &str, keysize: usize, valsize: usize) -> DbFile {
        let file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(filename);
        let file = match file {
            Ok(f) => f,
            Err(e) => panic!(e),
        };

        let total_size = keysize + valsize;
        let records_per_page = (PAGE_SIZE - HEADER_SIZE) / total_size;

        let mut buffers : VecDeque<Page> =
            VecDeque::with_capacity(NUM_BUFFERS);
        for _i in 0..NUM_BUFFERS {
            buffers.push_back(Page::new(keysize, valsize));
        }

        DbFile {
            path: String::from(filename),
            file: file,
            ctrl_buffer: Page::new(0, 0),
            buffers: buffers,
            records_per_page: records_per_page,
            bucket_to_page: vec![1, 2],
            keysize: keysize,
            valsize: valsize,
            num_pages: 3,
            free_list: Some(3),
            num_free: 0,
        }
    }

    // Control page layout:
    //
    // | nbits | nitems | nbuckets | num_pages | free_list root |
    // num_free | bucket_to_page mappings .... |
    pub fn read_ctrlpage(&mut self) -> (usize, usize, usize) {
        self.get_ctrl_page();
        let nbits : usize = bytearray_to_usize(self.ctrl_buffer.storage[0..8].to_vec());
        let nitems : usize =
            bytearray_to_usize(self.ctrl_buffer.storage[8..16].to_vec());
        let nbuckets : usize =
            bytearray_to_usize(self.ctrl_buffer.storage[16..24].to_vec());

        self.num_pages =
            bytearray_to_usize(self.ctrl_buffer.storage[24..32].to_vec());
        let free_list_head = bytearray_to_usize(self.ctrl_buffer.storage[32..40].to_vec());
        self.free_list =
            if free_list_head == 0 {
                None
            } else {
                Some(free_list_head)
            };
        self.num_free =
            bytearray_to_usize(self.ctrl_buffer.storage[40..48].to_vec());
        self.bucket_to_page =
            bytevec_to_usize_vec(self.ctrl_buffer.storage[48..PAGE_SIZE].to_vec());
        (nbits, nitems, nbuckets)
    }

    pub fn write_ctrlpage(&mut self,
                          (nbits, nitems, nbuckets):
                          (usize, usize, usize)) {
        self.get_ctrl_page();

        let nbits_bytes = usize_to_bytearray(nbits);
        let nitems_bytes = usize_to_bytearray(nitems);
        let nbuckets_bytes = usize_to_bytearray(nbuckets);
        let num_pages_bytes = usize_to_bytearray(self.num_pages);
        let free_list_bytes = usize_to_bytearray(self.free_list.unwrap_or(0));
        let num_free_bytes = usize_to_bytearray(self.num_free);
        let bucket_to_page_bytevec = usize_vec_to_bytevec(self.bucket_to_page.clone());
        let mut bucket_to_page_bytearray = vec![];
        bucket_to_page_bytearray.write(&bucket_to_page_bytevec)
            .expect("Write to ctrlpage failed");

        println!("nbits: {:?} nitems: {:?} nbuckets: {:?}", nbits_bytes,
                 nitems_bytes, nbuckets_bytes);
        mem_move(&mut self.ctrl_buffer.storage[0..8],
                 &nbits_bytes);
        mem_move(&mut self.ctrl_buffer.storage[8..16],
                 &nitems_bytes);
        mem_move(&mut self.ctrl_buffer.storage[16..24],
                 &nbuckets_bytes);
        mem_move(&mut self.ctrl_buffer.storage[24..32],
                 &num_pages_bytes);
        mem_move(&mut self.ctrl_buffer.storage[32..40],
                 &free_list_bytes);
        mem_move(&mut self.ctrl_buffer.storage[40..48],
                 &num_free_bytes);
        mem_move(&mut self.ctrl_buffer.storage[48..PAGE_SIZE],
                 &bucket_to_page_bytearray);
        DbFile::write_page(&mut self.file,
                           0,
                           &self.ctrl_buffer.storage);
    }

    pub fn get_ctrl_page(&mut self) {
        self.file.seek(SeekFrom::Start(0))
            .expect("Could not seek to offset");
        self.file.read(&mut self.ctrl_buffer.storage)
            .expect("Could not read file");
    }

    fn bucket_to_page(&self, bucket_id: usize) -> usize {
        self.bucket_to_page[bucket_id]
    }

    fn search_buffer_pool(&self, page_id: usize) -> Option<usize> {
        for (i, b) in self.buffers.iter().enumerate() {
            if b.id == page_id {
                return Some(i);
            }
        }
        None
    }

    /// Reads page to self.buffer
    pub fn fetch_page(&mut self, page_id: usize) -> usize {
        let bufpool_index = self.search_buffer_pool(page_id);
        match bufpool_index {
            None => {
                match self.buffers.pop_front() {
                    Some(mut old_page) => {
                        if old_page.dirty {
                            old_page.write_header();
                            DbFile::write_page(&self.file,
                                               old_page.id,
                                               &old_page.storage);
                        }
                    },
                    _ => (),
                }

                let offset = (page_id * PAGE_SIZE) as u64;
                let mut new_page = Page::new(self.keysize, self.valsize);
                new_page.id = page_id;
                let buffer_index = NUM_BUFFERS - 1;

                self.file.seek(SeekFrom::Start(offset))
                    .expect("Could not seek to offset");
                self.file.read(&mut new_page.storage)
                    .expect("Could not read file");
                self.buffers.push_back(new_page);
                self.buffers[buffer_index].read_header();

                buffer_index
            },
            Some(p) => p,
        }
    }

    /// Writes data in `data` into page `page_id` in file.
    pub fn write_page(mut file: &File, page_id: usize, data: &[u8]) {
        let offset = (page_id * PAGE_SIZE) as u64;
        file.seek(SeekFrom::Start(offset))
            .expect("Could not seek to offset");
        file.write(data).expect("write failed");
        file.flush().expect("flush failed");
    }

    /// Write record but don't increment `num_records`. Used when
    /// updating already existing record.
    pub fn write_record(&mut self,
                        page_id: usize,
                        row_num: usize,
                        key: &[u8],
                        val: &[u8]) {
        let buffer_index = self.fetch_page(page_id);
        self.buffers[buffer_index].dirty = true;
        self.buffers[buffer_index].write_record(row_num, key, val);
    }

    /// Write record and increment `num_records`. Used when inserting
    /// new record.
    pub fn write_record_incr(&mut self, page_id: usize, row_num: usize,
                             key: &[u8], val: &[u8]) {
        let buffer_index = self.fetch_page(page_id);
        self.buffers[buffer_index].incr_num_records();
        self.write_record(page_id, row_num, key, val);
    }

    /// Searches for `key` in `bucket`. A bucket is a linked list of
    /// pages. Return value:
    ///
    /// If key is present in bucket returns as struct, SearchResult
    /// (page_id, row_num, val).
    ///
    /// If key is not present and:
    ///   1. there is enough space in last page, returns (page_id, row_num, None)
    ///
    ///   2. there is not enough space in last page, returns
    ///      (last_page_id, None, None)
    pub fn search_bucket(&mut self, bucket_id: usize, key: &[u8]) -> SearchResult {
        let mut page_id = self.bucket_to_page(bucket_id);
        let mut buffer_index;
        let mut first_free_row = SearchResult {
            page_id: None,
            row_num: None,
            val: None,
        };
        loop {
            buffer_index = self.fetch_page(page_id);
            let next_page = self.buffers[buffer_index].next;
            let page_records = self.all_records_in_page(page_id);

            let len = page_records.len();
            for (row_num, (k,v)) in page_records.into_iter().enumerate() {
                if slices_eq(&k, key) {
                    return SearchResult{
                        page_id: Some(page_id),
                        row_num: Some(row_num),
                        val: Some(v)
                    }
                }
            }

            let row_num = if len < self.records_per_page {
                Some(len)
            } else {
                None
            };

            match (first_free_row.page_id, first_free_row.row_num) {
                // this is the first free space for a row found, so
                // keep track of it.
                (Some(_), None) |
                (None, _) => {
                    first_free_row = SearchResult {
                        page_id: Some(page_id),
                        row_num: row_num,
                        val: None,
                    }
                },
                _ => (),
            }

            if let Some(p) = next_page {
                page_id = p;
            } else {
                break;
            }
        }

        first_free_row
    }

    /// Add a new overflow page to a `bucket`.
    pub fn allocate_overflow(&mut self, bucket_id: usize,
                             last_page_id: usize) -> (usize, usize) {
        let physical_index = self.allocate_new_page();

        let new_page_buffer_index = self.fetch_page(physical_index);
        self.buffers[new_page_buffer_index].next = None;
        self.buffers[new_page_buffer_index].dirty = true;

        // Write next of old page
        let old_page_buffer_index = self.fetch_page(last_page_id);
        self.buffers[old_page_buffer_index].next = Some(physical_index);
        self.buffers[old_page_buffer_index].dirty = true;

        println!("setting next of buffer_id {}(page_id: {}) to {:?}",
                 bucket_id,
                 self.buffers[old_page_buffer_index].id,
                 self.buffers[old_page_buffer_index].next);

        (physical_index, 0)
    }

    /// Write out page in bufferpool to file.
    pub fn write_buffer_page(&mut self, buffer_index: usize) {
        // Ignore page 0(ctrlpage)
        if self.buffers[buffer_index].id != 0 {
            self.buffers[buffer_index].dirty = false;
            self.buffers[buffer_index].write_header();
            DbFile::write_page(&mut self.file,
                               self.buffers[buffer_index].id,
                               &self.buffers[buffer_index].storage);
        }
    }

    fn all_records_in_page(&mut self, page_id: usize)
                           -> Vec<(Vec<u8>, Vec<u8>)> {
        let buffer_index = self.fetch_page(page_id);
        let mut page_records = vec![];
        for i in 0..self.buffers[buffer_index].num_records {
            let (k, v) = self.buffers[buffer_index].read_record(i);
            let (dk, dv) = (k.to_vec(), v.to_vec());
            page_records.push((dk, dv));
        }

        page_records
    }

    /// Returns a vec of (page_id, records_in_vec). ie. each inner
    /// vector represents the records in a page in the bucket.
    fn all_records_in_bucket(&mut self, bucket_id: usize)
                             -> Vec<(usize, Vec<(Vec<u8>,Vec<u8>)>)> {
        let first_page_id = self.bucket_to_page(bucket_id);
        let buffer_index = self.fetch_page(first_page_id);
        let mut records = Vec::new();
        records.push((self.buffers[buffer_index].id,
                      self.all_records_in_page(first_page_id)));

        let mut next_page = self.buffers[buffer_index].next;
        while let Some(page_id) = next_page {
            if page_id == 0 {
                break;
            }

            let buffer_index = self.fetch_page(page_id);
            records.push((page_id,
                          self.all_records_in_page(page_id)));

            next_page = self.buffers[buffer_index].next;
        }

        records
    }

    /// Allocate a new page. If available uses recycled overflow
    /// pages.
    fn allocate_new_page(&mut self) -> usize {
        let p = self.free_list;
        let page_id = p.expect("no page in free_list");
        // println!("[allocate_new_page] allocating page_id: {}", page_id);
        let buffer_index = self.fetch_page(page_id);

        self.free_list = match self.buffers[buffer_index].next {
            Some(0) | None => {
                self.num_pages += 1;
                Some(self.num_pages)
            },
            _ => {
                self.num_free -= 1;
                self.buffers[buffer_index].next
            },
        };

        let new_page = Page::new(self.keysize, self.valsize);
        mem::replace(&mut self.buffers[buffer_index], new_page);
        self.buffers[buffer_index].id = page_id;
        self.buffers[buffer_index].dirty = false;
        self.buffers[buffer_index].next = None;

        page_id
    }

    /// Empties out root page for bucket. Overflow pages are added to
    /// `free_list`
    pub fn clear_bucket(&mut self, bucket_id: usize) -> Vec<(Vec<u8>,Vec<u8>)> {
        let all_records = self.all_records_in_bucket(bucket_id);
        let records = flatten(all_records.clone());

        // Add overflow pages to free_list
        let bucket_len = all_records.len();
        if bucket_len > 1 {
            // second page onwards are overflow pages
            let (second_page_id, _) = all_records[1];
            // println!("[clear_bucket] adding overflow chain starting page {} to free_list", second_page_id);
            let temp = self.free_list;
            self.free_list = Some(second_page_id);

            let second_page_buffer_index =
                self.fetch_page(second_page_id);
            // overflow pages only
            self.num_free += bucket_len - 1;
            self.buffers[second_page_buffer_index].next = temp;
        }

        let page_id = self.bucket_to_page(bucket_id);
        let buffer_index = self.fetch_page(page_id);
        let new_page = Page::new(self.keysize, self.valsize);
        mem::replace(&mut self.buffers[buffer_index], new_page);
        self.buffers[buffer_index].id = page_id;
        self.buffers[buffer_index].dirty = false;
        self.write_buffer_page(buffer_index);

        records
    }

    pub fn allocate_new_bucket(&mut self) {
        let page_id = self.allocate_new_page();
        self.bucket_to_page.push(page_id);
    }

    pub fn close(&mut self) {
        for b in 0..NUM_BUFFERS {
            self.write_buffer_page(b);
        }
    }
}

#[cfg(test)]
mod tests {
    use disk;
    use DbFile;
    use std::fs;

    #[test]
    fn dbfile_tests () {
        let mut bp = DbFile::new("/tmp/dbfile_tests", 4, 4);
        let bark = b"bark";
        let krab = b"krab";
        // write to page 1
        bp.write_record(1, 14, bark, krab);
        assert_eq!(bp.buffers[disk::NUM_BUFFERS-1].read_record(14),
                   (&bark[..], &krab[..]));
        bp.close();

        let mut bp2 = DbFile::new("/tmp/dbfile_tests", 4, 4);
        // read from page 1
        let buffer_index = bp2.fetch_page(1);
        assert_eq!(bp2.buffers[buffer_index].read_record(14),
                   (&bark[..], &krab[..]));

        fs::remove_file("/tmp/dbfile_tests").ok();
    }
}

page.rs


use util::*;

pub const PAGE_SIZE : usize = 4096; // bytes
pub const HEADER_SIZE : usize = 16; // bytes

pub struct Page {
    pub id: usize,
    pub storage: [u8; PAGE_SIZE],
    pub num_records: usize,
    // page_id of overflow bucket
    pub next: Option<usize>,
    pub dirty: bool,

    keysize: usize,
    valsize: usize,
}

// Row layout:
// | key | val |
#[derive(Debug)]
struct RowOffsets {
    key_offset: usize,
    val_offset: usize,
    row_end: usize,
}

impl Page {
    pub fn new(keysize: usize, valsize: usize) -> Page {
        Page {
            id: 0,
            num_records: 0,
            storage: [0; PAGE_SIZE],
            next: None,
            keysize: keysize,
            valsize: valsize,
            dirty: false,
        }
    }

    /// Compute where in the page the row should be placed. Within the
    /// row, calculate the offsets of the header, key and value.
    fn compute_offsets(&self, row_num: usize) -> RowOffsets {
        let total_size = self.keysize + self.valsize;

        let row_offset = HEADER_SIZE + (row_num * total_size);
        let key_offset = row_offset;
        let val_offset = key_offset + self.keysize;
        let row_end = val_offset + self.valsize;

        RowOffsets {
            key_offset: key_offset,
            val_offset: val_offset,
            row_end: row_end,
        }
    }


    pub fn read_header(&mut self) {
        let num_records : usize = bytearray_to_usize(self.storage[0..8].to_vec());
        let next : usize = bytearray_to_usize(self.storage[8..16].to_vec());
        self.num_records = num_records;
        self.next = if next != 0 {
            Some(next)
        } else {
            None
        };
    }

    pub fn write_header(&mut self) {
        mem_move(&mut self.storage[0..8], &usize_to_bytearray(self.num_records));
        mem_move(&mut self.storage[8..16], &usize_to_bytearray(self.next.unwrap_or(0)));
    }

    pub fn read_record(&mut self, row_num: usize) -> (&[u8], &[u8]) {
        let offsets = self.compute_offsets(row_num);
        let key = &self.storage[offsets.key_offset..offsets.val_offset];
        let val = &self.storage[offsets.val_offset..offsets.row_end];
        (key, val)
    }

    /// Write record to offset specified by `row_num`. The offset is
    /// calculated to accomodate header as well.
    pub fn write_record(&mut self, row_num: usize, key: &[u8], val: &[u8]) {
        let offsets = self.compute_offsets(row_num);
        mem_move(&mut self.storage[offsets.key_offset..offsets.val_offset],
                 key);
        mem_move(&mut self.storage[offsets.val_offset..offsets.row_end],
                 val);
    }

    /// Increment number of records in page
    pub fn incr_num_records(&mut self) {
        self.num_records += 1;
    }
}

util.rs


use std::mem::transmute;

pub fn mem_move(dest: &mut [u8], src: &[u8]) {
    for (d, s) in dest.iter_mut().zip(src) {
        *d = *s
    }
}

pub fn usize_to_bytearray(n: usize) -> [u8; 8] {
    unsafe {
        transmute::<usize, [u8;8]>(n)
    }
}

pub fn i32_to_bytearray(n: i32) -> [u8; 4] {
    unsafe {
        transmute::<i32, [u8;4]>(n)
    }
}

pub fn usize_vec_to_bytevec(v: Vec<usize>) -> Vec<u8> {
    let mut bv : Vec<u8> = vec![];
    for i in v {
        bv.append(&mut usize_to_bytearray(i).to_vec());
    }
    bv
}

pub fn bytevec_to_usize_vec(b: Vec<u8>) -> Vec<usize> {
    let mut v = vec![];
    for i in 0..(b.len() / 8) {
        v.push(bytearray_to_usize(b[i*8..(i+1)*8].to_vec()));
    }
    v
}

pub fn bytearray_to_usize(b: Vec<u8>) -> usize {
    assert_eq!(b.len(), 8);
    let mut a = [0; 8];

    for i in 0..b.len() {
        a[i] = b[i];
    }

    unsafe {
        transmute::<[u8;8], usize>(a)
    }
}

pub fn slices_eq<T: PartialEq>(s1: &[T], s2: &[T]) -> bool {
    s1.iter().zip(s2).all(|(a,b)| a == b)
}

Code: Utilizing the library

This contains a single main.rs file which is shown below.



extern crate linearhash;

use linearhash::LinHash;
use std::time::Instant;
use std::fs;
use linearhash::util::*;

#[allow(dead_code)]
fn measure_perf(num_iters: i32) {
    // in each iteration, insert a larger number of records to see how
    // `insert` and `lookup` performs. `insert` should be O(n) and
    // `lookup` should be O(1).
    for i in 1..num_iters {
        fs::remove_file("/tmp/measure_perf").ok();
        let time_insert_start = Instant::now();
        let mut h2 = LinHash::open("/tmp/measure_perf", 4, 4);
        for k in 0..(10000*i) {
            h2.put(&linearhash::util::i32_to_bytearray(k),
                   &linearhash::util::i32_to_bytearray(k+1));
        }
        let time_insert_end = Instant::now();
        println!("[insert] 10000 records {:?}", time_insert_end.duration_since(time_insert_start));

        let time_get_start = Instant::now();
        for k in 8000..9000 {
            assert_eq!(h2.get(&linearhash::util::i32_to_bytearray(k)),
                       Some(linearhash::util::i32_to_bytearray(k+1).to_vec()));
            // println!("{}", k);
        }
        let time_get_end = Instant::now();
        println!("[get] 1000 records {:?}", time_get_end.duration_since(time_get_start));

        let all_end = Instant::now();
        println!("[insert+get] 10000 records {:?}", all_end.duration_since(time_insert_start));
        h2.close();
        // fs::remove_file("/tmp/measure_perf");
    }

}

fn main() {

    measure_perf(2);

    fs::remove_file("/tmp/main_tests").ok();

    let mut h = LinHash::open("/tmp/main_tests", 32, 4);
    h.put(b"Spin", &i32_to_bytearray(9));
    h.put(b"Axis", &i32_to_bytearray(6));
    h.put(b"foo", &[14]);
    h.put(b"bar", &[15]);
    h.put(b"linear", &[16]);
    h.put(b"hashing", &[17]);
    h.put(b"disk", &[18]);
    h.put(b"space", &[19]);
    h.put(b"random", &[20]);
    h.put(b"keys", &[21]);
    h.put(b"ishaan", &[22]);
    h.put(b"linearhash", &[21]);
    h.put(b"rust", &[21]);
    h.put(b"3:30", &[21]);
    h.put(b"xinu", &[21]);
    h.put(b"linearhash1", &[21]);
    h.put(b"rust1", &[22]);
    h.put(b"rust2", &[51]);
    h.put(b"rust3", &[52]);
    h.put(b"rust4", &[53]);
    h.put(b"rust5", &[54]);
    
    h.update(b"rust1", &[99]);
    h.put(b"xinu3", &[24]);
    h.close();

    println!("{:?}", h.get("rust3".as_bytes()));
}

Improvements/Tweaks which can be performed:

1. The key-value store should be able to allow flexible value sizes.

2. LRU cache eviction mechanism could be used instead of FIFO.

3. Removing records based on key.

References:

[1] https://samrat.me/posts/2017-11-04-kvstore-linear-hashing/

[2] https://github.com/samrat/rust-linhash

[3] Zhang D., Manolopoulos Y., Theodoridis Y., Tsotras V.J. (2009) Linear Hashing. In: LIU L., OZSU M.T. (eds) Encyclopedia of Database Systems. Springer, Boston, MA. https://doi.org/10.1007/978-0-387-39940-9_742

[4] https://crates.io/crates