Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A single Rust value that can asynchronously change over time.
26///
27/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
28/// [`Unbounded`], the value will asynchronously change over time.
29///
30/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
31/// a single number that will asynchronously change as events are processed. Singletons also appear
32/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
33/// such as getting the length of a batch of requests.
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this singleton
37/// - `Loc`: the [`Location`] where the singleton is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Singleton<Type, Loc, Bound: Boundedness> {
40    pub(crate) location: Loc,
41    pub(crate) ir_node: RefCell<HydroNode>,
42    pub(crate) flow_state: FlowState,
43
44    _phantom: PhantomData<(Type, Loc, Bound)>,
45}
46
47impl<T, L, B: Boundedness> Drop for Singleton<T, L, B> {
48    fn drop(&mut self) {
49        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
50        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
51            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
52                input: Box::new(ir_node),
53                op_metadata: HydroIrOpMetadata::new(),
54            });
55        }
56    }
57}
58
59impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
60where
61    T: Clone,
62    L: Location<'a> + NoTick,
63{
64    fn from(value: Singleton<T, L, Bounded>) -> Self {
65        let tick = value.location().tick();
66        value.clone_into_tick(&tick).latest()
67    }
68}
69
70impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    type Location = Tick<L>;
75
76    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
77        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
78            location.clone(),
79            HydroNode::DeferTick {
80                input: Box::new(HydroNode::CycleSource {
81                    cycle_id,
82                    metadata: location.new_node_metadata(Self::collection_kind()),
83                }),
84                metadata: location
85                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
86            },
87        );
88
89        from_previous_tick.unwrap_or(initial)
90    }
91}
92
93impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
94where
95    L: Location<'a>,
96{
97    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
98        assert_eq!(
99            Location::id(&self.location),
100            expected_location,
101            "locations do not match"
102        );
103        self.location
104            .flow_state()
105            .borrow_mut()
106            .push_root(HydroRoot::CycleSink {
107                cycle_id,
108                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
109                op_metadata: HydroIrOpMetadata::new(),
110            });
111    }
112}
113
114impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116    L: Location<'a>,
117{
118    type Location = Tick<L>;
119
120    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
121        Singleton::new(
122            location.clone(),
123            HydroNode::CycleSource {
124                cycle_id,
125                metadata: location.new_node_metadata(Self::collection_kind()),
126            },
127        )
128    }
129}
130
131impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
132where
133    L: Location<'a>,
134{
135    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
136        assert_eq!(
137            Location::id(&self.location),
138            expected_location,
139            "locations do not match"
140        );
141        self.location
142            .flow_state()
143            .borrow_mut()
144            .push_root(HydroRoot::CycleSink {
145                cycle_id,
146                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
147                op_metadata: HydroIrOpMetadata::new(),
148            });
149    }
150}
151
152impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
153where
154    L: Location<'a> + NoTick,
155{
156    type Location = L;
157
158    fn create_source(cycle_id: CycleId, location: L) -> Self {
159        Singleton::new(
160            location.clone(),
161            HydroNode::CycleSource {
162                cycle_id,
163                metadata: location.new_node_metadata(Self::collection_kind()),
164            },
165        )
166    }
167}
168
169impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
170where
171    L: Location<'a> + NoTick,
172{
173    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
174        assert_eq!(
175            Location::id(&self.location),
176            expected_location,
177            "locations do not match"
178        );
179        self.location
180            .flow_state()
181            .borrow_mut()
182            .push_root(HydroRoot::CycleSink {
183                cycle_id,
184                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
185                op_metadata: HydroIrOpMetadata::new(),
186            });
187    }
188}
189
190impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
191where
192    T: Clone,
193    L: Location<'a>,
194{
195    fn clone(&self) -> Self {
196        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
197            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
198            *self.ir_node.borrow_mut() = HydroNode::Tee {
199                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
200                metadata: self.location.new_node_metadata(Self::collection_kind()),
201            };
202        }
203
204        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
205            Singleton {
206                location: self.location.clone(),
207                flow_state: self.flow_state.clone(),
208                ir_node: HydroNode::Tee {
209                    inner: SharedNode(inner.0.clone()),
210                    metadata: metadata.clone(),
211                }
212                .into(),
213                _phantom: PhantomData,
214            }
215        } else {
216            unreachable!()
217        }
218    }
219}
220
221#[cfg(stageleft_runtime)]
222fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
223    me: Singleton<T, Tick<L>, B>,
224    other: Optional<O, Tick<L>, B>,
225) -> Optional<(T, O), Tick<L>, B> {
226    let me_as_optional: Optional<T, Tick<L>, B> = me.into();
227    super::optional::zip_inside_tick(me_as_optional, other)
228}
229
230impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
231where
232    L: Location<'a>,
233{
234    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
235        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
236        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
237        let flow_state = location.flow_state().clone();
238        Singleton {
239            location,
240            flow_state,
241            ir_node: RefCell::new(ir_node),
242            _phantom: PhantomData,
243        }
244    }
245
246    pub(crate) fn collection_kind() -> CollectionKind {
247        CollectionKind::Singleton {
248            bound: B::BOUND_KIND,
249            element_type: stageleft::quote_type::<T>().into(),
250        }
251    }
252
253    /// Returns the [`Location`] where this singleton is being materialized.
254    pub fn location(&self) -> &L {
255        &self.location
256    }
257
258    /// Transforms the singleton value by applying a function `f` to it,
259    /// continuously as the input is updated.
260    ///
261    /// # Example
262    /// ```rust
263    /// # #[cfg(feature = "deploy")] {
264    /// # use hydro_lang::prelude::*;
265    /// # use futures::StreamExt;
266    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267    /// let tick = process.tick();
268    /// let singleton = tick.singleton(q!(5));
269    /// singleton.map(q!(|v| v * 2)).all_ticks()
270    /// # }, |mut stream| async move {
271    /// // 10
272    /// # assert_eq!(stream.next().await.unwrap(), 10);
273    /// # }));
274    /// # }
275    /// ```
276    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
277    where
278        F: Fn(T) -> U + 'a,
279    {
280        let f = f.splice_fn1_ctx(&self.location).into();
281        Singleton::new(
282            self.location.clone(),
283            HydroNode::Map {
284                f,
285                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
286                metadata: self
287                    .location
288                    .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
289            },
290        )
291    }
292
293    /// Transforms the singleton value by applying a function `f` to it and then flattening
294    /// the result into a stream, preserving the order of elements.
295    ///
296    /// The function `f` is applied to the singleton value to produce an iterator, and all items
297    /// from that iterator are emitted in the output stream in deterministic order.
298    ///
299    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
300    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
301    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
302    ///
303    /// # Example
304    /// ```rust
305    /// # #[cfg(feature = "deploy")] {
306    /// # use hydro_lang::prelude::*;
307    /// # use futures::StreamExt;
308    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309    /// let tick = process.tick();
310    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
311    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
312    /// # }, |mut stream| async move {
313    /// // 1, 2, 3
314    /// # for w in vec![1, 2, 3] {
315    /// #     assert_eq!(stream.next().await.unwrap(), w);
316    /// # }
317    /// # }));
318    /// # }
319    /// ```
320    pub fn flat_map_ordered<U, I, F>(
321        self,
322        f: impl IntoQuotedMut<'a, F, L>,
323    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
324    where
325        B: IsBounded,
326        I: IntoIterator<Item = U>,
327        F: Fn(T) -> I + 'a,
328    {
329        self.into_stream().flat_map_ordered(f)
330    }
331
332    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
333    /// for the output type `I` to produce items in any order.
334    ///
335    /// The function `f` is applied to the singleton value to produce an iterator, and all items
336    /// from that iterator are emitted in the output stream in non-deterministic order.
337    ///
338    /// # Example
339    /// ```rust
340    /// # #[cfg(feature = "deploy")] {
341    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
342    /// # use futures::StreamExt;
343    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
344    /// let tick = process.tick();
345    /// let singleton = tick.singleton(q!(
346    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
347    /// ));
348    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
349    /// # }, |mut stream| async move {
350    /// // 1, 2, 3, but in no particular order
351    /// # let mut results = Vec::new();
352    /// # for _ in 0..3 {
353    /// #     results.push(stream.next().await.unwrap());
354    /// # }
355    /// # results.sort();
356    /// # assert_eq!(results, vec![1, 2, 3]);
357    /// # }));
358    /// # }
359    /// ```
360    pub fn flat_map_unordered<U, I, F>(
361        self,
362        f: impl IntoQuotedMut<'a, F, L>,
363    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
364    where
365        B: IsBounded,
366        I: IntoIterator<Item = U>,
367        F: Fn(T) -> I + 'a,
368    {
369        self.into_stream().flat_map_unordered(f)
370    }
371
372    /// Flattens the singleton value into a stream, preserving the order of elements.
373    ///
374    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
375    /// are emitted in the output stream in deterministic order.
376    ///
377    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
378    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
379    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
380    ///
381    /// # Example
382    /// ```rust
383    /// # #[cfg(feature = "deploy")] {
384    /// # use hydro_lang::prelude::*;
385    /// # use futures::StreamExt;
386    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
387    /// let tick = process.tick();
388    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
389    /// singleton.flatten_ordered().all_ticks()
390    /// # }, |mut stream| async move {
391    /// // 1, 2, 3
392    /// # for w in vec![1, 2, 3] {
393    /// #     assert_eq!(stream.next().await.unwrap(), w);
394    /// # }
395    /// # }));
396    /// # }
397    /// ```
398    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
399    where
400        B: IsBounded,
401        T: IntoIterator<Item = U>,
402    {
403        self.flat_map_ordered(q!(|x| x))
404    }
405
406    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
407    /// for the element type `T` to produce items in any order.
408    ///
409    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
410    /// are emitted in the output stream in non-deterministic order.
411    ///
412    /// # Example
413    /// ```rust
414    /// # #[cfg(feature = "deploy")] {
415    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
416    /// # use futures::StreamExt;
417    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
418    /// let tick = process.tick();
419    /// let singleton = tick.singleton(q!(
420    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
421    /// ));
422    /// singleton.flatten_unordered().all_ticks()
423    /// # }, |mut stream| async move {
424    /// // 1, 2, 3, but in no particular order
425    /// # let mut results = Vec::new();
426    /// # for _ in 0..3 {
427    /// #     results.push(stream.next().await.unwrap());
428    /// # }
429    /// # results.sort();
430    /// # assert_eq!(results, vec![1, 2, 3]);
431    /// # }));
432    /// # }
433    /// ```
434    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
435    where
436        B: IsBounded,
437        T: IntoIterator<Item = U>,
438    {
439        self.flat_map_unordered(q!(|x| x))
440    }
441
442    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
443    ///
444    /// If the predicate returns `true`, the output optional contains the same value.
445    /// If the predicate returns `false`, the output optional is empty.
446    ///
447    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
448    /// not modify or take ownership of the value. If you need to modify the value while filtering
449    /// use [`Singleton::filter_map`] instead.
450    ///
451    /// # Example
452    /// ```rust
453    /// # #[cfg(feature = "deploy")] {
454    /// # use hydro_lang::prelude::*;
455    /// # use futures::StreamExt;
456    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
457    /// let tick = process.tick();
458    /// let singleton = tick.singleton(q!(5));
459    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
460    /// # }, |mut stream| async move {
461    /// // 5
462    /// # assert_eq!(stream.next().await.unwrap(), 5);
463    /// # }));
464    /// # }
465    /// ```
466    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
467    where
468        F: Fn(&T) -> bool + 'a,
469    {
470        let f = f.splice_fn1_borrow_ctx(&self.location).into();
471        Optional::new(
472            self.location.clone(),
473            HydroNode::Filter {
474                f,
475                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
476                metadata: self
477                    .location
478                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
479            },
480        )
481    }
482
483    /// An operator that both filters and maps. It yields the value only if the supplied
484    /// closure `f` returns `Some(value)`.
485    ///
486    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
487    /// If the closure returns `None`, the output optional is empty.
488    ///
489    /// # Example
490    /// ```rust
491    /// # #[cfg(feature = "deploy")] {
492    /// # use hydro_lang::prelude::*;
493    /// # use futures::StreamExt;
494    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
495    /// let tick = process.tick();
496    /// let singleton = tick.singleton(q!("42"));
497    /// singleton
498    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
499    ///     .all_ticks()
500    /// # }, |mut stream| async move {
501    /// // 42
502    /// # assert_eq!(stream.next().await.unwrap(), 42);
503    /// # }));
504    /// # }
505    /// ```
506    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
507    where
508        F: Fn(T) -> Option<U> + 'a,
509    {
510        let f = f.splice_fn1_ctx(&self.location).into();
511        Optional::new(
512            self.location.clone(),
513            HydroNode::FilterMap {
514                f,
515                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
516                metadata: self
517                    .location
518                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
519            },
520        )
521    }
522
523    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
524    ///
525    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
526    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
527    /// non-null. This is useful for combining several pieces of state together.
528    ///
529    /// # Example
530    /// ```rust
531    /// # #[cfg(feature = "deploy")] {
532    /// # use hydro_lang::prelude::*;
533    /// # use futures::StreamExt;
534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535    /// let tick = process.tick();
536    /// let numbers = process
537    ///   .source_iter(q!(vec![123, 456]))
538    ///   .batch(&tick, nondet!(/** test */));
539    /// let count = numbers.clone().count(); // Singleton
540    /// let max = numbers.max(); // Optional
541    /// count.zip(max).all_ticks()
542    /// # }, |mut stream| async move {
543    /// // [(2, 456)]
544    /// # for w in vec![(2, 456)] {
545    /// #     assert_eq!(stream.next().await.unwrap(), w);
546    /// # }
547    /// # }));
548    /// # }
549    /// ```
550    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
551    where
552        Self: ZipResult<'a, O, Location = L>,
553        B: IsBounded,
554    {
555        check_matching_location(&self.location, &Self::other_location(&other));
556
557        if L::is_top_level()
558            && let Some(tick) = self.location.try_tick()
559        {
560            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
561            let out = zip_inside_tick(
562                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
563                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
564                    other_location.clone(),
565                    HydroNode::Cast {
566                        inner: Box::new(Self::other_ir_node(other)),
567                        metadata: other_location.new_node_metadata(Optional::<
568                            <Self as ZipResult<'a, O>>::OtherType,
569                            Tick<L>,
570                            Bounded,
571                        >::collection_kind(
572                        )),
573                    },
574                )
575                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
576            )
577            .latest();
578
579            Self::make(
580                out.location.clone(),
581                out.ir_node.replace(HydroNode::Placeholder),
582            )
583        } else {
584            Self::make(
585                self.location.clone(),
586                HydroNode::CrossSingleton {
587                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
588                    right: Box::new(Self::other_ir_node(other)),
589                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
590                        bound: B::BOUND_KIND,
591                        element_type: stageleft::quote_type::<
592                            <Self as ZipResult<'a, O>>::ElementType,
593                        >()
594                        .into(),
595                    }),
596                },
597            )
598        }
599    }
600
601    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
602    /// boolean signal is `true`, otherwise the output is null.
603    ///
604    /// # Example
605    /// ```rust
606    /// # #[cfg(feature = "deploy")] {
607    /// # use hydro_lang::prelude::*;
608    /// # use futures::StreamExt;
609    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
610    /// let tick = process.tick();
611    /// // ticks are lazy by default, forces the second tick to run
612    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
613    ///
614    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
615    /// let batch_first_tick = process
616    ///   .source_iter(q!(vec![1]))
617    ///   .batch(&tick, nondet!(/** test */));
618    /// let batch_second_tick = process
619    ///   .source_iter(q!(vec![1, 2, 3]))
620    ///   .batch(&tick, nondet!(/** test */))
621    ///   .defer_tick();
622    /// batch_first_tick.chain(batch_second_tick).count()
623    ///   .filter_if(signal)
624    ///   .all_ticks()
625    /// # }, |mut stream| async move {
626    /// // [1]
627    /// # for w in vec![1] {
628    /// #     assert_eq!(stream.next().await.unwrap(), w);
629    /// # }
630    /// # }));
631    /// # }
632    /// ```
633    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
634    where
635        B: IsBounded,
636    {
637        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
638    }
639
640    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
641    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
642    ///
643    /// Useful for conditionally processing, such as only emitting a singleton's value outside
644    /// a tick if some other condition is satisfied.
645    ///
646    /// # Example
647    /// ```rust
648    /// # #[cfg(feature = "deploy")] {
649    /// # use hydro_lang::prelude::*;
650    /// # use futures::StreamExt;
651    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
652    /// let tick = process.tick();
653    /// // ticks are lazy by default, forces the second tick to run
654    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
655    ///
656    /// let batch_first_tick = process
657    ///   .source_iter(q!(vec![1]))
658    ///   .batch(&tick, nondet!(/** test */));
659    /// let batch_second_tick = process
660    ///   .source_iter(q!(vec![1, 2, 3]))
661    ///   .batch(&tick, nondet!(/** test */))
662    ///   .defer_tick(); // appears on the second tick
663    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
664    /// batch_first_tick.chain(batch_second_tick).count()
665    ///   .filter_if_some(some_on_first_tick)
666    ///   .all_ticks()
667    /// # }, |mut stream| async move {
668    /// // [1]
669    /// # for w in vec![1] {
670    /// #     assert_eq!(stream.next().await.unwrap(), w);
671    /// # }
672    /// # }));
673    /// # }
674    /// ```
675    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
676    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
677    where
678        B: IsBounded,
679    {
680        self.filter_if(signal.is_some())
681    }
682
683    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
684    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
685    ///
686    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
687    /// the condition.
688    ///
689    /// # Example
690    /// ```rust
691    /// # #[cfg(feature = "deploy")] {
692    /// # use hydro_lang::prelude::*;
693    /// # use futures::StreamExt;
694    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
695    /// let tick = process.tick();
696    /// // ticks are lazy by default, forces the second tick to run
697    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
698    ///
699    /// let batch_first_tick = process
700    ///   .source_iter(q!(vec![1]))
701    ///   .batch(&tick, nondet!(/** test */));
702    /// let batch_second_tick = process
703    ///   .source_iter(q!(vec![1, 2, 3]))
704    ///   .batch(&tick, nondet!(/** test */))
705    ///   .defer_tick(); // appears on the second tick
706    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
707    /// batch_first_tick.chain(batch_second_tick).count()
708    ///   .filter_if_none(some_on_first_tick)
709    ///   .all_ticks()
710    /// # }, |mut stream| async move {
711    /// // [3]
712    /// # for w in vec![3] {
713    /// #     assert_eq!(stream.next().await.unwrap(), w);
714    /// # }
715    /// # }));
716    /// # }
717    /// ```
718    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
719    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
720    where
721        B: IsBounded,
722    {
723        self.filter_if(other.is_none())
724    }
725
726    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
727    ///
728    /// # Example
729    /// ```rust
730    /// # #[cfg(feature = "deploy")] {
731    /// # use hydro_lang::prelude::*;
732    /// # use futures::StreamExt;
733    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
734    /// let tick = process.tick();
735    /// let a = tick.singleton(q!(5));
736    /// let b = tick.singleton(q!(5));
737    /// a.equals(b).all_ticks()
738    /// # }, |mut stream| async move {
739    /// // [true]
740    /// # assert_eq!(stream.next().await.unwrap(), true);
741    /// # }));
742    /// # }
743    /// ```
744    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
745    where
746        T: PartialEq,
747        B: IsBounded,
748    {
749        self.zip(other).map(q!(|(a, b)| a == b))
750    }
751
752    /// An operator which allows you to "name" a `HydroNode`.
753    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
754    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
755        {
756            let mut node = self.ir_node.borrow_mut();
757            let metadata = node.metadata_mut();
758            metadata.tag = Some(name.to_owned());
759        }
760        self
761    }
762}
763
764impl<'a, L: Location<'a>, B: Boundedness> Not for Singleton<bool, L, B> {
765    type Output = Singleton<bool, L, B>;
766
767    fn not(self) -> Self::Output {
768        self.map(q!(|b| !b))
769    }
770}
771
772impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
773where
774    L: Location<'a>,
775{
776    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
777    /// the inner `Option`.
778    ///
779    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
780    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
781    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
782    ///
783    /// # Example
784    /// ```rust
785    /// # #[cfg(feature = "deploy")] {
786    /// # use hydro_lang::prelude::*;
787    /// # use futures::StreamExt;
788    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
789    /// let tick = process.tick();
790    /// let singleton = tick.singleton(q!(Some(42)));
791    /// singleton.into_optional().all_ticks()
792    /// # }, |mut stream| async move {
793    /// // 42
794    /// # assert_eq!(stream.next().await.unwrap(), 42);
795    /// # }));
796    /// # }
797    /// ```
798    pub fn into_optional(self) -> Optional<T, L, B> {
799        self.filter_map(q!(|v| v))
800    }
801}
802
803impl<'a, L, B: Boundedness> Singleton<bool, L, B>
804where
805    L: Location<'a>,
806{
807    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
808    ///
809    /// # Example
810    /// ```rust
811    /// # #[cfg(feature = "deploy")] {
812    /// # use hydro_lang::prelude::*;
813    /// # use futures::StreamExt;
814    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815    /// let tick = process.tick();
816    /// // ticks are lazy by default, forces the second tick to run
817    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
818    ///
819    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
820    /// let b = tick.singleton(q!(true)); // true, true
821    /// a.and(b).all_ticks()
822    /// # }, |mut stream| async move {
823    /// // [true, false]
824    /// # for w in vec![true, false] {
825    /// #     assert_eq!(stream.next().await.unwrap(), w);
826    /// # }
827    /// # }));
828    /// # }
829    /// ```
830    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, B>
831    where
832        B: IsBounded,
833    {
834        self.zip(other).map(q!(|(a, b)| a && b))
835    }
836
837    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
838    ///
839    /// # Example
840    /// ```rust
841    /// # #[cfg(feature = "deploy")] {
842    /// # use hydro_lang::prelude::*;
843    /// # use futures::StreamExt;
844    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
845    /// let tick = process.tick();
846    /// // ticks are lazy by default, forces the second tick to run
847    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
848    ///
849    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
850    /// let b = tick.singleton(q!(false)); // false, false
851    /// a.or(b).all_ticks()
852    /// # }, |mut stream| async move {
853    /// // [true, false]
854    /// # for w in vec![true, false] {
855    /// #     assert_eq!(stream.next().await.unwrap(), w);
856    /// # }
857    /// # }));
858    /// # }
859    /// ```
860    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, B>
861    where
862        B: IsBounded,
863    {
864        self.zip(other).map(q!(|(a, b)| a || b))
865    }
866}
867
868impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
869where
870    L: Location<'a> + NoTick,
871{
872    /// Returns a singleton value corresponding to the latest snapshot of the singleton
873    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
874    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
875    /// all snapshots of this singleton into the atomic-associated tick will observe the
876    /// same value each tick.
877    ///
878    /// # Non-Determinism
879    /// Because this picks a snapshot of a singleton whose value is continuously changing,
880    /// the output singleton has a non-deterministic value since the snapshot can be at an
881    /// arbitrary point in time.
882    pub fn snapshot_atomic(
883        self,
884        tick: &Tick<L>,
885        _nondet: NonDet,
886    ) -> Singleton<T, Tick<L>, Bounded> {
887        Singleton::new(
888            tick.clone(),
889            HydroNode::Batch {
890                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
891                metadata: tick
892                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
893            },
894        )
895    }
896
897    /// Returns this singleton back into a top-level, asynchronous execution context where updates
898    /// to the value will be asynchronously propagated.
899    pub fn end_atomic(self) -> Singleton<T, L, B> {
900        Singleton::new(
901            self.location.tick.l.clone(),
902            HydroNode::EndAtomic {
903                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
904                metadata: self
905                    .location
906                    .tick
907                    .l
908                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
909            },
910        )
911    }
912}
913
914impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
915where
916    L: Location<'a>,
917{
918    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
919    /// will observe the same version of the value and will be executed synchronously before any
920    /// outputs are yielded (in [`Optional::end_atomic`]).
921    ///
922    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
923    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
924    /// a different version).
925    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
926        let id = self.location.flow_state().borrow_mut().next_clock_id();
927        let out_location = Atomic {
928            tick: Tick {
929                id,
930                l: self.location.clone(),
931            },
932        };
933        Singleton::new(
934            out_location.clone(),
935            HydroNode::BeginAtomic {
936                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
937                metadata: out_location
938                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
939            },
940        )
941    }
942
943    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
944    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
945    /// relevant data that contributed to the snapshot at tick `t`.
946    ///
947    /// # Non-Determinism
948    /// Because this picks a snapshot of a singleton whose value is continuously changing,
949    /// the output singleton has a non-deterministic value since the snapshot can be at an
950    /// arbitrary point in time.
951    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
952        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
953        Singleton::new(
954            tick.clone(),
955            HydroNode::Batch {
956                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
957                metadata: tick
958                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
959            },
960        )
961    }
962
963    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
964    /// with order corresponding to increasing prefixes of data contributing to the singleton.
965    ///
966    /// # Non-Determinism
967    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
968    /// to non-deterministic batching and arrival of inputs, the output stream is
969    /// non-deterministic.
970    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
971    where
972        L: NoTick,
973    {
974        sliced! {
975            let snapshot = use(self, nondet);
976            snapshot.into_stream()
977        }
978        .weaken_retries()
979    }
980
981    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
982    /// value taken at various points in time. Because the input singleton may be
983    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
984    /// represent the value of the singleton given some prefix of the streams leading up to
985    /// it.
986    ///
987    /// # Non-Determinism
988    /// The output stream is non-deterministic in which elements are sampled, since this
989    /// is controlled by a clock.
990    pub fn sample_every(
991        self,
992        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
993        nondet: NonDet,
994    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
995    where
996        L: NoTick + NoAtomic,
997    {
998        let samples = self.location.source_interval(interval, nondet);
999        sliced! {
1000            let snapshot = use(self, nondet);
1001            let sample_batch = use(samples, nondet);
1002
1003            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1004        }
1005        .weaken_retries()
1006    }
1007
1008    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1009    /// implies that `B == Bounded`.
1010    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1011    where
1012        B: IsBounded,
1013    {
1014        Singleton::new(
1015            self.location.clone(),
1016            self.ir_node.replace(HydroNode::Placeholder),
1017        )
1018    }
1019
1020    /// Clones this bounded singleton into a tick, returning a singleton that has the
1021    /// same value as the outer singleton. Because the outer singleton is bounded, this
1022    /// is deterministic because there is only a single immutable version.
1023    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1024    where
1025        B: IsBounded,
1026        T: Clone,
1027    {
1028        // TODO(shadaj): avoid printing simulator logs for this snapshot
1029        self.snapshot(
1030            tick,
1031            nondet!(/** bounded top-level singleton so deterministic */),
1032        )
1033    }
1034
1035    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1036    ///
1037    /// # Example
1038    /// ```rust
1039    /// # #[cfg(feature = "deploy")] {
1040    /// # use hydro_lang::prelude::*;
1041    /// # use futures::StreamExt;
1042    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1043    /// let tick = process.tick();
1044    /// let batch_input = process
1045    ///   .source_iter(q!(vec![123, 456]))
1046    ///   .batch(&tick, nondet!(/** test */));
1047    /// batch_input.clone().chain(
1048    ///   batch_input.count().into_stream()
1049    /// ).all_ticks()
1050    /// # }, |mut stream| async move {
1051    /// // [123, 456, 2]
1052    /// # for w in vec![123, 456, 2] {
1053    /// #     assert_eq!(stream.next().await.unwrap(), w);
1054    /// # }
1055    /// # }));
1056    /// # }
1057    /// ```
1058    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1059    where
1060        B: IsBounded,
1061    {
1062        Stream::new(
1063            self.location.clone(),
1064            HydroNode::Cast {
1065                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1066                metadata: self.location.new_node_metadata(Stream::<
1067                    T,
1068                    Tick<L>,
1069                    Bounded,
1070                    TotalOrder,
1071                    ExactlyOnce,
1072                >::collection_kind()),
1073            },
1074        )
1075    }
1076
1077    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1078    /// producing a singleton of the resolved output.
1079    ///
1080    /// This is useful when the singleton contains an async computation that must
1081    /// be awaited before further processing. The future is polled to completion
1082    /// before the output value is emitted.
1083    ///
1084    /// # Example
1085    /// ```rust
1086    /// # #[cfg(feature = "deploy")] {
1087    /// # use hydro_lang::prelude::*;
1088    /// # use futures::StreamExt;
1089    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1090    /// let tick = process.tick();
1091    /// let singleton = tick.singleton(q!(5));
1092    /// singleton
1093    ///     .map(q!(|v| async move { v * 2 }))
1094    ///     .resolve_future_blocking()
1095    ///     .all_ticks()
1096    /// # }, |mut stream| async move {
1097    /// // 10
1098    /// # assert_eq!(stream.next().await.unwrap(), 10);
1099    /// # }));
1100    /// # }
1101    /// ```
1102    pub fn resolve_future_blocking(self) -> Singleton<T::Output, L, B>
1103    where
1104        T: Future,
1105        B: IsBounded,
1106    {
1107        Singleton::new(
1108            self.location.clone(),
1109            HydroNode::ResolveFuturesBlocking {
1110                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1111                metadata: self
1112                    .location
1113                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1114            },
1115        )
1116    }
1117}
1118
1119impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1120where
1121    L: Location<'a>,
1122{
1123    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1124    /// which will stream the value computed in _each_ tick as a separate stream element.
1125    ///
1126    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1127    /// producing one element in the output for each tick. This is useful for batched computations,
1128    /// where the results from each tick must be combined together.
1129    ///
1130    /// # Example
1131    /// ```rust
1132    /// # #[cfg(feature = "deploy")] {
1133    /// # use hydro_lang::prelude::*;
1134    /// # use futures::StreamExt;
1135    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1136    /// let tick = process.tick();
1137    /// # // ticks are lazy by default, forces the second tick to run
1138    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1139    /// # let batch_first_tick = process
1140    /// #   .source_iter(q!(vec![1]))
1141    /// #   .batch(&tick, nondet!(/** test */));
1142    /// # let batch_second_tick = process
1143    /// #   .source_iter(q!(vec![1, 2, 3]))
1144    /// #   .batch(&tick, nondet!(/** test */))
1145    /// #   .defer_tick(); // appears on the second tick
1146    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1147    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1148    ///     .count()
1149    ///     .all_ticks()
1150    /// # }, |mut stream| async move {
1151    /// // [1, 3]
1152    /// # for w in vec![1, 3] {
1153    /// #     assert_eq!(stream.next().await.unwrap(), w);
1154    /// # }
1155    /// # }));
1156    /// # }
1157    /// ```
1158    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1159        self.into_stream().all_ticks()
1160    }
1161
1162    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1163    /// which will stream the value computed in _each_ tick as a separate stream element.
1164    ///
1165    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1166    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1167    /// singleton's [`Tick`] context.
1168    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1169        self.into_stream().all_ticks_atomic()
1170    }
1171
1172    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1173    /// be asynchronously updated with the latest value of the singleton inside the tick.
1174    ///
1175    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1176    /// tick that tracks the inner value. This is useful for getting the value as of the
1177    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1178    ///
1179    /// # Example
1180    /// ```rust
1181    /// # #[cfg(feature = "deploy")] {
1182    /// # use hydro_lang::prelude::*;
1183    /// # use futures::StreamExt;
1184    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185    /// let tick = process.tick();
1186    /// # // ticks are lazy by default, forces the second tick to run
1187    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1188    /// # let batch_first_tick = process
1189    /// #   .source_iter(q!(vec![1]))
1190    /// #   .batch(&tick, nondet!(/** test */));
1191    /// # let batch_second_tick = process
1192    /// #   .source_iter(q!(vec![1, 2, 3]))
1193    /// #   .batch(&tick, nondet!(/** test */))
1194    /// #   .defer_tick(); // appears on the second tick
1195    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1196    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1197    ///     .count()
1198    ///     .latest()
1199    /// # .sample_eager(nondet!(/** test */))
1200    /// # }, |mut stream| async move {
1201    /// // asynchronously changes from 1 ~> 3
1202    /// # for w in vec![1, 3] {
1203    /// #     assert_eq!(stream.next().await.unwrap(), w);
1204    /// # }
1205    /// # }));
1206    /// # }
1207    /// ```
1208    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1209        Singleton::new(
1210            self.location.outer().clone(),
1211            HydroNode::YieldConcat {
1212                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1213                metadata: self
1214                    .location
1215                    .outer()
1216                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1217            },
1218        )
1219    }
1220
1221    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1222    /// be updated with the latest value of the singleton inside the tick.
1223    ///
1224    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1225    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1226    /// singleton's [`Tick`] context.
1227    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1228        let out_location = Atomic {
1229            tick: self.location.clone(),
1230        };
1231        Singleton::new(
1232            out_location.clone(),
1233            HydroNode::YieldConcat {
1234                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1235                metadata: out_location
1236                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1237            },
1238        )
1239    }
1240}
1241
1242#[doc(hidden)]
1243/// Helper trait that determines the output collection type for [`Singleton::zip`].
1244///
1245/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1246/// [`Singleton`].
1247#[sealed::sealed]
1248pub trait ZipResult<'a, Other> {
1249    /// The output collection type.
1250    type Out;
1251    /// The type of the tupled output value.
1252    type ElementType;
1253    /// The type of the other collection's value.
1254    type OtherType;
1255    /// The location where the tupled result will be materialized.
1256    type Location: Location<'a>;
1257
1258    /// The location of the second input to the `zip`.
1259    fn other_location(other: &Other) -> Self::Location;
1260    /// The IR node of the second input to the `zip`.
1261    fn other_ir_node(other: Other) -> HydroNode;
1262
1263    /// Constructs the output live collection given an IR node containing the zip result.
1264    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1265}
1266
1267#[sealed::sealed]
1268impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1269where
1270    L: Location<'a>,
1271{
1272    type Out = Singleton<(T, U), L, B>;
1273    type ElementType = (T, U);
1274    type OtherType = U;
1275    type Location = L;
1276
1277    fn other_location(other: &Singleton<U, L, B>) -> L {
1278        other.location.clone()
1279    }
1280
1281    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1282        other.ir_node.replace(HydroNode::Placeholder)
1283    }
1284
1285    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1286        Singleton::new(
1287            location.clone(),
1288            HydroNode::Cast {
1289                inner: Box::new(ir_node),
1290                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1291            },
1292        )
1293    }
1294}
1295
1296#[sealed::sealed]
1297impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1298where
1299    L: Location<'a>,
1300{
1301    type Out = Optional<(T, U), L, B>;
1302    type ElementType = (T, U);
1303    type OtherType = U;
1304    type Location = L;
1305
1306    fn other_location(other: &Optional<U, L, B>) -> L {
1307        other.location.clone()
1308    }
1309
1310    fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1311        other.ir_node.replace(HydroNode::Placeholder)
1312    }
1313
1314    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1315        Optional::new(location, ir_node)
1316    }
1317}
1318
1319#[cfg(test)]
1320mod tests {
1321    #[cfg(feature = "deploy")]
1322    use futures::{SinkExt, StreamExt};
1323    #[cfg(feature = "deploy")]
1324    use hydro_deploy::Deployment;
1325    #[cfg(any(feature = "deploy", feature = "sim"))]
1326    use stageleft::q;
1327
1328    #[cfg(any(feature = "deploy", feature = "sim"))]
1329    use crate::compile::builder::FlowBuilder;
1330    #[cfg(feature = "deploy")]
1331    use crate::live_collections::stream::ExactlyOnce;
1332    #[cfg(any(feature = "deploy", feature = "sim"))]
1333    use crate::location::Location;
1334    #[cfg(any(feature = "deploy", feature = "sim"))]
1335    use crate::nondet::nondet;
1336
1337    #[cfg(feature = "deploy")]
1338    #[tokio::test]
1339    async fn tick_cycle_cardinality() {
1340        let mut deployment = Deployment::new();
1341
1342        let mut flow = FlowBuilder::new();
1343        let node = flow.process::<()>();
1344        let external = flow.external::<()>();
1345
1346        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1347
1348        let node_tick = node.tick();
1349        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1350        let counts = singleton
1351            .clone()
1352            .into_stream()
1353            .count()
1354            .filter_if(
1355                input
1356                    .batch(&node_tick, nondet!(/** testing */))
1357                    .first()
1358                    .is_some(),
1359            )
1360            .all_ticks()
1361            .send_bincode_external(&external);
1362        complete_cycle.complete_next_tick(singleton);
1363
1364        let nodes = flow
1365            .with_process(&node, deployment.Localhost())
1366            .with_external(&external, deployment.Localhost())
1367            .deploy(&mut deployment);
1368
1369        deployment.deploy().await.unwrap();
1370
1371        let mut tick_trigger = nodes.connect(input_send).await;
1372        let mut external_out = nodes.connect(counts).await;
1373
1374        deployment.start().await.unwrap();
1375
1376        tick_trigger.send(()).await.unwrap();
1377
1378        assert_eq!(external_out.next().await.unwrap(), 1);
1379
1380        tick_trigger.send(()).await.unwrap();
1381
1382        assert_eq!(external_out.next().await.unwrap(), 1);
1383    }
1384
1385    #[cfg(feature = "sim")]
1386    #[test]
1387    #[should_panic]
1388    fn sim_fold_intermediate_states() {
1389        let mut flow = FlowBuilder::new();
1390        let node = flow.process::<()>();
1391
1392        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1393        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1394
1395        let tick = node.tick();
1396        let batch = folded.snapshot(&tick, nondet!(/** test */));
1397        let out_recv = batch.all_ticks().sim_output();
1398
1399        flow.sim().exhaustive(async || {
1400            assert_eq!(out_recv.next().await.unwrap(), 10);
1401        });
1402    }
1403
1404    #[cfg(feature = "sim")]
1405    #[test]
1406    fn sim_fold_intermediate_state_count() {
1407        let mut flow = FlowBuilder::new();
1408        let node = flow.process::<()>();
1409
1410        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1411        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1412
1413        let tick = node.tick();
1414        let batch = folded.snapshot(&tick, nondet!(/** test */));
1415        let out_recv = batch.all_ticks().sim_output();
1416
1417        let instance_count = flow.sim().exhaustive(async || {
1418            let out = out_recv.collect::<Vec<_>>().await;
1419            assert_eq!(out.last(), Some(&10));
1420        });
1421
1422        assert_eq!(
1423            instance_count,
1424            16 // 2^4 possible subsets of intermediates (including initial state)
1425        )
1426    }
1427
1428    #[cfg(feature = "sim")]
1429    #[test]
1430    fn sim_fold_no_repeat_initial() {
1431        // check that we don't repeat the initial state of the fold in autonomous decisions
1432
1433        let mut flow = FlowBuilder::new();
1434        let node = flow.process::<()>();
1435
1436        let (in_port, input) = node.sim_input();
1437        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1438
1439        let tick = node.tick();
1440        let batch = folded.snapshot(&tick, nondet!(/** test */));
1441        let out_recv = batch.all_ticks().sim_output();
1442
1443        flow.sim().exhaustive(async || {
1444            assert_eq!(out_recv.next().await.unwrap(), 0);
1445
1446            in_port.send(123);
1447
1448            assert_eq!(out_recv.next().await.unwrap(), 123);
1449        });
1450    }
1451
1452    #[cfg(feature = "sim")]
1453    #[test]
1454    #[should_panic]
1455    fn sim_fold_repeats_snapshots() {
1456        // when the tick is driven by a snapshot AND something else, the snapshot can
1457        // "stutter" and repeat the same state multiple times
1458
1459        let mut flow = FlowBuilder::new();
1460        let node = flow.process::<()>();
1461
1462        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1463        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1464
1465        let tick = node.tick();
1466        let batch = source
1467            .batch(&tick, nondet!(/** test */))
1468            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1469        let out_recv = batch.all_ticks().sim_output();
1470
1471        flow.sim().exhaustive(async || {
1472            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1473            {
1474                panic!("repeated snapshot");
1475            }
1476        });
1477    }
1478
1479    #[cfg(feature = "sim")]
1480    #[test]
1481    fn sim_fold_repeats_snapshots_count() {
1482        // check the number of instances
1483        let mut flow = FlowBuilder::new();
1484        let node = flow.process::<()>();
1485
1486        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1487        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1488
1489        let tick = node.tick();
1490        let batch = source
1491            .batch(&tick, nondet!(/** test */))
1492            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1493        let out_recv = batch.all_ticks().sim_output();
1494
1495        let count = flow.sim().exhaustive(async || {
1496            let _ = out_recv.collect::<Vec<_>>().await;
1497        });
1498
1499        assert_eq!(count, 52);
1500        // don't have a combinatorial explanation for this number yet, but checked via logs
1501    }
1502
1503    #[cfg(feature = "sim")]
1504    #[test]
1505    fn sim_top_level_singleton_exhaustive() {
1506        // ensures that top-level singletons have only one snapshot
1507        let mut flow = FlowBuilder::new();
1508        let node = flow.process::<()>();
1509
1510        let singleton = node.singleton(q!(1));
1511        let tick = node.tick();
1512        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1513        let out_recv = batch.all_ticks().sim_output();
1514
1515        let count = flow.sim().exhaustive(async || {
1516            let _ = out_recv.collect::<Vec<_>>().await;
1517        });
1518
1519        assert_eq!(count, 1);
1520    }
1521
1522    #[cfg(feature = "sim")]
1523    #[test]
1524    fn sim_top_level_singleton_join_count() {
1525        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1526        // exploration
1527
1528        let mut flow = FlowBuilder::new();
1529        let node = flow.process::<()>();
1530
1531        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1532        let tick = node.tick();
1533        let batch = source_iter
1534            .batch(&tick, nondet!(/** test */))
1535            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1536        let out_recv = batch.all_ticks().sim_output();
1537
1538        let instance_count = flow.sim().exhaustive(async || {
1539            let _ = out_recv.collect::<Vec<_>>().await;
1540        });
1541
1542        assert_eq!(
1543            instance_count,
1544            16 // 2^4 ways to split up (including a possibly empty first batch)
1545        )
1546    }
1547
1548    #[cfg(feature = "sim")]
1549    #[test]
1550    fn top_level_singleton_into_stream_no_replay() {
1551        let mut flow = FlowBuilder::new();
1552        let node = flow.process::<()>();
1553
1554        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1555        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1556
1557        let out_recv = folded.into_stream().sim_output();
1558
1559        flow.sim().exhaustive(async || {
1560            out_recv.assert_yields_only([10]).await;
1561        });
1562    }
1563
1564    #[cfg(feature = "sim")]
1565    #[test]
1566    fn inside_tick_singleton_zip() {
1567        use crate::live_collections::Stream;
1568        use crate::live_collections::sliced::sliced;
1569
1570        let mut flow = FlowBuilder::new();
1571        let node = flow.process::<()>();
1572
1573        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1574        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1575
1576        let out_recv = sliced! {
1577            let v = use(folded, nondet!(/** test */));
1578            v.clone().zip(v).into_stream()
1579        }
1580        .sim_output();
1581
1582        let count = flow.sim().exhaustive(async || {
1583            let out = out_recv.collect::<Vec<_>>().await;
1584            assert_eq!(out.last(), Some(&(3, 3)));
1585        });
1586
1587        assert_eq!(count, 4);
1588    }
1589}