Skip to content

Commit

Permalink
Merge branch 'main' into use-latest-arrow-version
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 authored Aug 21, 2024
2 parents 2757aef + 0305629 commit 43a58fb
Show file tree
Hide file tree
Showing 45 changed files with 11,819 additions and 18 deletions.
2 changes: 2 additions & 0 deletions interactive_engine/executor/common/huge_container/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
/Cargo.lock
10 changes: 10 additions & 0 deletions interactive_engine/executor/common/huge_container/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "huge_container"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
libc = "0.2"
lazy_static = "1.4"
144 changes: 144 additions & 0 deletions interactive_engine/executor/common/huge_container/src/huge_vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
use std::fmt;
use std::ops;

pub struct HugeVec<T> {
ptr: *mut T,
cap: usize,
len: usize,
}

impl<T> HugeVec<T> {
pub fn new() -> Self {
Self { ptr: std::ptr::null_mut(), cap: 0, len: 0 }
}

pub fn with_capacity(capacity: usize) -> Self {
let cap_in_bytes = capacity * std::mem::size_of::<T>();
let ptr = crate::hugepage_alloc(cap_in_bytes) as *mut T;
Self { ptr, cap: capacity, len: 0 }
}

pub fn len(&self) -> usize {
self.len
}

pub fn capacity(&self) -> usize {
self.cap
}

pub fn reserve(&mut self, additional: usize) {
let new_cap = self.cap + additional;
let new_cap_in_bytes = new_cap * std::mem::size_of::<T>();
let new_ptr = crate::hugepage_alloc(new_cap_in_bytes) as *mut T;

if self.len > 0 {
unsafe {
std::ptr::copy_nonoverlapping(self.ptr, new_ptr, self.len);
}
}
if self.cap > 0 {
crate::hugepage_dealloc(self.ptr as *mut u8, self.cap * std::mem::size_of::<T>());
}

self.ptr = new_ptr;
self.cap = new_cap;
}

pub fn as_ptr(&self) -> *const T {
self.ptr
}

pub fn as_mut_ptr(&mut self) -> *mut T {
self.ptr
}

pub fn push(&mut self, value: T) {
if self.len == self.cap {
self.reserve(1);
}

unsafe {
self.ptr.add(self.len).write(value);
}

self.len += 1;
}

pub fn clear(&mut self) {
unsafe { std::ptr::drop_in_place(std::slice::from_raw_parts_mut(self.ptr, self.len)) }
self.len = 0;
}

pub fn resize(&mut self, new_len: usize, value: T)
where
T: Clone,
{
if new_len > self.len {
if new_len > self.cap {
self.reserve(new_len - self.len);
}

for i in self.len..new_len {
unsafe {
self.ptr.add(i).write(value.clone());
}
}
} else {
unsafe {
std::ptr::drop_in_place(std::slice::from_raw_parts_mut(
self.ptr.add(new_len),
self.len - new_len,
));
}
}

self.len = new_len;
}
}

impl<T> Drop for HugeVec<T> {
fn drop(&mut self) {
self.clear();
if self.cap > 0 {
crate::hugepage_dealloc(self.ptr as *mut u8, self.cap * std::mem::size_of::<T>());
}
}
}

impl<T> ops::Deref for HugeVec<T> {
type Target = [T];

fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl<T> ops::DerefMut for HugeVec<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
}

impl<T: fmt::Debug> fmt::Debug for HugeVec<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}

unsafe impl<T> Sync for HugeVec<T> {}
unsafe impl<T> Send for HugeVec<T> {}
57 changes: 57 additions & 0 deletions interactive_engine/executor/common/huge_container/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
#[cfg(target_os = "linux")]
mod linux_hugepages;
#[cfg(target_os = "linux")]
use linux_hugepages::hugepage_alloc;
#[cfg(target_os = "linux")]
use linux_hugepages::hugepage_dealloc;

