lava_flow/channels/
mod.rs1use crate::error::{LavaFlowError, Result};
4use crate::memory::{cpu, gpu};
5use serde::de::DeserializeOwned;
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::convert::TryFrom;
9
10#[cfg_attr(not(test), allow(dead_code))]
11pub(crate) mod local;
12
13#[repr(u8)]
15#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
16pub enum MetadataEncoding {
17 #[default]
19 Json = 0,
20 Cbor = 1,
24}
25
26impl TryFrom<u8> for MetadataEncoding {
27 type Error = LavaFlowError;
28
29 fn try_from(value: u8) -> Result<Self> {
30 match value {
31 0 => Ok(Self::Json),
32 1 => Ok(Self::Cbor),
33 _ => Err(LavaFlowError::ChannelTransportOperation {
34 operation: "decode_connection_header",
35 source: std::io::Error::new(
36 std::io::ErrorKind::InvalidInput,
37 "unknown metadata encoding",
38 ),
39 }),
40 }
41 }
42}
43
44pub trait ChannelMetadata: Serialize + DeserializeOwned {
46 fn used_size(&self) -> usize;
48}
49
50#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
52pub enum MetaValue {
53 Bool(bool),
55 I64(i64),
57 U64(u64),
59 F64(f64),
61 String(String),
63 Bytes(Vec<u8>),
65 List(Vec<MetaValue>),
67 Map(BTreeMap<String, MetaValue>),
69}
70
71#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
73pub struct MessageMeta {
74 pub used_size: usize,
76 pub values: BTreeMap<String, MetaValue>,
78}
79
80impl ChannelMetadata for MessageMeta {
81 fn used_size(&self) -> usize {
82 self.used_size
83 }
84}
85
86#[derive(Copy, Clone, Debug, PartialEq, Eq)]
88pub enum ReceiveRepresentation {
89 ExternalShare,
91 Materialized,
93}
94
95#[derive(Debug)]
102pub enum Frame {
103 Cpu(cpu::MemoryBuffer),
105 Gpu(gpu::MemoryBuffer),
107}
108
109impl Frame {
110 pub fn size(&self) -> usize {
112 match self {
113 Self::Cpu(buffer) => buffer.size(),
114 Self::Gpu(buffer) => buffer.size(),
115 }
116 }
117
118 pub fn into_cpu(self) -> Option<cpu::MemoryBuffer> {
120 match self {
121 Self::Cpu(buffer) => Some(buffer),
122 Self::Gpu(_) => None,
123 }
124 }
125
126 pub fn into_gpu(self) -> Option<gpu::MemoryBuffer> {
128 match self {
129 Self::Cpu(_) => None,
130 Self::Gpu(buffer) => Some(buffer),
131 }
132 }
133}
134
135impl From<cpu::MemoryBuffer> for Frame {
136 fn from(buffer: cpu::MemoryBuffer) -> Self {
137 Self::Cpu(buffer)
138 }
139}
140
141impl From<gpu::MemoryBuffer> for Frame {
142 fn from(buffer: gpu::MemoryBuffer) -> Self {
143 Self::Gpu(buffer)
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn metadata_encoding_rejects_unknown_wire_value() {
153 let err = MetadataEncoding::try_from(99).expect_err("unknown encoding must fail");
154 assert!(matches!(
155 err,
156 LavaFlowError::ChannelTransportOperation {
157 operation: "decode_connection_header",
158 ..
159 }
160 ));
161 }
162
163 #[test]
164 fn message_meta_used_size_returns_stored_value() {
165 let meta = MessageMeta {
166 used_size: 17,
167 values: BTreeMap::new(),
168 };
169 assert_eq!(meta.used_size(), 17);
170 }
171
172 #[test]
173 fn frame_cpu_helpers_cover_size_and_variant_access() {
174 let buffer = cpu::Allocator::with_max_allocation_size(usize::MAX)
175 .allocate(64)
176 .expect("allocate cpu buffer");
177 let frame = Frame::from(buffer);
178 assert_eq!(frame.size(), 64);
179 assert!(frame.into_cpu().is_some());
180
181 let buffer = cpu::Allocator::with_max_allocation_size(usize::MAX)
182 .allocate(64)
183 .expect("allocate cpu buffer");
184 let frame = Frame::from(buffer);
185 assert!(frame.into_gpu().is_none());
186 }
187
188 #[test]
189 fn frame_gpu_helpers_cover_size_and_variant_access() {
190 let allocator = match gpu::Allocator::new() {
191 Ok(allocator) => allocator,
192 Err(_) => return,
193 };
194 let buffer = allocator.allocate(64).expect("allocate gpu buffer");
195 let frame = Frame::from(buffer);
196 assert_eq!(frame.size(), 64);
197 assert!(frame.into_gpu().is_some());
198
199 let buffer = allocator.allocate(64).expect("allocate gpu buffer");
200 let frame = Frame::from(buffer);
201 assert!(frame.into_cpu().is_none());
202 }
203}