Skip to main content

lava_flow/channels/
mod.rs

1//! Layer-2 channel envelope types and local transport scaffolding.
2
3use crate::error::{LavaFlowError, Result};
4use crate::memory::{cpu, gpu};
5use serde::de::DeserializeOwned;
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::convert::TryFrom;
9
10#[cfg_attr(not(test), allow(dead_code))]
11pub(crate) mod local;
12
13/// Metadata encoding configuration for channel envelopes.
14#[repr(u8)]
15#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
16pub enum MetadataEncoding {
17    /// JSON metadata encoding.
18    #[default]
19    Json = 0,
20    /// CBOR metadata encoding.
21    ///
22    /// This variant is planned but not implemented in the current step-1 local transport.
23    Cbor = 1,
24}
25
26impl TryFrom<u8> for MetadataEncoding {
27    type Error = LavaFlowError;
28
29    fn try_from(value: u8) -> Result<Self> {
30        match value {
31            0 => Ok(Self::Json),
32            1 => Ok(Self::Cbor),
33            _ => Err(LavaFlowError::ChannelTransportOperation {
34                operation: "decode_connection_header",
35                source: std::io::Error::new(
36                    std::io::ErrorKind::InvalidInput,
37                    "unknown metadata encoding",
38                ),
39            }),
40        }
41    }
42}
43
44/// User-defined typed metadata contract for channel payloads.
45pub trait ChannelMetadata: Serialize + DeserializeOwned {
46    /// Returns the number of payload bytes that are valid for this message.
47    fn used_size(&self) -> usize;
48}
49
50/// Dynamic metadata value used by schema-less receive paths.
51#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
52pub enum MetaValue {
53    /// Boolean value.
54    Bool(bool),
55    /// Signed integer value.
56    I64(i64),
57    /// Unsigned integer value.
58    U64(u64),
59    /// Floating-point value.
60    F64(f64),
61    /// UTF-8 string value.
62    String(String),
63    /// Opaque byte string.
64    Bytes(Vec<u8>),
65    /// Homogeneous or heterogeneous list.
66    List(Vec<MetaValue>),
67    /// Nested map value.
68    Map(BTreeMap<String, MetaValue>),
69}
70
71/// Dynamic metadata envelope for schema-less receive operations.
72#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
73pub struct MessageMeta {
74    /// Number of payload bytes that are valid for this message.
75    pub used_size: usize,
76    /// Additional metadata fields.
77    pub values: BTreeMap<String, MetaValue>,
78}
79
80impl ChannelMetadata for MessageMeta {
81    fn used_size(&self) -> usize {
82        self.used_size
83    }
84}
85
86/// Observable receive behavior for a receiver endpoint.
87#[derive(Copy, Clone, Debug, PartialEq, Eq)]
88pub enum ReceiveRepresentation {
89    /// Receiver returns an externally shared buffer reference.
90    ExternalShare,
91    /// Receiver materializes into an owned buffer.
92    Materialized,
93}
94
95/// Payload exchanged by channels.
96///
97/// `Frame` is the high-level payload abstraction used by the public channel API. It distinguishes
98/// buffer backend kind (`CPU` or `GPU`) while keeping transport representation details internal to
99/// the channel runtime. Whether delivery used external sharing or local materialization is exposed
100/// through receiver introspection such as [`ReceiveRepresentation`].
101#[derive(Debug)]
102pub enum Frame {
103    /// CPU-backed payload.
104    Cpu(cpu::MemoryBuffer),
105    /// GPU-backed payload.
106    Gpu(gpu::MemoryBuffer),
107}
108
109impl Frame {
110    /// Returns the payload size in bytes.
111    pub fn size(&self) -> usize {
112        match self {
113            Self::Cpu(buffer) => buffer.size(),
114            Self::Gpu(buffer) => buffer.size(),
115        }
116    }
117
118    /// Returns the CPU buffer when this payload is CPU-backed.
119    pub fn into_cpu(self) -> Option<cpu::MemoryBuffer> {
120        match self {
121            Self::Cpu(buffer) => Some(buffer),
122            Self::Gpu(_) => None,
123        }
124    }
125
126    /// Returns the GPU buffer when this payload is GPU-backed.
127    pub fn into_gpu(self) -> Option<gpu::MemoryBuffer> {
128        match self {
129            Self::Cpu(_) => None,
130            Self::Gpu(buffer) => Some(buffer),
131        }
132    }
133}
134
135impl From<cpu::MemoryBuffer> for Frame {
136    fn from(buffer: cpu::MemoryBuffer) -> Self {
137        Self::Cpu(buffer)
138    }
139}
140
141impl From<gpu::MemoryBuffer> for Frame {
142    fn from(buffer: gpu::MemoryBuffer) -> Self {
143        Self::Gpu(buffer)
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn metadata_encoding_rejects_unknown_wire_value() {
153        let err = MetadataEncoding::try_from(99).expect_err("unknown encoding must fail");
154        assert!(matches!(
155            err,
156            LavaFlowError::ChannelTransportOperation {
157                operation: "decode_connection_header",
158                ..
159            }
160        ));
161    }
162
163    #[test]
164    fn message_meta_used_size_returns_stored_value() {
165        let meta = MessageMeta {
166            used_size: 17,
167            values: BTreeMap::new(),
168        };
169        assert_eq!(meta.used_size(), 17);
170    }
171
172    #[test]
173    fn frame_cpu_helpers_cover_size_and_variant_access() {
174        let buffer = cpu::Allocator::with_max_allocation_size(usize::MAX)
175            .allocate(64)
176            .expect("allocate cpu buffer");
177        let frame = Frame::from(buffer);
178        assert_eq!(frame.size(), 64);
179        assert!(frame.into_cpu().is_some());
180
181        let buffer = cpu::Allocator::with_max_allocation_size(usize::MAX)
182            .allocate(64)
183            .expect("allocate cpu buffer");
184        let frame = Frame::from(buffer);
185        assert!(frame.into_gpu().is_none());
186    }
187
188    #[test]
189    fn frame_gpu_helpers_cover_size_and_variant_access() {
190        let allocator = match gpu::Allocator::new() {
191            Ok(allocator) => allocator,
192            Err(_) => return,
193        };
194        let buffer = allocator.allocate(64).expect("allocate gpu buffer");
195        let frame = Frame::from(buffer);
196        assert_eq!(frame.size(), 64);
197        assert!(frame.into_gpu().is_some());
198
199        let buffer = allocator.allocate(64).expect("allocate gpu buffer");
200        let frame = Frame::from(buffer);
201        assert!(frame.into_cpu().is_none());
202    }
203}