Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, DeferTick, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A *nullable* Rust value that can asynchronously change over time.
26///
27/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
28/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
29/// asynchronously change over time, including becoming present of uninhabited.
30///
31/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
32/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this optional (when it is not null)
36/// - `Loc`: the [`Location`] where the optional is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Optional<Type, Loc, Bound: Boundedness> {
39    pub(crate) location: Loc,
40    pub(crate) ir_node: RefCell<HydroNode>,
41    pub(crate) flow_state: FlowState,
42
43    _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
47    fn drop(&mut self) {
48        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
49        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
50            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
51                input: Box::new(ir_node),
52                op_metadata: HydroIrOpMetadata::new(),
53            });
54        }
55    }
56}
57
58impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
59where
60    T: Clone,
61    L: Location<'a> + NoTick,
62{
63    fn from(value: Optional<T, L, Bounded>) -> Self {
64        let tick = value.location().tick();
65        value.clone_into_tick(&tick).latest()
66    }
67}
68
69impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
70where
71    L: Location<'a>,
72{
73    fn defer_tick(self) -> Self {
74        Optional::defer_tick(self)
75    }
76}
77
78impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
79where
80    L: Location<'a>,
81{
82    type Location = Tick<L>;
83
84    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
85        Optional::new(
86            location.clone(),
87            HydroNode::CycleSource {
88                cycle_id,
89                metadata: location.new_node_metadata(Self::collection_kind()),
90            },
91        )
92    }
93}
94
95impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
96where
97    L: Location<'a>,
98{
99    type Location = Tick<L>;
100
101    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
102        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
103            location.clone(),
104            HydroNode::DeferTick {
105                input: Box::new(HydroNode::CycleSource {
106                    cycle_id,
107                    metadata: location.new_node_metadata(Self::collection_kind()),
108                }),
109                metadata: location
110                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
111            },
112        );
113
114        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
115    }
116}
117
118impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
119where
120    L: Location<'a>,
121{
122    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
123        assert_eq!(
124            Location::id(&self.location),
125            expected_location,
126            "locations do not match"
127        );
128        self.location
129            .flow_state()
130            .borrow_mut()
131            .push_root(HydroRoot::CycleSink {
132                cycle_id,
133                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
134                op_metadata: HydroIrOpMetadata::new(),
135            });
136    }
137}
138
139impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
140where
141    L: Location<'a>,
142{
143    type Location = Tick<L>;
144
145    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
146        Optional::new(
147            location.clone(),
148            HydroNode::CycleSource {
149                cycle_id,
150                metadata: location.new_node_metadata(Self::collection_kind()),
151            },
152        )
153    }
154}
155
156impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
157where
158    L: Location<'a>,
159{
160    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
161        assert_eq!(
162            Location::id(&self.location),
163            expected_location,
164            "locations do not match"
165        );
166        self.location
167            .flow_state()
168            .borrow_mut()
169            .push_root(HydroRoot::CycleSink {
170                cycle_id,
171                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
172                op_metadata: HydroIrOpMetadata::new(),
173            });
174    }
175}
176
177impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
178where
179    L: Location<'a> + NoTick,
180{
181    type Location = L;
182
183    fn create_source(cycle_id: CycleId, location: L) -> Self {
184        Optional::new(
185            location.clone(),
186            HydroNode::CycleSource {
187                cycle_id,
188                metadata: location.new_node_metadata(Self::collection_kind()),
189            },
190        )
191    }
192}
193
194impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
195where
196    L: Location<'a> + NoTick,
197{
198    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
199        assert_eq!(
200            Location::id(&self.location),
201            expected_location,
202            "locations do not match"
203        );
204        self.location
205            .flow_state()
206            .borrow_mut()
207            .push_root(HydroRoot::CycleSink {
208                cycle_id,
209                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
210                op_metadata: HydroIrOpMetadata::new(),
211            });
212    }
213}
214
215impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
216where
217    L: Location<'a>,
218{
219    fn from(singleton: Singleton<T, L, B>) -> Self {
220        Optional::new(
221            singleton.location.clone(),
222            HydroNode::Cast {
223                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
224                metadata: singleton
225                    .location
226                    .new_node_metadata(Self::collection_kind()),
227            },
228        )
229    }
230}
231
232#[cfg(stageleft_runtime)]
233pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
234    me: Optional<T, L, B>,
235    other: Optional<O, L, B>,
236) -> Optional<(T, O), L, B> {
237    check_matching_location(&me.location, &other.location);
238
239    Optional::new(
240        me.location.clone(),
241        HydroNode::CrossSingleton {
242            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
243            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
244            metadata: me
245                .location
246                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
247        },
248    )
249}
250
251#[cfg(stageleft_runtime)]
252fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
253    me: Optional<T, L, B>,
254    other: Optional<T, L, B>,
255) -> Optional<T, L, B> {
256    check_matching_location(&me.location, &other.location);
257
258    Optional::new(
259        me.location.clone(),
260        HydroNode::ChainFirst {
261            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
262            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
263            metadata: me
264                .location
265                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
266        },
267    )
268}
269
270impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
271where
272    T: Clone,
273    L: Location<'a>,
274{
275    fn clone(&self) -> Self {
276        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
277            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
278            *self.ir_node.borrow_mut() = HydroNode::Tee {
279                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
280                metadata: self.location.new_node_metadata(Self::collection_kind()),
281            };
282        }
283
284        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
285            Optional {
286                location: self.location.clone(),
287                flow_state: self.flow_state.clone(),
288                ir_node: HydroNode::Tee {
289                    inner: SharedNode(inner.0.clone()),
290                    metadata: metadata.clone(),
291                }
292                .into(),
293                _phantom: PhantomData,
294            }
295        } else {
296            unreachable!()
297        }
298    }
299}
300
301impl<'a, T, L, B: Boundedness> Optional<T, L, B>
302where
303    L: Location<'a>,
304{
305    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308        let flow_state = location.flow_state().clone();
309        Optional {
310            location,
311            flow_state,
312            ir_node: RefCell::new(ir_node),
313            _phantom: PhantomData,
314        }
315    }
316
317    pub(crate) fn collection_kind() -> CollectionKind {
318        CollectionKind::Optional {
319            bound: B::BOUND_KIND,
320            element_type: stageleft::quote_type::<T>().into(),
321        }
322    }
323
324    /// Returns the [`Location`] where this optional is being materialized.
325    pub fn location(&self) -> &L {
326        &self.location
327    }
328
329    /// Transforms the optional value by applying a function `f` to it,
330    /// continuously as the input is updated.
331    ///
332    /// Whenever the optional is empty, the output optional is also empty.
333    ///
334    /// # Example
335    /// ```rust
336    /// # #[cfg(feature = "deploy")] {
337    /// # use hydro_lang::prelude::*;
338    /// # use futures::StreamExt;
339    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
340    /// let tick = process.tick();
341    /// let optional = tick.optional_first_tick(q!(1));
342    /// optional.map(q!(|v| v + 1)).all_ticks()
343    /// # }, |mut stream| async move {
344    /// // 2
345    /// # assert_eq!(stream.next().await.unwrap(), 2);
346    /// # }));
347    /// # }
348    /// ```
349    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
350    where
351        F: Fn(T) -> U + 'a,
352    {
353        let f = f.splice_fn1_ctx(&self.location).into();
354        Optional::new(
355            self.location.clone(),
356            HydroNode::Map {
357                f,
358                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
359                metadata: self
360                    .location
361                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
362            },
363        )
364    }
365
366    /// Transforms the optional value by applying a function `f` to it and then flattening
367    /// the result into a stream, preserving the order of elements.
368    ///
369    /// If the optional is empty, the output stream is also empty. If the optional contains
370    /// a value, `f` is applied to produce an iterator, and all items from that iterator
371    /// are emitted in the output stream in deterministic order.
372    ///
373    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
374    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
375    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
376    ///
377    /// # Example
378    /// ```rust
379    /// # #[cfg(feature = "deploy")] {
380    /// # use hydro_lang::prelude::*;
381    /// # use futures::StreamExt;
382    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
383    /// let tick = process.tick();
384    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
385    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
386    /// # }, |mut stream| async move {
387    /// // 1, 2, 3
388    /// # for w in vec![1, 2, 3] {
389    /// #     assert_eq!(stream.next().await.unwrap(), w);
390    /// # }
391    /// # }));
392    /// # }
393    /// ```
394    pub fn flat_map_ordered<U, I, F>(
395        self,
396        f: impl IntoQuotedMut<'a, F, L>,
397    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
398    where
399        B: IsBounded,
400        I: IntoIterator<Item = U>,
401        F: Fn(T) -> I + 'a,
402    {
403        self.into_stream().flat_map_ordered(f)
404    }
405
406    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
407    /// for the output type `I` to produce items in any order.
408    ///
409    /// If the optional is empty, the output stream is also empty. If the optional contains
410    /// a value, `f` is applied to produce an iterator, and all items from that iterator
411    /// are emitted in the output stream in non-deterministic order.
412    ///
413    /// # Example
414    /// ```rust
415    /// # #[cfg(feature = "deploy")] {
416    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
417    /// # use futures::StreamExt;
418    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
419    /// let tick = process.tick();
420    /// let optional = tick.optional_first_tick(q!(
421    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
422    /// ));
423    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
424    /// # }, |mut stream| async move {
425    /// // 1, 2, 3, but in no particular order
426    /// # let mut results = Vec::new();
427    /// # for _ in 0..3 {
428    /// #     results.push(stream.next().await.unwrap());
429    /// # }
430    /// # results.sort();
431    /// # assert_eq!(results, vec![1, 2, 3]);
432    /// # }));
433    /// # }
434    /// ```
435    pub fn flat_map_unordered<U, I, F>(
436        self,
437        f: impl IntoQuotedMut<'a, F, L>,
438    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
439    where
440        B: IsBounded,
441        I: IntoIterator<Item = U>,
442        F: Fn(T) -> I + 'a,
443    {
444        self.into_stream().flat_map_unordered(f)
445    }
446
447    /// Flattens the optional value into a stream, preserving the order of elements.
448    ///
449    /// If the optional is empty, the output stream is also empty. If the optional contains
450    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
451    /// in the output stream in deterministic order.
452    ///
453    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
454    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
455    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
456    ///
457    /// # Example
458    /// ```rust
459    /// # #[cfg(feature = "deploy")] {
460    /// # use hydro_lang::prelude::*;
461    /// # use futures::StreamExt;
462    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
463    /// let tick = process.tick();
464    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
465    /// optional.flatten_ordered().all_ticks()
466    /// # }, |mut stream| async move {
467    /// // 1, 2, 3
468    /// # for w in vec![1, 2, 3] {
469    /// #     assert_eq!(stream.next().await.unwrap(), w);
470    /// # }
471    /// # }));
472    /// # }
473    /// ```
474    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
475    where
476        B: IsBounded,
477        T: IntoIterator<Item = U>,
478    {
479        self.flat_map_ordered(q!(|v| v))
480    }
481
482    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
483    /// for the element type `T` to produce items in any order.
484    ///
485    /// If the optional is empty, the output stream is also empty. If the optional contains
486    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
487    /// in the output stream in non-deterministic order.
488    ///
489    /// # Example
490    /// ```rust
491    /// # #[cfg(feature = "deploy")] {
492    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
493    /// # use futures::StreamExt;
494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
495    /// let tick = process.tick();
496    /// let optional = tick.optional_first_tick(q!(
497    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
498    /// ));
499    /// optional.flatten_unordered().all_ticks()
500    /// # }, |mut stream| async move {
501    /// // 1, 2, 3, but in no particular order
502    /// # let mut results = Vec::new();
503    /// # for _ in 0..3 {
504    /// #     results.push(stream.next().await.unwrap());
505    /// # }
506    /// # results.sort();
507    /// # assert_eq!(results, vec![1, 2, 3]);
508    /// # }));
509    /// # }
510    /// ```
511    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
512    where
513        B: IsBounded,
514        T: IntoIterator<Item = U>,
515    {
516        self.flat_map_unordered(q!(|v| v))
517    }
518
519    /// Creates an optional containing only the value if it satisfies a predicate `f`.
520    ///
521    /// If the optional is empty, the output optional is also empty. If the optional contains
522    /// a value and the predicate returns `true`, the output optional contains the same value.
523    /// If the predicate returns `false`, the output optional is empty.
524    ///
525    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
526    /// not modify or take ownership of the value. If you need to modify the value while filtering
527    /// use [`Optional::filter_map`] instead.
528    ///
529    /// # Example
530    /// ```rust
531    /// # #[cfg(feature = "deploy")] {
532    /// # use hydro_lang::prelude::*;
533    /// # use futures::StreamExt;
534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535    /// let tick = process.tick();
536    /// let optional = tick.optional_first_tick(q!(5));
537    /// optional.filter(q!(|&x| x > 3)).all_ticks()
538    /// # }, |mut stream| async move {
539    /// // 5
540    /// # assert_eq!(stream.next().await.unwrap(), 5);
541    /// # }));
542    /// # }
543    /// ```
544    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
545    where
546        F: Fn(&T) -> bool + 'a,
547    {
548        let f = f.splice_fn1_borrow_ctx(&self.location).into();
549        Optional::new(
550            self.location.clone(),
551            HydroNode::Filter {
552                f,
553                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
554                metadata: self.location.new_node_metadata(Self::collection_kind()),
555            },
556        )
557    }
558
559    /// An operator that both filters and maps. It yields only the value if the supplied
560    /// closure `f` returns `Some(value)`.
561    ///
562    /// If the optional is empty, the output optional is also empty. If the optional contains
563    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
564    /// If the closure returns `None`, the output optional is empty.
565    ///
566    /// # Example
567    /// ```rust
568    /// # #[cfg(feature = "deploy")] {
569    /// # use hydro_lang::prelude::*;
570    /// # use futures::StreamExt;
571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572    /// let tick = process.tick();
573    /// let optional = tick.optional_first_tick(q!("42"));
574    /// optional
575    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
576    ///     .all_ticks()
577    /// # }, |mut stream| async move {
578    /// // 42
579    /// # assert_eq!(stream.next().await.unwrap(), 42);
580    /// # }));
581    /// # }
582    /// ```
583    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
584    where
585        F: Fn(T) -> Option<U> + 'a,
586    {
587        let f = f.splice_fn1_ctx(&self.location).into();
588        Optional::new(
589            self.location.clone(),
590            HydroNode::FilterMap {
591                f,
592                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
593                metadata: self
594                    .location
595                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
596            },
597        )
598    }
599
600    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
601    ///
602    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
603    /// non-null. This is useful for combining several pieces of state together.
604    ///
605    /// # Example
606    /// ```rust
607    /// # #[cfg(feature = "deploy")] {
608    /// # use hydro_lang::prelude::*;
609    /// # use futures::StreamExt;
610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611    /// let tick = process.tick();
612    /// let numbers = process
613    ///   .source_iter(q!(vec![123, 456, 789]))
614    ///   .batch(&tick, nondet!(/** test */));
615    /// let min = numbers.clone().min(); // Optional
616    /// let max = numbers.max(); // Optional
617    /// min.zip(max).all_ticks()
618    /// # }, |mut stream| async move {
619    /// // [(123, 789)]
620    /// # for w in vec![(123, 789)] {
621    /// #     assert_eq!(stream.next().await.unwrap(), w);
622    /// # }
623    /// # }));
624    /// # }
625    /// ```
626    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
627    where
628        B: IsBounded,
629    {
630        let other: Optional<O, L, B> = other.into();
631        check_matching_location(&self.location, &other.location);
632
633        if L::is_top_level()
634            && let Some(tick) = self.location.try_tick()
635        {
636            let out = zip_inside_tick(
637                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
638                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639            )
640            .latest();
641
642            Optional::new(
643                out.location.clone(),
644                out.ir_node.replace(HydroNode::Placeholder),
645            )
646        } else {
647            zip_inside_tick(self, other)
648        }
649    }
650
651    /// Passes through `self` when it has a value, otherwise passes through `other`.
652    ///
653    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
654    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
655    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
656    ///
657    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
658    /// of the inputs change (including to/from null states).
659    ///
660    /// # Example
661    /// ```rust
662    /// # #[cfg(feature = "deploy")] {
663    /// # use hydro_lang::prelude::*;
664    /// # use futures::StreamExt;
665    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
666    /// let tick = process.tick();
667    /// // ticks are lazy by default, forces the second tick to run
668    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
669    ///
670    /// let some_first_tick = tick.optional_first_tick(q!(123));
671    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
672    /// some_first_tick.or(some_second_tick).all_ticks()
673    /// # }, |mut stream| async move {
674    /// // [123 /* first tick */, 456 /* second tick */]
675    /// # for w in vec![123, 456] {
676    /// #     assert_eq!(stream.next().await.unwrap(), w);
677    /// # }
678    /// # }));
679    /// # }
680    /// ```
681    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
682        check_matching_location(&self.location, &other.location);
683
684        if L::is_top_level()
685            && !B::BOUNDED // only if unbounded we need to use a tick
686            && let Some(tick) = self.location.try_tick()
687        {
688            let out = or_inside_tick(
689                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
690                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
691            )
692            .latest();
693
694            Optional::new(
695                out.location.clone(),
696                out.ir_node.replace(HydroNode::Placeholder),
697            )
698        } else {
699            Optional::new(
700                self.location.clone(),
701                HydroNode::ChainFirst {
702                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
703                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
704                    metadata: self.location.new_node_metadata(Self::collection_kind()),
705                },
706            )
707        }
708    }
709
710    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
711    ///
712    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
713    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
714    ///
715    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
716    /// of the inputs change (including to/from null states).
717    ///
718    /// # Example
719    /// ```rust
720    /// # #[cfg(feature = "deploy")] {
721    /// # use hydro_lang::prelude::*;
722    /// # use futures::StreamExt;
723    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
724    /// let tick = process.tick();
725    /// // ticks are lazy by default, forces the later ticks to run
726    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
727    ///
728    /// let some_first_tick = tick.optional_first_tick(q!(123));
729    /// some_first_tick
730    ///     .unwrap_or(tick.singleton(q!(456)))
731    ///     .all_ticks()
732    /// # }, |mut stream| async move {
733    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
734    /// # for w in vec![123, 456, 456, 456] {
735    /// #     assert_eq!(stream.next().await.unwrap(), w);
736    /// # }
737    /// # }));
738    /// # }
739    /// ```
740    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
741        let res_option = self.or(other.into());
742        Singleton::new(
743            res_option.location.clone(),
744            HydroNode::Cast {
745                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
746                metadata: res_option
747                    .location
748                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
749            },
750        )
751    }
752
753    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
754    ///
755    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
756    /// [`Optional`] when the default value of the type is a suitable fallback.
757    ///
758    /// # Example
759    /// ```rust
760    /// # #[cfg(feature = "deploy")] {
761    /// # use hydro_lang::prelude::*;
762    /// # use futures::StreamExt;
763    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
764    /// let tick = process.tick();
765    /// // ticks are lazy by default, forces the later ticks to run
766    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
767    ///
768    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
769    /// some_first_tick.unwrap_or_default().all_ticks()
770    /// # }, |mut stream| async move {
771    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
772    /// # for w in vec![123, 0, 0, 0] {
773    /// #     assert_eq!(stream.next().await.unwrap(), w);
774    /// # }
775    /// # }));
776    /// # }
777    /// ```
778    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
779    where
780        T: Default + Clone,
781    {
782        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
783    }
784
785    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
786    ///
787    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
788    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
789    /// so that Hydro can skip any computation on null values.
790    ///
791    /// # Example
792    /// ```rust
793    /// # #[cfg(feature = "deploy")] {
794    /// # use hydro_lang::prelude::*;
795    /// # use futures::StreamExt;
796    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
797    /// let tick = process.tick();
798    /// // ticks are lazy by default, forces the later ticks to run
799    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
800    ///
801    /// let some_first_tick = tick.optional_first_tick(q!(123));
802    /// some_first_tick.into_singleton().all_ticks()
803    /// # }, |mut stream| async move {
804    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
805    /// # for w in vec![Some(123), None, None, None] {
806    /// #     assert_eq!(stream.next().await.unwrap(), w);
807    /// # }
808    /// # }));
809    /// # }
810    /// ```
811    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
812    where
813        T: Clone,
814    {
815        let none: syn::Expr = parse_quote!(::std::option::Option::None);
816
817        let none_singleton = Singleton::new(
818            self.location.clone(),
819            HydroNode::SingletonSource {
820                value: none.into(),
821                first_tick_only: false,
822                metadata: self
823                    .location
824                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
825            },
826        );
827
828        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
829    }
830
831    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
832    ///
833    /// # Example
834    /// ```rust
835    /// # #[cfg(feature = "deploy")] {
836    /// # use hydro_lang::prelude::*;
837    /// # use futures::StreamExt;
838    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
839    /// let tick = process.tick();
840    /// // ticks are lazy by default, forces the second tick to run
841    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
842    ///
843    /// let some_first_tick = tick.optional_first_tick(q!(42));
844    /// some_first_tick.is_some().all_ticks()
845    /// # }, |mut stream| async move {
846    /// // [true /* first tick */, false /* second tick */, ...]
847    /// # for w in vec![true, false] {
848    /// #     assert_eq!(stream.next().await.unwrap(), w);
849    /// # }
850    /// # }));
851    /// # }
852    /// ```
853    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
854    pub fn is_some(self) -> Singleton<bool, L, B> {
855        self.map(q!(|_| ()))
856            .into_singleton()
857            .map(q!(|o| o.is_some()))
858    }
859
860    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
861    ///
862    /// # Example
863    /// ```rust
864    /// # #[cfg(feature = "deploy")] {
865    /// # use hydro_lang::prelude::*;
866    /// # use futures::StreamExt;
867    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
868    /// let tick = process.tick();
869    /// // ticks are lazy by default, forces the second tick to run
870    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
871    ///
872    /// let some_first_tick = tick.optional_first_tick(q!(42));
873    /// some_first_tick.is_none().all_ticks()
874    /// # }, |mut stream| async move {
875    /// // [false /* first tick */, true /* second tick */, ...]
876    /// # for w in vec![false, true] {
877    /// #     assert_eq!(stream.next().await.unwrap(), w);
878    /// # }
879    /// # }));
880    /// # }
881    /// ```
882    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
883    pub fn is_none(self) -> Singleton<bool, L, B> {
884        self.map(q!(|_| ()))
885            .into_singleton()
886            .map(q!(|o| o.is_none()))
887    }
888
889    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
890    /// values are equal, `false` otherwise (including when either is null).
891    ///
892    /// # Example
893    /// ```rust
894    /// # #[cfg(feature = "deploy")] {
895    /// # use hydro_lang::prelude::*;
896    /// # use futures::StreamExt;
897    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
898    /// let tick = process.tick();
899    /// // ticks are lazy by default, forces the second tick to run
900    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
901    ///
902    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
903    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
904    /// a.is_some_and_equals(b).all_ticks()
905    /// # }, |mut stream| async move {
906    /// // [true, false]
907    /// # for w in vec![true, false] {
908    /// #     assert_eq!(stream.next().await.unwrap(), w);
909    /// # }
910    /// # }));
911    /// # }
912    /// ```
913    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
914    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
915    where
916        T: PartialEq + Clone,
917        B: IsBounded,
918    {
919        self.into_singleton()
920            .zip(other.into_singleton())
921            .map(q!(|(a, b)| a.is_some() && a == b))
922    }
923
924    /// An operator which allows you to "name" a `HydroNode`.
925    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
926    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
927        {
928            let mut node = self.ir_node.borrow_mut();
929            let metadata = node.metadata_mut();
930            metadata.tag = Some(name.to_owned());
931        }
932        self
933    }
934
935    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
936    /// implies that `B == Bounded`.
937    pub fn make_bounded(self) -> Optional<T, L, Bounded>
938    where
939        B: IsBounded,
940    {
941        Optional::new(
942            self.location.clone(),
943            self.ir_node.replace(HydroNode::Placeholder),
944        )
945    }
946
947    /// Clones this bounded optional into a tick, returning a optional that has the
948    /// same value as the outer optional. Because the outer optional is bounded, this
949    /// is deterministic because there is only a single immutable version.
950    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
951    where
952        B: IsBounded,
953        T: Clone,
954    {
955        // TODO(shadaj): avoid printing simulator logs for this snapshot
956        self.snapshot(
957            tick,
958            nondet!(/** bounded top-level optional so deterministic */),
959        )
960    }
961
962    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
963    /// non-null. Otherwise, the stream is empty.
964    ///
965    /// # Example
966    /// ```rust
967    /// # #[cfg(feature = "deploy")] {
968    /// # use hydro_lang::prelude::*;
969    /// # use futures::StreamExt;
970    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
971    /// # let tick = process.tick();
972    /// # // ticks are lazy by default, forces the second tick to run
973    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
974    /// # let batch_first_tick = process
975    /// #   .source_iter(q!(vec![]))
976    /// #   .batch(&tick, nondet!(/** test */));
977    /// # let batch_second_tick = process
978    /// #   .source_iter(q!(vec![123, 456]))
979    /// #   .batch(&tick, nondet!(/** test */))
980    /// #   .defer_tick(); // appears on the second tick
981    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
982    /// input_batch // first tick: [], second tick: [123, 456]
983    ///     .clone()
984    ///     .max()
985    ///     .into_stream()
986    ///     .chain(input_batch)
987    ///     .all_ticks()
988    /// # }, |mut stream| async move {
989    /// // [456, 123, 456]
990    /// # for w in vec![456, 123, 456] {
991    /// #     assert_eq!(stream.next().await.unwrap(), w);
992    /// # }
993    /// # }));
994    /// # }
995    /// ```
996    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
997    where
998        B: IsBounded,
999    {
1000        Stream::new(
1001            self.location.clone(),
1002            HydroNode::Cast {
1003                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1004                metadata: self.location.new_node_metadata(Stream::<
1005                    T,
1006                    Tick<L>,
1007                    Bounded,
1008                    TotalOrder,
1009                    ExactlyOnce,
1010                >::collection_kind()),
1011            },
1012        )
1013    }
1014
1015    /// Filters this optional, passing through the value if the boolean signal is `true`,
1016    /// otherwise the output is null.
1017    ///
1018    /// # Example
1019    /// ```rust
1020    /// # #[cfg(feature = "deploy")] {
1021    /// # use hydro_lang::prelude::*;
1022    /// # use futures::StreamExt;
1023    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1024    /// let tick = process.tick();
1025    /// // ticks are lazy by default, forces the second tick to run
1026    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1027    ///
1028    /// let some_first_tick = tick.optional_first_tick(q!(()));
1029    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1030    /// let batch_first_tick = process
1031    ///   .source_iter(q!(vec![456]))
1032    ///   .batch(&tick, nondet!(/** test */));
1033    /// let batch_second_tick = process
1034    ///   .source_iter(q!(vec![789]))
1035    ///   .batch(&tick, nondet!(/** test */))
1036    ///   .defer_tick();
1037    /// batch_first_tick.chain(batch_second_tick).first()
1038    ///   .filter_if(signal)
1039    ///   .unwrap_or(tick.singleton(q!(0)))
1040    ///   .all_ticks()
1041    /// # }, |mut stream| async move {
1042    /// // [456, 0]
1043    /// # for w in vec![456, 0] {
1044    /// #     assert_eq!(stream.next().await.unwrap(), w);
1045    /// # }
1046    /// # }));
1047    /// # }
1048    /// ```
1049    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1050    where
1051        B: IsBounded,
1052    {
1053        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1054    }
1055
1056    /// Filters this optional, passing through the optional value if it is non-null **and** the
1057    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1058    ///
1059    /// Useful for conditionally processing, such as only emitting an optional's value outside
1060    /// a tick if some other condition is satisfied.
1061    ///
1062    /// # Example
1063    /// ```rust
1064    /// # #[cfg(feature = "deploy")] {
1065    /// # use hydro_lang::prelude::*;
1066    /// # use futures::StreamExt;
1067    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1068    /// let tick = process.tick();
1069    /// // ticks are lazy by default, forces the second tick to run
1070    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1071    ///
1072    /// let batch_first_tick = process
1073    ///   .source_iter(q!(vec![]))
1074    ///   .batch(&tick, nondet!(/** test */));
1075    /// let batch_second_tick = process
1076    ///   .source_iter(q!(vec![456]))
1077    ///   .batch(&tick, nondet!(/** test */))
1078    ///   .defer_tick(); // appears on the second tick
1079    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1080    /// batch_first_tick.chain(batch_second_tick).first()
1081    ///   .filter_if_some(some_on_first_tick)
1082    ///   .unwrap_or(tick.singleton(q!(789)))
1083    ///   .all_ticks()
1084    /// # }, |mut stream| async move {
1085    /// // [789, 789]
1086    /// # for w in vec![789, 789] {
1087    /// #     assert_eq!(stream.next().await.unwrap(), w);
1088    /// # }
1089    /// # }));
1090    /// # }
1091    /// ```
1092    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1093    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1094    where
1095        B: IsBounded,
1096    {
1097        self.filter_if(signal.is_some())
1098    }
1099
1100    /// Filters this optional, passing through the optional value if it is non-null **and** the
1101    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1102    ///
1103    /// Useful for conditionally processing, such as only emitting an optional's value outside
1104    /// a tick if some other condition is satisfied.
1105    ///
1106    /// # Example
1107    /// ```rust
1108    /// # #[cfg(feature = "deploy")] {
1109    /// # use hydro_lang::prelude::*;
1110    /// # use futures::StreamExt;
1111    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1112    /// let tick = process.tick();
1113    /// // ticks are lazy by default, forces the second tick to run
1114    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1115    ///
1116    /// let batch_first_tick = process
1117    ///   .source_iter(q!(vec![]))
1118    ///   .batch(&tick, nondet!(/** test */));
1119    /// let batch_second_tick = process
1120    ///   .source_iter(q!(vec![456]))
1121    ///   .batch(&tick, nondet!(/** test */))
1122    ///   .defer_tick(); // appears on the second tick
1123    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1124    /// batch_first_tick.chain(batch_second_tick).first()
1125    ///   .filter_if_none(some_on_first_tick)
1126    ///   .unwrap_or(tick.singleton(q!(789)))
1127    ///   .all_ticks()
1128    /// # }, |mut stream| async move {
1129    /// // [789, 789]
1130    /// # for w in vec![789, 456] {
1131    /// #     assert_eq!(stream.next().await.unwrap(), w);
1132    /// # }
1133    /// # }));
1134    /// # }
1135    /// ```
1136    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1137    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1138    where
1139        B: IsBounded,
1140    {
1141        self.filter_if(other.is_none())
1142    }
1143
1144    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1145    ///
1146    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1147    /// having a value, such as only releasing a piece of state if the node is the leader.
1148    ///
1149    /// # Example
1150    /// ```rust
1151    /// # #[cfg(feature = "deploy")] {
1152    /// # use hydro_lang::prelude::*;
1153    /// # use futures::StreamExt;
1154    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1155    /// let tick = process.tick();
1156    /// // ticks are lazy by default, forces the second tick to run
1157    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1158    ///
1159    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1160    /// some_on_first_tick
1161    ///     .if_some_then(tick.singleton(q!(456)))
1162    ///     .unwrap_or(tick.singleton(q!(123)))
1163    /// # .all_ticks()
1164    /// # }, |mut stream| async move {
1165    /// // 456 (first tick) ~> 123 (second tick onwards)
1166    /// # for w in vec![456, 123, 123] {
1167    /// #     assert_eq!(stream.next().await.unwrap(), w);
1168    /// # }
1169    /// # }));
1170    /// # }
1171    /// ```
1172    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1173    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1174    where
1175        B: IsBounded,
1176    {
1177        value.filter_if(self.is_some())
1178    }
1179}
1180
1181impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1182where
1183    L: Location<'a> + NoTick,
1184{
1185    /// Returns an optional value corresponding to the latest snapshot of the optional
1186    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1187    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1188    /// all snapshots of this optional into the atomic-associated tick will observe the
1189    /// same value each tick.
1190    ///
1191    /// # Non-Determinism
1192    /// Because this picks a snapshot of a optional whose value is continuously changing,
1193    /// the output optional has a non-deterministic value since the snapshot can be at an
1194    /// arbitrary point in time.
1195    pub fn snapshot_atomic(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1196        Optional::new(
1197            tick.clone(),
1198            HydroNode::Batch {
1199                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1200                metadata: tick
1201                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1202            },
1203        )
1204    }
1205
1206    /// Returns this optional back into a top-level, asynchronous execution context where updates
1207    /// to the value will be asynchronously propagated.
1208    pub fn end_atomic(self) -> Optional<T, L, B> {
1209        Optional::new(
1210            self.location.tick.l.clone(),
1211            HydroNode::EndAtomic {
1212                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1213                metadata: self
1214                    .location
1215                    .tick
1216                    .l
1217                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1218            },
1219        )
1220    }
1221}
1222
1223impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1224where
1225    L: Location<'a>,
1226{
1227    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1228    /// will observe the same version of the value and will be executed synchronously before any
1229    /// outputs are yielded (in [`Optional::end_atomic`]).
1230    ///
1231    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1232    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1233    /// a different version).
1234    pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1235        let id = self.location.flow_state().borrow_mut().next_clock_id();
1236        let out_location = Atomic {
1237            tick: Tick {
1238                id,
1239                l: self.location.clone(),
1240            },
1241        };
1242        Optional::new(
1243            out_location.clone(),
1244            HydroNode::BeginAtomic {
1245                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1246                metadata: out_location
1247                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1248            },
1249        )
1250    }
1251
1252    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1253    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1254    /// relevant data that contributed to the snapshot at tick `t`.
1255    ///
1256    /// # Non-Determinism
1257    /// Because this picks a snapshot of a optional whose value is continuously changing,
1258    /// the output optional has a non-deterministic value since the snapshot can be at an
1259    /// arbitrary point in time.
1260    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1261        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1262        Optional::new(
1263            tick.clone(),
1264            HydroNode::Batch {
1265                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1266                metadata: tick
1267                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1268            },
1269        )
1270    }
1271
1272    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1273    /// with order corresponding to increasing prefixes of data contributing to the optional.
1274    ///
1275    /// # Non-Determinism
1276    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1277    /// to non-deterministic batching and arrival of inputs, the output stream is
1278    /// non-deterministic.
1279    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1280    where
1281        L: NoTick,
1282    {
1283        let tick = self.location.tick();
1284        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1285    }
1286
1287    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1288    /// value taken at various points in time. Because the input optional may be
1289    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1290    /// represent the value of the optional given some prefix of the streams leading up to
1291    /// it.
1292    ///
1293    /// # Non-Determinism
1294    /// The output stream is non-deterministic in which elements are sampled, since this
1295    /// is controlled by a clock.
1296    pub fn sample_every(
1297        self,
1298        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1299        nondet: NonDet,
1300    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1301    where
1302        L: NoTick + NoAtomic,
1303    {
1304        let samples = self.location.source_interval(interval, nondet);
1305        let tick = self.location.tick();
1306
1307        self.snapshot(&tick, nondet)
1308            .filter_if(samples.batch(&tick, nondet).first().is_some())
1309            .all_ticks()
1310            .weaken_retries()
1311    }
1312}
1313
1314impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1315where
1316    L: Location<'a>,
1317{
1318    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1319    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1320    /// null values).
1321    ///
1322    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1323    /// producing one element in the output for each (non-null) tick. This is useful for batched
1324    /// computations, where the results from each tick must be combined together.
1325    ///
1326    /// # Example
1327    /// ```rust
1328    /// # #[cfg(feature = "deploy")] {
1329    /// # use hydro_lang::prelude::*;
1330    /// # use futures::StreamExt;
1331    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1332    /// # let tick = process.tick();
1333    /// # // ticks are lazy by default, forces the second tick to run
1334    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1335    /// # let batch_first_tick = process
1336    /// #   .source_iter(q!(vec![]))
1337    /// #   .batch(&tick, nondet!(/** test */));
1338    /// # let batch_second_tick = process
1339    /// #   .source_iter(q!(vec![1, 2, 3]))
1340    /// #   .batch(&tick, nondet!(/** test */))
1341    /// #   .defer_tick(); // appears on the second tick
1342    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1343    /// input_batch // first tick: [], second tick: [1, 2, 3]
1344    ///     .max()
1345    ///     .all_ticks()
1346    /// # }, |mut stream| async move {
1347    /// // [3]
1348    /// # for w in vec![3] {
1349    /// #     assert_eq!(stream.next().await.unwrap(), w);
1350    /// # }
1351    /// # }));
1352    /// # }
1353    /// ```
1354    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1355        self.into_stream().all_ticks()
1356    }
1357
1358    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1359    /// which will stream the value computed in _each_ tick as a separate stream element.
1360    ///
1361    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1362    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1363    /// optional's [`Tick`] context.
1364    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1365        self.into_stream().all_ticks_atomic()
1366    }
1367
1368    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1369    /// be asynchronously updated with the latest value of the optional inside the tick, including
1370    /// whether the optional is null or not.
1371    ///
1372    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1373    /// tick that tracks the inner value. This is useful for getting the value as of the
1374    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1375    ///
1376    /// # Example
1377    /// ```rust
1378    /// # #[cfg(feature = "deploy")] {
1379    /// # use hydro_lang::prelude::*;
1380    /// # use futures::StreamExt;
1381    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1382    /// # let tick = process.tick();
1383    /// # // ticks are lazy by default, forces the second tick to run
1384    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1385    /// # let batch_first_tick = process
1386    /// #   .source_iter(q!(vec![]))
1387    /// #   .batch(&tick, nondet!(/** test */));
1388    /// # let batch_second_tick = process
1389    /// #   .source_iter(q!(vec![1, 2, 3]))
1390    /// #   .batch(&tick, nondet!(/** test */))
1391    /// #   .defer_tick(); // appears on the second tick
1392    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1393    /// input_batch // first tick: [], second tick: [1, 2, 3]
1394    ///     .max()
1395    ///     .latest()
1396    /// # .into_singleton()
1397    /// # .sample_eager(nondet!(/** test */))
1398    /// # }, |mut stream| async move {
1399    /// // asynchronously changes from None ~> 3
1400    /// # for w in vec![None, Some(3)] {
1401    /// #     assert_eq!(stream.next().await.unwrap(), w);
1402    /// # }
1403    /// # }));
1404    /// # }
1405    /// ```
1406    pub fn latest(self) -> Optional<T, L, Unbounded> {
1407        Optional::new(
1408            self.location.outer().clone(),
1409            HydroNode::YieldConcat {
1410                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1411                metadata: self
1412                    .location
1413                    .outer()
1414                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1415            },
1416        )
1417    }
1418
1419    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1420    /// be updated with the latest value of the optional inside the tick.
1421    ///
1422    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1423    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1424    /// optional's [`Tick`] context.
1425    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1426        let out_location = Atomic {
1427            tick: self.location.clone(),
1428        };
1429
1430        Optional::new(
1431            out_location.clone(),
1432            HydroNode::YieldConcat {
1433                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1434                metadata: out_location
1435                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1436            },
1437        )
1438    }
1439
1440    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1441    /// always has the state of `self` at tick `T - 1`.
1442    ///
1443    /// At tick `0`, the output optional is null, since there is no previous tick.
1444    ///
1445    /// This operator enables stateful iterative processing with ticks, by sending data from one
1446    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1447    ///
1448    /// # Example
1449    /// ```rust
1450    /// # #[cfg(feature = "deploy")] {
1451    /// # use hydro_lang::prelude::*;
1452    /// # use futures::StreamExt;
1453    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1454    /// let tick = process.tick();
1455    /// // ticks are lazy by default, forces the second tick to run
1456    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1457    ///
1458    /// let batch_first_tick = process
1459    ///   .source_iter(q!(vec![1, 2]))
1460    ///   .batch(&tick, nondet!(/** test */));
1461    /// let batch_second_tick = process
1462    ///   .source_iter(q!(vec![3, 4]))
1463    ///   .batch(&tick, nondet!(/** test */))
1464    ///   .defer_tick(); // appears on the second tick
1465    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1466    ///   .reduce(q!(|state, v| *state += v));
1467    ///
1468    /// current_tick_sum.clone().into_singleton().zip(
1469    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1470    /// ).all_ticks()
1471    /// # }, |mut stream| async move {
1472    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1473    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1474    /// #     assert_eq!(stream.next().await.unwrap(), w);
1475    /// # }
1476    /// # }));
1477    /// # }
1478    /// ```
1479    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1480        Optional::new(
1481            self.location.clone(),
1482            HydroNode::DeferTick {
1483                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1484                metadata: self.location.new_node_metadata(Self::collection_kind()),
1485            },
1486        )
1487    }
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492    #[cfg(feature = "deploy")]
1493    use futures::StreamExt;
1494    #[cfg(feature = "deploy")]
1495    use hydro_deploy::Deployment;
1496    #[cfg(any(feature = "deploy", feature = "sim"))]
1497    use stageleft::q;
1498
1499    #[cfg(feature = "deploy")]
1500    use super::Optional;
1501    #[cfg(any(feature = "deploy", feature = "sim"))]
1502    use crate::compile::builder::FlowBuilder;
1503    #[cfg(any(feature = "deploy", feature = "sim"))]
1504    use crate::location::Location;
1505    #[cfg(feature = "deploy")]
1506    use crate::nondet::nondet;
1507
1508    #[cfg(feature = "deploy")]
1509    #[tokio::test]
1510    async fn optional_or_cardinality() {
1511        let mut deployment = Deployment::new();
1512
1513        let mut flow = FlowBuilder::new();
1514        let node = flow.process::<()>();
1515        let external = flow.external::<()>();
1516
1517        let node_tick = node.tick();
1518        let tick_singleton = node_tick.singleton(q!(123));
1519        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1520        let counts = tick_optional_inhabited
1521            .clone()
1522            .or(tick_optional_inhabited)
1523            .into_stream()
1524            .count()
1525            .all_ticks()
1526            .send_bincode_external(&external);
1527
1528        let nodes = flow
1529            .with_process(&node, deployment.Localhost())
1530            .with_external(&external, deployment.Localhost())
1531            .deploy(&mut deployment);
1532
1533        deployment.deploy().await.unwrap();
1534
1535        let mut external_out = nodes.connect(counts).await;
1536
1537        deployment.start().await.unwrap();
1538
1539        assert_eq!(external_out.next().await.unwrap(), 1);
1540    }
1541
1542    #[cfg(feature = "deploy")]
1543    #[tokio::test]
1544    async fn into_singleton_top_level_none_cardinality() {
1545        let mut deployment = Deployment::new();
1546
1547        let mut flow = FlowBuilder::new();
1548        let node = flow.process::<()>();
1549        let external = flow.external::<()>();
1550
1551        let node_tick = node.tick();
1552        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1553        let into_singleton = top_level_none.into_singleton();
1554
1555        let tick_driver = node.spin();
1556
1557        let counts = into_singleton
1558            .snapshot(&node_tick, nondet!(/** test */))
1559            .into_stream()
1560            .count()
1561            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1562            .map(q!(|(c, _)| c))
1563            .all_ticks()
1564            .send_bincode_external(&external);
1565
1566        let nodes = flow
1567            .with_process(&node, deployment.Localhost())
1568            .with_external(&external, deployment.Localhost())
1569            .deploy(&mut deployment);
1570
1571        deployment.deploy().await.unwrap();
1572
1573        let mut external_out = nodes.connect(counts).await;
1574
1575        deployment.start().await.unwrap();
1576
1577        assert_eq!(external_out.next().await.unwrap(), 1);
1578        assert_eq!(external_out.next().await.unwrap(), 1);
1579        assert_eq!(external_out.next().await.unwrap(), 1);
1580    }
1581
1582    #[cfg(feature = "deploy")]
1583    #[tokio::test]
1584    async fn into_singleton_unbounded_top_level_none_cardinality() {
1585        let mut deployment = Deployment::new();
1586
1587        let mut flow = FlowBuilder::new();
1588        let node = flow.process::<()>();
1589        let external = flow.external::<()>();
1590
1591        let node_tick = node.tick();
1592        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1593        let into_singleton = top_level_none.into_singleton();
1594
1595        let tick_driver = node.spin();
1596
1597        let counts = into_singleton
1598            .snapshot(&node_tick, nondet!(/** test */))
1599            .into_stream()
1600            .count()
1601            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1602            .map(q!(|(c, _)| c))
1603            .all_ticks()
1604            .send_bincode_external(&external);
1605
1606        let nodes = flow
1607            .with_process(&node, deployment.Localhost())
1608            .with_external(&external, deployment.Localhost())
1609            .deploy(&mut deployment);
1610
1611        deployment.deploy().await.unwrap();
1612
1613        let mut external_out = nodes.connect(counts).await;
1614
1615        deployment.start().await.unwrap();
1616
1617        assert_eq!(external_out.next().await.unwrap(), 1);
1618        assert_eq!(external_out.next().await.unwrap(), 1);
1619        assert_eq!(external_out.next().await.unwrap(), 1);
1620    }
1621
1622    #[cfg(feature = "sim")]
1623    #[test]
1624    fn top_level_optional_some_into_stream_no_replay() {
1625        let mut flow = FlowBuilder::new();
1626        let node = flow.process::<()>();
1627
1628        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1629        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1630        let filtered_some = folded.filter(q!(|_| true));
1631
1632        let out_recv = filtered_some.into_stream().sim_output();
1633
1634        flow.sim().exhaustive(async || {
1635            out_recv.assert_yields_only([10]).await;
1636        });
1637    }
1638
1639    #[cfg(feature = "sim")]
1640    #[test]
1641    fn top_level_optional_none_into_stream_no_replay() {
1642        let mut flow = FlowBuilder::new();
1643        let node = flow.process::<()>();
1644
1645        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1646        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1647        let filtered_none = folded.filter(q!(|_| false));
1648
1649        let out_recv = filtered_none.into_stream().sim_output();
1650
1651        flow.sim().exhaustive(async || {
1652            out_recv.assert_yields_only([] as [i32; 0]).await;
1653        });
1654    }
1655}