hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A single Rust value that can asynchronously change over time.
26///
27/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
28/// [`Unbounded`], the value will asynchronously change over time.
29///
30/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
31/// a single number that will asynchronously change as events are processed. Singletons also appear
32/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
33/// such as getting the length of a batch of requests.
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this singleton
37/// - `Loc`: the [`Location`] where the singleton is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Singleton<Type, Loc, Bound: Boundedness> {
40 pub(crate) location: Loc,
41 pub(crate) ir_node: RefCell<HydroNode>,
42 pub(crate) flow_state: FlowState,
43
44 _phantom: PhantomData<(Type, Loc, Bound)>,
45}
46
47impl<T, L, B: Boundedness> Drop for Singleton<T, L, B> {
48 fn drop(&mut self) {
49 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
50 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
51 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
52 input: Box::new(ir_node),
53 op_metadata: HydroIrOpMetadata::new(),
54 });
55 }
56 }
57}
58
59impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
60where
61 T: Clone,
62 L: Location<'a> + NoTick,
63{
64 fn from(value: Singleton<T, L, Bounded>) -> Self {
65 let tick = value.location().tick();
66 value.clone_into_tick(&tick).latest()
67 }
68}
69
70impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 type Location = Tick<L>;
75
76 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
77 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
78 location.clone(),
79 HydroNode::DeferTick {
80 input: Box::new(HydroNode::CycleSource {
81 cycle_id,
82 metadata: location.new_node_metadata(Self::collection_kind()),
83 }),
84 metadata: location
85 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
86 },
87 );
88
89 from_previous_tick.unwrap_or(initial)
90 }
91}
92
93impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
94where
95 L: Location<'a>,
96{
97 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
98 assert_eq!(
99 Location::id(&self.location),
100 expected_location,
101 "locations do not match"
102 );
103 self.location
104 .flow_state()
105 .borrow_mut()
106 .push_root(HydroRoot::CycleSink {
107 cycle_id,
108 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
109 op_metadata: HydroIrOpMetadata::new(),
110 });
111 }
112}
113
114impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
115where
116 L: Location<'a>,
117{
118 type Location = Tick<L>;
119
120 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
121 Singleton::new(
122 location.clone(),
123 HydroNode::CycleSource {
124 cycle_id,
125 metadata: location.new_node_metadata(Self::collection_kind()),
126 },
127 )
128 }
129}
130
131impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
132where
133 L: Location<'a>,
134{
135 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
136 assert_eq!(
137 Location::id(&self.location),
138 expected_location,
139 "locations do not match"
140 );
141 self.location
142 .flow_state()
143 .borrow_mut()
144 .push_root(HydroRoot::CycleSink {
145 cycle_id,
146 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
147 op_metadata: HydroIrOpMetadata::new(),
148 });
149 }
150}
151
152impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
153where
154 L: Location<'a> + NoTick,
155{
156 type Location = L;
157
158 fn create_source(cycle_id: CycleId, location: L) -> Self {
159 Singleton::new(
160 location.clone(),
161 HydroNode::CycleSource {
162 cycle_id,
163 metadata: location.new_node_metadata(Self::collection_kind()),
164 },
165 )
166 }
167}
168
169impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
170where
171 L: Location<'a> + NoTick,
172{
173 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
174 assert_eq!(
175 Location::id(&self.location),
176 expected_location,
177 "locations do not match"
178 );
179 self.location
180 .flow_state()
181 .borrow_mut()
182 .push_root(HydroRoot::CycleSink {
183 cycle_id,
184 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
185 op_metadata: HydroIrOpMetadata::new(),
186 });
187 }
188}
189
190impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
191where
192 T: Clone,
193 L: Location<'a>,
194{
195 fn clone(&self) -> Self {
196 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
197 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
198 *self.ir_node.borrow_mut() = HydroNode::Tee {
199 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
200 metadata: self.location.new_node_metadata(Self::collection_kind()),
201 };
202 }
203
204 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
205 Singleton {
206 location: self.location.clone(),
207 flow_state: self.flow_state.clone(),
208 ir_node: HydroNode::Tee {
209 inner: SharedNode(inner.0.clone()),
210 metadata: metadata.clone(),
211 }
212 .into(),
213 _phantom: PhantomData,
214 }
215 } else {
216 unreachable!()
217 }
218 }
219}
220
221#[cfg(stageleft_runtime)]
222fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
223 me: Singleton<T, Tick<L>, B>,
224 other: Optional<O, Tick<L>, B>,
225) -> Optional<(T, O), Tick<L>, B> {
226 let me_as_optional: Optional<T, Tick<L>, B> = me.into();
227 super::optional::zip_inside_tick(me_as_optional, other)
228}
229
230impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
231where
232 L: Location<'a>,
233{
234 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
235 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
236 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
237 let flow_state = location.flow_state().clone();
238 Singleton {
239 location,
240 flow_state,
241 ir_node: RefCell::new(ir_node),
242 _phantom: PhantomData,
243 }
244 }
245
246 pub(crate) fn collection_kind() -> CollectionKind {
247 CollectionKind::Singleton {
248 bound: B::BOUND_KIND,
249 element_type: stageleft::quote_type::<T>().into(),
250 }
251 }
252
253 /// Returns the [`Location`] where this singleton is being materialized.
254 pub fn location(&self) -> &L {
255 &self.location
256 }
257
258 /// Transforms the singleton value by applying a function `f` to it,
259 /// continuously as the input is updated.
260 ///
261 /// # Example
262 /// ```rust
263 /// # #[cfg(feature = "deploy")] {
264 /// # use hydro_lang::prelude::*;
265 /// # use futures::StreamExt;
266 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267 /// let tick = process.tick();
268 /// let singleton = tick.singleton(q!(5));
269 /// singleton.map(q!(|v| v * 2)).all_ticks()
270 /// # }, |mut stream| async move {
271 /// // 10
272 /// # assert_eq!(stream.next().await.unwrap(), 10);
273 /// # }));
274 /// # }
275 /// ```
276 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
277 where
278 F: Fn(T) -> U + 'a,
279 {
280 let f = f.splice_fn1_ctx(&self.location).into();
281 Singleton::new(
282 self.location.clone(),
283 HydroNode::Map {
284 f,
285 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
286 metadata: self
287 .location
288 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
289 },
290 )
291 }
292
293 /// Transforms the singleton value by applying a function `f` to it and then flattening
294 /// the result into a stream, preserving the order of elements.
295 ///
296 /// The function `f` is applied to the singleton value to produce an iterator, and all items
297 /// from that iterator are emitted in the output stream in deterministic order.
298 ///
299 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
300 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
301 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
302 ///
303 /// # Example
304 /// ```rust
305 /// # #[cfg(feature = "deploy")] {
306 /// # use hydro_lang::prelude::*;
307 /// # use futures::StreamExt;
308 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309 /// let tick = process.tick();
310 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
311 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
312 /// # }, |mut stream| async move {
313 /// // 1, 2, 3
314 /// # for w in vec![1, 2, 3] {
315 /// # assert_eq!(stream.next().await.unwrap(), w);
316 /// # }
317 /// # }));
318 /// # }
319 /// ```
320 pub fn flat_map_ordered<U, I, F>(
321 self,
322 f: impl IntoQuotedMut<'a, F, L>,
323 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
324 where
325 B: IsBounded,
326 I: IntoIterator<Item = U>,
327 F: Fn(T) -> I + 'a,
328 {
329 self.into_stream().flat_map_ordered(f)
330 }
331
332 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
333 /// for the output type `I` to produce items in any order.
334 ///
335 /// The function `f` is applied to the singleton value to produce an iterator, and all items
336 /// from that iterator are emitted in the output stream in non-deterministic order.
337 ///
338 /// # Example
339 /// ```rust
340 /// # #[cfg(feature = "deploy")] {
341 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
342 /// # use futures::StreamExt;
343 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
344 /// let tick = process.tick();
345 /// let singleton = tick.singleton(q!(
346 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
347 /// ));
348 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
349 /// # }, |mut stream| async move {
350 /// // 1, 2, 3, but in no particular order
351 /// # let mut results = Vec::new();
352 /// # for _ in 0..3 {
353 /// # results.push(stream.next().await.unwrap());
354 /// # }
355 /// # results.sort();
356 /// # assert_eq!(results, vec![1, 2, 3]);
357 /// # }));
358 /// # }
359 /// ```
360 pub fn flat_map_unordered<U, I, F>(
361 self,
362 f: impl IntoQuotedMut<'a, F, L>,
363 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
364 where
365 B: IsBounded,
366 I: IntoIterator<Item = U>,
367 F: Fn(T) -> I + 'a,
368 {
369 self.into_stream().flat_map_unordered(f)
370 }
371
372 /// Flattens the singleton value into a stream, preserving the order of elements.
373 ///
374 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
375 /// are emitted in the output stream in deterministic order.
376 ///
377 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
378 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
379 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
380 ///
381 /// # Example
382 /// ```rust
383 /// # #[cfg(feature = "deploy")] {
384 /// # use hydro_lang::prelude::*;
385 /// # use futures::StreamExt;
386 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
387 /// let tick = process.tick();
388 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
389 /// singleton.flatten_ordered().all_ticks()
390 /// # }, |mut stream| async move {
391 /// // 1, 2, 3
392 /// # for w in vec![1, 2, 3] {
393 /// # assert_eq!(stream.next().await.unwrap(), w);
394 /// # }
395 /// # }));
396 /// # }
397 /// ```
398 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
399 where
400 B: IsBounded,
401 T: IntoIterator<Item = U>,
402 {
403 self.flat_map_ordered(q!(|x| x))
404 }
405
406 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
407 /// for the element type `T` to produce items in any order.
408 ///
409 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
410 /// are emitted in the output stream in non-deterministic order.
411 ///
412 /// # Example
413 /// ```rust
414 /// # #[cfg(feature = "deploy")] {
415 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
416 /// # use futures::StreamExt;
417 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
418 /// let tick = process.tick();
419 /// let singleton = tick.singleton(q!(
420 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
421 /// ));
422 /// singleton.flatten_unordered().all_ticks()
423 /// # }, |mut stream| async move {
424 /// // 1, 2, 3, but in no particular order
425 /// # let mut results = Vec::new();
426 /// # for _ in 0..3 {
427 /// # results.push(stream.next().await.unwrap());
428 /// # }
429 /// # results.sort();
430 /// # assert_eq!(results, vec![1, 2, 3]);
431 /// # }));
432 /// # }
433 /// ```
434 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
435 where
436 B: IsBounded,
437 T: IntoIterator<Item = U>,
438 {
439 self.flat_map_unordered(q!(|x| x))
440 }
441
442 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
443 ///
444 /// If the predicate returns `true`, the output optional contains the same value.
445 /// If the predicate returns `false`, the output optional is empty.
446 ///
447 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
448 /// not modify or take ownership of the value. If you need to modify the value while filtering
449 /// use [`Singleton::filter_map`] instead.
450 ///
451 /// # Example
452 /// ```rust
453 /// # #[cfg(feature = "deploy")] {
454 /// # use hydro_lang::prelude::*;
455 /// # use futures::StreamExt;
456 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
457 /// let tick = process.tick();
458 /// let singleton = tick.singleton(q!(5));
459 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
460 /// # }, |mut stream| async move {
461 /// // 5
462 /// # assert_eq!(stream.next().await.unwrap(), 5);
463 /// # }));
464 /// # }
465 /// ```
466 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
467 where
468 F: Fn(&T) -> bool + 'a,
469 {
470 let f = f.splice_fn1_borrow_ctx(&self.location).into();
471 Optional::new(
472 self.location.clone(),
473 HydroNode::Filter {
474 f,
475 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
476 metadata: self
477 .location
478 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
479 },
480 )
481 }
482
483 /// An operator that both filters and maps. It yields the value only if the supplied
484 /// closure `f` returns `Some(value)`.
485 ///
486 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
487 /// If the closure returns `None`, the output optional is empty.
488 ///
489 /// # Example
490 /// ```rust
491 /// # #[cfg(feature = "deploy")] {
492 /// # use hydro_lang::prelude::*;
493 /// # use futures::StreamExt;
494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
495 /// let tick = process.tick();
496 /// let singleton = tick.singleton(q!("42"));
497 /// singleton
498 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
499 /// .all_ticks()
500 /// # }, |mut stream| async move {
501 /// // 42
502 /// # assert_eq!(stream.next().await.unwrap(), 42);
503 /// # }));
504 /// # }
505 /// ```
506 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
507 where
508 F: Fn(T) -> Option<U> + 'a,
509 {
510 let f = f.splice_fn1_ctx(&self.location).into();
511 Optional::new(
512 self.location.clone(),
513 HydroNode::FilterMap {
514 f,
515 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
516 metadata: self
517 .location
518 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
519 },
520 )
521 }
522
523 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
524 ///
525 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
526 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
527 /// non-null. This is useful for combining several pieces of state together.
528 ///
529 /// # Example
530 /// ```rust
531 /// # #[cfg(feature = "deploy")] {
532 /// # use hydro_lang::prelude::*;
533 /// # use futures::StreamExt;
534 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535 /// let tick = process.tick();
536 /// let numbers = process
537 /// .source_iter(q!(vec![123, 456]))
538 /// .batch(&tick, nondet!(/** test */));
539 /// let count = numbers.clone().count(); // Singleton
540 /// let max = numbers.max(); // Optional
541 /// count.zip(max).all_ticks()
542 /// # }, |mut stream| async move {
543 /// // [(2, 456)]
544 /// # for w in vec![(2, 456)] {
545 /// # assert_eq!(stream.next().await.unwrap(), w);
546 /// # }
547 /// # }));
548 /// # }
549 /// ```
550 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
551 where
552 Self: ZipResult<'a, O, Location = L>,
553 B: IsBounded,
554 {
555 check_matching_location(&self.location, &Self::other_location(&other));
556
557 if L::is_top_level()
558 && let Some(tick) = self.location.try_tick()
559 {
560 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
561 let out = zip_inside_tick(
562 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
563 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
564 other_location.clone(),
565 HydroNode::Cast {
566 inner: Box::new(Self::other_ir_node(other)),
567 metadata: other_location.new_node_metadata(Optional::<
568 <Self as ZipResult<'a, O>>::OtherType,
569 Tick<L>,
570 Bounded,
571 >::collection_kind(
572 )),
573 },
574 )
575 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
576 )
577 .latest();
578
579 Self::make(
580 out.location.clone(),
581 out.ir_node.replace(HydroNode::Placeholder),
582 )
583 } else {
584 Self::make(
585 self.location.clone(),
586 HydroNode::CrossSingleton {
587 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
588 right: Box::new(Self::other_ir_node(other)),
589 metadata: self.location.new_node_metadata(CollectionKind::Optional {
590 bound: B::BOUND_KIND,
591 element_type: stageleft::quote_type::<
592 <Self as ZipResult<'a, O>>::ElementType,
593 >()
594 .into(),
595 }),
596 },
597 )
598 }
599 }
600
601 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
602 /// boolean signal is `true`, otherwise the output is null.
603 ///
604 /// # Example
605 /// ```rust
606 /// # #[cfg(feature = "deploy")] {
607 /// # use hydro_lang::prelude::*;
608 /// # use futures::StreamExt;
609 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
610 /// let tick = process.tick();
611 /// // ticks are lazy by default, forces the second tick to run
612 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
613 ///
614 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
615 /// let batch_first_tick = process
616 /// .source_iter(q!(vec![1]))
617 /// .batch(&tick, nondet!(/** test */));
618 /// let batch_second_tick = process
619 /// .source_iter(q!(vec![1, 2, 3]))
620 /// .batch(&tick, nondet!(/** test */))
621 /// .defer_tick();
622 /// batch_first_tick.chain(batch_second_tick).count()
623 /// .filter_if(signal)
624 /// .all_ticks()
625 /// # }, |mut stream| async move {
626 /// // [1]
627 /// # for w in vec![1] {
628 /// # assert_eq!(stream.next().await.unwrap(), w);
629 /// # }
630 /// # }));
631 /// # }
632 /// ```
633 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
634 where
635 B: IsBounded,
636 {
637 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
638 }
639
640 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
641 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
642 ///
643 /// Useful for conditionally processing, such as only emitting a singleton's value outside
644 /// a tick if some other condition is satisfied.
645 ///
646 /// # Example
647 /// ```rust
648 /// # #[cfg(feature = "deploy")] {
649 /// # use hydro_lang::prelude::*;
650 /// # use futures::StreamExt;
651 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
652 /// let tick = process.tick();
653 /// // ticks are lazy by default, forces the second tick to run
654 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
655 ///
656 /// let batch_first_tick = process
657 /// .source_iter(q!(vec![1]))
658 /// .batch(&tick, nondet!(/** test */));
659 /// let batch_second_tick = process
660 /// .source_iter(q!(vec![1, 2, 3]))
661 /// .batch(&tick, nondet!(/** test */))
662 /// .defer_tick(); // appears on the second tick
663 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
664 /// batch_first_tick.chain(batch_second_tick).count()
665 /// .filter_if_some(some_on_first_tick)
666 /// .all_ticks()
667 /// # }, |mut stream| async move {
668 /// // [1]
669 /// # for w in vec![1] {
670 /// # assert_eq!(stream.next().await.unwrap(), w);
671 /// # }
672 /// # }));
673 /// # }
674 /// ```
675 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
676 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
677 where
678 B: IsBounded,
679 {
680 self.filter_if(signal.is_some())
681 }
682
683 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
684 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
685 ///
686 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
687 /// the condition.
688 ///
689 /// # Example
690 /// ```rust
691 /// # #[cfg(feature = "deploy")] {
692 /// # use hydro_lang::prelude::*;
693 /// # use futures::StreamExt;
694 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
695 /// let tick = process.tick();
696 /// // ticks are lazy by default, forces the second tick to run
697 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
698 ///
699 /// let batch_first_tick = process
700 /// .source_iter(q!(vec![1]))
701 /// .batch(&tick, nondet!(/** test */));
702 /// let batch_second_tick = process
703 /// .source_iter(q!(vec![1, 2, 3]))
704 /// .batch(&tick, nondet!(/** test */))
705 /// .defer_tick(); // appears on the second tick
706 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
707 /// batch_first_tick.chain(batch_second_tick).count()
708 /// .filter_if_none(some_on_first_tick)
709 /// .all_ticks()
710 /// # }, |mut stream| async move {
711 /// // [3]
712 /// # for w in vec![3] {
713 /// # assert_eq!(stream.next().await.unwrap(), w);
714 /// # }
715 /// # }));
716 /// # }
717 /// ```
718 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
719 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
720 where
721 B: IsBounded,
722 {
723 self.filter_if(other.is_none())
724 }
725
726 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
727 ///
728 /// # Example
729 /// ```rust
730 /// # #[cfg(feature = "deploy")] {
731 /// # use hydro_lang::prelude::*;
732 /// # use futures::StreamExt;
733 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
734 /// let tick = process.tick();
735 /// let a = tick.singleton(q!(5));
736 /// let b = tick.singleton(q!(5));
737 /// a.equals(b).all_ticks()
738 /// # }, |mut stream| async move {
739 /// // [true]
740 /// # assert_eq!(stream.next().await.unwrap(), true);
741 /// # }));
742 /// # }
743 /// ```
744 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
745 where
746 T: PartialEq,
747 B: IsBounded,
748 {
749 self.zip(other).map(q!(|(a, b)| a == b))
750 }
751
752 /// An operator which allows you to "name" a `HydroNode`.
753 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
754 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
755 {
756 let mut node = self.ir_node.borrow_mut();
757 let metadata = node.metadata_mut();
758 metadata.tag = Some(name.to_owned());
759 }
760 self
761 }
762}
763
764impl<'a, L: Location<'a>, B: Boundedness> Not for Singleton<bool, L, B> {
765 type Output = Singleton<bool, L, B>;
766
767 fn not(self) -> Self::Output {
768 self.map(q!(|b| !b))
769 }
770}
771
772impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
773where
774 L: Location<'a>,
775{
776 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
777 /// the inner `Option`.
778 ///
779 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
780 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
781 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
782 ///
783 /// # Example
784 /// ```rust
785 /// # #[cfg(feature = "deploy")] {
786 /// # use hydro_lang::prelude::*;
787 /// # use futures::StreamExt;
788 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
789 /// let tick = process.tick();
790 /// let singleton = tick.singleton(q!(Some(42)));
791 /// singleton.into_optional().all_ticks()
792 /// # }, |mut stream| async move {
793 /// // 42
794 /// # assert_eq!(stream.next().await.unwrap(), 42);
795 /// # }));
796 /// # }
797 /// ```
798 pub fn into_optional(self) -> Optional<T, L, B> {
799 self.filter_map(q!(|v| v))
800 }
801}
802
803impl<'a, L, B: Boundedness> Singleton<bool, L, B>
804where
805 L: Location<'a>,
806{
807 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
808 ///
809 /// # Example
810 /// ```rust
811 /// # #[cfg(feature = "deploy")] {
812 /// # use hydro_lang::prelude::*;
813 /// # use futures::StreamExt;
814 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815 /// let tick = process.tick();
816 /// // ticks are lazy by default, forces the second tick to run
817 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
818 ///
819 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
820 /// let b = tick.singleton(q!(true)); // true, true
821 /// a.and(b).all_ticks()
822 /// # }, |mut stream| async move {
823 /// // [true, false]
824 /// # for w in vec![true, false] {
825 /// # assert_eq!(stream.next().await.unwrap(), w);
826 /// # }
827 /// # }));
828 /// # }
829 /// ```
830 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, B>
831 where
832 B: IsBounded,
833 {
834 self.zip(other).map(q!(|(a, b)| a && b))
835 }
836
837 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
838 ///
839 /// # Example
840 /// ```rust
841 /// # #[cfg(feature = "deploy")] {
842 /// # use hydro_lang::prelude::*;
843 /// # use futures::StreamExt;
844 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
845 /// let tick = process.tick();
846 /// // ticks are lazy by default, forces the second tick to run
847 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
848 ///
849 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
850 /// let b = tick.singleton(q!(false)); // false, false
851 /// a.or(b).all_ticks()
852 /// # }, |mut stream| async move {
853 /// // [true, false]
854 /// # for w in vec![true, false] {
855 /// # assert_eq!(stream.next().await.unwrap(), w);
856 /// # }
857 /// # }));
858 /// # }
859 /// ```
860 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, B>
861 where
862 B: IsBounded,
863 {
864 self.zip(other).map(q!(|(a, b)| a || b))
865 }
866}
867
868impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
869where
870 L: Location<'a> + NoTick,
871{
872 /// Returns a singleton value corresponding to the latest snapshot of the singleton
873 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
874 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
875 /// all snapshots of this singleton into the atomic-associated tick will observe the
876 /// same value each tick.
877 ///
878 /// # Non-Determinism
879 /// Because this picks a snapshot of a singleton whose value is continuously changing,
880 /// the output singleton has a non-deterministic value since the snapshot can be at an
881 /// arbitrary point in time.
882 pub fn snapshot_atomic(
883 self,
884 tick: &Tick<L>,
885 _nondet: NonDet,
886 ) -> Singleton<T, Tick<L>, Bounded> {
887 Singleton::new(
888 tick.clone(),
889 HydroNode::Batch {
890 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
891 metadata: tick
892 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
893 },
894 )
895 }
896
897 /// Returns this singleton back into a top-level, asynchronous execution context where updates
898 /// to the value will be asynchronously propagated.
899 pub fn end_atomic(self) -> Singleton<T, L, B> {
900 Singleton::new(
901 self.location.tick.l.clone(),
902 HydroNode::EndAtomic {
903 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
904 metadata: self
905 .location
906 .tick
907 .l
908 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
909 },
910 )
911 }
912}
913
914impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
915where
916 L: Location<'a>,
917{
918 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
919 /// will observe the same version of the value and will be executed synchronously before any
920 /// outputs are yielded (in [`Optional::end_atomic`]).
921 ///
922 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
923 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
924 /// a different version).
925 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
926 let id = self.location.flow_state().borrow_mut().next_clock_id();
927 let out_location = Atomic {
928 tick: Tick {
929 id,
930 l: self.location.clone(),
931 },
932 };
933 Singleton::new(
934 out_location.clone(),
935 HydroNode::BeginAtomic {
936 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
937 metadata: out_location
938 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
939 },
940 )
941 }
942
943 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
944 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
945 /// relevant data that contributed to the snapshot at tick `t`.
946 ///
947 /// # Non-Determinism
948 /// Because this picks a snapshot of a singleton whose value is continuously changing,
949 /// the output singleton has a non-deterministic value since the snapshot can be at an
950 /// arbitrary point in time.
951 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
952 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
953 Singleton::new(
954 tick.clone(),
955 HydroNode::Batch {
956 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
957 metadata: tick
958 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
959 },
960 )
961 }
962
963 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
964 /// with order corresponding to increasing prefixes of data contributing to the singleton.
965 ///
966 /// # Non-Determinism
967 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
968 /// to non-deterministic batching and arrival of inputs, the output stream is
969 /// non-deterministic.
970 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
971 where
972 L: NoTick,
973 {
974 sliced! {
975 let snapshot = use(self, nondet);
976 snapshot.into_stream()
977 }
978 .weaken_retries()
979 }
980
981 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
982 /// value taken at various points in time. Because the input singleton may be
983 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
984 /// represent the value of the singleton given some prefix of the streams leading up to
985 /// it.
986 ///
987 /// # Non-Determinism
988 /// The output stream is non-deterministic in which elements are sampled, since this
989 /// is controlled by a clock.
990 pub fn sample_every(
991 self,
992 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
993 nondet: NonDet,
994 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
995 where
996 L: NoTick + NoAtomic,
997 {
998 let samples = self.location.source_interval(interval, nondet);
999 sliced! {
1000 let snapshot = use(self, nondet);
1001 let sample_batch = use(samples, nondet);
1002
1003 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1004 }
1005 .weaken_retries()
1006 }
1007
1008 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1009 /// implies that `B == Bounded`.
1010 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1011 where
1012 B: IsBounded,
1013 {
1014 Singleton::new(
1015 self.location.clone(),
1016 self.ir_node.replace(HydroNode::Placeholder),
1017 )
1018 }
1019
1020 /// Clones this bounded singleton into a tick, returning a singleton that has the
1021 /// same value as the outer singleton. Because the outer singleton is bounded, this
1022 /// is deterministic because there is only a single immutable version.
1023 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1024 where
1025 B: IsBounded,
1026 T: Clone,
1027 {
1028 // TODO(shadaj): avoid printing simulator logs for this snapshot
1029 self.snapshot(
1030 tick,
1031 nondet!(/** bounded top-level singleton so deterministic */),
1032 )
1033 }
1034
1035 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1036 ///
1037 /// # Example
1038 /// ```rust
1039 /// # #[cfg(feature = "deploy")] {
1040 /// # use hydro_lang::prelude::*;
1041 /// # use futures::StreamExt;
1042 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1043 /// let tick = process.tick();
1044 /// let batch_input = process
1045 /// .source_iter(q!(vec![123, 456]))
1046 /// .batch(&tick, nondet!(/** test */));
1047 /// batch_input.clone().chain(
1048 /// batch_input.count().into_stream()
1049 /// ).all_ticks()
1050 /// # }, |mut stream| async move {
1051 /// // [123, 456, 2]
1052 /// # for w in vec![123, 456, 2] {
1053 /// # assert_eq!(stream.next().await.unwrap(), w);
1054 /// # }
1055 /// # }));
1056 /// # }
1057 /// ```
1058 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1059 where
1060 B: IsBounded,
1061 {
1062 Stream::new(
1063 self.location.clone(),
1064 HydroNode::Cast {
1065 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1066 metadata: self.location.new_node_metadata(Stream::<
1067 T,
1068 Tick<L>,
1069 Bounded,
1070 TotalOrder,
1071 ExactlyOnce,
1072 >::collection_kind()),
1073 },
1074 )
1075 }
1076
1077 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1078 /// producing a singleton of the resolved output.
1079 ///
1080 /// This is useful when the singleton contains an async computation that must
1081 /// be awaited before further processing. The future is polled to completion
1082 /// before the output value is emitted.
1083 ///
1084 /// # Example
1085 /// ```rust
1086 /// # #[cfg(feature = "deploy")] {
1087 /// # use hydro_lang::prelude::*;
1088 /// # use futures::StreamExt;
1089 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1090 /// let tick = process.tick();
1091 /// let singleton = tick.singleton(q!(5));
1092 /// singleton
1093 /// .map(q!(|v| async move { v * 2 }))
1094 /// .resolve_future_blocking()
1095 /// .all_ticks()
1096 /// # }, |mut stream| async move {
1097 /// // 10
1098 /// # assert_eq!(stream.next().await.unwrap(), 10);
1099 /// # }));
1100 /// # }
1101 /// ```
1102 pub fn resolve_future_blocking(self) -> Singleton<T::Output, L, B>
1103 where
1104 T: Future,
1105 B: IsBounded,
1106 {
1107 Singleton::new(
1108 self.location.clone(),
1109 HydroNode::ResolveFuturesBlocking {
1110 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1111 metadata: self
1112 .location
1113 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1114 },
1115 )
1116 }
1117}
1118
1119impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1120where
1121 L: Location<'a>,
1122{
1123 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1124 /// which will stream the value computed in _each_ tick as a separate stream element.
1125 ///
1126 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1127 /// producing one element in the output for each tick. This is useful for batched computations,
1128 /// where the results from each tick must be combined together.
1129 ///
1130 /// # Example
1131 /// ```rust
1132 /// # #[cfg(feature = "deploy")] {
1133 /// # use hydro_lang::prelude::*;
1134 /// # use futures::StreamExt;
1135 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1136 /// let tick = process.tick();
1137 /// # // ticks are lazy by default, forces the second tick to run
1138 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1139 /// # let batch_first_tick = process
1140 /// # .source_iter(q!(vec![1]))
1141 /// # .batch(&tick, nondet!(/** test */));
1142 /// # let batch_second_tick = process
1143 /// # .source_iter(q!(vec![1, 2, 3]))
1144 /// # .batch(&tick, nondet!(/** test */))
1145 /// # .defer_tick(); // appears on the second tick
1146 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1147 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1148 /// .count()
1149 /// .all_ticks()
1150 /// # }, |mut stream| async move {
1151 /// // [1, 3]
1152 /// # for w in vec![1, 3] {
1153 /// # assert_eq!(stream.next().await.unwrap(), w);
1154 /// # }
1155 /// # }));
1156 /// # }
1157 /// ```
1158 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1159 self.into_stream().all_ticks()
1160 }
1161
1162 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1163 /// which will stream the value computed in _each_ tick as a separate stream element.
1164 ///
1165 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1166 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1167 /// singleton's [`Tick`] context.
1168 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1169 self.into_stream().all_ticks_atomic()
1170 }
1171
1172 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1173 /// be asynchronously updated with the latest value of the singleton inside the tick.
1174 ///
1175 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1176 /// tick that tracks the inner value. This is useful for getting the value as of the
1177 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1178 ///
1179 /// # Example
1180 /// ```rust
1181 /// # #[cfg(feature = "deploy")] {
1182 /// # use hydro_lang::prelude::*;
1183 /// # use futures::StreamExt;
1184 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185 /// let tick = process.tick();
1186 /// # // ticks are lazy by default, forces the second tick to run
1187 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1188 /// # let batch_first_tick = process
1189 /// # .source_iter(q!(vec![1]))
1190 /// # .batch(&tick, nondet!(/** test */));
1191 /// # let batch_second_tick = process
1192 /// # .source_iter(q!(vec![1, 2, 3]))
1193 /// # .batch(&tick, nondet!(/** test */))
1194 /// # .defer_tick(); // appears on the second tick
1195 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1196 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1197 /// .count()
1198 /// .latest()
1199 /// # .sample_eager(nondet!(/** test */))
1200 /// # }, |mut stream| async move {
1201 /// // asynchronously changes from 1 ~> 3
1202 /// # for w in vec![1, 3] {
1203 /// # assert_eq!(stream.next().await.unwrap(), w);
1204 /// # }
1205 /// # }));
1206 /// # }
1207 /// ```
1208 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1209 Singleton::new(
1210 self.location.outer().clone(),
1211 HydroNode::YieldConcat {
1212 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1213 metadata: self
1214 .location
1215 .outer()
1216 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1217 },
1218 )
1219 }
1220
1221 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1222 /// be updated with the latest value of the singleton inside the tick.
1223 ///
1224 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1225 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1226 /// singleton's [`Tick`] context.
1227 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1228 let out_location = Atomic {
1229 tick: self.location.clone(),
1230 };
1231 Singleton::new(
1232 out_location.clone(),
1233 HydroNode::YieldConcat {
1234 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1235 metadata: out_location
1236 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1237 },
1238 )
1239 }
1240}
1241
1242#[doc(hidden)]
1243/// Helper trait that determines the output collection type for [`Singleton::zip`].
1244///
1245/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1246/// [`Singleton`].
1247#[sealed::sealed]
1248pub trait ZipResult<'a, Other> {
1249 /// The output collection type.
1250 type Out;
1251 /// The type of the tupled output value.
1252 type ElementType;
1253 /// The type of the other collection's value.
1254 type OtherType;
1255 /// The location where the tupled result will be materialized.
1256 type Location: Location<'a>;
1257
1258 /// The location of the second input to the `zip`.
1259 fn other_location(other: &Other) -> Self::Location;
1260 /// The IR node of the second input to the `zip`.
1261 fn other_ir_node(other: Other) -> HydroNode;
1262
1263 /// Constructs the output live collection given an IR node containing the zip result.
1264 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1265}
1266
1267#[sealed::sealed]
1268impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1269where
1270 L: Location<'a>,
1271{
1272 type Out = Singleton<(T, U), L, B>;
1273 type ElementType = (T, U);
1274 type OtherType = U;
1275 type Location = L;
1276
1277 fn other_location(other: &Singleton<U, L, B>) -> L {
1278 other.location.clone()
1279 }
1280
1281 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1282 other.ir_node.replace(HydroNode::Placeholder)
1283 }
1284
1285 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1286 Singleton::new(
1287 location.clone(),
1288 HydroNode::Cast {
1289 inner: Box::new(ir_node),
1290 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1291 },
1292 )
1293 }
1294}
1295
1296#[sealed::sealed]
1297impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1298where
1299 L: Location<'a>,
1300{
1301 type Out = Optional<(T, U), L, B>;
1302 type ElementType = (T, U);
1303 type OtherType = U;
1304 type Location = L;
1305
1306 fn other_location(other: &Optional<U, L, B>) -> L {
1307 other.location.clone()
1308 }
1309
1310 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1311 other.ir_node.replace(HydroNode::Placeholder)
1312 }
1313
1314 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1315 Optional::new(location, ir_node)
1316 }
1317}
1318
1319#[cfg(test)]
1320mod tests {
1321 #[cfg(feature = "deploy")]
1322 use futures::{SinkExt, StreamExt};
1323 #[cfg(feature = "deploy")]
1324 use hydro_deploy::Deployment;
1325 #[cfg(any(feature = "deploy", feature = "sim"))]
1326 use stageleft::q;
1327
1328 #[cfg(any(feature = "deploy", feature = "sim"))]
1329 use crate::compile::builder::FlowBuilder;
1330 #[cfg(feature = "deploy")]
1331 use crate::live_collections::stream::ExactlyOnce;
1332 #[cfg(any(feature = "deploy", feature = "sim"))]
1333 use crate::location::Location;
1334 #[cfg(any(feature = "deploy", feature = "sim"))]
1335 use crate::nondet::nondet;
1336
1337 #[cfg(feature = "deploy")]
1338 #[tokio::test]
1339 async fn tick_cycle_cardinality() {
1340 let mut deployment = Deployment::new();
1341
1342 let mut flow = FlowBuilder::new();
1343 let node = flow.process::<()>();
1344 let external = flow.external::<()>();
1345
1346 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1347
1348 let node_tick = node.tick();
1349 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1350 let counts = singleton
1351 .clone()
1352 .into_stream()
1353 .count()
1354 .filter_if(
1355 input
1356 .batch(&node_tick, nondet!(/** testing */))
1357 .first()
1358 .is_some(),
1359 )
1360 .all_ticks()
1361 .send_bincode_external(&external);
1362 complete_cycle.complete_next_tick(singleton);
1363
1364 let nodes = flow
1365 .with_process(&node, deployment.Localhost())
1366 .with_external(&external, deployment.Localhost())
1367 .deploy(&mut deployment);
1368
1369 deployment.deploy().await.unwrap();
1370
1371 let mut tick_trigger = nodes.connect(input_send).await;
1372 let mut external_out = nodes.connect(counts).await;
1373
1374 deployment.start().await.unwrap();
1375
1376 tick_trigger.send(()).await.unwrap();
1377
1378 assert_eq!(external_out.next().await.unwrap(), 1);
1379
1380 tick_trigger.send(()).await.unwrap();
1381
1382 assert_eq!(external_out.next().await.unwrap(), 1);
1383 }
1384
1385 #[cfg(feature = "sim")]
1386 #[test]
1387 #[should_panic]
1388 fn sim_fold_intermediate_states() {
1389 let mut flow = FlowBuilder::new();
1390 let node = flow.process::<()>();
1391
1392 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1393 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1394
1395 let tick = node.tick();
1396 let batch = folded.snapshot(&tick, nondet!(/** test */));
1397 let out_recv = batch.all_ticks().sim_output();
1398
1399 flow.sim().exhaustive(async || {
1400 assert_eq!(out_recv.next().await.unwrap(), 10);
1401 });
1402 }
1403
1404 #[cfg(feature = "sim")]
1405 #[test]
1406 fn sim_fold_intermediate_state_count() {
1407 let mut flow = FlowBuilder::new();
1408 let node = flow.process::<()>();
1409
1410 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1411 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1412
1413 let tick = node.tick();
1414 let batch = folded.snapshot(&tick, nondet!(/** test */));
1415 let out_recv = batch.all_ticks().sim_output();
1416
1417 let instance_count = flow.sim().exhaustive(async || {
1418 let out = out_recv.collect::<Vec<_>>().await;
1419 assert_eq!(out.last(), Some(&10));
1420 });
1421
1422 assert_eq!(
1423 instance_count,
1424 16 // 2^4 possible subsets of intermediates (including initial state)
1425 )
1426 }
1427
1428 #[cfg(feature = "sim")]
1429 #[test]
1430 fn sim_fold_no_repeat_initial() {
1431 // check that we don't repeat the initial state of the fold in autonomous decisions
1432
1433 let mut flow = FlowBuilder::new();
1434 let node = flow.process::<()>();
1435
1436 let (in_port, input) = node.sim_input();
1437 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1438
1439 let tick = node.tick();
1440 let batch = folded.snapshot(&tick, nondet!(/** test */));
1441 let out_recv = batch.all_ticks().sim_output();
1442
1443 flow.sim().exhaustive(async || {
1444 assert_eq!(out_recv.next().await.unwrap(), 0);
1445
1446 in_port.send(123);
1447
1448 assert_eq!(out_recv.next().await.unwrap(), 123);
1449 });
1450 }
1451
1452 #[cfg(feature = "sim")]
1453 #[test]
1454 #[should_panic]
1455 fn sim_fold_repeats_snapshots() {
1456 // when the tick is driven by a snapshot AND something else, the snapshot can
1457 // "stutter" and repeat the same state multiple times
1458
1459 let mut flow = FlowBuilder::new();
1460 let node = flow.process::<()>();
1461
1462 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1463 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1464
1465 let tick = node.tick();
1466 let batch = source
1467 .batch(&tick, nondet!(/** test */))
1468 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1469 let out_recv = batch.all_ticks().sim_output();
1470
1471 flow.sim().exhaustive(async || {
1472 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1473 {
1474 panic!("repeated snapshot");
1475 }
1476 });
1477 }
1478
1479 #[cfg(feature = "sim")]
1480 #[test]
1481 fn sim_fold_repeats_snapshots_count() {
1482 // check the number of instances
1483 let mut flow = FlowBuilder::new();
1484 let node = flow.process::<()>();
1485
1486 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1487 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1488
1489 let tick = node.tick();
1490 let batch = source
1491 .batch(&tick, nondet!(/** test */))
1492 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1493 let out_recv = batch.all_ticks().sim_output();
1494
1495 let count = flow.sim().exhaustive(async || {
1496 let _ = out_recv.collect::<Vec<_>>().await;
1497 });
1498
1499 assert_eq!(count, 52);
1500 // don't have a combinatorial explanation for this number yet, but checked via logs
1501 }
1502
1503 #[cfg(feature = "sim")]
1504 #[test]
1505 fn sim_top_level_singleton_exhaustive() {
1506 // ensures that top-level singletons have only one snapshot
1507 let mut flow = FlowBuilder::new();
1508 let node = flow.process::<()>();
1509
1510 let singleton = node.singleton(q!(1));
1511 let tick = node.tick();
1512 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1513 let out_recv = batch.all_ticks().sim_output();
1514
1515 let count = flow.sim().exhaustive(async || {
1516 let _ = out_recv.collect::<Vec<_>>().await;
1517 });
1518
1519 assert_eq!(count, 1);
1520 }
1521
1522 #[cfg(feature = "sim")]
1523 #[test]
1524 fn sim_top_level_singleton_join_count() {
1525 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1526 // exploration
1527
1528 let mut flow = FlowBuilder::new();
1529 let node = flow.process::<()>();
1530
1531 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1532 let tick = node.tick();
1533 let batch = source_iter
1534 .batch(&tick, nondet!(/** test */))
1535 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1536 let out_recv = batch.all_ticks().sim_output();
1537
1538 let instance_count = flow.sim().exhaustive(async || {
1539 let _ = out_recv.collect::<Vec<_>>().await;
1540 });
1541
1542 assert_eq!(
1543 instance_count,
1544 16 // 2^4 ways to split up (including a possibly empty first batch)
1545 )
1546 }
1547
1548 #[cfg(feature = "sim")]
1549 #[test]
1550 fn top_level_singleton_into_stream_no_replay() {
1551 let mut flow = FlowBuilder::new();
1552 let node = flow.process::<()>();
1553
1554 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1555 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1556
1557 let out_recv = folded.into_stream().sim_output();
1558
1559 flow.sim().exhaustive(async || {
1560 out_recv.assert_yields_only([10]).await;
1561 });
1562 }
1563
1564 #[cfg(feature = "sim")]
1565 #[test]
1566 fn inside_tick_singleton_zip() {
1567 use crate::live_collections::Stream;
1568 use crate::live_collections::sliced::sliced;
1569
1570 let mut flow = FlowBuilder::new();
1571 let node = flow.process::<()>();
1572
1573 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1574 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1575
1576 let out_recv = sliced! {
1577 let v = use(folded, nondet!(/** test */));
1578 v.clone().zip(v).into_stream()
1579 }
1580 .sim_output();
1581
1582 let count = flow.sim().exhaustive(async || {
1583 let out = out_recv.collect::<Vec<_>>().await;
1584 assert_eq!(out.last(), Some(&(3, 3)));
1585 });
1586
1587 assert_eq!(count, 4);
1588 }
1589}