Skip to main content

lava_flow/memory/cpu/
mod.rs

1use crate::error::{AllocationReason, LavaFlowError, Result};
2use crate::memory::allocator::InterprocessMemoryHandle;
3use std::env;
4
5#[cfg(unix)]
6mod unix;
7#[cfg(windows)]
8mod windows;
9
10#[cfg(unix)]
11use unix::SharedMemoryRegion;
12#[cfg(windows)]
13use windows::SharedMemoryRegion;
14
15/// Stateless API wrapper around CPU memory allocation strategies.
16#[derive(Debug)]
17pub struct Allocator {
18    max_allocation_size: usize,
19}
20
21impl Allocator {
22    /// Creates a new allocator configured from environment.
23    ///
24    /// Uses `LAVA_FLOW_MAX_CPU_ALLOCATION_SIZE` when valid and non-zero.
25    pub fn new() -> Self {
26        let hard = hard_max_cpu_allocation_size();
27        let raw = env::var("LAVA_FLOW_MAX_CPU_ALLOCATION_SIZE").ok();
28        let max_allocation_size = parse_cpu_allocation_cap(raw.as_deref(), hard);
29        Self {
30            max_allocation_size,
31        }
32    }
33
34    /// Creates a new allocator with an explicit allocation cap.
35    ///
36    /// A value of `0` means "use the platform hard limit".
37    pub fn with_max_allocation_size(max_allocation_size: usize) -> Self {
38        let hard = hard_max_cpu_allocation_size();
39        let capped = if max_allocation_size == 0 {
40            hard
41        } else {
42            max_allocation_size.min(hard)
43        };
44        Self {
45            max_allocation_size: capped,
46        }
47    }
48
49    /// Returns the effective maximum CPU allocation size in bytes.
50    pub fn max_allocation_size(&self) -> usize {
51        self.max_allocation_size
52    }
53
54    /// Allocates standard host memory.
55    pub fn allocate(&self, size: usize) -> Result<MemoryBuffer> {
56        let region = SharedMemoryRegion::create(size, self.max_allocation_size)?;
57        let mut buffer = MemoryBuffer { region };
58        buffer.zero_fill();
59        Ok(buffer)
60    }
61}
62
63impl Default for Allocator {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69/// CPU-side allocation strategy.
70#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
71pub enum AllocationStrategy {
72    /// Default host allocation.
73    Standard,
74    /// Host memory that is page-locked/pinned.
75    GpuPinned,
76}
77
78/// CPU-backed memory buffer metadata and storage.
79#[derive(Debug)]
80pub struct MemoryBuffer {
81    region: SharedMemoryRegion,
82}
83
84impl MemoryBuffer {
85    /// Imports a CPU shared-memory handle into a buffer with platform hard-limit validation.
86    #[cfg_attr(not(test), allow(dead_code))]
87    pub(crate) fn from_shared_handle(
88        size: usize,
89        handle: InterprocessMemoryHandle,
90    ) -> Result<Self> {
91        let region = SharedMemoryRegion::from_handle(size, hard_max_cpu_allocation_size(), handle)?;
92        Ok(Self { region })
93    }
94
95    fn zero_fill(&mut self) {
96        unsafe { std::ptr::write_bytes(self.as_mut_ptr(), 0, self.size()) };
97    }
98
99    /// Returns an immutable byte slice view over the whole allocation.
100    pub fn as_slice(&self) -> &[u8] {
101        unsafe { std::slice::from_raw_parts(self.as_ptr(), self.size()) }
102    }
103
104    /// Returns a mutable byte slice view over the whole allocation.
105    pub fn as_mut_slice(&mut self) -> &mut [u8] {
106        unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.size()) }
107    }
108
109    /// Returns an immutable raw pointer to the beginning of the buffer.
110    pub fn as_ptr(&self) -> *const u8 {
111        self.region.as_ptr()
112    }
113
114    /// Returns a mutable raw pointer to the beginning of the buffer.
115    pub fn as_mut_ptr(&mut self) -> *mut u8 {
116        self.region.as_mut_ptr()
117    }
118
119    /// Returns the buffer size in bytes.
120    pub fn size(&self) -> usize {
121        self.region.size()
122    }
123
124    /// Returns the allocation strategy used to create this buffer.
125    pub fn strategy(&self) -> AllocationStrategy {
126        AllocationStrategy::Standard
127    }
128
129    /// Returns the interprocess shared-memory handle for this CPU buffer.
130    #[cfg_attr(not(test), allow(dead_code))]
131    pub(crate) fn shared_handle(&self) -> Result<InterprocessMemoryHandle> {
132        self.region.export_handle()
133    }
134}
135
136fn hard_max_cpu_allocation_size() -> usize {
137    // Keep allocations within platform addressability limits and (on Unix) ftruncate/off_t range.
138    #[cfg(unix)]
139    {
140        let off_t_max = libc::off_t::MAX as u128;
141        let pointer_max = isize::MAX as u128;
142        let cap = off_t_max.min(pointer_max);
143        usize::try_from(cap).unwrap_or(isize::MAX as usize)
144    }
145    #[cfg(not(unix))]
146    {
147        isize::MAX as usize
148    }
149}
150
151fn parse_cpu_allocation_cap(raw: Option<&str>, hard: usize) -> usize {
152    let configured = match raw {
153        Some(value) => match value.parse::<usize>() {
154            Ok(parsed) if parsed > 0 => parsed,
155            _ => hard,
156        },
157        None => hard,
158    };
159    configured.min(hard)
160}
161
162fn validate_size(size: usize, max_allocation_size: usize) -> Result<()> {
163    if size == 0 {
164        return Err(LavaFlowError::InvalidAllocationRequest {
165            size,
166            reason: AllocationReason::ZeroSize,
167        });
168    }
169    if size > max_allocation_size {
170        return Err(LavaFlowError::InvalidAllocationRequest {
171            size,
172            reason: AllocationReason::ExceedsMaxSize,
173        });
174    }
175    Ok(())
176}
177
178fn shared_memory_error(operation: &'static str) -> LavaFlowError {
179    let source = std::io::Error::last_os_error();
180    let source = if source.raw_os_error().is_none() || source.raw_os_error() == Some(0) {
181        std::io::Error::other(format!("{operation} failed"))
182    } else {
183        source
184    };
185    LavaFlowError::SharedMemoryOperation { operation, source }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::test_support::env::Guard as EnvGuard;
192
193    const BUFFER_SIZE: usize = 64;
194    const SMALL_CAP: usize = 64;
195    const OVER_CAP_SIZE: usize = 65;
196    const TEST_BYTE_OFFSET: usize = 3;
197    const TEST_BYTE_VALUE: u8 = 0x7f;
198    const ENV_MAX_CPU_ALLOCATION_SIZE: &str = "LAVA_FLOW_MAX_CPU_ALLOCATION_SIZE";
199    const TEST_HARD_CAP: usize = usize::MAX;
200    const ENV_CONFIGURED_CAP: usize = 1024;
201
202    fn test_allocator() -> Allocator {
203        Allocator::with_max_allocation_size(TEST_HARD_CAP)
204    }
205
206    #[test]
207    fn standard_allocation_exposes_mutable_and_immutable_views() {
208        let allocator = test_allocator();
209        let mut buffer = allocator.allocate(BUFFER_SIZE).expect("allocate standard");
210        assert_eq!(buffer.size(), BUFFER_SIZE);
211        assert!(!buffer.as_ptr().is_null());
212        assert!(!buffer.as_mut_ptr().is_null());
213        assert_eq!(buffer.strategy(), AllocationStrategy::Standard);
214        assert_eq!(buffer.as_slice().len(), BUFFER_SIZE);
215        assert_eq!(buffer.as_mut_slice().len(), BUFFER_SIZE);
216    }
217
218    #[test]
219    fn standard_allocation_is_zero_initialized() {
220        let allocator = test_allocator();
221        let buffer = allocator.allocate(BUFFER_SIZE).expect("allocate standard");
222        assert!(buffer.as_slice().iter().all(|&b| b == 0));
223    }
224
225    #[test]
226    fn imported_shared_handle_observes_written_bytes() {
227        let allocator = test_allocator();
228        let mut original = allocator.allocate(BUFFER_SIZE).expect("allocate original");
229        original.as_mut_slice()[TEST_BYTE_OFFSET] = TEST_BYTE_VALUE;
230
231        let handle = original.shared_handle().expect("export handle");
232        let imported =
233            MemoryBuffer::from_shared_handle(BUFFER_SIZE, handle).expect("import shared handle");
234        assert_eq!(imported.as_slice()[TEST_BYTE_OFFSET], TEST_BYTE_VALUE);
235    }
236
237    #[test]
238    fn configured_cap_is_enforced() {
239        let allocator = Allocator::with_max_allocation_size(SMALL_CAP);
240        let err = allocator
241            .allocate(OVER_CAP_SIZE)
242            .expect_err("allocation over configured cap must fail");
243        assert!(matches!(
244            err,
245            LavaFlowError::InvalidAllocationRequest {
246                size: OVER_CAP_SIZE,
247                reason: AllocationReason::ExceedsMaxSize,
248            }
249        ));
250    }
251
252    #[test]
253    fn zero_cap_configuration_uses_hard_limit() {
254        assert_eq!(
255            Allocator::with_max_allocation_size(0).max_allocation_size(),
256            hard_max_cpu_allocation_size()
257        );
258    }
259
260    #[test]
261    fn parse_cpu_allocation_cap_accepts_valid_value() {
262        assert_eq!(parse_cpu_allocation_cap(Some("128"), 1024), 128);
263    }
264
265    #[test]
266    fn parse_cpu_allocation_cap_rejects_zero_and_invalid_values() {
267        assert_eq!(parse_cpu_allocation_cap(Some("0"), 1024), 1024);
268        assert_eq!(parse_cpu_allocation_cap(Some("invalid"), 1024), 1024);
269        assert_eq!(parse_cpu_allocation_cap(None, 1024), 1024);
270    }
271
272    #[test]
273    fn default_allocator_reads_env_cap() {
274        let _guard = EnvGuard::set(ENV_MAX_CPU_ALLOCATION_SIZE, &ENV_CONFIGURED_CAP.to_string());
275        let allocator = Allocator::default();
276        assert_eq!(allocator.max_allocation_size(), ENV_CONFIGURED_CAP);
277    }
278
279    #[test]
280    fn shared_memory_error_keeps_operation_name() {
281        let err = shared_memory_error("unit_test");
282        assert!(matches!(
283            err,
284            LavaFlowError::SharedMemoryOperation { operation, .. } if operation == "unit_test"
285        ));
286    }
287
288    #[test]
289    fn zero_size_allocation_is_rejected() {
290        let allocator = test_allocator();
291        let err = allocator
292            .allocate(0)
293            .expect_err("zero-sized allocation must fail");
294        assert!(matches!(
295            err,
296            LavaFlowError::InvalidAllocationRequest {
297                size,
298                reason: AllocationReason::ZeroSize,
299            } if size == 0
300        ));
301    }
302
303    #[test]
304    fn oversized_allocation_is_rejected() {
305        let allocator = test_allocator();
306        let max = allocator.max_allocation_size();
307        let oversized = max
308            .checked_add(1)
309            .expect("allocation cap leaves room for overflow test");
310        let err = allocator
311            .allocate(oversized)
312            .expect_err("oversized allocation must fail");
313        assert!(matches!(
314            err,
315            LavaFlowError::InvalidAllocationRequest {
316                size,
317                reason: AllocationReason::ExceedsMaxSize,
318            } if size == oversized
319        ));
320    }
321}