Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::manual_expr::ManualExpr;
31use crate::nondet::{NonDet, nondet};
32use crate::prelude::manual_proof;
33use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
34
35pub mod networking;
36
37/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
38#[sealed::sealed]
39pub trait Ordering:
40    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
41{
42    /// The [`StreamOrder`] corresponding to this type.
43    const ORDERING_KIND: StreamOrder;
44}
45
46/// Marks the stream as being totally ordered, which means that there are
47/// no sources of non-determinism (other than intentional ones) that will
48/// affect the order of elements.
49pub enum TotalOrder {}
50
51#[sealed::sealed]
52impl Ordering for TotalOrder {
53    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
54}
55
56/// Marks the stream as having no order, which means that the order of
57/// elements may be affected by non-determinism.
58///
59/// This restricts certain operators, such as `fold` and `reduce`, to only
60/// be used with commutative aggregation functions.
61pub enum NoOrder {}
62
63#[sealed::sealed]
64impl Ordering for NoOrder {
65    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
66}
67
68/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
69/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
70/// have `Self` guarantees instead.
71#[sealed::sealed]
72pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
73#[sealed::sealed]
74impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
75
76/// Helper trait for determining the weakest of two orderings.
77#[sealed::sealed]
78pub trait MinOrder<Other: ?Sized> {
79    /// The weaker of the two orderings.
80    type Min: Ordering;
81}
82
83#[sealed::sealed]
84impl<O: Ordering> MinOrder<O> for TotalOrder {
85    type Min = O;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for NoOrder {
90    type Min = NoOrder;
91}
92
93/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
94#[sealed::sealed]
95pub trait Retries:
96    MinRetries<Self, Min = Self>
97    + MinRetries<ExactlyOnce, Min = Self>
98    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
99{
100    /// The [`StreamRetry`] corresponding to this type.
101    const RETRIES_KIND: StreamRetry;
102}
103
104/// Marks the stream as having deterministic message cardinality, with no
105/// possibility of duplicates.
106pub enum ExactlyOnce {}
107
108#[sealed::sealed]
109impl Retries for ExactlyOnce {
110    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
111}
112
113/// Marks the stream as having non-deterministic message cardinality, which
114/// means that duplicates may occur, but messages will not be dropped.
115pub enum AtLeastOnce {}
116
117#[sealed::sealed]
118impl Retries for AtLeastOnce {
119    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
120}
121
122/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
123/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
124/// have `Self` guarantees instead.
125#[sealed::sealed]
126pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
127#[sealed::sealed]
128impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
129
130/// Helper trait for determining the weakest of two retry guarantees.
131#[sealed::sealed]
132pub trait MinRetries<Other: ?Sized> {
133    /// The weaker of the two retry guarantees.
134    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
135}
136
137#[sealed::sealed]
138impl<R: Retries> MinRetries<R> for ExactlyOnce {
139    type Min = R;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for AtLeastOnce {
144    type Min = AtLeastOnce;
145}
146
147#[sealed::sealed]
148#[diagnostic::on_unimplemented(
149    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
150    label = "required here",
151    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
152)]
153/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
154pub trait IsOrdered: Ordering {}
155
156#[sealed::sealed]
157#[diagnostic::do_not_recommend]
158impl IsOrdered for TotalOrder {}
159
160#[sealed::sealed]
161#[diagnostic::on_unimplemented(
162    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
163    label = "required here",
164    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
165)]
166/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
167pub trait IsExactlyOnce: Retries {}
168
169#[sealed::sealed]
170#[diagnostic::do_not_recommend]
171impl IsExactlyOnce for ExactlyOnce {}
172
173/// Streaming sequence of elements with type `Type`.
174///
175/// This live collection represents a growing sequence of elements, with new elements being
176/// asynchronously appended to the end of the sequence. This can be used to model the arrival
177/// of network input, such as API requests, or streaming ingestion.
178///
179/// By default, all streams have deterministic ordering and each element is materialized exactly
180/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
181/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
182/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
183///
184/// Type Parameters:
185/// - `Type`: the type of elements in the stream
186/// - `Loc`: the location where the stream is being materialized
187/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
188/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
189///   (default is [`TotalOrder`])
190/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
191///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
192pub struct Stream<
193    Type,
194    Loc,
195    Bound: Boundedness = Unbounded,
196    Order: Ordering = TotalOrder,
197    Retry: Retries = ExactlyOnce,
198> {
199    pub(crate) location: Loc,
200    pub(crate) ir_node: RefCell<HydroNode>,
201    pub(crate) flow_state: FlowState,
202
203    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
204}
205
206impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
207    fn drop(&mut self) {
208        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
209        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
210            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
211                input: Box::new(ir_node),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214        }
215    }
216}
217
218impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
219    for Stream<T, L, Unbounded, O, R>
220where
221    L: Location<'a>,
222{
223    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
224        let new_meta = stream
225            .location
226            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
227
228        Stream {
229            location: stream.location.clone(),
230            flow_state: stream.flow_state.clone(),
231            ir_node: RefCell::new(HydroNode::Cast {
232                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
233                metadata: new_meta,
234            }),
235            _phantom: PhantomData,
236        }
237    }
238}
239
240impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
241    for Stream<T, L, B, NoOrder, R>
242where
243    L: Location<'a>,
244{
245    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
246        stream.weaken_ordering()
247    }
248}
249
250impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
251    for Stream<T, L, B, O, AtLeastOnce>
252where
253    L: Location<'a>,
254{
255    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
256        stream.weaken_retries()
257    }
258}
259
260impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
261where
262    L: Location<'a>,
263{
264    fn defer_tick(self) -> Self {
265        Stream::defer_tick(self)
266    }
267}
268
269impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
270    for Stream<T, Tick<L>, Bounded, O, R>
271where
272    L: Location<'a>,
273{
274    type Location = Tick<L>;
275
276    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
277        Stream::new(
278            location.clone(),
279            HydroNode::CycleSource {
280                cycle_id,
281                metadata: location.new_node_metadata(Self::collection_kind()),
282            },
283        )
284    }
285}
286
287impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
288    for Stream<T, Tick<L>, Bounded, O, R>
289where
290    L: Location<'a>,
291{
292    type Location = Tick<L>;
293
294    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
295        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
296            location.clone(),
297            HydroNode::DeferTick {
298                input: Box::new(HydroNode::CycleSource {
299                    cycle_id,
300                    metadata: location.new_node_metadata(Self::collection_kind()),
301                }),
302                metadata: location.new_node_metadata(Self::collection_kind()),
303            },
304        );
305
306        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
307    }
308}
309
310impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
311    for Stream<T, Tick<L>, Bounded, O, R>
312where
313    L: Location<'a>,
314{
315    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
316        assert_eq!(
317            Location::id(&self.location),
318            expected_location,
319            "locations do not match"
320        );
321        self.location
322            .flow_state()
323            .borrow_mut()
324            .push_root(HydroRoot::CycleSink {
325                cycle_id,
326                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
327                op_metadata: HydroIrOpMetadata::new(),
328            });
329    }
330}
331
332impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
333    for Stream<T, L, B, O, R>
334where
335    L: Location<'a> + NoTick,
336{
337    type Location = L;
338
339    fn create_source(cycle_id: CycleId, location: L) -> Self {
340        Stream::new(
341            location.clone(),
342            HydroNode::CycleSource {
343                cycle_id,
344                metadata: location.new_node_metadata(Self::collection_kind()),
345            },
346        )
347    }
348}
349
350impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
351    for Stream<T, L, B, O, R>
352where
353    L: Location<'a> + NoTick,
354{
355    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
356        assert_eq!(
357            Location::id(&self.location),
358            expected_location,
359            "locations do not match"
360        );
361        self.location
362            .flow_state()
363            .borrow_mut()
364            .push_root(HydroRoot::CycleSink {
365                cycle_id,
366                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367                op_metadata: HydroIrOpMetadata::new(),
368            });
369    }
370}
371
372impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
373where
374    T: Clone,
375    L: Location<'a>,
376{
377    fn clone(&self) -> Self {
378        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
379            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
380            *self.ir_node.borrow_mut() = HydroNode::Tee {
381                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
382                metadata: self.location.new_node_metadata(Self::collection_kind()),
383            };
384        }
385
386        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
387            Stream {
388                location: self.location.clone(),
389                flow_state: self.flow_state.clone(),
390                ir_node: HydroNode::Tee {
391                    inner: SharedNode(inner.0.clone()),
392                    metadata: metadata.clone(),
393                }
394                .into(),
395                _phantom: PhantomData,
396            }
397        } else {
398            unreachable!()
399        }
400    }
401}
402
403impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
404where
405    L: Location<'a>,
406{
407    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
408        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
409        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
410
411        let flow_state = location.flow_state().clone();
412        Stream {
413            location,
414            flow_state,
415            ir_node: RefCell::new(ir_node),
416            _phantom: PhantomData,
417        }
418    }
419
420    /// Returns the [`Location`] where this stream is being materialized.
421    pub fn location(&self) -> &L {
422        &self.location
423    }
424
425    pub(crate) fn collection_kind() -> CollectionKind {
426        CollectionKind::Stream {
427            bound: B::BOUND_KIND,
428            order: O::ORDERING_KIND,
429            retry: R::RETRIES_KIND,
430            element_type: quote_type::<T>().into(),
431        }
432    }
433
434    /// Produces a stream based on invoking `f` on each element.
435    /// If you do not want to modify the stream and instead only want to view
436    /// each item use [`Stream::inspect`] instead.
437    ///
438    /// # Example
439    /// ```rust
440    /// # #[cfg(feature = "deploy")] {
441    /// # use hydro_lang::prelude::*;
442    /// # use futures::StreamExt;
443    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444    /// let words = process.source_iter(q!(vec!["hello", "world"]));
445    /// words.map(q!(|x| x.to_uppercase()))
446    /// # }, |mut stream| async move {
447    /// # for w in vec!["HELLO", "WORLD"] {
448    /// #     assert_eq!(stream.next().await.unwrap(), w);
449    /// # }
450    /// # }));
451    /// # }
452    /// ```
453    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
454    where
455        F: Fn(T) -> U + 'a,
456    {
457        let f = f.splice_fn1_ctx(&self.location).into();
458        Stream::new(
459            self.location.clone(),
460            HydroNode::Map {
461                f,
462                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
463                metadata: self
464                    .location
465                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
466            },
467        )
468    }
469
470    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
471    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
472    /// for the output type `U` must produce items in a **deterministic** order.
473    ///
474    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
475    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
476    ///
477    /// # Example
478    /// ```rust
479    /// # #[cfg(feature = "deploy")] {
480    /// # use hydro_lang::prelude::*;
481    /// # use futures::StreamExt;
482    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
483    /// process
484    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
485    ///     .flat_map_ordered(q!(|x| x))
486    /// # }, |mut stream| async move {
487    /// // 1, 2, 3, 4
488    /// # for w in (1..5) {
489    /// #     assert_eq!(stream.next().await.unwrap(), w);
490    /// # }
491    /// # }));
492    /// # }
493    /// ```
494    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
495    where
496        I: IntoIterator<Item = U>,
497        F: Fn(T) -> I + 'a,
498    {
499        let f = f.splice_fn1_ctx(&self.location).into();
500        Stream::new(
501            self.location.clone(),
502            HydroNode::FlatMap {
503                f,
504                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505                metadata: self
506                    .location
507                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
508            },
509        )
510    }
511
512    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
513    /// for the output type `U` to produce items in any order.
514    ///
515    /// # Example
516    /// ```rust
517    /// # #[cfg(feature = "deploy")] {
518    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
519    /// # use futures::StreamExt;
520    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
521    /// process
522    ///     .source_iter(q!(vec![
523    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
524    ///         std::collections::HashSet::from_iter(vec![3, 4]),
525    ///     ]))
526    ///     .flat_map_unordered(q!(|x| x))
527    /// # }, |mut stream| async move {
528    /// // 1, 2, 3, 4, but in no particular order
529    /// # let mut results = Vec::new();
530    /// # for w in (1..5) {
531    /// #     results.push(stream.next().await.unwrap());
532    /// # }
533    /// # results.sort();
534    /// # assert_eq!(results, vec![1, 2, 3, 4]);
535    /// # }));
536    /// # }
537    /// ```
538    pub fn flat_map_unordered<U, I, F>(
539        self,
540        f: impl IntoQuotedMut<'a, F, L>,
541    ) -> Stream<U, L, B, NoOrder, R>
542    where
543        I: IntoIterator<Item = U>,
544        F: Fn(T) -> I + 'a,
545    {
546        let f = f.splice_fn1_ctx(&self.location).into();
547        Stream::new(
548            self.location.clone(),
549            HydroNode::FlatMap {
550                f,
551                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
552                metadata: self
553                    .location
554                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
555            },
556        )
557    }
558
559    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
560    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
561    ///
562    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
563    /// not deterministic, use [`Stream::flatten_unordered`] instead.
564    ///
565    /// ```rust
566    /// # #[cfg(feature = "deploy")] {
567    /// # use hydro_lang::prelude::*;
568    /// # use futures::StreamExt;
569    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
570    /// process
571    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
572    ///     .flatten_ordered()
573    /// # }, |mut stream| async move {
574    /// // 1, 2, 3, 4
575    /// # for w in (1..5) {
576    /// #     assert_eq!(stream.next().await.unwrap(), w);
577    /// # }
578    /// # }));
579    /// # }
580    /// ```
581    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
582    where
583        T: IntoIterator<Item = U>,
584    {
585        self.flat_map_ordered(q!(|d| d))
586    }
587
588    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
589    /// for the element type `T` to produce items in any order.
590    ///
591    /// # Example
592    /// ```rust
593    /// # #[cfg(feature = "deploy")] {
594    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
595    /// # use futures::StreamExt;
596    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
597    /// process
598    ///     .source_iter(q!(vec![
599    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
600    ///         std::collections::HashSet::from_iter(vec![3, 4]),
601    ///     ]))
602    ///     .flatten_unordered()
603    /// # }, |mut stream| async move {
604    /// // 1, 2, 3, 4, but in no particular order
605    /// # let mut results = Vec::new();
606    /// # for w in (1..5) {
607    /// #     results.push(stream.next().await.unwrap());
608    /// # }
609    /// # results.sort();
610    /// # assert_eq!(results, vec![1, 2, 3, 4]);
611    /// # }));
612    /// # }
613    /// ```
614    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
615    where
616        T: IntoIterator<Item = U>,
617    {
618        self.flat_map_unordered(q!(|d| d))
619    }
620
621    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
622    /// `f`, preserving the order of the elements.
623    ///
624    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
625    /// not modify or take ownership of the values. If you need to modify the values while filtering
626    /// use [`Stream::filter_map`] instead.
627    ///
628    /// # Example
629    /// ```rust
630    /// # #[cfg(feature = "deploy")] {
631    /// # use hydro_lang::prelude::*;
632    /// # use futures::StreamExt;
633    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
634    /// process
635    ///     .source_iter(q!(vec![1, 2, 3, 4]))
636    ///     .filter(q!(|&x| x > 2))
637    /// # }, |mut stream| async move {
638    /// // 3, 4
639    /// # for w in (3..5) {
640    /// #     assert_eq!(stream.next().await.unwrap(), w);
641    /// # }
642    /// # }));
643    /// # }
644    /// ```
645    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
646    where
647        F: Fn(&T) -> bool + 'a,
648    {
649        let f = f.splice_fn1_borrow_ctx(&self.location).into();
650        Stream::new(
651            self.location.clone(),
652            HydroNode::Filter {
653                f,
654                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
655                metadata: self.location.new_node_metadata(Self::collection_kind()),
656            },
657        )
658    }
659
660    /// Splits the stream into two streams based on a predicate, without cloning elements.
661    ///
662    /// Elements for which `f` returns `true` are sent to the first output stream,
663    /// and elements for which `f` returns `false` are sent to the second output stream.
664    ///
665    /// Unlike using `filter` twice, this only evaluates the predicate once per element
666    /// and does not require `T: Clone`.
667    ///
668    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
669    /// the predicate is only used for routing; the element itself is moved to the
670    /// appropriate output stream.
671    ///
672    /// # Example
673    /// ```rust
674    /// # #[cfg(feature = "deploy")] {
675    /// # use hydro_lang::prelude::*;
676    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
677    /// # use futures::StreamExt;
678    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
679    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
680    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
681    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
682    /// evens.map(q!(|x| (x, true)))
683    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
684    /// # }, |mut stream| async move {
685    /// # let mut results = Vec::new();
686    /// # for _ in 0..6 {
687    /// #     results.push(stream.next().await.unwrap());
688    /// # }
689    /// # results.sort();
690    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
691    /// # }));
692    /// # }
693    /// ```
694    #[expect(
695        clippy::type_complexity,
696        reason = "return type mirrors the input stream type"
697    )]
698    pub fn partition<F>(
699        self,
700        f: impl IntoQuotedMut<'a, F, L>,
701    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
702    where
703        F: Fn(&T) -> bool + 'a,
704    {
705        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
706        let shared = SharedNode(Rc::new(RefCell::new(
707            self.ir_node.replace(HydroNode::Placeholder),
708        )));
709
710        let true_stream = Stream::new(
711            self.location.clone(),
712            HydroNode::Partition {
713                inner: SharedNode(shared.0.clone()),
714                f: f.clone(),
715                is_true: true,
716                metadata: self.location.new_node_metadata(Self::collection_kind()),
717            },
718        );
719
720        let false_stream = Stream::new(
721            self.location.clone(),
722            HydroNode::Partition {
723                inner: SharedNode(shared.0),
724                f,
725                is_true: false,
726                metadata: self.location.new_node_metadata(Self::collection_kind()),
727            },
728        );
729
730        (true_stream, false_stream)
731    }
732
733    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
734    ///
735    /// # Example
736    /// ```rust
737    /// # #[cfg(feature = "deploy")] {
738    /// # use hydro_lang::prelude::*;
739    /// # use futures::StreamExt;
740    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
741    /// process
742    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
743    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
744    /// # }, |mut stream| async move {
745    /// // 1, 2
746    /// # for w in (1..3) {
747    /// #     assert_eq!(stream.next().await.unwrap(), w);
748    /// # }
749    /// # }));
750    /// # }
751    /// ```
752    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
753    where
754        F: Fn(T) -> Option<U> + 'a,
755    {
756        let f = f.splice_fn1_ctx(&self.location).into();
757        Stream::new(
758            self.location.clone(),
759            HydroNode::FilterMap {
760                f,
761                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
762                metadata: self
763                    .location
764                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
765            },
766        )
767    }
768
769    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
770    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
771    /// If `other` is an empty [`Optional`], no values will be produced.
772    ///
773    /// # Example
774    /// ```rust
775    /// # #[cfg(feature = "deploy")] {
776    /// # use hydro_lang::prelude::*;
777    /// # use futures::StreamExt;
778    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
779    /// let tick = process.tick();
780    /// let batch = process
781    ///   .source_iter(q!(vec![1, 2, 3, 4]))
782    ///   .batch(&tick, nondet!(/** test */));
783    /// let count = batch.clone().count(); // `count()` returns a singleton
784    /// batch.cross_singleton(count).all_ticks()
785    /// # }, |mut stream| async move {
786    /// // (1, 4), (2, 4), (3, 4), (4, 4)
787    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
788    /// #     assert_eq!(stream.next().await.unwrap(), w);
789    /// # }
790    /// # }));
791    /// # }
792    /// ```
793    pub fn cross_singleton<O2>(
794        self,
795        other: impl Into<Optional<O2, L, Bounded>>,
796    ) -> Stream<(T, O2), L, B, O, R>
797    where
798        O2: Clone,
799    {
800        let other: Optional<O2, L, Bounded> = other.into();
801        check_matching_location(&self.location, &other.location);
802
803        Stream::new(
804            self.location.clone(),
805            HydroNode::CrossSingleton {
806                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
807                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
808                metadata: self
809                    .location
810                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
811            },
812        )
813    }
814
815    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
816    ///
817    /// # Example
818    /// ```rust
819    /// # #[cfg(feature = "deploy")] {
820    /// # use hydro_lang::prelude::*;
821    /// # use futures::StreamExt;
822    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
823    /// let tick = process.tick();
824    /// // ticks are lazy by default, forces the second tick to run
825    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
826    ///
827    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
828    /// let batch_first_tick = process
829    ///   .source_iter(q!(vec![1, 2, 3, 4]))
830    ///   .batch(&tick, nondet!(/** test */));
831    /// let batch_second_tick = process
832    ///   .source_iter(q!(vec![5, 6, 7, 8]))
833    ///   .batch(&tick, nondet!(/** test */))
834    ///   .defer_tick();
835    /// batch_first_tick.chain(batch_second_tick)
836    ///   .filter_if(signal)
837    ///   .all_ticks()
838    /// # }, |mut stream| async move {
839    /// // [1, 2, 3, 4]
840    /// # for w in vec![1, 2, 3, 4] {
841    /// #     assert_eq!(stream.next().await.unwrap(), w);
842    /// # }
843    /// # }));
844    /// # }
845    /// ```
846    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
847        self.cross_singleton(signal.filter(q!(|b| *b)))
848            .map(q!(|(d, _)| d))
849    }
850
851    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
852    ///
853    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
854    /// leader of a cluster.
855    ///
856    /// # Example
857    /// ```rust
858    /// # #[cfg(feature = "deploy")] {
859    /// # use hydro_lang::prelude::*;
860    /// # use futures::StreamExt;
861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862    /// let tick = process.tick();
863    /// // ticks are lazy by default, forces the second tick to run
864    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865    ///
866    /// let batch_first_tick = process
867    ///   .source_iter(q!(vec![1, 2, 3, 4]))
868    ///   .batch(&tick, nondet!(/** test */));
869    /// let batch_second_tick = process
870    ///   .source_iter(q!(vec![5, 6, 7, 8]))
871    ///   .batch(&tick, nondet!(/** test */))
872    ///   .defer_tick(); // appears on the second tick
873    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
874    /// batch_first_tick.chain(batch_second_tick)
875    ///   .filter_if_some(some_on_first_tick)
876    ///   .all_ticks()
877    /// # }, |mut stream| async move {
878    /// // [1, 2, 3, 4]
879    /// # for w in vec![1, 2, 3, 4] {
880    /// #     assert_eq!(stream.next().await.unwrap(), w);
881    /// # }
882    /// # }));
883    /// # }
884    /// ```
885    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
886    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
887        self.filter_if(signal.is_some())
888    }
889
890    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
891    ///
892    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
893    /// some local state.
894    ///
895    /// # Example
896    /// ```rust
897    /// # #[cfg(feature = "deploy")] {
898    /// # use hydro_lang::prelude::*;
899    /// # use futures::StreamExt;
900    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901    /// let tick = process.tick();
902    /// // ticks are lazy by default, forces the second tick to run
903    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
904    ///
905    /// let batch_first_tick = process
906    ///   .source_iter(q!(vec![1, 2, 3, 4]))
907    ///   .batch(&tick, nondet!(/** test */));
908    /// let batch_second_tick = process
909    ///   .source_iter(q!(vec![5, 6, 7, 8]))
910    ///   .batch(&tick, nondet!(/** test */))
911    ///   .defer_tick(); // appears on the second tick
912    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
913    /// batch_first_tick.chain(batch_second_tick)
914    ///   .filter_if_none(some_on_first_tick)
915    ///   .all_ticks()
916    /// # }, |mut stream| async move {
917    /// // [5, 6, 7, 8]
918    /// # for w in vec![5, 6, 7, 8] {
919    /// #     assert_eq!(stream.next().await.unwrap(), w);
920    /// # }
921    /// # }));
922    /// # }
923    /// ```
924    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
925    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
926        self.filter_if(other.is_none())
927    }
928
929    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
930    /// tupled pairs in a non-deterministic order.
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use std::collections::HashSet;
937    /// # use futures::StreamExt;
938    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
939    /// let tick = process.tick();
940    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
941    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
942    /// stream1.cross_product(stream2)
943    /// # }, |mut stream| async move {
944    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
945    /// # stream.map(|i| assert!(expected.contains(&i)));
946    /// # }));
947    /// # }
948    /// ```
949    pub fn cross_product<T2, O2: Ordering>(
950        self,
951        other: Stream<T2, L, B, O2, R>,
952    ) -> Stream<(T, T2), L, B, NoOrder, R>
953    where
954        T: Clone,
955        T2: Clone,
956    {
957        check_matching_location(&self.location, &other.location);
958
959        Stream::new(
960            self.location.clone(),
961            HydroNode::CrossProduct {
962                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
963                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
964                metadata: self
965                    .location
966                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
967            },
968        )
969    }
970
971    /// Takes one stream as input and filters out any duplicate occurrences. The output
972    /// contains all unique values from the input.
973    ///
974    /// # Example
975    /// ```rust
976    /// # #[cfg(feature = "deploy")] {
977    /// # use hydro_lang::prelude::*;
978    /// # use futures::StreamExt;
979    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980    /// let tick = process.tick();
981    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
982    /// # }, |mut stream| async move {
983    /// # for w in vec![1, 2, 3, 4] {
984    /// #     assert_eq!(stream.next().await.unwrap(), w);
985    /// # }
986    /// # }));
987    /// # }
988    /// ```
989    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
990    where
991        T: Eq + Hash,
992    {
993        Stream::new(
994            self.location.clone(),
995            HydroNode::Unique {
996                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
997                metadata: self
998                    .location
999                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1000            },
1001        )
1002    }
1003
1004    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1005    ///
1006    /// The `other` stream must be [`Bounded`], since this function will wait until
1007    /// all its elements are available before producing any output.
1008    /// # Example
1009    /// ```rust
1010    /// # #[cfg(feature = "deploy")] {
1011    /// # use hydro_lang::prelude::*;
1012    /// # use futures::StreamExt;
1013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014    /// let tick = process.tick();
1015    /// let stream = process
1016    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1017    ///   .batch(&tick, nondet!(/** test */));
1018    /// let batch = process
1019    ///   .source_iter(q!(vec![1, 2]))
1020    ///   .batch(&tick, nondet!(/** test */));
1021    /// stream.filter_not_in(batch).all_ticks()
1022    /// # }, |mut stream| async move {
1023    /// # for w in vec![3, 4] {
1024    /// #     assert_eq!(stream.next().await.unwrap(), w);
1025    /// # }
1026    /// # }));
1027    /// # }
1028    /// ```
1029    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1030    where
1031        T: Eq + Hash,
1032        B2: IsBounded,
1033    {
1034        check_matching_location(&self.location, &other.location);
1035
1036        Stream::new(
1037            self.location.clone(),
1038            HydroNode::Difference {
1039                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1040                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1041                metadata: self
1042                    .location
1043                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1044            },
1045        )
1046    }
1047
1048    /// An operator which allows you to "inspect" each element of a stream without
1049    /// modifying it. The closure `f` is called on a reference to each item. This is
1050    /// mainly useful for debugging, and should not be used to generate side-effects.
1051    ///
1052    /// # Example
1053    /// ```rust
1054    /// # #[cfg(feature = "deploy")] {
1055    /// # use hydro_lang::prelude::*;
1056    /// # use futures::StreamExt;
1057    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058    /// let nums = process.source_iter(q!(vec![1, 2]));
1059    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1060    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1061    /// # }, |mut stream| async move {
1062    /// # for w in vec![1, 2] {
1063    /// #     assert_eq!(stream.next().await.unwrap(), w);
1064    /// # }
1065    /// # }));
1066    /// # }
1067    /// ```
1068    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1069    where
1070        F: Fn(&T) + 'a,
1071    {
1072        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1073
1074        Stream::new(
1075            self.location.clone(),
1076            HydroNode::Inspect {
1077                f,
1078                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1079                metadata: self.location.new_node_metadata(Self::collection_kind()),
1080            },
1081        )
1082    }
1083
1084    /// Executes the provided closure for every element in this stream.
1085    ///
1086    /// Because the closure may have side effects, the stream must have deterministic order
1087    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1088    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1089    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1090    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1091    where
1092        O: IsOrdered,
1093        R: IsExactlyOnce,
1094    {
1095        let f = f.splice_fn1_ctx(&self.location).into();
1096        self.location
1097            .flow_state()
1098            .borrow_mut()
1099            .push_root(HydroRoot::ForEach {
1100                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1101                f,
1102                op_metadata: HydroIrOpMetadata::new(),
1103            });
1104    }
1105
1106    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1107    /// TCP socket to some other server. You should _not_ use this API for interacting with
1108    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1109    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1110    /// interaction with asynchronous sinks.
1111    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1112    where
1113        O: IsOrdered,
1114        R: IsExactlyOnce,
1115        S: 'a + futures::Sink<T> + Unpin,
1116    {
1117        self.location
1118            .flow_state()
1119            .borrow_mut()
1120            .push_root(HydroRoot::DestSink {
1121                sink: sink.splice_typed_ctx(&self.location).into(),
1122                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1123                op_metadata: HydroIrOpMetadata::new(),
1124            });
1125    }
1126
1127    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1128    ///
1129    /// # Example
1130    /// ```rust
1131    /// # #[cfg(feature = "deploy")] {
1132    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1133    /// # use futures::StreamExt;
1134    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1135    /// let tick = process.tick();
1136    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1137    /// numbers.enumerate()
1138    /// # }, |mut stream| async move {
1139    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1140    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1141    /// #     assert_eq!(stream.next().await.unwrap(), w);
1142    /// # }
1143    /// # }));
1144    /// # }
1145    /// ```
1146    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1147    where
1148        O: IsOrdered,
1149        R: IsExactlyOnce,
1150    {
1151        Stream::new(
1152            self.location.clone(),
1153            HydroNode::Enumerate {
1154                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1155                metadata: self.location.new_node_metadata(Stream::<
1156                    (usize, T),
1157                    L,
1158                    B,
1159                    TotalOrder,
1160                    ExactlyOnce,
1161                >::collection_kind()),
1162            },
1163        )
1164    }
1165
1166    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1167    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1168    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1169    ///
1170    /// Depending on the input stream guarantees, the closure may need to be commutative
1171    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1172    ///
1173    /// # Example
1174    /// ```rust
1175    /// # #[cfg(feature = "deploy")] {
1176    /// # use hydro_lang::prelude::*;
1177    /// # use futures::StreamExt;
1178    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1179    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1180    /// words
1181    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1182    ///     .into_stream()
1183    /// # }, |mut stream| async move {
1184    /// // "HELLOWORLD"
1185    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1186    /// # }));
1187    /// # }
1188    /// ```
1189    pub fn fold<A, I, F, C, Idemp>(
1190        self,
1191        init: impl IntoQuotedMut<'a, I, L>,
1192        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1193    ) -> Singleton<A, L, B>
1194    where
1195        I: Fn() -> A + 'a,
1196        F: Fn(&mut A, T),
1197        C: ValidCommutativityFor<O>,
1198        Idemp: ValidIdempotenceFor<R>,
1199    {
1200        let init = init.splice_fn0_ctx(&self.location).into();
1201        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1202        proof.register_proof(&comb);
1203
1204        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1205        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1206
1207        let core = HydroNode::Fold {
1208            init,
1209            acc: comb.into(),
1210            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1211            metadata: ordered_etc
1212                .location
1213                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1214        };
1215
1216        Singleton::new(ordered_etc.location.clone(), core)
1217    }
1218
1219    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1220    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1221    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1222    /// reference, so that it can be modified in place.
1223    ///
1224    /// Depending on the input stream guarantees, the closure may need to be commutative
1225    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1226    ///
1227    /// # Example
1228    /// ```rust
1229    /// # #[cfg(feature = "deploy")] {
1230    /// # use hydro_lang::prelude::*;
1231    /// # use futures::StreamExt;
1232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233    /// let bools = process.source_iter(q!(vec![false, true, false]));
1234    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1235    /// # }, |mut stream| async move {
1236    /// // true
1237    /// # assert_eq!(stream.next().await.unwrap(), true);
1238    /// # }));
1239    /// # }
1240    /// ```
1241    pub fn reduce<F, C, Idemp>(
1242        self,
1243        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1244    ) -> Optional<T, L, B>
1245    where
1246        F: Fn(&mut T, T) + 'a,
1247        C: ValidCommutativityFor<O>,
1248        Idemp: ValidIdempotenceFor<R>,
1249    {
1250        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1251        proof.register_proof(&f);
1252
1253        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1254        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1255
1256        let core = HydroNode::Reduce {
1257            f: f.into(),
1258            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1259            metadata: ordered_etc
1260                .location
1261                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1262        };
1263
1264        Optional::new(ordered_etc.location.clone(), core)
1265    }
1266
1267    /// Computes the maximum element in the stream as an [`Optional`], which
1268    /// will be empty until the first element in the input arrives.
1269    ///
1270    /// # Example
1271    /// ```rust
1272    /// # #[cfg(feature = "deploy")] {
1273    /// # use hydro_lang::prelude::*;
1274    /// # use futures::StreamExt;
1275    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1276    /// let tick = process.tick();
1277    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1278    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1279    /// batch.max().all_ticks()
1280    /// # }, |mut stream| async move {
1281    /// // 4
1282    /// # assert_eq!(stream.next().await.unwrap(), 4);
1283    /// # }));
1284    /// # }
1285    /// ```
1286    pub fn max(self) -> Optional<T, L, B>
1287    where
1288        T: Ord,
1289    {
1290        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1291            .assume_ordering_trusted_bounded::<TotalOrder>(
1292                nondet!(/** max is commutative, but order affects intermediates */),
1293            )
1294            .reduce(q!(|curr, new| {
1295                if new > *curr {
1296                    *curr = new;
1297                }
1298            }))
1299    }
1300
1301    /// Computes the minimum element in the stream as an [`Optional`], which
1302    /// will be empty until the first element in the input arrives.
1303    ///
1304    /// # Example
1305    /// ```rust
1306    /// # #[cfg(feature = "deploy")] {
1307    /// # use hydro_lang::prelude::*;
1308    /// # use futures::StreamExt;
1309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1310    /// let tick = process.tick();
1311    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1312    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1313    /// batch.min().all_ticks()
1314    /// # }, |mut stream| async move {
1315    /// // 1
1316    /// # assert_eq!(stream.next().await.unwrap(), 1);
1317    /// # }));
1318    /// # }
1319    /// ```
1320    pub fn min(self) -> Optional<T, L, B>
1321    where
1322        T: Ord,
1323    {
1324        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1325            .assume_ordering_trusted_bounded::<TotalOrder>(
1326                nondet!(/** max is commutative, but order affects intermediates */),
1327            )
1328            .reduce(q!(|curr, new| {
1329                if new < *curr {
1330                    *curr = new;
1331                }
1332            }))
1333    }
1334
1335    /// Computes the first element in the stream as an [`Optional`], which
1336    /// will be empty until the first element in the input arrives.
1337    ///
1338    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1339    /// re-ordering of elements may cause the first element to change.
1340    ///
1341    /// # Example
1342    /// ```rust
1343    /// # #[cfg(feature = "deploy")] {
1344    /// # use hydro_lang::prelude::*;
1345    /// # use futures::StreamExt;
1346    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1347    /// let tick = process.tick();
1348    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1349    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1350    /// batch.first().all_ticks()
1351    /// # }, |mut stream| async move {
1352    /// // 1
1353    /// # assert_eq!(stream.next().await.unwrap(), 1);
1354    /// # }));
1355    /// # }
1356    /// ```
1357    pub fn first(self) -> Optional<T, L, B>
1358    where
1359        O: IsOrdered,
1360    {
1361        self.make_totally_ordered()
1362            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1363            .reduce(q!(|_, _| {}))
1364    }
1365
1366    /// Computes the last element in the stream as an [`Optional`], which
1367    /// will be empty until an element in the input arrives.
1368    ///
1369    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1370    /// re-ordering of elements may cause the last element to change.
1371    ///
1372    /// # Example
1373    /// ```rust
1374    /// # #[cfg(feature = "deploy")] {
1375    /// # use hydro_lang::prelude::*;
1376    /// # use futures::StreamExt;
1377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378    /// let tick = process.tick();
1379    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1380    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1381    /// batch.last().all_ticks()
1382    /// # }, |mut stream| async move {
1383    /// // 4
1384    /// # assert_eq!(stream.next().await.unwrap(), 4);
1385    /// # }));
1386    /// # }
1387    /// ```
1388    pub fn last(self) -> Optional<T, L, B>
1389    where
1390        O: IsOrdered,
1391    {
1392        self.make_totally_ordered()
1393            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1394            .reduce(q!(|curr, new| *curr = new))
1395    }
1396
1397    /// Collects all the elements of this stream into a single [`Vec`] element.
1398    ///
1399    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1400    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1401    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1402    /// the vector at an arbitrary point in time.
1403    ///
1404    /// # Example
1405    /// ```rust
1406    /// # #[cfg(feature = "deploy")] {
1407    /// # use hydro_lang::prelude::*;
1408    /// # use futures::StreamExt;
1409    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1410    /// let tick = process.tick();
1411    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1412    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1413    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1414    /// # }, |mut stream| async move {
1415    /// // [ vec![1, 2, 3, 4] ]
1416    /// # for w in vec![vec![1, 2, 3, 4]] {
1417    /// #     assert_eq!(stream.next().await.unwrap(), w);
1418    /// # }
1419    /// # }));
1420    /// # }
1421    /// ```
1422    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1423    where
1424        O: IsOrdered,
1425        R: IsExactlyOnce,
1426    {
1427        self.make_totally_ordered().make_exactly_once().fold(
1428            q!(|| vec![]),
1429            q!(|acc, v| {
1430                acc.push(v);
1431            }),
1432        )
1433    }
1434
1435    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1436    /// and emitting each intermediate result.
1437    ///
1438    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1439    /// containing all intermediate accumulated values. The scan operation can also terminate early
1440    /// by returning `None`.
1441    ///
1442    /// The function takes a mutable reference to the accumulator and the current element, and returns
1443    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1444    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1445    ///
1446    /// # Examples
1447    ///
1448    /// Basic usage - running sum:
1449    /// ```rust
1450    /// # #[cfg(feature = "deploy")] {
1451    /// # use hydro_lang::prelude::*;
1452    /// # use futures::StreamExt;
1453    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1454    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1455    ///     q!(|| 0),
1456    ///     q!(|acc, x| {
1457    ///         *acc += x;
1458    ///         Some(*acc)
1459    ///     }),
1460    /// )
1461    /// # }, |mut stream| async move {
1462    /// // Output: 1, 3, 6, 10
1463    /// # for w in vec![1, 3, 6, 10] {
1464    /// #     assert_eq!(stream.next().await.unwrap(), w);
1465    /// # }
1466    /// # }));
1467    /// # }
1468    /// ```
1469    ///
1470    /// Early termination example:
1471    /// ```rust
1472    /// # #[cfg(feature = "deploy")] {
1473    /// # use hydro_lang::prelude::*;
1474    /// # use futures::StreamExt;
1475    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1476    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1477    ///     q!(|| 1),
1478    ///     q!(|state, x| {
1479    ///         *state = *state * x;
1480    ///         if *state > 6 {
1481    ///             None // Terminate the stream
1482    ///         } else {
1483    ///             Some(-*state)
1484    ///         }
1485    ///     }),
1486    /// )
1487    /// # }, |mut stream| async move {
1488    /// // Output: -1, -2, -6
1489    /// # for w in vec![-1, -2, -6] {
1490    /// #     assert_eq!(stream.next().await.unwrap(), w);
1491    /// # }
1492    /// # }));
1493    /// # }
1494    /// ```
1495    pub fn scan<A, U, I, F>(
1496        self,
1497        init: impl IntoQuotedMut<'a, I, L>,
1498        f: impl IntoQuotedMut<'a, F, L>,
1499    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1500    where
1501        O: IsOrdered,
1502        R: IsExactlyOnce,
1503        I: Fn() -> A + 'a,
1504        F: Fn(&mut A, T) -> Option<U> + 'a,
1505    {
1506        let init = init.splice_fn0_ctx(&self.location).into();
1507        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1508
1509        Stream::new(
1510            self.location.clone(),
1511            HydroNode::Scan {
1512                init,
1513                acc: f,
1514                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1515                metadata: self.location.new_node_metadata(
1516                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1517                ),
1518            },
1519        )
1520    }
1521
1522    /// Iteratively processes the elements of the stream using a state machine that can yield
1523    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1524    /// syntax in Rust, without requiring special syntax.
1525    ///
1526    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1527    /// state. The second argument defines the processing logic, taking in a mutable reference
1528    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1529    /// variants define what is emitted and whether further inputs should be processed.
1530    ///
1531    /// # Example
1532    /// ```rust
1533    /// # #[cfg(feature = "deploy")] {
1534    /// # use hydro_lang::prelude::*;
1535    /// # use futures::StreamExt;
1536    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1537    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1538    ///     q!(|| 0),
1539    ///     q!(|acc, x| {
1540    ///         *acc += x;
1541    ///         if *acc > 100 {
1542    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1543    ///         } else if *acc % 2 == 0 {
1544    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1545    ///         } else {
1546    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1547    ///         }
1548    ///     }),
1549    /// )
1550    /// # }, |mut stream| async move {
1551    /// // Output: "even", "done!"
1552    /// # let mut results = Vec::new();
1553    /// # for _ in 0..2 {
1554    /// #     results.push(stream.next().await.unwrap());
1555    /// # }
1556    /// # results.sort();
1557    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1558    /// # }));
1559    /// # }
1560    /// ```
1561    pub fn generator<A, U, I, F>(
1562        self,
1563        init: impl IntoQuotedMut<'a, I, L> + Copy,
1564        f: impl IntoQuotedMut<'a, F, L> + Copy,
1565    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1566    where
1567        O: IsOrdered,
1568        R: IsExactlyOnce,
1569        I: Fn() -> A + 'a,
1570        F: Fn(&mut A, T) -> Generate<U> + 'a,
1571    {
1572        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1573        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1574
1575        let this = self.make_totally_ordered().make_exactly_once();
1576
1577        // State is Option<Option<A>>:
1578        //   None = not yet initialized
1579        //   Some(Some(a)) = active with state a
1580        //   Some(None) = terminated
1581        let scan_init = q!(|| None)
1582            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1583            .into();
1584        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1585            if state.is_none() {
1586                *state = Some(Some(init()));
1587            }
1588            match state {
1589                Some(Some(state_value)) => match f(state_value, v) {
1590                    Generate::Yield(out) => Some(Some(out)),
1591                    Generate::Return(out) => {
1592                        *state = Some(None);
1593                        Some(Some(out))
1594                    }
1595                    // Unlike KeyedStream, we can terminate the scan directly on
1596                    // Break/Return because there is only one state (no other keys
1597                    // that still need processing).
1598                    Generate::Break => None,
1599                    Generate::Continue => Some(None),
1600                },
1601                // State is Some(None) after Return; terminate the scan.
1602                _ => None,
1603            }
1604        })
1605        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1606        .into();
1607
1608        let scan_node = HydroNode::Scan {
1609            init: scan_init,
1610            acc: scan_f,
1611            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1612            metadata: this.location.new_node_metadata(Stream::<
1613                Option<U>,
1614                L,
1615                B,
1616                TotalOrder,
1617                ExactlyOnce,
1618            >::collection_kind()),
1619        };
1620
1621        let flatten_f = q!(|d| d)
1622            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1623            .into();
1624        let flatten_node = HydroNode::FlatMap {
1625            f: flatten_f,
1626            input: Box::new(scan_node),
1627            metadata: this
1628                .location
1629                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1630        };
1631
1632        Stream::new(this.location.clone(), flatten_node)
1633    }
1634
1635    /// Given a time interval, returns a stream corresponding to samples taken from the
1636    /// stream roughly at that interval. The output will have elements in the same order
1637    /// as the input, but with arbitrary elements skipped between samples. There is also
1638    /// no guarantee on the exact timing of the samples.
1639    ///
1640    /// # Non-Determinism
1641    /// The output stream is non-deterministic in which elements are sampled, since this
1642    /// is controlled by a clock.
1643    pub fn sample_every(
1644        self,
1645        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1646        nondet: NonDet,
1647    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1648    where
1649        L: NoTick + NoAtomic,
1650    {
1651        let samples = self.location.source_interval(interval, nondet);
1652
1653        let tick = self.location.tick();
1654        self.batch(&tick, nondet)
1655            .filter_if(samples.batch(&tick, nondet).first().is_some())
1656            .all_ticks()
1657            .weaken_retries()
1658    }
1659
1660    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1661    /// stream has not emitted a value since that duration.
1662    ///
1663    /// # Non-Determinism
1664    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1665    /// samples take place, timeouts may be non-deterministically generated or missed,
1666    /// and the notification of the timeout may be delayed as well. There is also no
1667    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1668    /// detected based on when the next sample is taken.
1669    pub fn timeout(
1670        self,
1671        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1672        nondet: NonDet,
1673    ) -> Optional<(), L, Unbounded>
1674    where
1675        L: NoTick + NoAtomic,
1676    {
1677        let tick = self.location.tick();
1678
1679        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1680            q!(|| None),
1681            q!(
1682                |latest, _| {
1683                    *latest = Some(Instant::now());
1684                },
1685                commutative = manual_proof!(/** TODO */)
1686            ),
1687        );
1688
1689        latest_received
1690            .snapshot(&tick, nondet)
1691            .filter_map(q!(move |latest_received| {
1692                if let Some(latest_received) = latest_received {
1693                    if Instant::now().duration_since(latest_received) > duration {
1694                        Some(())
1695                    } else {
1696                        None
1697                    }
1698                } else {
1699                    Some(())
1700                }
1701            }))
1702            .latest()
1703    }
1704
1705    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1706    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1707    ///
1708    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1709    /// processed before an acknowledgement is emitted.
1710    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1711        let id = self.location.flow_state().borrow_mut().next_clock_id();
1712        let out_location = Atomic {
1713            tick: Tick {
1714                id,
1715                l: self.location.clone(),
1716            },
1717        };
1718        Stream::new(
1719            out_location.clone(),
1720            HydroNode::BeginAtomic {
1721                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1722                metadata: out_location
1723                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1724            },
1725        )
1726    }
1727
1728    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1729    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1730    /// the order of the input. The output stream will execute in the [`Tick`] that was
1731    /// used to create the atomic section.
1732    ///
1733    /// # Non-Determinism
1734    /// The batch boundaries are non-deterministic and may change across executions.
1735    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1736        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1737        Stream::new(
1738            tick.clone(),
1739            HydroNode::Batch {
1740                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1741                metadata: tick
1742                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1743            },
1744        )
1745    }
1746
1747    /// An operator which allows you to "name" a `HydroNode`.
1748    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1749    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1750        {
1751            let mut node = self.ir_node.borrow_mut();
1752            let metadata = node.metadata_mut();
1753            metadata.tag = Some(name.to_owned());
1754        }
1755        self
1756    }
1757
1758    /// Explicitly "casts" the stream to a type with a different ordering
1759    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1760    /// by the type-system.
1761    ///
1762    /// # Non-Determinism
1763    /// This function is used as an escape hatch, and any mistakes in the
1764    /// provided ordering guarantee will propagate into the guarantees
1765    /// for the rest of the program.
1766    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1767        if O::ORDERING_KIND == O2::ORDERING_KIND {
1768            Stream::new(
1769                self.location.clone(),
1770                self.ir_node.replace(HydroNode::Placeholder),
1771            )
1772        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1773            // We can always weaken the ordering guarantee
1774            Stream::new(
1775                self.location.clone(),
1776                HydroNode::Cast {
1777                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1778                    metadata: self
1779                        .location
1780                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1781                },
1782            )
1783        } else {
1784            Stream::new(
1785                self.location.clone(),
1786                HydroNode::ObserveNonDet {
1787                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1788                    trusted: false,
1789                    metadata: self
1790                        .location
1791                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1792                },
1793            )
1794        }
1795    }
1796
1797    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1798    // intermediate states will not be revealed
1799    fn assume_ordering_trusted_bounded<O2: Ordering>(
1800        self,
1801        nondet: NonDet,
1802    ) -> Stream<T, L, B, O2, R> {
1803        if B::BOUNDED {
1804            self.assume_ordering_trusted(nondet)
1805        } else {
1806            self.assume_ordering(nondet)
1807        }
1808    }
1809
1810    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1811    // is not observable
1812    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1813        self,
1814        _nondet: NonDet,
1815    ) -> Stream<T, L, B, O2, R> {
1816        if O::ORDERING_KIND == O2::ORDERING_KIND {
1817            Stream::new(
1818                self.location.clone(),
1819                self.ir_node.replace(HydroNode::Placeholder),
1820            )
1821        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1822            // We can always weaken the ordering guarantee
1823            Stream::new(
1824                self.location.clone(),
1825                HydroNode::Cast {
1826                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1827                    metadata: self
1828                        .location
1829                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1830                },
1831            )
1832        } else {
1833            Stream::new(
1834                self.location.clone(),
1835                HydroNode::ObserveNonDet {
1836                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1837                    trusted: true,
1838                    metadata: self
1839                        .location
1840                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1841                },
1842            )
1843        }
1844    }
1845
1846    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1847    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1848    /// which is always safe because that is the weakest possible guarantee.
1849    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1850        self.weaken_ordering::<NoOrder>()
1851    }
1852
1853    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1854    /// enforcing that `O2` is weaker than the input ordering guarantee.
1855    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1856        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1857        self.assume_ordering::<O2>(nondet)
1858    }
1859
1860    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1861    /// implies that `O == TotalOrder`.
1862    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1863    where
1864        O: IsOrdered,
1865    {
1866        self.assume_ordering(nondet!(/** no-op */))
1867    }
1868
1869    /// Explicitly "casts" the stream to a type with a different retries
1870    /// guarantee. Useful in unsafe code where the lack of retries cannot
1871    /// be proven by the type-system.
1872    ///
1873    /// # Non-Determinism
1874    /// This function is used as an escape hatch, and any mistakes in the
1875    /// provided retries guarantee will propagate into the guarantees
1876    /// for the rest of the program.
1877    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1878        if R::RETRIES_KIND == R2::RETRIES_KIND {
1879            Stream::new(
1880                self.location.clone(),
1881                self.ir_node.replace(HydroNode::Placeholder),
1882            )
1883        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1884            // We can always weaken the retries guarantee
1885            Stream::new(
1886                self.location.clone(),
1887                HydroNode::Cast {
1888                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1889                    metadata: self
1890                        .location
1891                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1892                },
1893            )
1894        } else {
1895            Stream::new(
1896                self.location.clone(),
1897                HydroNode::ObserveNonDet {
1898                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1899                    trusted: false,
1900                    metadata: self
1901                        .location
1902                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1903                },
1904            )
1905        }
1906    }
1907
1908    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1909    // is not observable
1910    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1911        if R::RETRIES_KIND == R2::RETRIES_KIND {
1912            Stream::new(
1913                self.location.clone(),
1914                self.ir_node.replace(HydroNode::Placeholder),
1915            )
1916        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1917            // We can always weaken the retries guarantee
1918            Stream::new(
1919                self.location.clone(),
1920                HydroNode::Cast {
1921                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1922                    metadata: self
1923                        .location
1924                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1925                },
1926            )
1927        } else {
1928            Stream::new(
1929                self.location.clone(),
1930                HydroNode::ObserveNonDet {
1931                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1932                    trusted: true,
1933                    metadata: self
1934                        .location
1935                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1936                },
1937            )
1938        }
1939    }
1940
1941    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1942    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1943    /// which is always safe because that is the weakest possible guarantee.
1944    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1945        self.weaken_retries::<AtLeastOnce>()
1946    }
1947
1948    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1949    /// enforcing that `R2` is weaker than the input retries guarantee.
1950    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1951        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1952        self.assume_retries::<R2>(nondet)
1953    }
1954
1955    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1956    /// implies that `R == ExactlyOnce`.
1957    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1958    where
1959        R: IsExactlyOnce,
1960    {
1961        self.assume_retries(nondet!(/** no-op */))
1962    }
1963
1964    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1965    /// implies that `B == Bounded`.
1966    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1967    where
1968        B: IsBounded,
1969    {
1970        Stream::new(
1971            self.location.clone(),
1972            self.ir_node.replace(HydroNode::Placeholder),
1973        )
1974    }
1975}
1976
1977impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1978where
1979    L: Location<'a>,
1980{
1981    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1982    ///
1983    /// # Example
1984    /// ```rust
1985    /// # #[cfg(feature = "deploy")] {
1986    /// # use hydro_lang::prelude::*;
1987    /// # use futures::StreamExt;
1988    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1989    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1990    /// # }, |mut stream| async move {
1991    /// // 1, 2, 3
1992    /// # for w in vec![1, 2, 3] {
1993    /// #     assert_eq!(stream.next().await.unwrap(), w);
1994    /// # }
1995    /// # }));
1996    /// # }
1997    /// ```
1998    pub fn cloned(self) -> Stream<T, L, B, O, R>
1999    where
2000        T: Clone,
2001    {
2002        self.map(q!(|d| d.clone()))
2003    }
2004}
2005
2006impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2007where
2008    L: Location<'a>,
2009{
2010    /// Computes the number of elements in the stream as a [`Singleton`].
2011    ///
2012    /// # Example
2013    /// ```rust
2014    /// # #[cfg(feature = "deploy")] {
2015    /// # use hydro_lang::prelude::*;
2016    /// # use futures::StreamExt;
2017    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2018    /// let tick = process.tick();
2019    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2020    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2021    /// batch.count().all_ticks()
2022    /// # }, |mut stream| async move {
2023    /// // 4
2024    /// # assert_eq!(stream.next().await.unwrap(), 4);
2025    /// # }));
2026    /// # }
2027    /// ```
2028    pub fn count(self) -> Singleton<usize, L, B> {
2029        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2030            /// Order does not affect eventual count, and also does not affect intermediate states.
2031        ))
2032        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
2033    }
2034}
2035
2036impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2037    /// Produces a new stream that merges the elements of the two input streams.
2038    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2039    ///
2040    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2041    /// [`Bounded`], you can use [`Stream::chain`] instead.
2042    ///
2043    /// # Example
2044    /// ```rust
2045    /// # #[cfg(feature = "deploy")] {
2046    /// # use hydro_lang::prelude::*;
2047    /// # use futures::StreamExt;
2048    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2049    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2050    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2051    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2052    /// # }, |mut stream| async move {
2053    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2054    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2055    /// #     assert_eq!(stream.next().await.unwrap(), w);
2056    /// # }
2057    /// # }));
2058    /// # }
2059    /// ```
2060    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2061        self,
2062        other: Stream<T, L, Unbounded, O2, R2>,
2063    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2064    where
2065        R: MinRetries<R2>,
2066    {
2067        Stream::new(
2068            self.location.clone(),
2069            HydroNode::Chain {
2070                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2071                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2072                metadata: self.location.new_node_metadata(Stream::<
2073                    T,
2074                    L,
2075                    Unbounded,
2076                    NoOrder,
2077                    <R as MinRetries<R2>>::Min,
2078                >::collection_kind()),
2079            },
2080        )
2081    }
2082
2083    /// Deprecated: use [`Stream::merge_unordered`] instead.
2084    #[deprecated(note = "use `merge_unordered` instead")]
2085    pub fn interleave<O2: Ordering, R2: Retries>(
2086        self,
2087        other: Stream<T, L, Unbounded, O2, R2>,
2088    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2089    where
2090        R: MinRetries<R2>,
2091    {
2092        self.merge_unordered(other)
2093    }
2094}
2095
2096impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2097    /// Produces a new stream that combines the elements of the two input streams,
2098    /// preserving the relative order of elements within each input.
2099    ///
2100    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2101    /// [`Bounded`], you can use [`Stream::chain`] instead.
2102    ///
2103    /// # Non-Determinism
2104    /// The order in which elements *across* the two streams will be interleaved is
2105    /// non-deterministic, so the order of elements will vary across runs. If the output order
2106    /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2107    /// unordered stream.
2108    ///
2109    /// # Example
2110    /// ```rust
2111    /// # #[cfg(feature = "deploy")] {
2112    /// # use hydro_lang::prelude::*;
2113    /// # use futures::StreamExt;
2114    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2115    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2116    /// # process.source_iter(q!(vec![1, 3])).into();
2117    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2118    /// # }, |mut stream| async move {
2119    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2120    /// # for w in vec![1, 3, 2, 4] {
2121    /// #     assert_eq!(stream.next().await.unwrap(), w);
2122    /// # }
2123    /// # }));
2124    /// # }
2125    /// ```
2126    pub fn merge_ordered<R2: Retries>(
2127        self,
2128        other: Stream<T, L, Unbounded, TotalOrder, R2>,
2129        _nondet: NonDet,
2130    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2131    where
2132        R: MinRetries<R2>,
2133    {
2134        Stream::new(
2135            self.location.clone(),
2136            HydroNode::Chain {
2137                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2138                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2139                metadata: self.location.new_node_metadata(Stream::<
2140                    T,
2141                    L,
2142                    Unbounded,
2143                    TotalOrder,
2144                    <R as MinRetries<R2>>::Min,
2145                >::collection_kind()),
2146            },
2147        )
2148    }
2149}
2150
2151impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2152where
2153    L: Location<'a>,
2154{
2155    /// Produces a new stream that emits the input elements in sorted order.
2156    ///
2157    /// The input stream can have any ordering guarantee, but the output stream
2158    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2159    /// elements in the input stream are available, so it requires the input stream
2160    /// to be [`Bounded`].
2161    ///
2162    /// # Example
2163    /// ```rust
2164    /// # #[cfg(feature = "deploy")] {
2165    /// # use hydro_lang::prelude::*;
2166    /// # use futures::StreamExt;
2167    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2168    /// let tick = process.tick();
2169    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2170    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2171    /// batch.sort().all_ticks()
2172    /// # }, |mut stream| async move {
2173    /// // 1, 2, 3, 4
2174    /// # for w in (1..5) {
2175    /// #     assert_eq!(stream.next().await.unwrap(), w);
2176    /// # }
2177    /// # }));
2178    /// # }
2179    /// ```
2180    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2181    where
2182        B: IsBounded,
2183        T: Ord,
2184    {
2185        let this = self.make_bounded();
2186        Stream::new(
2187            this.location.clone(),
2188            HydroNode::Sort {
2189                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2190                metadata: this
2191                    .location
2192                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2193            },
2194        )
2195    }
2196
2197    /// Produces a new stream that first emits the elements of the `self` stream,
2198    /// and then emits the elements of the `other` stream. The output stream has
2199    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2200    /// [`TotalOrder`] guarantee.
2201    ///
2202    /// Currently, both input streams must be [`Bounded`]. This operator will block
2203    /// on the first stream until all its elements are available. In a future version,
2204    /// we will relax the requirement on the `other` stream.
2205    ///
2206    /// # Example
2207    /// ```rust
2208    /// # #[cfg(feature = "deploy")] {
2209    /// # use hydro_lang::prelude::*;
2210    /// # use futures::StreamExt;
2211    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2212    /// let tick = process.tick();
2213    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2214    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2215    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2216    /// # }, |mut stream| async move {
2217    /// // 2, 3, 4, 5, 1, 2, 3, 4
2218    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2219    /// #     assert_eq!(stream.next().await.unwrap(), w);
2220    /// # }
2221    /// # }));
2222    /// # }
2223    /// ```
2224    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2225        self,
2226        other: Stream<T, L, B2, O2, R2>,
2227    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2228    where
2229        B: IsBounded,
2230        O: MinOrder<O2>,
2231        R: MinRetries<R2>,
2232    {
2233        check_matching_location(&self.location, &other.location);
2234
2235        Stream::new(
2236            self.location.clone(),
2237            HydroNode::Chain {
2238                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2239                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2240                metadata: self.location.new_node_metadata(Stream::<
2241                    T,
2242                    L,
2243                    B2,
2244                    <O as MinOrder<O2>>::Min,
2245                    <R as MinRetries<R2>>::Min,
2246                >::collection_kind()),
2247            },
2248        )
2249    }
2250
2251    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2252    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2253    /// because this is compiled into a nested loop.
2254    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2255        self,
2256        other: Stream<T2, L, Bounded, O2, R>,
2257    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2258    where
2259        B: IsBounded,
2260        T: Clone,
2261        T2: Clone,
2262    {
2263        let this = self.make_bounded();
2264        check_matching_location(&this.location, &other.location);
2265
2266        Stream::new(
2267            this.location.clone(),
2268            HydroNode::CrossProduct {
2269                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2270                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2271                metadata: this.location.new_node_metadata(Stream::<
2272                    (T, T2),
2273                    L,
2274                    Bounded,
2275                    <O2 as MinOrder<O>>::Min,
2276                    R,
2277                >::collection_kind()),
2278            },
2279        )
2280    }
2281
2282    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2283    /// `self` used as the values for *each* key.
2284    ///
2285    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2286    /// values. For example, it can be used to send the same set of elements to several cluster
2287    /// members, if the membership information is available as a [`KeyedSingleton`].
2288    ///
2289    /// # Example
2290    /// ```rust
2291    /// # #[cfg(feature = "deploy")] {
2292    /// # use hydro_lang::prelude::*;
2293    /// # use futures::StreamExt;
2294    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2295    /// # let tick = process.tick();
2296    /// let keyed_singleton = // { 1: (), 2: () }
2297    /// # process
2298    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2299    /// #     .into_keyed()
2300    /// #     .batch(&tick, nondet!(/** test */))
2301    /// #     .first();
2302    /// let stream = // [ "a", "b" ]
2303    /// # process
2304    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2305    /// #     .batch(&tick, nondet!(/** test */));
2306    /// stream.repeat_with_keys(keyed_singleton)
2307    /// # .entries().all_ticks()
2308    /// # }, |mut stream| async move {
2309    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2310    /// # let mut results = Vec::new();
2311    /// # for _ in 0..4 {
2312    /// #     results.push(stream.next().await.unwrap());
2313    /// # }
2314    /// # results.sort();
2315    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2316    /// # }));
2317    /// # }
2318    /// ```
2319    pub fn repeat_with_keys<K, V2>(
2320        self,
2321        keys: KeyedSingleton<K, V2, L, Bounded>,
2322    ) -> KeyedStream<K, T, L, Bounded, O, R>
2323    where
2324        B: IsBounded,
2325        K: Clone,
2326        T: Clone,
2327    {
2328        keys.keys()
2329            .weaken_retries()
2330            .assume_ordering_trusted::<TotalOrder>(
2331                nondet!(/** keyed stream does not depend on ordering of keys */),
2332            )
2333            .cross_product_nested_loop(self.make_bounded())
2334            .into_keyed()
2335    }
2336
2337    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2338    /// execution until all results are available. The output order is based on when futures
2339    /// complete, and may be different than the input order.
2340    ///
2341    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2342    /// while futures are pending, this variant blocks until the futures resolve.
2343    ///
2344    /// # Example
2345    /// ```rust
2346    /// # #[cfg(feature = "deploy")] {
2347    /// # use std::collections::HashSet;
2348    /// # use futures::StreamExt;
2349    /// # use hydro_lang::prelude::*;
2350    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2351    /// process
2352    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2353    ///     .map(q!(|x| async move {
2354    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2355    ///         x
2356    ///     }))
2357    ///     .resolve_futures_blocking()
2358    /// #   },
2359    /// #   |mut stream| async move {
2360    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2361    /// #       let mut output = HashSet::new();
2362    /// #       for _ in 1..10 {
2363    /// #           output.insert(stream.next().await.unwrap());
2364    /// #       }
2365    /// #       assert_eq!(
2366    /// #           output,
2367    /// #           HashSet::<i32>::from_iter(1..10)
2368    /// #       );
2369    /// #   },
2370    /// # ));
2371    /// # }
2372    /// ```
2373    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2374    where
2375        T: Future,
2376    {
2377        Stream::new(
2378            self.location.clone(),
2379            HydroNode::ResolveFuturesBlocking {
2380                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2381                metadata: self
2382                    .location
2383                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2384            },
2385        )
2386    }
2387}
2388
2389impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2390where
2391    L: Location<'a>,
2392{
2393    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2394    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2395    /// by equi-joining the two streams on the key attribute `K`.
2396    ///
2397    /// # Example
2398    /// ```rust
2399    /// # #[cfg(feature = "deploy")] {
2400    /// # use hydro_lang::prelude::*;
2401    /// # use std::collections::HashSet;
2402    /// # use futures::StreamExt;
2403    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2404    /// let tick = process.tick();
2405    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2406    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2407    /// stream1.join(stream2)
2408    /// # }, |mut stream| async move {
2409    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2410    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2411    /// # stream.map(|i| assert!(expected.contains(&i)));
2412    /// # }));
2413    /// # }
2414    pub fn join<V2, O2: Ordering, R2: Retries>(
2415        self,
2416        n: Stream<(K, V2), L, B, O2, R2>,
2417    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2418    where
2419        K: Eq + Hash,
2420        R: MinRetries<R2>,
2421    {
2422        check_matching_location(&self.location, &n.location);
2423
2424        Stream::new(
2425            self.location.clone(),
2426            HydroNode::Join {
2427                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2428                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2429                metadata: self.location.new_node_metadata(Stream::<
2430                    (K, (V1, V2)),
2431                    L,
2432                    B,
2433                    NoOrder,
2434                    <R as MinRetries<R2>>::Min,
2435                >::collection_kind()),
2436            },
2437        )
2438    }
2439
2440    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2441    /// computes the anti-join of the items in the input -- i.e. returns
2442    /// unique items in the first input that do not have a matching key
2443    /// in the second input.
2444    ///
2445    /// # Example
2446    /// ```rust
2447    /// # #[cfg(feature = "deploy")] {
2448    /// # use hydro_lang::prelude::*;
2449    /// # use futures::StreamExt;
2450    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2451    /// let tick = process.tick();
2452    /// let stream = process
2453    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2454    ///   .batch(&tick, nondet!(/** test */));
2455    /// let batch = process
2456    ///   .source_iter(q!(vec![1, 2]))
2457    ///   .batch(&tick, nondet!(/** test */));
2458    /// stream.anti_join(batch).all_ticks()
2459    /// # }, |mut stream| async move {
2460    /// # for w in vec![(3, 'c'), (4, 'd')] {
2461    /// #     assert_eq!(stream.next().await.unwrap(), w);
2462    /// # }
2463    /// # }));
2464    /// # }
2465    pub fn anti_join<O2: Ordering, R2: Retries>(
2466        self,
2467        n: Stream<K, L, Bounded, O2, R2>,
2468    ) -> Stream<(K, V1), L, B, O, R>
2469    where
2470        K: Eq + Hash,
2471    {
2472        check_matching_location(&self.location, &n.location);
2473
2474        Stream::new(
2475            self.location.clone(),
2476            HydroNode::AntiJoin {
2477                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2478                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2479                metadata: self
2480                    .location
2481                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2482            },
2483        )
2484    }
2485}
2486
2487impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2488    Stream<(K, V), L, B, O, R>
2489{
2490    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2491    /// is used as the key and the second element is added to the entries associated with that key.
2492    ///
2493    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2494    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2495    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2496    /// total ordering _within_ each group but no ordering _across_ groups.
2497    ///
2498    /// # Example
2499    /// ```rust
2500    /// # #[cfg(feature = "deploy")] {
2501    /// # use hydro_lang::prelude::*;
2502    /// # use futures::StreamExt;
2503    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2504    /// process
2505    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2506    ///     .into_keyed()
2507    /// #   .entries()
2508    /// # }, |mut stream| async move {
2509    /// // { 1: [2, 3], 2: [4] }
2510    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2511    /// #     assert_eq!(stream.next().await.unwrap(), w);
2512    /// # }
2513    /// # }));
2514    /// # }
2515    /// ```
2516    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2517        KeyedStream::new(
2518            self.location.clone(),
2519            HydroNode::Cast {
2520                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2521                metadata: self
2522                    .location
2523                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2524            },
2525        )
2526    }
2527}
2528
2529impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2530where
2531    K: Eq + Hash,
2532    L: Location<'a>,
2533{
2534    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2535    /// # Example
2536    /// ```rust
2537    /// # #[cfg(feature = "deploy")] {
2538    /// # use hydro_lang::prelude::*;
2539    /// # use futures::StreamExt;
2540    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2541    /// let tick = process.tick();
2542    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2543    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2544    /// batch.keys().all_ticks()
2545    /// # }, |mut stream| async move {
2546    /// // 1, 2
2547    /// # assert_eq!(stream.next().await.unwrap(), 1);
2548    /// # assert_eq!(stream.next().await.unwrap(), 2);
2549    /// # }));
2550    /// # }
2551    /// ```
2552    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2553        self.into_keyed()
2554            .fold(
2555                q!(|| ()),
2556                q!(
2557                    |_, _| {},
2558                    commutative = manual_proof!(/** values are ignored */),
2559                    idempotent = manual_proof!(/** values are ignored */)
2560                ),
2561            )
2562            .keys()
2563    }
2564}
2565
2566impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2567where
2568    L: Location<'a> + NoTick,
2569{
2570    /// Returns a stream corresponding to the latest batch of elements being atomically
2571    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2572    /// the order of the input.
2573    ///
2574    /// # Non-Determinism
2575    /// The batch boundaries are non-deterministic and may change across executions.
2576    pub fn batch_atomic(
2577        self,
2578        tick: &Tick<L>,
2579        _nondet: NonDet,
2580    ) -> Stream<T, Tick<L>, Bounded, O, R> {
2581        Stream::new(
2582            tick.clone(),
2583            HydroNode::Batch {
2584                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2585                metadata: tick
2586                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2587            },
2588        )
2589    }
2590
2591    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2592    /// See [`Stream::atomic`] for more details.
2593    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2594        Stream::new(
2595            self.location.tick.l.clone(),
2596            HydroNode::EndAtomic {
2597                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2598                metadata: self
2599                    .location
2600                    .tick
2601                    .l
2602                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2603            },
2604        )
2605    }
2606}
2607
2608impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2609where
2610    L: Location<'a> + NoTick + NoAtomic,
2611    F: Future<Output = T>,
2612{
2613    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2614    /// Future outputs are produced as available, regardless of input arrival order.
2615    ///
2616    /// # Example
2617    /// ```rust
2618    /// # #[cfg(feature = "deploy")] {
2619    /// # use std::collections::HashSet;
2620    /// # use futures::StreamExt;
2621    /// # use hydro_lang::prelude::*;
2622    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2623    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2624    ///     .map(q!(|x| async move {
2625    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2626    ///         x
2627    ///     }))
2628    ///     .resolve_futures()
2629    /// #   },
2630    /// #   |mut stream| async move {
2631    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2632    /// #       let mut output = HashSet::new();
2633    /// #       for _ in 1..10 {
2634    /// #           output.insert(stream.next().await.unwrap());
2635    /// #       }
2636    /// #       assert_eq!(
2637    /// #           output,
2638    /// #           HashSet::<i32>::from_iter(1..10)
2639    /// #       );
2640    /// #   },
2641    /// # ));
2642    /// # }
2643    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2644        Stream::new(
2645            self.location.clone(),
2646            HydroNode::ResolveFutures {
2647                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2648                metadata: self
2649                    .location
2650                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2651            },
2652        )
2653    }
2654
2655    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2656    /// Future outputs are produced in the same order as the input stream.
2657    ///
2658    /// # Example
2659    /// ```rust
2660    /// # #[cfg(feature = "deploy")] {
2661    /// # use std::collections::HashSet;
2662    /// # use futures::StreamExt;
2663    /// # use hydro_lang::prelude::*;
2664    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2665    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2666    ///     .map(q!(|x| async move {
2667    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2668    ///         x
2669    ///     }))
2670    ///     .resolve_futures_ordered()
2671    /// #   },
2672    /// #   |mut stream| async move {
2673    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2674    /// #       let mut output = Vec::new();
2675    /// #       for _ in 1..10 {
2676    /// #           output.push(stream.next().await.unwrap());
2677    /// #       }
2678    /// #       assert_eq!(
2679    /// #           output,
2680    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2681    /// #       );
2682    /// #   },
2683    /// # ));
2684    /// # }
2685    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2686        Stream::new(
2687            self.location.clone(),
2688            HydroNode::ResolveFuturesOrdered {
2689                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2690                metadata: self
2691                    .location
2692                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2693            },
2694        )
2695    }
2696}
2697
2698impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2699where
2700    L: Location<'a>,
2701{
2702    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2703    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2704    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2705        Stream::new(
2706            self.location.outer().clone(),
2707            HydroNode::YieldConcat {
2708                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2709                metadata: self
2710                    .location
2711                    .outer()
2712                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2713            },
2714        )
2715    }
2716
2717    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2718    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2719    ///
2720    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2721    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2722    /// stream's [`Tick`] context.
2723    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2724        let out_location = Atomic {
2725            tick: self.location.clone(),
2726        };
2727
2728        Stream::new(
2729            out_location.clone(),
2730            HydroNode::YieldConcat {
2731                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2732                metadata: out_location
2733                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2734            },
2735        )
2736    }
2737
2738    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2739    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2740    /// input.
2741    ///
2742    /// This API is particularly useful for stateful computation on batches of data, such as
2743    /// maintaining an accumulated state that is up to date with the current batch.
2744    ///
2745    /// # Example
2746    /// ```rust
2747    /// # #[cfg(feature = "deploy")] {
2748    /// # use hydro_lang::prelude::*;
2749    /// # use futures::StreamExt;
2750    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2751    /// let tick = process.tick();
2752    /// # // ticks are lazy by default, forces the second tick to run
2753    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2754    /// # let batch_first_tick = process
2755    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2756    /// #  .batch(&tick, nondet!(/** test */));
2757    /// # let batch_second_tick = process
2758    /// #   .source_iter(q!(vec![5, 6, 7]))
2759    /// #   .batch(&tick, nondet!(/** test */))
2760    /// #   .defer_tick(); // appears on the second tick
2761    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2762    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2763    ///
2764    /// input.batch(&tick, nondet!(/** test */))
2765    ///     .across_ticks(|s| s.count()).all_ticks()
2766    /// # }, |mut stream| async move {
2767    /// // [4, 7]
2768    /// assert_eq!(stream.next().await.unwrap(), 4);
2769    /// assert_eq!(stream.next().await.unwrap(), 7);
2770    /// # }));
2771    /// # }
2772    /// ```
2773    pub fn across_ticks<Out: BatchAtomic>(
2774        self,
2775        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2776    ) -> Out::Batched {
2777        thunk(self.all_ticks_atomic()).batched_atomic()
2778    }
2779
2780    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2781    /// always has the elements of `self` at tick `T - 1`.
2782    ///
2783    /// At tick `0`, the output stream is empty, since there is no previous tick.
2784    ///
2785    /// This operator enables stateful iterative processing with ticks, by sending data from one
2786    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2787    ///
2788    /// # Example
2789    /// ```rust
2790    /// # #[cfg(feature = "deploy")] {
2791    /// # use hydro_lang::prelude::*;
2792    /// # use futures::StreamExt;
2793    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2794    /// let tick = process.tick();
2795    /// // ticks are lazy by default, forces the second tick to run
2796    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2797    ///
2798    /// let batch_first_tick = process
2799    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2800    ///   .batch(&tick, nondet!(/** test */));
2801    /// let batch_second_tick = process
2802    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2803    ///   .batch(&tick, nondet!(/** test */))
2804    ///   .defer_tick(); // appears on the second tick
2805    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2806    ///
2807    /// changes_across_ticks.clone().filter_not_in(
2808    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2809    /// ).all_ticks()
2810    /// # }, |mut stream| async move {
2811    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2812    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2813    /// #     assert_eq!(stream.next().await.unwrap(), w);
2814    /// # }
2815    /// # }));
2816    /// # }
2817    /// ```
2818    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2819        Stream::new(
2820            self.location.clone(),
2821            HydroNode::DeferTick {
2822                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2823                metadata: self
2824                    .location
2825                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2826            },
2827        )
2828    }
2829}
2830
2831#[cfg(test)]
2832mod tests {
2833    #[cfg(feature = "deploy")]
2834    use futures::{SinkExt, StreamExt};
2835    #[cfg(feature = "deploy")]
2836    use hydro_deploy::Deployment;
2837    #[cfg(feature = "deploy")]
2838    use serde::{Deserialize, Serialize};
2839    #[cfg(any(feature = "deploy", feature = "sim"))]
2840    use stageleft::q;
2841
2842    #[cfg(any(feature = "deploy", feature = "sim"))]
2843    use crate::compile::builder::FlowBuilder;
2844    #[cfg(feature = "deploy")]
2845    use crate::live_collections::sliced::sliced;
2846    #[cfg(feature = "deploy")]
2847    use crate::live_collections::stream::ExactlyOnce;
2848    #[cfg(feature = "sim")]
2849    use crate::live_collections::stream::NoOrder;
2850    #[cfg(any(feature = "deploy", feature = "sim"))]
2851    use crate::live_collections::stream::TotalOrder;
2852    #[cfg(any(feature = "deploy", feature = "sim"))]
2853    use crate::location::Location;
2854    #[cfg(any(feature = "deploy", feature = "sim"))]
2855    use crate::nondet::nondet;
2856
2857    mod backtrace_chained_ops;
2858
2859    #[cfg(feature = "deploy")]
2860    struct P1 {}
2861    #[cfg(feature = "deploy")]
2862    struct P2 {}
2863
2864    #[cfg(feature = "deploy")]
2865    #[derive(Serialize, Deserialize, Debug)]
2866    struct SendOverNetwork {
2867        n: u32,
2868    }
2869
2870    #[cfg(feature = "deploy")]
2871    #[tokio::test]
2872    async fn first_ten_distributed() {
2873        use crate::networking::TCP;
2874
2875        let mut deployment = Deployment::new();
2876
2877        let mut flow = FlowBuilder::new();
2878        let first_node = flow.process::<P1>();
2879        let second_node = flow.process::<P2>();
2880        let external = flow.external::<P2>();
2881
2882        let numbers = first_node.source_iter(q!(0..10));
2883        let out_port = numbers
2884            .map(q!(|n| SendOverNetwork { n }))
2885            .send(&second_node, TCP.fail_stop().bincode())
2886            .send_bincode_external(&external);
2887
2888        let nodes = flow
2889            .with_process(&first_node, deployment.Localhost())
2890            .with_process(&second_node, deployment.Localhost())
2891            .with_external(&external, deployment.Localhost())
2892            .deploy(&mut deployment);
2893
2894        deployment.deploy().await.unwrap();
2895
2896        let mut external_out = nodes.connect(out_port).await;
2897
2898        deployment.start().await.unwrap();
2899
2900        for i in 0..10 {
2901            assert_eq!(external_out.next().await.unwrap().n, i);
2902        }
2903    }
2904
2905    #[cfg(feature = "deploy")]
2906    #[tokio::test]
2907    async fn first_cardinality() {
2908        let mut deployment = Deployment::new();
2909
2910        let mut flow = FlowBuilder::new();
2911        let node = flow.process::<()>();
2912        let external = flow.external::<()>();
2913
2914        let node_tick = node.tick();
2915        let count = node_tick
2916            .singleton(q!([1, 2, 3]))
2917            .into_stream()
2918            .flatten_ordered()
2919            .first()
2920            .into_stream()
2921            .count()
2922            .all_ticks()
2923            .send_bincode_external(&external);
2924
2925        let nodes = flow
2926            .with_process(&node, deployment.Localhost())
2927            .with_external(&external, deployment.Localhost())
2928            .deploy(&mut deployment);
2929
2930        deployment.deploy().await.unwrap();
2931
2932        let mut external_out = nodes.connect(count).await;
2933
2934        deployment.start().await.unwrap();
2935
2936        assert_eq!(external_out.next().await.unwrap(), 1);
2937    }
2938
2939    #[cfg(feature = "deploy")]
2940    #[tokio::test]
2941    async fn unbounded_reduce_remembers_state() {
2942        let mut deployment = Deployment::new();
2943
2944        let mut flow = FlowBuilder::new();
2945        let node = flow.process::<()>();
2946        let external = flow.external::<()>();
2947
2948        let (input_port, input) = node.source_external_bincode(&external);
2949        let out = input
2950            .reduce(q!(|acc, v| *acc += v))
2951            .sample_eager(nondet!(/** test */))
2952            .send_bincode_external(&external);
2953
2954        let nodes = flow
2955            .with_process(&node, deployment.Localhost())
2956            .with_external(&external, deployment.Localhost())
2957            .deploy(&mut deployment);
2958
2959        deployment.deploy().await.unwrap();
2960
2961        let mut external_in = nodes.connect(input_port).await;
2962        let mut external_out = nodes.connect(out).await;
2963
2964        deployment.start().await.unwrap();
2965
2966        external_in.send(1).await.unwrap();
2967        assert_eq!(external_out.next().await.unwrap(), 1);
2968
2969        external_in.send(2).await.unwrap();
2970        assert_eq!(external_out.next().await.unwrap(), 3);
2971    }
2972
2973    #[cfg(feature = "deploy")]
2974    #[tokio::test]
2975    async fn top_level_bounded_cross_singleton() {
2976        let mut deployment = Deployment::new();
2977
2978        let mut flow = FlowBuilder::new();
2979        let node = flow.process::<()>();
2980        let external = flow.external::<()>();
2981
2982        let (input_port, input) =
2983            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2984
2985        let out = input
2986            .cross_singleton(
2987                node.source_iter(q!(vec![1, 2, 3]))
2988                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2989            )
2990            .send_bincode_external(&external);
2991
2992        let nodes = flow
2993            .with_process(&node, deployment.Localhost())
2994            .with_external(&external, deployment.Localhost())
2995            .deploy(&mut deployment);
2996
2997        deployment.deploy().await.unwrap();
2998
2999        let mut external_in = nodes.connect(input_port).await;
3000        let mut external_out = nodes.connect(out).await;
3001
3002        deployment.start().await.unwrap();
3003
3004        external_in.send(1).await.unwrap();
3005        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3006
3007        external_in.send(2).await.unwrap();
3008        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3009    }
3010
3011    #[cfg(feature = "deploy")]
3012    #[tokio::test]
3013    async fn top_level_bounded_reduce_cardinality() {
3014        let mut deployment = Deployment::new();
3015
3016        let mut flow = FlowBuilder::new();
3017        let node = flow.process::<()>();
3018        let external = flow.external::<()>();
3019
3020        let (input_port, input) =
3021            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3022
3023        let out = sliced! {
3024            let input = use(input, nondet!(/** test */));
3025            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3026            input.cross_singleton(v.into_stream().count())
3027        }
3028        .send_bincode_external(&external);
3029
3030        let nodes = flow
3031            .with_process(&node, deployment.Localhost())
3032            .with_external(&external, deployment.Localhost())
3033            .deploy(&mut deployment);
3034
3035        deployment.deploy().await.unwrap();
3036
3037        let mut external_in = nodes.connect(input_port).await;
3038        let mut external_out = nodes.connect(out).await;
3039
3040        deployment.start().await.unwrap();
3041
3042        external_in.send(1).await.unwrap();
3043        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3044
3045        external_in.send(2).await.unwrap();
3046        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3047    }
3048
3049    #[cfg(feature = "deploy")]
3050    #[tokio::test]
3051    async fn top_level_bounded_into_singleton_cardinality() {
3052        let mut deployment = Deployment::new();
3053
3054        let mut flow = FlowBuilder::new();
3055        let node = flow.process::<()>();
3056        let external = flow.external::<()>();
3057
3058        let (input_port, input) =
3059            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3060
3061        let out = sliced! {
3062            let input = use(input, nondet!(/** test */));
3063            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3064            input.cross_singleton(v.into_stream().count())
3065        }
3066        .send_bincode_external(&external);
3067
3068        let nodes = flow
3069            .with_process(&node, deployment.Localhost())
3070            .with_external(&external, deployment.Localhost())
3071            .deploy(&mut deployment);
3072
3073        deployment.deploy().await.unwrap();
3074
3075        let mut external_in = nodes.connect(input_port).await;
3076        let mut external_out = nodes.connect(out).await;
3077
3078        deployment.start().await.unwrap();
3079
3080        external_in.send(1).await.unwrap();
3081        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3082
3083        external_in.send(2).await.unwrap();
3084        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3085    }
3086
3087    #[cfg(feature = "deploy")]
3088    #[tokio::test]
3089    async fn atomic_fold_replays_each_tick() {
3090        let mut deployment = Deployment::new();
3091
3092        let mut flow = FlowBuilder::new();
3093        let node = flow.process::<()>();
3094        let external = flow.external::<()>();
3095
3096        let (input_port, input) =
3097            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3098        let tick = node.tick();
3099
3100        let out = input
3101            .batch(&tick, nondet!(/** test */))
3102            .cross_singleton(
3103                node.source_iter(q!(vec![1, 2, 3]))
3104                    .atomic()
3105                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3106                    .snapshot_atomic(&tick, nondet!(/** test */)),
3107            )
3108            .all_ticks()
3109            .send_bincode_external(&external);
3110
3111        let nodes = flow
3112            .with_process(&node, deployment.Localhost())
3113            .with_external(&external, deployment.Localhost())
3114            .deploy(&mut deployment);
3115
3116        deployment.deploy().await.unwrap();
3117
3118        let mut external_in = nodes.connect(input_port).await;
3119        let mut external_out = nodes.connect(out).await;
3120
3121        deployment.start().await.unwrap();
3122
3123        external_in.send(1).await.unwrap();
3124        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3125
3126        external_in.send(2).await.unwrap();
3127        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3128    }
3129
3130    #[cfg(feature = "deploy")]
3131    #[tokio::test]
3132    async fn unbounded_scan_remembers_state() {
3133        let mut deployment = Deployment::new();
3134
3135        let mut flow = FlowBuilder::new();
3136        let node = flow.process::<()>();
3137        let external = flow.external::<()>();
3138
3139        let (input_port, input) = node.source_external_bincode(&external);
3140        let out = input
3141            .scan(
3142                q!(|| 0),
3143                q!(|acc, v| {
3144                    *acc += v;
3145                    Some(*acc)
3146                }),
3147            )
3148            .send_bincode_external(&external);
3149
3150        let nodes = flow
3151            .with_process(&node, deployment.Localhost())
3152            .with_external(&external, deployment.Localhost())
3153            .deploy(&mut deployment);
3154
3155        deployment.deploy().await.unwrap();
3156
3157        let mut external_in = nodes.connect(input_port).await;
3158        let mut external_out = nodes.connect(out).await;
3159
3160        deployment.start().await.unwrap();
3161
3162        external_in.send(1).await.unwrap();
3163        assert_eq!(external_out.next().await.unwrap(), 1);
3164
3165        external_in.send(2).await.unwrap();
3166        assert_eq!(external_out.next().await.unwrap(), 3);
3167    }
3168
3169    #[cfg(feature = "deploy")]
3170    #[tokio::test]
3171    async fn unbounded_enumerate_remembers_state() {
3172        let mut deployment = Deployment::new();
3173
3174        let mut flow = FlowBuilder::new();
3175        let node = flow.process::<()>();
3176        let external = flow.external::<()>();
3177
3178        let (input_port, input) = node.source_external_bincode(&external);
3179        let out = input.enumerate().send_bincode_external(&external);
3180
3181        let nodes = flow
3182            .with_process(&node, deployment.Localhost())
3183            .with_external(&external, deployment.Localhost())
3184            .deploy(&mut deployment);
3185
3186        deployment.deploy().await.unwrap();
3187
3188        let mut external_in = nodes.connect(input_port).await;
3189        let mut external_out = nodes.connect(out).await;
3190
3191        deployment.start().await.unwrap();
3192
3193        external_in.send(1).await.unwrap();
3194        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3195
3196        external_in.send(2).await.unwrap();
3197        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3198    }
3199
3200    #[cfg(feature = "deploy")]
3201    #[tokio::test]
3202    async fn unbounded_unique_remembers_state() {
3203        let mut deployment = Deployment::new();
3204
3205        let mut flow = FlowBuilder::new();
3206        let node = flow.process::<()>();
3207        let external = flow.external::<()>();
3208
3209        let (input_port, input) =
3210            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3211        let out = input.unique().send_bincode_external(&external);
3212
3213        let nodes = flow
3214            .with_process(&node, deployment.Localhost())
3215            .with_external(&external, deployment.Localhost())
3216            .deploy(&mut deployment);
3217
3218        deployment.deploy().await.unwrap();
3219
3220        let mut external_in = nodes.connect(input_port).await;
3221        let mut external_out = nodes.connect(out).await;
3222
3223        deployment.start().await.unwrap();
3224
3225        external_in.send(1).await.unwrap();
3226        assert_eq!(external_out.next().await.unwrap(), 1);
3227
3228        external_in.send(2).await.unwrap();
3229        assert_eq!(external_out.next().await.unwrap(), 2);
3230
3231        external_in.send(1).await.unwrap();
3232        external_in.send(3).await.unwrap();
3233        assert_eq!(external_out.next().await.unwrap(), 3);
3234    }
3235
3236    #[cfg(feature = "sim")]
3237    #[test]
3238    #[should_panic]
3239    fn sim_batch_nondet_size() {
3240        let mut flow = FlowBuilder::new();
3241        let node = flow.process::<()>();
3242
3243        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3244
3245        let tick = node.tick();
3246        let out_recv = input
3247            .batch(&tick, nondet!(/** test */))
3248            .count()
3249            .all_ticks()
3250            .sim_output();
3251
3252        flow.sim().exhaustive(async || {
3253            in_send.send(());
3254            in_send.send(());
3255            in_send.send(());
3256
3257            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3258        });
3259    }
3260
3261    #[cfg(feature = "sim")]
3262    #[test]
3263    fn sim_batch_preserves_order() {
3264        let mut flow = FlowBuilder::new();
3265        let node = flow.process::<()>();
3266
3267        let (in_send, input) = node.sim_input();
3268
3269        let tick = node.tick();
3270        let out_recv = input
3271            .batch(&tick, nondet!(/** test */))
3272            .all_ticks()
3273            .sim_output();
3274
3275        flow.sim().exhaustive(async || {
3276            in_send.send(1);
3277            in_send.send(2);
3278            in_send.send(3);
3279
3280            out_recv.assert_yields_only([1, 2, 3]).await;
3281        });
3282    }
3283
3284    #[cfg(feature = "sim")]
3285    #[test]
3286    #[should_panic]
3287    fn sim_batch_unordered_shuffles() {
3288        let mut flow = FlowBuilder::new();
3289        let node = flow.process::<()>();
3290
3291        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3292
3293        let tick = node.tick();
3294        let batch = input.batch(&tick, nondet!(/** test */));
3295        let out_recv = batch
3296            .clone()
3297            .min()
3298            .zip(batch.max())
3299            .all_ticks()
3300            .sim_output();
3301
3302        flow.sim().exhaustive(async || {
3303            in_send.send_many_unordered([1, 2, 3]);
3304
3305            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3306                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3307            }
3308        });
3309    }
3310
3311    #[cfg(feature = "sim")]
3312    #[test]
3313    fn sim_batch_unordered_shuffles_count() {
3314        let mut flow = FlowBuilder::new();
3315        let node = flow.process::<()>();
3316
3317        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3318
3319        let tick = node.tick();
3320        let batch = input.batch(&tick, nondet!(/** test */));
3321        let out_recv = batch.all_ticks().sim_output();
3322
3323        let instance_count = flow.sim().exhaustive(async || {
3324            in_send.send_many_unordered([1, 2, 3, 4]);
3325            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3326        });
3327
3328        assert_eq!(
3329            instance_count,
3330            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3331        )
3332    }
3333
3334    #[cfg(feature = "sim")]
3335    #[test]
3336    #[should_panic]
3337    fn sim_observe_order_batched() {
3338        let mut flow = FlowBuilder::new();
3339        let node = flow.process::<()>();
3340
3341        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3342
3343        let tick = node.tick();
3344        let batch = input.batch(&tick, nondet!(/** test */));
3345        let out_recv = batch
3346            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3347            .all_ticks()
3348            .sim_output();
3349
3350        flow.sim().exhaustive(async || {
3351            in_send.send_many_unordered([1, 2, 3, 4]);
3352            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3353        });
3354    }
3355
3356    #[cfg(feature = "sim")]
3357    #[test]
3358    fn sim_observe_order_batched_count() {
3359        let mut flow = FlowBuilder::new();
3360        let node = flow.process::<()>();
3361
3362        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3363
3364        let tick = node.tick();
3365        let batch = input.batch(&tick, nondet!(/** test */));
3366        let out_recv = batch
3367            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3368            .all_ticks()
3369            .sim_output();
3370
3371        let instance_count = flow.sim().exhaustive(async || {
3372            in_send.send_many_unordered([1, 2, 3, 4]);
3373            let _ = out_recv.collect::<Vec<_>>().await;
3374        });
3375
3376        assert_eq!(
3377            instance_count,
3378            192 // 4! * 2^{4 - 1}
3379        )
3380    }
3381
3382    #[cfg(feature = "sim")]
3383    #[test]
3384    fn sim_unordered_count_instance_count() {
3385        let mut flow = FlowBuilder::new();
3386        let node = flow.process::<()>();
3387
3388        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3389
3390        let tick = node.tick();
3391        let out_recv = input
3392            .count()
3393            .snapshot(&tick, nondet!(/** test */))
3394            .all_ticks()
3395            .sim_output();
3396
3397        let instance_count = flow.sim().exhaustive(async || {
3398            in_send.send_many_unordered([1, 2, 3, 4]);
3399            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3400        });
3401
3402        assert_eq!(
3403            instance_count,
3404            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3405        )
3406    }
3407
3408    #[cfg(feature = "sim")]
3409    #[test]
3410    fn sim_top_level_assume_ordering() {
3411        let mut flow = FlowBuilder::new();
3412        let node = flow.process::<()>();
3413
3414        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3415
3416        let out_recv = input
3417            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3418            .sim_output();
3419
3420        let instance_count = flow.sim().exhaustive(async || {
3421            in_send.send_many_unordered([1, 2, 3]);
3422            let mut out = out_recv.collect::<Vec<_>>().await;
3423            out.sort();
3424            assert_eq!(out, vec![1, 2, 3]);
3425        });
3426
3427        assert_eq!(instance_count, 6)
3428    }
3429
3430    #[cfg(feature = "sim")]
3431    #[test]
3432    fn sim_top_level_assume_ordering_cycle_back() {
3433        let mut flow = FlowBuilder::new();
3434        let node = flow.process::<()>();
3435
3436        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3437
3438        let (complete_cycle_back, cycle_back) =
3439            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3440        let ordered = input
3441            .merge_unordered(cycle_back)
3442            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3443        complete_cycle_back.complete(
3444            ordered
3445                .clone()
3446                .map(q!(|v| v + 1))
3447                .filter(q!(|v| v % 2 == 1)),
3448        );
3449
3450        let out_recv = ordered.sim_output();
3451
3452        let mut saw = false;
3453        let instance_count = flow.sim().exhaustive(async || {
3454            in_send.send_many_unordered([0, 2]);
3455            let out = out_recv.collect::<Vec<_>>().await;
3456
3457            if out.starts_with(&[0, 1, 2]) {
3458                saw = true;
3459            }
3460        });
3461
3462        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3463        assert_eq!(instance_count, 6)
3464    }
3465
3466    #[cfg(feature = "sim")]
3467    #[test]
3468    fn sim_top_level_assume_ordering_cycle_back_tick() {
3469        let mut flow = FlowBuilder::new();
3470        let node = flow.process::<()>();
3471
3472        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3473
3474        let (complete_cycle_back, cycle_back) =
3475            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3476        let ordered = input
3477            .merge_unordered(cycle_back)
3478            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3479        complete_cycle_back.complete(
3480            ordered
3481                .clone()
3482                .batch(&node.tick(), nondet!(/** test */))
3483                .all_ticks()
3484                .map(q!(|v| v + 1))
3485                .filter(q!(|v| v % 2 == 1)),
3486        );
3487
3488        let out_recv = ordered.sim_output();
3489
3490        let mut saw = false;
3491        let instance_count = flow.sim().exhaustive(async || {
3492            in_send.send_many_unordered([0, 2]);
3493            let out = out_recv.collect::<Vec<_>>().await;
3494
3495            if out.starts_with(&[0, 1, 2]) {
3496                saw = true;
3497            }
3498        });
3499
3500        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3501        assert_eq!(instance_count, 58)
3502    }
3503
3504    #[cfg(feature = "sim")]
3505    #[test]
3506    fn sim_top_level_assume_ordering_multiple() {
3507        let mut flow = FlowBuilder::new();
3508        let node = flow.process::<()>();
3509
3510        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3511        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3512
3513        let (complete_cycle_back, cycle_back) =
3514            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3515        let input1_ordered = input
3516            .clone()
3517            .merge_unordered(cycle_back)
3518            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3519        let foo = input1_ordered
3520            .clone()
3521            .map(q!(|v| v + 3))
3522            .weaken_ordering::<NoOrder>()
3523            .merge_unordered(input2)
3524            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3525
3526        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3527
3528        let out_recv = input1_ordered.sim_output();
3529
3530        let mut saw = false;
3531        let instance_count = flow.sim().exhaustive(async || {
3532            in_send.send_many_unordered([0, 1]);
3533            let out = out_recv.collect::<Vec<_>>().await;
3534
3535            if out.starts_with(&[0, 3, 1]) {
3536                saw = true;
3537            }
3538        });
3539
3540        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3541        assert_eq!(instance_count, 24)
3542    }
3543
3544    #[cfg(feature = "sim")]
3545    #[test]
3546    fn sim_atomic_assume_ordering_cycle_back() {
3547        let mut flow = FlowBuilder::new();
3548        let node = flow.process::<()>();
3549
3550        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3551
3552        let (complete_cycle_back, cycle_back) =
3553            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3554        let ordered = input
3555            .merge_unordered(cycle_back)
3556            .atomic()
3557            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3558            .end_atomic();
3559        complete_cycle_back.complete(
3560            ordered
3561                .clone()
3562                .map(q!(|v| v + 1))
3563                .filter(q!(|v| v % 2 == 1)),
3564        );
3565
3566        let out_recv = ordered.sim_output();
3567
3568        let instance_count = flow.sim().exhaustive(async || {
3569            in_send.send_many_unordered([0, 2]);
3570            let out = out_recv.collect::<Vec<_>>().await;
3571            assert_eq!(out.len(), 4);
3572        });
3573
3574        assert_eq!(instance_count, 22)
3575    }
3576
3577    #[cfg(feature = "deploy")]
3578    #[tokio::test]
3579    async fn partition_evens_odds() {
3580        let mut deployment = Deployment::new();
3581
3582        let mut flow = FlowBuilder::new();
3583        let node = flow.process::<()>();
3584        let external = flow.external::<()>();
3585
3586        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3587        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3588        let evens_port = evens.send_bincode_external(&external);
3589        let odds_port = odds.send_bincode_external(&external);
3590
3591        let nodes = flow
3592            .with_process(&node, deployment.Localhost())
3593            .with_external(&external, deployment.Localhost())
3594            .deploy(&mut deployment);
3595
3596        deployment.deploy().await.unwrap();
3597
3598        let mut evens_out = nodes.connect(evens_port).await;
3599        let mut odds_out = nodes.connect(odds_port).await;
3600
3601        deployment.start().await.unwrap();
3602
3603        let mut even_results = Vec::new();
3604        for _ in 0..3 {
3605            even_results.push(evens_out.next().await.unwrap());
3606        }
3607        even_results.sort();
3608        assert_eq!(even_results, vec![2, 4, 6]);
3609
3610        let mut odd_results = Vec::new();
3611        for _ in 0..3 {
3612            odd_results.push(odds_out.next().await.unwrap());
3613        }
3614        odd_results.sort();
3615        assert_eq!(odd_results, vec![1, 3, 5]);
3616    }
3617
3618    #[cfg(feature = "deploy")]
3619    #[tokio::test]
3620    async fn unconsumed_inspect_still_runs() {
3621        use crate::deploy::DeployCrateWrapper;
3622
3623        let mut deployment = Deployment::new();
3624
3625        let mut flow = FlowBuilder::new();
3626        let node = flow.process::<()>();
3627
3628        // The return value of .inspect() is intentionally dropped.
3629        // Before the Null-root fix, this would silently do nothing.
3630        node.source_iter(q!(0..5))
3631            .inspect(q!(|x| println!("inspect: {}", x)));
3632
3633        let nodes = flow
3634            .with_process(&node, deployment.Localhost())
3635            .deploy(&mut deployment);
3636
3637        deployment.deploy().await.unwrap();
3638
3639        let mut stdout = nodes.get_process(&node).stdout();
3640
3641        deployment.start().await.unwrap();
3642
3643        let mut lines = Vec::new();
3644        for _ in 0..5 {
3645            lines.push(stdout.recv().await.unwrap());
3646        }
3647        lines.sort();
3648        assert_eq!(
3649            lines,
3650            vec![
3651                "inspect: 0",
3652                "inspect: 1",
3653                "inspect: 2",
3654                "inspect: 3",
3655                "inspect: 4",
3656            ]
3657        );
3658    }
3659}