Skip to content

Commit

Permalink
Use djb2 as hash function (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
hesampakdaman authored May 12, 2024
1 parent da886b5 commit ac883e3
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 52 deletions.
11 changes: 6 additions & 5 deletions src/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use crate::compute::CityKey;
use crate::record::Record;
use fxhash::FxHashMap;
use std::sync::mpsc::Receiver;

pub fn reduce(rx: Receiver<FxHashMap<String, Record>>) -> Vec<(String, Record)> {
pub fn reduce(rx: Receiver<FxHashMap<CityKey, Record>>) -> Vec<Record> {
let mut hmap = FxHashMap::default();
while let Ok(stats) = rx.recv() {
merge_records(&mut hmap, stats);
}
to_sorted_vec(hmap)
}

fn merge_records(dst: &mut FxHashMap<String, Record>, src: FxHashMap<String, Record>) {
fn merge_records(dst: &mut FxHashMap<CityKey, Record>, src: FxHashMap<CityKey, Record>) {
for (city, new_record) in src {
dst.entry(city)
.and_modify(|existing_record: &mut Record| existing_record.merge(&new_record))
.or_insert(new_record);
}
}

fn to_sorted_vec(hmap: FxHashMap<String, Record>) -> Vec<(String, Record)> {
let mut v = hmap.into_iter().collect::<Vec<_>>();
v.sort_unstable_by(|a, b| a.0.cmp(&b.0));
fn to_sorted_vec(hmap: FxHashMap<CityKey, Record>) -> Vec<Record> {
let mut v = hmap.into_values().collect::<Vec<_>>();
v.sort_unstable_by(|a, b| a.name.cmp(&b.name));
v
}
64 changes: 43 additions & 21 deletions src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,47 @@
use crate::record::Record;
use fxhash::FxHashMap;
use std::sync::mpsc::Sender;
use memchr::memchr;
use std::hash::{Hash, Hasher};
use std::sync::mpsc::Sender;

#[derive(PartialEq, Eq)]
pub struct CityKey(u64);

impl CityKey {
fn new(bytes: &[u8]) -> Self {
// djb2 hash fn
// hash(0) = 5381
// hash(i) = hash(i-1) * 33 ^ byte[i]
let hash_fn = |hash, byte: &u8| (hash * 33) ^ u64::from(*byte);
Self(bytes.iter().fold(5381, hash_fn))
}
}

pub fn stats(bytes: &[u8], tx: Sender<FxHashMap<String, Record>>) {
impl Hash for CityKey {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.0);
}
}

pub fn stats(bytes: &[u8], tx: Sender<FxHashMap<CityKey, Record>>) {
let hmap = calculate(bytes);
tx.send(hmap).unwrap();
}

fn calculate(mut bytes: &[u8]) -> FxHashMap<String, Record> {
let mut map: FxHashMap<String, Record> = FxHashMap::default();
fn calculate(mut bytes: &[u8]) -> FxHashMap<CityKey, Record> {
let mut map: FxHashMap<CityKey, Record> = FxHashMap::default();
while let Some(sep_idx) = memchr(b';', bytes) {
let end_idx = memchr(b'\n', bytes).unwrap_or(bytes.len());
let city = unsafe { std::str::from_utf8_unchecked(&bytes[..sep_idx]) };
let num = parse_float(&bytes[sep_idx+1..end_idx]);
if let Some(rec) = map.get_mut(city) {
let key = CityKey::new(&bytes[..sep_idx]);
let num = parse_float(&bytes[sep_idx + 1..end_idx]);
if let Some(rec) = map.get_mut(&key) {
rec.add(num);
} else {
map.insert(city.to_string(), Record::from(num));
let name = unsafe { std::str::from_utf8_unchecked(&bytes[..sep_idx]) };
map.insert(key, Record::from((name, num)));
}
bytes = if end_idx < bytes.len() {
&bytes[end_idx+1..]
&bytes[end_idx + 1..]
} else {
&[]
};
Expand Down Expand Up @@ -51,26 +72,27 @@ fn parse_float(bytes: &[u8]) -> i32 {
mod tests {
use super::*;

fn check(data: &str, expected: FxHashMap<String, Record>) {
let actual = calculate(data.as_bytes());
fn check(input: &str, expected: Vec<Record>) {
let map = calculate(input.as_bytes());
let mut actual: Vec<Record> = map.into_values().collect();
actual.sort_unstable_by(|a, b| a.name.cmp(&b.name));
assert_eq!(actual, expected);
}

#[test]
fn compute() {
let input = "Stockholm;1.5
let input = "
Stockholm;1.5
New York;2.0
Oslo;0.0
Stockholm;11.5
Oslo;10.2";
let mut expected = FxHashMap::default();
for (city, rec) in [
("Stockholm".to_string(), Record::new(15, 115, 130, 2)),
("New York".to_string(), Record::new(20, 20, 20, 1)),
("Oslo".to_string(), Record::new(0, 102, 102, 2)),
] {
expected.insert(city, rec);
}
Oslo;10.2"
.trim();
let expected = vec![
Record::new("New York", 20, 20, 20, 1),
Record::new("Oslo", 0, 102, 102, 2),
Record::new("Stockholm", 15, 115, 130, 2),
];
check(input, expected);
}
}
22 changes: 10 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
}
}

fn run(path: PathBuf) -> Result<Vec<WeatherReport>, RunErr> {
fn run(path: PathBuf) -> Result<Vec<Record>, RunErr> {
let file = File::open(path).unwrap();
let mmap = Arc::new(unsafe { MmapOptions::new().map(&file).map_err(RunErr::IO)? });
let (tx, rx) = mpsc::channel();
Expand All @@ -41,20 +41,18 @@ fn run(path: PathBuf) -> Result<Vec<WeatherReport>, RunErr> {
Ok(aggregate::reduce(rx))
}

type WeatherReport = (String, Record);

#[derive(Debug)]
enum RunErr {
IO(std::io::Error),
}

fn print_results(v: &[WeatherReport]) {
fn print_results(v: &[Record]) {
print!("{{");
for (i, (name, r)) in v.iter().enumerate() {
for (i, record) in v.iter().enumerate() {
if i < v.len() - 1 {
print!("{name}: {r}, ");
print!("{record}, ");
} else {
print!("{name}: {r}");
print!("{record}");
}
}
println!("}}")
Expand All @@ -69,11 +67,11 @@ mod test {
let path = PathBuf::from("./data/measurements-test.txt");
let actual = run(path).unwrap();
let expected = vec![
(String::from("London"), Record::new(85, 95, 180, 2)),
(String::from("New York"), Record::new(35, 150, 185, 2)),
(String::from("Oslo"), Record::new(-100, 102, 2, 2)),
(String::from("Paris"), Record::new(130, 130, 130, 1)),
(String::from("Stockholm"), Record::new(-5, 200, 210, 3)),
Record::new("London", 85, 95, 180, 2),
Record::new("New York", 35, 150, 185, 2),
Record::new("Oslo", -100, 102, 2, 2),
Record::new("Paris", 130, 130, 130, 1),
Record::new("Stockholm", -5, 200, 210, 3),
];
assert_eq!(actual, expected);
}
Expand Down
37 changes: 23 additions & 14 deletions src/record.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Record {
pub name: String,
min: i32,
max: i32,
sum: i32,
count: usize,
}

impl Record {
#[cfg(test)]
pub fn new(min: i32, max: i32, sum: i32, count: usize) -> Self {
pub fn new(name: &str, min: i32, max: i32, sum: i32, count: usize) -> Self {
Self {
name: name.to_string(),
min,
max,
sum,
Expand All @@ -30,27 +31,35 @@ impl Record {
self.sum += t;
self.count += 1;
}

fn average(&self) -> f32 {
(self.sum as f32 / 10.0) / self.count as f32
}

fn min(&self) -> f32 {
self.min as f32 / 10.0
}

fn max(&self) -> f32 {
self.max as f32 / 10.0
}
}

impl From<i32> for Record {
fn from(value: i32) -> Self {
Self {
min: value,
max: value,
sum: value,
count: 1,
}
impl From<(&str, i32)> for Record {
fn from(value: (&str, i32)) -> Self {
Self::new(value.0, value.1, value.1, value.1, 1)
}
}

impl std::fmt::Display for Record {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:.1}/{:.1}/{:.1}",
self.min as f32 / 10.0,
(self.sum as f32 / 10.0) / self.count as f32,
self.max as f32 / 10.0,
"{}: {:.1}/{:.1}/{:.1}",
self.name,
self.min(),
self.average(),
self.max(),
)
}
}

0 comments on commit ac883e3

Please sign in to comment.