#[cfg(not(target_os = "linux"))]
mod notlinux_hugepages;
#[cfg(not(target_os = "linux"))]
use notlinux_hugepages::hugepage_alloc;
#[cfg(not(target_os = "linux"))]
use notlinux_hugepages::hugepage_dealloc;

mod huge_vec;

pub use huge_vec::HugeVec;

pub fn add(left: usize, right: usize) -> usize {
left + right
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);

let mut vec = HugeVec::<i32>::new();
vec.push(1);
vec.push(2);
vec.push(3);

assert_eq!(vec.len(), 3);
assert_eq!(vec[0], 1);
assert_eq!(vec[1], 2);
assert_eq!(vec[2], 3);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
use lazy_static::lazy_static;
use std::{
fs::File,
io::{self, BufRead, BufReader},
};

fn get_hugepage_size() -> io::Result<usize> {
let file = File::open("/proc/meminfo")?;
let reader = BufReader::new(file);

for line in reader.lines() {
let line = line?;
if line.starts_with("Hugepagesize:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(size_kb) = parts[1].parse::<usize>() {
match parts[2] {
"kB" => return Ok(size_kb * 1024),
"MB" => return Ok(size_kb * 1024 * 1024),
"GB" => return Ok(size_kb * 1024 * 1024 * 1024),
_ => {}
}
}
}
}
}

Err(io::Error::new(io::ErrorKind::NotFound, "Hugepagesize info not found"))
}

lazy_static! {
static ref HUGE_PAGE_SIZE: usize = get_hugepage_size().unwrap();
}

fn align_to(size: usize, align: usize) -> usize {
(size + align - 1) & !(align - 1)
}

pub(crate) fn hugepage_alloc(size: usize) -> *mut u8 {
let len = align_to(size, *HUGE_PAGE_SIZE);
let p = unsafe {
libc::mmap(
std::ptr::null_mut(),
len,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_HUGETLB,
-1,
0,
)
};
p as *mut u8
}

pub(crate) fn hugepage_dealloc(ptr: *mut u8, size: usize) {
let len = align_to(size, *HUGE_PAGE_SIZE);
let ret = unsafe { libc::munmap(ptr as *mut libc::c_void, len) };
if ret != 0 {
panic!("hugepage deallocation failed, {} - {} -> {}", ret, size, len);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
pub(crate) fn hugepage_alloc(size: usize) -> *mut u8 {
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
-1,
0,
)
};

if ptr == libc::MAP_FAILED {
panic!("hugepage allocation failed");
}

ptr as *mut u8
}

pub(crate) fn hugepage_dealloc(ptr: *mut u8, size: usize) {
let ret = unsafe { libc::munmap(ptr as *mut libc::c_void, size) };
if ret != 0 {
panic!("hugepage deallocation failed, {}", ret);
}
}
3 changes: 2 additions & 1 deletion interactive_engine/executor/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ members = [
"mcsr",
"global_query",
"groot",
"exp_store"
"exp_store",
"bmcsr"
]

[profile.release]
Expand Down
34 changes: 34 additions & 0 deletions interactive_engine/executor/store/bmcsr/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "bmcsr"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0"
pegasus_common = { path = "../../engine/pegasus/common"}
dyn_type = { path = "../../common/dyn_type" }
huge_container = { path = "../../common/huge_container" }
log = "0.4"
bincode = "1.0.1"
itertools = "0.9"
csv = "1.1"
abomonation = "0.7.3"
abomonation_derive = "0.5"
env_logger = "0.7.1"
chrono = "0.4.23"
fnv = "1.0.3"
regex = "1.7.1"
rust-htslib = { version = "0.39.5", default-features = false, features = ["bzip2", "lzma"] }
clap = "2.32.0"
byteorder = "1.5.0"
glob = "0.3"
rayon = "1.5.1"
dashmap = "5.5.3"

[features]
hugepage_csr = []
hugepage_table = []
Loading

0 comments on commit 43a58fb

Please sign in to comment.