lava_flow/memory/cpu/
mod.rs1use crate::error::{AllocationReason, LavaFlowError, Result};
2use crate::memory::allocator::InterprocessMemoryHandle;
3use std::env;
4
5#[cfg(unix)]
6mod unix;
7#[cfg(windows)]
8mod windows;
9
10#[cfg(unix)]
11use unix::SharedMemoryRegion;
12#[cfg(windows)]
13use windows::SharedMemoryRegion;
14
15#[derive(Debug)]
17pub struct Allocator {
18 max_allocation_size: usize,
19}
20
21impl Allocator {
22 pub fn new() -> Self {
26 let hard = hard_max_cpu_allocation_size();
27 let raw = env::var("LAVA_FLOW_MAX_CPU_ALLOCATION_SIZE").ok();
28 let max_allocation_size = parse_cpu_allocation_cap(raw.as_deref(), hard);
29 Self {
30 max_allocation_size,
31 }
32 }
33
34 pub fn with_max_allocation_size(max_allocation_size: usize) -> Self {
38 let hard = hard_max_cpu_allocation_size();
39 let capped = if max_allocation_size == 0 {
40 hard
41 } else {
42 max_allocation_size.min(hard)
43 };
44 Self {
45 max_allocation_size: capped,
46 }
47 }
48
49 pub fn max_allocation_size(&self) -> usize {
51 self.max_allocation_size
52 }
53
54 pub fn allocate(&self, size: usize) -> Result<MemoryBuffer> {
56 let region = SharedMemoryRegion::create(size, self.max_allocation_size)?;
57 let mut buffer = MemoryBuffer { region };
58 buffer.zero_fill();
59 Ok(buffer)
60 }
61}
62
63impl Default for Allocator {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
71pub enum AllocationStrategy {
72 Standard,
74 GpuPinned,
76}
77
78#[derive(Debug)]
80pub struct MemoryBuffer {
81 region: SharedMemoryRegion,
82}
83
84impl MemoryBuffer {
85 #[cfg_attr(not(test), allow(dead_code))]
87 pub(crate) fn from_shared_handle(
88 size: usize,
89 handle: InterprocessMemoryHandle,
90 ) -> Result<Self> {
91 let region = SharedMemoryRegion::from_handle(size, hard_max_cpu_allocation_size(), handle)?;
92 Ok(Self { region })
93 }
94
95 fn zero_fill(&mut self) {
96 unsafe { std::ptr::write_bytes(self.as_mut_ptr(), 0, self.size()) };
97 }
98
99 pub fn as_slice(&self) -> &[u8] {
101 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.size()) }
102 }
103
104 pub fn as_mut_slice(&mut self) -> &mut [u8] {
106 unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.size()) }
107 }
108
109 pub fn as_ptr(&self) -> *const u8 {
111 self.region.as_ptr()
112 }
113
114 pub fn as_mut_ptr(&mut self) -> *mut u8 {
116 self.region.as_mut_ptr()
117 }
118
119 pub fn size(&self) -> usize {
121 self.region.size()
122 }
123
124 pub fn strategy(&self) -> AllocationStrategy {
126 AllocationStrategy::Standard
127 }
128
129 #[cfg_attr(not(test), allow(dead_code))]
131 pub(crate) fn shared_handle(&self) -> Result<InterprocessMemoryHandle> {
132 self.region.export_handle()
133 }
134}
135
136fn hard_max_cpu_allocation_size() -> usize {
137 #[cfg(unix)]
139 {
140 let off_t_max = libc::off_t::MAX as u128;
141 let pointer_max = isize::MAX as u128;
142 let cap = off_t_max.min(pointer_max);
143 usize::try_from(cap).unwrap_or(isize::MAX as usize)
144 }
145 #[cfg(not(unix))]
146 {
147 isize::MAX as usize
148 }
149}
150
151fn parse_cpu_allocation_cap(raw: Option<&str>, hard: usize) -> usize {
152 let configured = match raw {
153 Some(value) => match value.parse::<usize>() {
154 Ok(parsed) if parsed > 0 => parsed,
155 _ => hard,
156 },
157 None => hard,
158 };
159 configured.min(hard)
160}
161
162fn validate_size(size: usize, max_allocation_size: usize) -> Result<()> {
163 if size == 0 {
164 return Err(LavaFlowError::InvalidAllocationRequest {
165 size,
166 reason: AllocationReason::ZeroSize,
167 });
168 }
169 if size > max_allocation_size {
170 return Err(LavaFlowError::InvalidAllocationRequest {
171 size,
172 reason: AllocationReason::ExceedsMaxSize,
173 });
174 }
175 Ok(())
176}
177
178fn shared_memory_error(operation: &'static str) -> LavaFlowError {
179 let source = std::io::Error::last_os_error();
180 let source = if source.raw_os_error().is_none() || source.raw_os_error() == Some(0) {
181 std::io::Error::other(format!("{operation} failed"))
182 } else {
183 source
184 };
185 LavaFlowError::SharedMemoryOperation { operation, source }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use crate::test_support::env::Guard as EnvGuard;
192
193 const BUFFER_SIZE: usize = 64;
194 const SMALL_CAP: usize = 64;
195 const OVER_CAP_SIZE: usize = 65;
196 const TEST_BYTE_OFFSET: usize = 3;
197 const TEST_BYTE_VALUE: u8 = 0x7f;
198 const ENV_MAX_CPU_ALLOCATION_SIZE: &str = "LAVA_FLOW_MAX_CPU_ALLOCATION_SIZE";
199 const TEST_HARD_CAP: usize = usize::MAX;
200 const ENV_CONFIGURED_CAP: usize = 1024;
201
202 fn test_allocator() -> Allocator {
203 Allocator::with_max_allocation_size(TEST_HARD_CAP)
204 }
205
206 #[test]
207 fn standard_allocation_exposes_mutable_and_immutable_views() {
208 let allocator = test_allocator();
209 let mut buffer = allocator.allocate(BUFFER_SIZE).expect("allocate standard");
210 assert_eq!(buffer.size(), BUFFER_SIZE);
211 assert!(!buffer.as_ptr().is_null());
212 assert!(!buffer.as_mut_ptr().is_null());
213 assert_eq!(buffer.strategy(), AllocationStrategy::Standard);
214 assert_eq!(buffer.as_slice().len(), BUFFER_SIZE);
215 assert_eq!(buffer.as_mut_slice().len(), BUFFER_SIZE);
216 }
217
218 #[test]
219 fn standard_allocation_is_zero_initialized() {
220 let allocator = test_allocator();
221 let buffer = allocator.allocate(BUFFER_SIZE).expect("allocate standard");
222 assert!(buffer.as_slice().iter().all(|&b| b == 0));
223 }
224
225 #[test]
226 fn imported_shared_handle_observes_written_bytes() {
227 let allocator = test_allocator();
228 let mut original = allocator.allocate(BUFFER_SIZE).expect("allocate original");
229 original.as_mut_slice()[TEST_BYTE_OFFSET] = TEST_BYTE_VALUE;
230
231 let handle = original.shared_handle().expect("export handle");
232 let imported =
233 MemoryBuffer::from_shared_handle(BUFFER_SIZE, handle).expect("import shared handle");
234 assert_eq!(imported.as_slice()[TEST_BYTE_OFFSET], TEST_BYTE_VALUE);
235 }
236
237 #[test]
238 fn configured_cap_is_enforced() {
239 let allocator = Allocator::with_max_allocation_size(SMALL_CAP);
240 let err = allocator
241 .allocate(OVER_CAP_SIZE)
242 .expect_err("allocation over configured cap must fail");
243 assert!(matches!(
244 err,
245 LavaFlowError::InvalidAllocationRequest {
246 size: OVER_CAP_SIZE,
247 reason: AllocationReason::ExceedsMaxSize,
248 }
249 ));
250 }
251
252 #[test]
253 fn zero_cap_configuration_uses_hard_limit() {
254 assert_eq!(
255 Allocator::with_max_allocation_size(0).max_allocation_size(),
256 hard_max_cpu_allocation_size()
257 );
258 }
259
260 #[test]
261 fn parse_cpu_allocation_cap_accepts_valid_value() {
262 assert_eq!(parse_cpu_allocation_cap(Some("128"), 1024), 128);
263 }
264
265 #[test]
266 fn parse_cpu_allocation_cap_rejects_zero_and_invalid_values() {
267 assert_eq!(parse_cpu_allocation_cap(Some("0"), 1024), 1024);
268 assert_eq!(parse_cpu_allocation_cap(Some("invalid"), 1024), 1024);
269 assert_eq!(parse_cpu_allocation_cap(None, 1024), 1024);
270 }
271
272 #[test]
273 fn default_allocator_reads_env_cap() {
274 let _guard = EnvGuard::set(ENV_MAX_CPU_ALLOCATION_SIZE, &ENV_CONFIGURED_CAP.to_string());
275 let allocator = Allocator::default();
276 assert_eq!(allocator.max_allocation_size(), ENV_CONFIGURED_CAP);
277 }
278
279 #[test]
280 fn shared_memory_error_keeps_operation_name() {
281 let err = shared_memory_error("unit_test");
282 assert!(matches!(
283 err,
284 LavaFlowError::SharedMemoryOperation { operation, .. } if operation == "unit_test"
285 ));
286 }
287
288 #[test]
289 fn zero_size_allocation_is_rejected() {
290 let allocator = test_allocator();
291 let err = allocator
292 .allocate(0)
293 .expect_err("zero-sized allocation must fail");
294 assert!(matches!(
295 err,
296 LavaFlowError::InvalidAllocationRequest {
297 size,
298 reason: AllocationReason::ZeroSize,
299 } if size == 0
300 ));
301 }
302
303 #[test]
304 fn oversized_allocation_is_rejected() {
305 let allocator = test_allocator();
306 let max = allocator.max_allocation_size();
307 let oversized = max
308 .checked_add(1)
309 .expect("allocation cap leaves room for overflow test");
310 let err = allocator
311 .allocate(oversized)
312 .expect_err("oversized allocation must fail");
313 assert!(matches!(
314 err,
315 LavaFlowError::InvalidAllocationRequest {
316 size,
317 reason: AllocationReason::ExceedsMaxSize,
318 } if size == oversized
319 ));
320 }
321}