From 92748d7a0641ca3f1abe16f27fe88f429270e4e8 Mon Sep 17 00:00:00 2001 From: carlo Date: Tue, 20 Feb 2024 13:13:12 +0100 Subject: [PATCH 1/2] training by passing sparse features and labels --- c-api/src/lib.rs | 112 ++++++++++++++++++++++++++++- python-wrapper/omikuji/__init__.py | 92 ++++++++++++++++++++++++ src/data.rs | 14 ++++ 3 files changed, 216 insertions(+), 2 deletions(-) diff --git a/c-api/src/lib.rs b/c-api/src/lib.rs index dfff6d5..d230d18 100644 --- a/c-api/src/lib.rs +++ b/c-api/src/lib.rs @@ -1,10 +1,11 @@ use itertools::Itertools; use libc::size_t; -use omikuji::rayon; +use omikuji::{rayon, Index}; use std::convert::TryInto; use std::ffi::CStr; use std::os::raw::{c_char, c_float, c_void}; use std::slice; +use omikuji::{IndexSet, IndexValueVec}; #[repr(C)] pub struct Model { @@ -242,7 +243,7 @@ pub unsafe extern "C" fn load_omikuji_data_set( .and_then(|path| { maybe_run_with_thread_pool(thread_pool_ptr, || { omikuji::DataSet::load_xc_repo_data_file(path) - .map_err(|_| "Failed to laod data file") + .map_err(|_| "Failed to load data file") }) }) { Ok(dataset) => Box::into_raw(Box::new(dataset)) as *mut DataSet, @@ -253,6 +254,113 @@ pub unsafe extern "C" fn load_omikuji_data_set( } } +pub fn extract_pairs(indices: &Vec, indptr: &Vec, data: &Vec) -> Vec<(u32, T)> { + let mut pairs: Vec<(u32, T)> = Vec::with_capacity(data.len()); // Preallocate memory + + for row in 0..indptr.len() - 1 { + let start = indptr[row] as usize; + let end = indptr[row + 1] as usize; + + for i in start..end { + pairs.push((indices[i], data[i])); + } + } + pairs +} + + +pub fn extract_label_sets(indices: &Vec, indptr: &Vec, data: &Vec) -> Vec { + let mut label_sets: Vec = Vec::with_capacity(indptr.len() - 1); // Preallocate memory for label_sets + + for row in 0..indptr.len() - 1 { + let start = indptr[row] as usize; + let end = indptr[row + 1] as usize; + + let mut label_set: IndexSet = IndexSet::with_capacity(end - start); // Preallocate memory for label_set + + // Iterate directly over non-zero elements + for (&index, &value) in indices[start..end].iter().zip(&data[start..end]) { + if value != 0 { + label_set.insert(index); + } + } + label_sets.push(label_set); + } + label_sets +} + + +#[no_mangle] +pub unsafe extern "C" fn load_omikuji_data_set_from_features_labels( + num_features: size_t, + num_labels: size_t, + num_nnz_features: size_t, + num_nnz_labels: size_t, + num_rows: size_t, + feature_indices: * const u32, + feature_indptr: * const u32, + feature_data: * const c_float, + label_indices: * const u32, + label_indptr: * const u32, + label_data: *const u32, + thread_pool_ptr: *const ThreadPool, +) -> *mut DataSet { + + // features + let vec_feature_indices = { + slice::from_raw_parts(feature_indices, num_nnz_features).iter().cloned().collect_vec() + }; + let vec_feature_indptr = { + slice::from_raw_parts(feature_indptr, num_rows+1).iter().cloned().collect_vec() + }; + let vec_feature_data = { + slice::from_raw_parts(feature_data, num_nnz_features).iter().cloned().collect_vec() + }; + // labels + let vec_labels_indices = { + slice::from_raw_parts(label_indices, num_nnz_labels).iter().cloned().collect_vec() + }; + let vec_labels_indptr = { + slice::from_raw_parts(label_indptr, num_rows+1).iter().cloned().collect_vec() + }; + let vec_labels_data = { + slice::from_raw_parts(label_data, num_nnz_labels).iter().cloned().collect_vec() + }; + + let features_list: Vec = vec_feature_indptr + .windows(2) + .map(|window| { + let start = window[0] as usize; + let end = window[1] as usize; + extract_pairs( + &vec_feature_indices[start..end].to_vec(), + &vec![0, (end - start).try_into().unwrap()], + &vec_feature_data[start..end].to_vec(), + ) + }) + .collect(); + + let label_sets: Vec = extract_label_sets(&vec_labels_indices, &vec_labels_indptr, &vec_labels_data); + + // // For debugging purpose + // println!("features_list={:?}", features_list); + // println!("labels_set={:?}", label_sets); + + // println!("==== RUST Features ===="); + // println!("indices={:?}", vec_feature_indices); + // println!("indptr={:?}", vec_feature_indptr); + // println!("data={:?}", vec_feature_data); + + // println!("==== RUST Labels ===="); + // println!("indices={:?}", vec_labels_indices); + // println!("indptr={:?}", vec_labels_indptr); + // println!("data={:?}", vec_labels_data); + + // Construct the DataSet and return a raw pointer + let dataset = maybe_run_with_thread_pool(thread_pool_ptr, || {omikuji::DataSet::from_x_y(num_features, num_labels, features_list, label_sets).map_err(|_| "Failed passing data to Rust")}); + return Box::into_raw(Box::new(dataset)) as *mut DataSet; +} + /// Free data set object. /// /// # Safety diff --git a/python-wrapper/omikuji/__init__.py b/python-wrapper/omikuji/__init__.py index 6dab009..8b3cced 100644 --- a/python-wrapper/omikuji/__init__.py +++ b/python-wrapper/omikuji/__init__.py @@ -1,6 +1,8 @@ __version__ = "0.5.1" __all__ = ["Model", "LossType"] +import numpy as np +from scipy.sparse import csr_matrix from ._libomikuji import lib, ffi from enum import Enum import os @@ -36,6 +38,16 @@ def ptr(self): return self._ptr +def extract_pairs(indices, indptr, data): + pairs = [] + for row in range(len(indptr) - 1): + start = indptr[row] + end = indptr[row + 1] + for i in range(start, end): + pairs.append((indices[i], data[i])) + return pairs + + class Model: """A Omikuji model object.""" @@ -167,6 +179,86 @@ def train_on_data( return Model(model_ptr) + @classmethod + def train_on_features_labels( + cls, + features: csr_matrix, + labels: csr_matrix, + hyper_param=None, + n_threads: Optional[int] = None, + ): + num_features = features.shape[1] + num_labels = labels.shape[1] + num_feature_rows = features.shape[0] + num_labels_rows = labels.shape[0] + assert num_feature_rows == num_labels_rows + thread_pool = _ThreadPoolHandle(n_threads) + + # Check if it is needed to cast data + if features.dtype is not np.float32: + features = csr_matrix(features, dtype=np.float32) + + if labels.dtype is not np.uint32: + labels = csr_matrix(labels, dtype=np.uint32) + + num_nnz_features = features.nnz + num_nnz_labels = labels.nnz + # print("======== FEATURES =========") + # print("indices=",features.indices) + # print("indptr=",features.indptr) + # print("data=",features.data) + # print("==== FEATURE PAIR ====") + # print(extract_pairs(features.indices, features.indptr, features.data)) + # print("======== LABELS =========") + # print("indices=",labels.indices) + # print("indptr=",labels.indptr) + # print("data=",labels.data) + # print("==== LABELS PAIR ====") + # print(extract_pairs(features.indices, features.indptr, features.data)) + + # Creates and map the rust feature vectors from the numpy arrays + feature_indices = ffi.new("uint32_t[]", num_nnz_features) + feature_indptr = ffi.new("uint32_t[]", num_feature_rows+1) + feature_data = ffi.new("float[]", num_nnz_features) + feature_indices = ffi.from_buffer("uint32_t[]", features.indices) + feature_indptr = ffi.from_buffer("uint32_t[]", features.indptr) + feature_data = ffi.from_buffer("float[]", features.data) + + # Creates and map the rust label vectors from the numpy arrays + label_indices = ffi.new("uint32_t[]", num_nnz_labels) + label_indptr = ffi.new("uint32_t[]", num_labels_rows+1) + label_data = ffi.new("uint32_t[]", num_nnz_labels) + label_indices = ffi.from_buffer("uint32_t[]", labels.indices) + label_indptr = ffi.from_buffer("uint32_t[]", labels.indptr) + label_data = ffi.from_buffer("uint32_t[]", labels.data) + + dataset_ptr = lib.load_omikuji_data_set_from_features_labels( + num_features, + num_labels, + num_nnz_features, + num_nnz_labels, + num_feature_rows, + feature_indices, + feature_indptr, + feature_data, + label_indices, + label_indptr, + label_data, + thread_pool.ptr, + ) + if dataset_ptr == ffi.NULL: + raise RuntimeError("Failed to pass data to Rust") + dataset_ptr = ffi.gc(dataset_ptr, lib.free_omikuji_data_set) + + if hyper_param is None: + hyper_param = cls.default_hyper_param() + + model_ptr = lib.train_omikuji_model(dataset_ptr, hyper_param, thread_pool.ptr) + if model_ptr == ffi.NULL: + raise RuntimeError("Failed to train model") + + return Model(model_ptr) + def init_logger(): """Initialize a simple logger that writes to stdout.""" diff --git a/src/data.rs b/src/data.rs index cb22ae3..4b0821c 100644 --- a/src/data.rs +++ b/src/data.rs @@ -125,6 +125,7 @@ impl DataSet { .skip(1) .map(|line| Self::parse_xc_repo_data_line(line, n_features)) .collect::>()?; + let (feature_lists, label_sets): (Vec<_>, Vec<_>) = lines.into_iter().unzip(); if n_examples != feature_lists.len() { @@ -150,6 +151,19 @@ impl DataSet { label_sets, }) } +//feature_lists: Vec + pub fn from_x_y(n_features: usize, n_labels: usize, feature_lists: Vec, label_sets: Vec) -> Result + { + info!("Total number of features {}", n_features); + info!("Total number of labels {}", n_labels); + info!("Num rows {}", feature_lists.len()); + Ok(Self { + n_features, + n_labels, + feature_lists, + label_sets, + }) + } } #[cfg(test)] From 2c7925524cb5a7010aea37fe313a81bb3c0696f4 Mon Sep 17 00:00:00 2001 From: carlo Date: Tue, 20 Feb 2024 13:49:07 +0100 Subject: [PATCH 2/2] Corrected python wrapper --- python-wrapper/omikuji/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python-wrapper/omikuji/__init__.py b/python-wrapper/omikuji/__init__.py index 8b3cced..91242fa 100644 --- a/python-wrapper/omikuji/__init__.py +++ b/python-wrapper/omikuji/__init__.py @@ -218,7 +218,7 @@ def train_on_features_labels( # Creates and map the rust feature vectors from the numpy arrays feature_indices = ffi.new("uint32_t[]", num_nnz_features) - feature_indptr = ffi.new("uint32_t[]", num_feature_rows+1) + feature_indptr = ffi.new("uint32_t[]", num_feature_rows + 1) feature_data = ffi.new("float[]", num_nnz_features) feature_indices = ffi.from_buffer("uint32_t[]", features.indices) feature_indptr = ffi.from_buffer("uint32_t[]", features.indptr) @@ -226,7 +226,7 @@ def train_on_features_labels( # Creates and map the rust label vectors from the numpy arrays label_indices = ffi.new("uint32_t[]", num_nnz_labels) - label_indptr = ffi.new("uint32_t[]", num_labels_rows+1) + label_indptr = ffi.new("uint32_t[]", num_labels_rows + 1) label_data = ffi.new("uint32_t[]", num_nnz_labels) label_indices = ffi.from_buffer("uint32_t[]", labels.indices) label_indptr = ffi.from_buffer("uint32_t[]", labels.indptr)