Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41    type UnderlyingBound: Boundedness;
42    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43    type ValueBound: Boundedness;
44
45    /// The type of the keyed singleton if the value for each key is immutable.
46    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
47
48    /// The type of the keyed singleton if the value for each key may change asynchronously.
49    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
50
51    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
52    fn bound_kind() -> KeyedSingletonBoundKind;
53}
54
55impl KeyedSingletonBound for Unbounded {
56    type UnderlyingBound = Unbounded;
57    type ValueBound = Unbounded;
58    type WithBoundedValue = BoundedValue;
59    type WithUnboundedValue = Unbounded;
60
61    fn bound_kind() -> KeyedSingletonBoundKind {
62        KeyedSingletonBoundKind::Unbounded
63    }
64}
65
66impl KeyedSingletonBound for Bounded {
67    type UnderlyingBound = Bounded;
68    type ValueBound = Bounded;
69    type WithBoundedValue = Bounded;
70    type WithUnboundedValue = UnreachableBound;
71
72    fn bound_kind() -> KeyedSingletonBoundKind {
73        KeyedSingletonBoundKind::Bounded
74    }
75}
76
77/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
78/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
79/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
80pub struct BoundedValue;
81
82impl KeyedSingletonBound for BoundedValue {
83    type UnderlyingBound = Unbounded;
84    type ValueBound = Bounded;
85    type WithBoundedValue = BoundedValue;
86    type WithUnboundedValue = Unbounded;
87
88    fn bound_kind() -> KeyedSingletonBoundKind {
89        KeyedSingletonBoundKind::BoundedValue
90    }
91}
92
93#[doc(hidden)]
94pub struct UnreachableBound;
95
96impl KeyedSingletonBound for UnreachableBound {
97    type UnderlyingBound = Bounded;
98    type ValueBound = Unbounded;
99
100    type WithBoundedValue = Bounded;
101    type WithUnboundedValue = UnreachableBound;
102
103    fn bound_kind() -> KeyedSingletonBoundKind {
104        unreachable!("UnreachableBound cannot be instantiated")
105    }
106}
107
108/// Mapping from keys of type `K` to values of type `V`.
109///
110/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
111/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
112/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
113/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
114/// keys cannot be removed and the value for each key is immutable.
115///
116/// Type Parameters:
117/// - `K`: the type of the key for each entry
118/// - `V`: the type of the value for each entry
119/// - `Loc`: the [`Location`] where the keyed singleton is materialized
120/// - `Bound`: tracks whether the entries are:
121///     - [`Bounded`] (local and finite)
122///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
123///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
124pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
125    pub(crate) location: Loc,
126    pub(crate) ir_node: RefCell<HydroNode>,
127    pub(crate) flow_state: FlowState,
128
129    _phantom: PhantomData<(K, V, Loc, Bound)>,
130}
131
132impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
133    fn drop(&mut self) {
134        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
135        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
136            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
137                input: Box::new(ir_node),
138                op_metadata: HydroIrOpMetadata::new(),
139            });
140        }
141    }
142}
143
144impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
145    for KeyedSingleton<K, V, Loc, Bound>
146{
147    fn clone(&self) -> Self {
148        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
149            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
150            *self.ir_node.borrow_mut() = HydroNode::Tee {
151                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
152                metadata: self.location.new_node_metadata(Self::collection_kind()),
153            };
154        }
155
156        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
157            KeyedSingleton {
158                location: self.location.clone(),
159                flow_state: self.flow_state.clone(),
160                ir_node: HydroNode::Tee {
161                    inner: SharedNode(inner.0.clone()),
162                    metadata: metadata.clone(),
163                }
164                .into(),
165                _phantom: PhantomData,
166            }
167        } else {
168            unreachable!()
169        }
170    }
171}
172
173impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
174    for KeyedSingleton<K, V, L, B>
175where
176    L: Location<'a> + NoTick,
177{
178    type Location = L;
179
180    fn create_source(cycle_id: CycleId, location: L) -> Self {
181        KeyedSingleton {
182            flow_state: location.flow_state().clone(),
183            location: location.clone(),
184            ir_node: RefCell::new(HydroNode::CycleSource {
185                cycle_id,
186                metadata: location.new_node_metadata(Self::collection_kind()),
187            }),
188            _phantom: PhantomData,
189        }
190    }
191}
192
193impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
194where
195    L: Location<'a>,
196{
197    type Location = Tick<L>;
198
199    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
200        KeyedSingleton::new(
201            location.clone(),
202            HydroNode::CycleSource {
203                cycle_id,
204                metadata: location.new_node_metadata(Self::collection_kind()),
205            },
206        )
207    }
208}
209
210impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
211where
212    L: Location<'a>,
213{
214    fn defer_tick(self) -> Self {
215        KeyedSingleton::defer_tick(self)
216    }
217}
218
219impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
220    for KeyedSingleton<K, V, L, B>
221where
222    L: Location<'a> + NoTick,
223{
224    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
225        assert_eq!(
226            Location::id(&self.location),
227            expected_location,
228            "locations do not match"
229        );
230        self.location
231            .flow_state()
232            .borrow_mut()
233            .push_root(HydroRoot::CycleSink {
234                cycle_id,
235                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
236                op_metadata: HydroIrOpMetadata::new(),
237            });
238    }
239}
240
241impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
242where
243    L: Location<'a>,
244{
245    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
246        assert_eq!(
247            Location::id(&self.location),
248            expected_location,
249            "locations do not match"
250        );
251        self.location
252            .flow_state()
253            .borrow_mut()
254            .push_root(HydroRoot::CycleSink {
255                cycle_id,
256                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
257                op_metadata: HydroIrOpMetadata::new(),
258            });
259    }
260}
261
262impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
263    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
264        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
265        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
266
267        let flow_state = location.flow_state().clone();
268        KeyedSingleton {
269            location,
270            flow_state,
271            ir_node: RefCell::new(ir_node),
272            _phantom: PhantomData,
273        }
274    }
275
276    /// Returns the [`Location`] where this keyed singleton is being materialized.
277    pub fn location(&self) -> &L {
278        &self.location
279    }
280}
281
282#[cfg(stageleft_runtime)]
283fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
284    me: KeyedSingleton<K, V, L, Bounded>,
285) -> Singleton<usize, L, Bounded> {
286    me.entries().count()
287}
288
289#[cfg(stageleft_runtime)]
290fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
291    me: KeyedSingleton<K, V, L, Bounded>,
292) -> Singleton<HashMap<K, V>, L, Bounded>
293where
294    K: Eq + Hash,
295{
296    me.entries()
297        .assume_ordering(nondet!(
298            /// Because this is a keyed singleton, there is only one value per key.
299        ))
300        .fold(
301            q!(|| HashMap::new()),
302            q!(|map, (k, v)| {
303                map.insert(k, v);
304            }),
305        )
306}
307
308impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
309    pub(crate) fn collection_kind() -> CollectionKind {
310        CollectionKind::KeyedSingleton {
311            bound: B::bound_kind(),
312            key_type: stageleft::quote_type::<K>().into(),
313            value_type: stageleft::quote_type::<V>().into(),
314        }
315    }
316
317    /// Transforms each value by invoking `f` on each element, with keys staying the same
318    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
319    ///
320    /// If you do not want to modify the stream and instead only want to view
321    /// each item use [`KeyedSingleton::inspect`] instead.
322    ///
323    /// # Example
324    /// ```rust
325    /// # #[cfg(feature = "deploy")] {
326    /// # use hydro_lang::prelude::*;
327    /// # use futures::StreamExt;
328    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
329    /// let keyed_singleton = // { 1: 2, 2: 4 }
330    /// # process
331    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
332    /// #     .into_keyed()
333    /// #     .first();
334    /// keyed_singleton.map(q!(|v| v + 1))
335    /// #   .entries()
336    /// # }, |mut stream| async move {
337    /// // { 1: 3, 2: 5 }
338    /// # let mut results = Vec::new();
339    /// # for _ in 0..2 {
340    /// #     results.push(stream.next().await.unwrap());
341    /// # }
342    /// # results.sort();
343    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
344    /// # }));
345    /// # }
346    /// ```
347    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
348    where
349        F: Fn(V) -> U + 'a,
350    {
351        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
352        let map_f = q!({
353            let orig = f;
354            move |(k, v)| (k, orig(v))
355        })
356        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
357        .into();
358
359        KeyedSingleton::new(
360            self.location.clone(),
361            HydroNode::Map {
362                f: map_f,
363                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
364                metadata: self
365                    .location
366                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
367            },
368        )
369    }
370
371    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
372    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
373    ///
374    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
375    /// the new value `U`. The key remains unchanged in the output.
376    ///
377    /// # Example
378    /// ```rust
379    /// # #[cfg(feature = "deploy")] {
380    /// # use hydro_lang::prelude::*;
381    /// # use futures::StreamExt;
382    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
383    /// let keyed_singleton = // { 1: 2, 2: 4 }
384    /// # process
385    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
386    /// #     .into_keyed()
387    /// #     .first();
388    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
389    /// #   .entries()
390    /// # }, |mut stream| async move {
391    /// // { 1: 3, 2: 6 }
392    /// # let mut results = Vec::new();
393    /// # for _ in 0..2 {
394    /// #     results.push(stream.next().await.unwrap());
395    /// # }
396    /// # results.sort();
397    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
398    /// # }));
399    /// # }
400    /// ```
401    pub fn map_with_key<U, F>(
402        self,
403        f: impl IntoQuotedMut<'a, F, L> + Copy,
404    ) -> KeyedSingleton<K, U, L, B>
405    where
406        F: Fn((K, V)) -> U + 'a,
407        K: Clone,
408    {
409        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
410        let map_f = q!({
411            let orig = f;
412            move |(k, v)| {
413                let out = orig((Clone::clone(&k), v));
414                (k, out)
415            }
416        })
417        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
418        .into();
419
420        KeyedSingleton::new(
421            self.location.clone(),
422            HydroNode::Map {
423                f: map_f,
424                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
425                metadata: self
426                    .location
427                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
428            },
429        )
430    }
431
432    /// Gets the number of keys in the keyed singleton.
433    ///
434    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
435    /// since keys may be added / removed over time. When the set of keys changes, the count will
436    /// be asynchronously updated.
437    ///
438    /// # Example
439    /// ```rust
440    /// # #[cfg(feature = "deploy")] {
441    /// # use hydro_lang::prelude::*;
442    /// # use futures::StreamExt;
443    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444    /// # let tick = process.tick();
445    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
446    /// # process
447    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
448    /// #     .into_keyed()
449    /// #     .batch(&tick, nondet!(/** test */))
450    /// #     .first();
451    /// keyed_singleton.key_count()
452    /// # .all_ticks()
453    /// # }, |mut stream| async move {
454    /// // 3
455    /// # assert_eq!(stream.next().await.unwrap(), 3);
456    /// # }));
457    /// # }
458    /// ```
459    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
460        if B::ValueBound::BOUNDED {
461            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
462                location: self.location.clone(),
463                flow_state: self.flow_state.clone(),
464                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
465                _phantom: PhantomData,
466            };
467
468            me.entries().count()
469        } else if L::is_top_level()
470            && let Some(tick) = self.location.try_tick()
471        {
472            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
473                location: self.location.clone(),
474                flow_state: self.flow_state.clone(),
475                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
476                _phantom: PhantomData,
477            };
478
479            let out =
480                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
481                    .latest();
482            Singleton::new(
483                out.location.clone(),
484                out.ir_node.replace(HydroNode::Placeholder),
485            )
486        } else {
487            panic!("Unbounded KeyedSingleton inside a tick");
488        }
489    }
490
491    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
492    ///
493    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
494    /// asynchronously as well.
495    ///
496    /// # Example
497    /// ```rust
498    /// # #[cfg(feature = "deploy")] {
499    /// # use hydro_lang::prelude::*;
500    /// # use futures::StreamExt;
501    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
502    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
503    /// # process
504    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
505    /// #     .into_keyed()
506    /// #     .batch(&process.tick(), nondet!(/** test */))
507    /// #     .first();
508    /// keyed_singleton.into_singleton()
509    /// # .all_ticks()
510    /// # }, |mut stream| async move {
511    /// // { 1: "a", 2: "b", 3: "c" }
512    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
513    /// # }));
514    /// # }
515    /// ```
516    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
517    where
518        K: Eq + Hash,
519    {
520        if B::ValueBound::BOUNDED {
521            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
522                location: self.location.clone(),
523                flow_state: self.flow_state.clone(),
524                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
525                _phantom: PhantomData,
526            };
527
528            me.entries()
529                .assume_ordering(nondet!(
530                    /// Because this is a keyed singleton, there is only one value per key.
531                ))
532                .fold(
533                    q!(|| HashMap::new()),
534                    q!(|map, (k, v)| {
535                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
536                        map.insert(k, v);
537                    }),
538                )
539        } else if L::is_top_level()
540            && let Some(tick) = self.location.try_tick()
541        {
542            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
543                location: self.location.clone(),
544                flow_state: self.flow_state.clone(),
545                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
546                _phantom: PhantomData,
547            };
548
549            let out = into_singleton_inside_tick(
550                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
551            )
552            .latest();
553            Singleton::new(
554                out.location.clone(),
555                out.ir_node.replace(HydroNode::Placeholder),
556            )
557        } else {
558            panic!("Unbounded KeyedSingleton inside a tick");
559        }
560    }
561
562    /// An operator which allows you to "name" a `HydroNode`.
563    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
564    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
565        {
566            let mut node = self.ir_node.borrow_mut();
567            let metadata = node.metadata_mut();
568            metadata.tag = Some(name.to_owned());
569        }
570        self
571    }
572
573    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
574    /// implies that `B == Bounded`.
575    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
576    where
577        B: IsBounded,
578    {
579        KeyedSingleton::new(
580            self.location.clone(),
581            self.ir_node.replace(HydroNode::Placeholder),
582        )
583    }
584
585    /// Gets the value associated with a specific key from the keyed singleton.
586    /// Returns `None` if the key is `None` or there is no associated value.
587    ///
588    /// # Example
589    /// ```rust
590    /// # #[cfg(feature = "deploy")] {
591    /// # use hydro_lang::prelude::*;
592    /// # use futures::StreamExt;
593    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
594    /// let tick = process.tick();
595    /// let keyed_data = process
596    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
597    ///     .into_keyed()
598    ///     .batch(&tick, nondet!(/** test */))
599    ///     .first();
600    /// let key = tick.singleton(q!(1));
601    /// keyed_data.get(key).all_ticks()
602    /// # }, |mut stream| async move {
603    /// // 2
604    /// # assert_eq!(stream.next().await.unwrap(), 2);
605    /// # }));
606    /// # }
607    /// ```
608    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
609    where
610        B: IsBounded,
611        K: Hash + Eq,
612    {
613        self.make_bounded()
614            .into_keyed_stream()
615            .get(key)
616            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
617            .first()
618    }
619
620    /// Emit a keyed stream containing keys shared between the keyed singleton and the
621    /// keyed stream, where each value in the output keyed stream is a tuple of
622    /// (the keyed singleton's value, the keyed stream's value).
623    ///
624    /// # Example
625    /// ```rust
626    /// # #[cfg(feature = "deploy")] {
627    /// # use hydro_lang::prelude::*;
628    /// # use futures::StreamExt;
629    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
630    /// let tick = process.tick();
631    /// let keyed_data = process
632    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
633    ///     .into_keyed()
634    ///     .batch(&tick, nondet!(/** test */))
635    ///     .first();
636    /// let other_data = process
637    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
638    ///     .into_keyed()
639    ///     .batch(&tick, nondet!(/** test */));
640    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
641    /// # }, |mut stream| async move {
642    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
643    /// # let mut results = vec![];
644    /// # for _ in 0..3 {
645    /// #     results.push(stream.next().await.unwrap());
646    /// # }
647    /// # results.sort();
648    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
649    /// # }));
650    /// # }
651    /// ```
652    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2>(
653        self,
654        keyed_stream: KeyedStream<K, V2, L, Bounded, O2, R2>,
655    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R2>
656    where
657        B: IsBounded,
658        K: Eq + Hash,
659    {
660        self.make_bounded()
661            .entries()
662            .weaken_retries::<R2>()
663            .join(keyed_stream.entries())
664            .into_keyed()
665    }
666
667    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
668    /// where each value in the output keyed singleton is a tuple of
669    /// (self.value, other.value).
670    ///
671    /// # Example
672    /// ```rust
673    /// # #[cfg(feature = "deploy")] {
674    /// # use hydro_lang::prelude::*;
675    /// # use futures::StreamExt;
676    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
677    /// # let tick = process.tick();
678    /// let requests = // { 1: 10, 2: 20, 3: 30 }
679    /// # process
680    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
681    /// #     .into_keyed()
682    /// #     .batch(&tick, nondet!(/** test */))
683    /// #     .first();
684    /// let other = // { 1: 100, 2: 200, 4: 400 }
685    /// # process
686    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
687    /// #     .into_keyed()
688    /// #     .batch(&tick, nondet!(/** test */))
689    /// #     .first();
690    /// requests.join_keyed_singleton(other)
691    /// # .entries().all_ticks()
692    /// # }, |mut stream| async move {
693    /// // { 1: (10, 100), 2: (20, 200) }
694    /// # let mut results = vec![];
695    /// # for _ in 0..2 {
696    /// #     results.push(stream.next().await.unwrap());
697    /// # }
698    /// # results.sort();
699    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
700    /// # }));
701    /// # }
702    /// ```
703    pub fn join_keyed_singleton<V2: Clone>(
704        self,
705        other: KeyedSingleton<K, V2, L, Bounded>,
706    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
707    where
708        B: IsBounded,
709        K: Eq + Hash,
710    {
711        let result_stream = self
712            .make_bounded()
713            .entries()
714            .join(other.entries())
715            .into_keyed();
716
717        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
718        KeyedSingleton::new(
719            result_stream.location.clone(),
720            HydroNode::Cast {
721                inner: Box::new(result_stream.ir_node.replace(HydroNode::Placeholder)),
722                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
723                    K,
724                    (V, V2),
725                    L,
726                    Bounded,
727                >::collection_kind(
728                )),
729            },
730        )
731    }
732
733    /// For each value in `self`, find the matching key in `lookup`.
734    /// The output is a keyed singleton with the key from `self`, and a value
735    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
736    /// If the key is not present in `lookup`, the option will be [`None`].
737    ///
738    /// # Example
739    /// ```rust
740    /// # #[cfg(feature = "deploy")] {
741    /// # use hydro_lang::prelude::*;
742    /// # use futures::StreamExt;
743    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
744    /// # let tick = process.tick();
745    /// let requests = // { 1: 10, 2: 20 }
746    /// # process
747    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
748    /// #     .into_keyed()
749    /// #     .batch(&tick, nondet!(/** test */))
750    /// #     .first();
751    /// let other_data = // { 10: 100, 11: 110 }
752    /// # process
753    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
754    /// #     .into_keyed()
755    /// #     .batch(&tick, nondet!(/** test */))
756    /// #     .first();
757    /// requests.lookup_keyed_singleton(other_data)
758    /// # .entries().all_ticks()
759    /// # }, |mut stream| async move {
760    /// // { 1: (10, Some(100)), 2: (20, None) }
761    /// # let mut results = vec![];
762    /// # for _ in 0..2 {
763    /// #     results.push(stream.next().await.unwrap());
764    /// # }
765    /// # results.sort();
766    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
767    /// # }));
768    /// # }
769    /// ```
770    pub fn lookup_keyed_singleton<V2>(
771        self,
772        lookup: KeyedSingleton<V, V2, L, Bounded>,
773    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
774    where
775        B: IsBounded,
776        K: Eq + Hash + Clone,
777        V: Eq + Hash + Clone,
778        V2: Clone,
779    {
780        let result_stream = self
781            .make_bounded()
782            .into_keyed_stream()
783            .lookup_keyed_stream(lookup.into_keyed_stream());
784
785        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
786        KeyedSingleton::new(
787            result_stream.location.clone(),
788            HydroNode::Cast {
789                inner: Box::new(result_stream.ir_node.replace(HydroNode::Placeholder)),
790                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
791                    K,
792                    (V, Option<V2>),
793                    L,
794                    Bounded,
795                >::collection_kind(
796                )),
797            },
798        )
799    }
800
801    /// For each value in `self`, find the matching key in `lookup`.
802    /// The output is a keyed stream with the key from `self`, and a value
803    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
804    /// If the key is not present in `lookup`, the option will be [`None`].
805    ///
806    /// # Example
807    /// ```rust
808    /// # #[cfg(feature = "deploy")] {
809    /// # use hydro_lang::prelude::*;
810    /// # use futures::StreamExt;
811    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
812    /// # let tick = process.tick();
813    /// let requests = // { 1: 10, 2: 20 }
814    /// # process
815    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
816    /// #     .into_keyed()
817    /// #     .batch(&tick, nondet!(/** test */))
818    /// #     .first();
819    /// let other_data = // { 10: 100, 10: 110 }
820    /// # process
821    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
822    /// #     .into_keyed()
823    /// #     .batch(&tick, nondet!(/** test */));
824    /// requests.lookup_keyed_stream(other_data)
825    /// # .entries().all_ticks()
826    /// # }, |mut stream| async move {
827    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
828    /// # let mut results = vec![];
829    /// # for _ in 0..3 {
830    /// #     results.push(stream.next().await.unwrap());
831    /// # }
832    /// # results.sort();
833    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
834    /// # }));
835    /// # }
836    /// ```
837    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
838        self,
839        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
840    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
841    where
842        B: IsBounded,
843        K: Eq + Hash + Clone,
844        V: Eq + Hash + Clone,
845        V2: Clone,
846    {
847        self.make_bounded()
848            .entries()
849            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
850            .into_keyed()
851            .lookup_keyed_stream(lookup)
852    }
853}
854
855impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
856    KeyedSingleton<K, V, L, B>
857{
858    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
859    ///
860    /// The value for each key must be bounded, otherwise the resulting stream elements would be
861    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
862    /// into the output.
863    ///
864    /// # Example
865    /// ```rust
866    /// # #[cfg(feature = "deploy")] {
867    /// # use hydro_lang::prelude::*;
868    /// # use futures::StreamExt;
869    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
870    /// let keyed_singleton = // { 1: 2, 2: 4 }
871    /// # process
872    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
873    /// #     .into_keyed()
874    /// #     .first();
875    /// keyed_singleton.entries()
876    /// # }, |mut stream| async move {
877    /// // (1, 2), (2, 4) in any order
878    /// # let mut results = Vec::new();
879    /// # for _ in 0..2 {
880    /// #     results.push(stream.next().await.unwrap());
881    /// # }
882    /// # results.sort();
883    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
884    /// # }));
885    /// # }
886    /// ```
887    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
888        self.into_keyed_stream().entries()
889    }
890
891    /// Flattens the keyed singleton into an unordered stream of just the values.
892    ///
893    /// The value for each key must be bounded, otherwise the resulting stream elements would be
894    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
895    /// into the output.
896    ///
897    /// # Example
898    /// ```rust
899    /// # #[cfg(feature = "deploy")] {
900    /// # use hydro_lang::prelude::*;
901    /// # use futures::StreamExt;
902    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
903    /// let keyed_singleton = // { 1: 2, 2: 4 }
904    /// # process
905    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
906    /// #     .into_keyed()
907    /// #     .first();
908    /// keyed_singleton.values()
909    /// # }, |mut stream| async move {
910    /// // 2, 4 in any order
911    /// # let mut results = Vec::new();
912    /// # for _ in 0..2 {
913    /// #     results.push(stream.next().await.unwrap());
914    /// # }
915    /// # results.sort();
916    /// # assert_eq!(results, vec![2, 4]);
917    /// # }));
918    /// # }
919    /// ```
920    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
921        let map_f = q!(|(_, v)| v)
922            .splice_fn1_ctx::<(K, V), V>(&self.location)
923            .into();
924
925        Stream::new(
926            self.location.clone(),
927            HydroNode::Map {
928                f: map_f,
929                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
930                metadata: self.location.new_node_metadata(Stream::<
931                    V,
932                    L,
933                    B::UnderlyingBound,
934                    NoOrder,
935                    ExactlyOnce,
936                >::collection_kind()),
937            },
938        )
939    }
940
941    /// Flattens the keyed singleton into an unordered stream of just the keys.
942    ///
943    /// The value for each key must be bounded, otherwise the removal of keys would result in
944    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
945    /// into the output.
946    ///
947    /// # Example
948    /// ```rust
949    /// # #[cfg(feature = "deploy")] {
950    /// # use hydro_lang::prelude::*;
951    /// # use futures::StreamExt;
952    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953    /// let keyed_singleton = // { 1: 2, 2: 4 }
954    /// # process
955    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
956    /// #     .into_keyed()
957    /// #     .first();
958    /// keyed_singleton.keys()
959    /// # }, |mut stream| async move {
960    /// // 1, 2 in any order
961    /// # let mut results = Vec::new();
962    /// # for _ in 0..2 {
963    /// #     results.push(stream.next().await.unwrap());
964    /// # }
965    /// # results.sort();
966    /// # assert_eq!(results, vec![1, 2]);
967    /// # }));
968    /// # }
969    /// ```
970    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
971        self.entries().map(q!(|(k, _)| k))
972    }
973
974    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
975    /// entries whose keys are not in the provided stream.
976    ///
977    /// # Example
978    /// ```rust
979    /// # #[cfg(feature = "deploy")] {
980    /// # use hydro_lang::prelude::*;
981    /// # use futures::StreamExt;
982    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
983    /// let tick = process.tick();
984    /// let keyed_singleton = // { 1: 2, 2: 4 }
985    /// # process
986    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
987    /// #     .into_keyed()
988    /// #     .first()
989    /// #     .batch(&tick, nondet!(/** test */));
990    /// let keys_to_remove = process
991    ///     .source_iter(q!(vec![1]))
992    ///     .batch(&tick, nondet!(/** test */));
993    /// keyed_singleton.filter_key_not_in(keys_to_remove)
994    /// #   .entries().all_ticks()
995    /// # }, |mut stream| async move {
996    /// // { 2: 4 }
997    /// # for w in vec![(2, 4)] {
998    /// #     assert_eq!(stream.next().await.unwrap(), w);
999    /// # }
1000    /// # }));
1001    /// # }
1002    /// ```
1003    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1004        self,
1005        other: Stream<K, L, Bounded, O2, R2>,
1006    ) -> Self
1007    where
1008        K: Hash + Eq,
1009    {
1010        check_matching_location(&self.location, &other.location);
1011
1012        KeyedSingleton::new(
1013            self.location.clone(),
1014            HydroNode::AntiJoin {
1015                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1016                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1017                metadata: self.location.new_node_metadata(Self::collection_kind()),
1018            },
1019        )
1020    }
1021
1022    /// An operator which allows you to "inspect" each value of a keyed singleton without
1023    /// modifying it. The closure `f` is called on a reference to each value. This is
1024    /// mainly useful for debugging, and should not be used to generate side-effects.
1025    ///
1026    /// # Example
1027    /// ```rust
1028    /// # #[cfg(feature = "deploy")] {
1029    /// # use hydro_lang::prelude::*;
1030    /// # use futures::StreamExt;
1031    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1032    /// let keyed_singleton = // { 1: 2, 2: 4 }
1033    /// # process
1034    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1035    /// #     .into_keyed()
1036    /// #     .first();
1037    /// keyed_singleton
1038    ///     .inspect(q!(|v| println!("{}", v)))
1039    /// #   .entries()
1040    /// # }, |mut stream| async move {
1041    /// // { 1: 2, 2: 4 }
1042    /// # for w in vec![(1, 2), (2, 4)] {
1043    /// #     assert_eq!(stream.next().await.unwrap(), w);
1044    /// # }
1045    /// # }));
1046    /// # }
1047    /// ```
1048    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1049    where
1050        F: Fn(&V) + 'a,
1051    {
1052        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1053        let inspect_f = q!({
1054            let orig = f;
1055            move |t: &(_, _)| orig(&t.1)
1056        })
1057        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1058        .into();
1059
1060        KeyedSingleton::new(
1061            self.location.clone(),
1062            HydroNode::Inspect {
1063                f: inspect_f,
1064                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1065                metadata: self.location.new_node_metadata(Self::collection_kind()),
1066            },
1067        )
1068    }
1069
1070    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1071    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1072    /// mainly useful for debugging, and should not be used to generate side-effects.
1073    ///
1074    /// # Example
1075    /// ```rust
1076    /// # #[cfg(feature = "deploy")] {
1077    /// # use hydro_lang::prelude::*;
1078    /// # use futures::StreamExt;
1079    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1080    /// let keyed_singleton = // { 1: 2, 2: 4 }
1081    /// # process
1082    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1083    /// #     .into_keyed()
1084    /// #     .first();
1085    /// keyed_singleton
1086    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1087    /// #   .entries()
1088    /// # }, |mut stream| async move {
1089    /// // { 1: 2, 2: 4 }
1090    /// # for w in vec![(1, 2), (2, 4)] {
1091    /// #     assert_eq!(stream.next().await.unwrap(), w);
1092    /// # }
1093    /// # }));
1094    /// # }
1095    /// ```
1096    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1097    where
1098        F: Fn(&(K, V)) + 'a,
1099    {
1100        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1101
1102        KeyedSingleton::new(
1103            self.location.clone(),
1104            HydroNode::Inspect {
1105                f: inspect_f,
1106                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1107                metadata: self.location.new_node_metadata(Self::collection_kind()),
1108            },
1109        )
1110    }
1111
1112    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1113    ///
1114    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1115    /// asynchronously updated if a new key is added that is higher than the previous max key.
1116    ///
1117    /// # Example
1118    /// ```rust
1119    /// # #[cfg(feature = "deploy")] {
1120    /// # use hydro_lang::prelude::*;
1121    /// # use futures::StreamExt;
1122    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1123    /// let tick = process.tick();
1124    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1125    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1126    /// #     .into_keyed()
1127    /// #     .first();
1128    /// keyed_singleton.get_max_key()
1129    /// # .sample_eager(nondet!(/** test */))
1130    /// # }, |mut stream| async move {
1131    /// // (2, 456)
1132    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1133    /// # }));
1134    /// # }
1135    /// ```
1136    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1137    where
1138        K: Ord,
1139    {
1140        self.entries()
1141            .assume_ordering_trusted(nondet!(
1142                /// There is only one element associated with each key, and the keys are totallly
1143                /// ordered so we will produce a deterministic value. The closure technically
1144                /// isn't commutative in the case where both passed entries have the same key
1145                /// but different values.
1146                ///
1147                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1148                /// the two inputs do not have the same key.
1149            ))
1150            .reduce(q!(
1151                move |curr, new| {
1152                    if new.0 > curr.0 {
1153                        *curr = new;
1154                    }
1155                },
1156                idempotent = manual_proof!(/** repeated elements are ignored */)
1157            ))
1158    }
1159
1160    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1161    /// element, the value.
1162    ///
1163    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1164    ///
1165    /// # Example
1166    /// ```rust
1167    /// # #[cfg(feature = "deploy")] {
1168    /// # use hydro_lang::prelude::*;
1169    /// # use futures::StreamExt;
1170    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1171    /// let keyed_singleton = // { 1: 2, 2: 4 }
1172    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1173    /// #     .into_keyed()
1174    /// #     .first();
1175    /// keyed_singleton
1176    ///     .clone()
1177    ///     .into_keyed_stream()
1178    ///     .merge_unordered(
1179    ///         keyed_singleton.into_keyed_stream()
1180    ///     )
1181    /// #   .entries()
1182    /// # }, |mut stream| async move {
1183    /// /// // { 1: [2, 2], 2: [4, 4] }
1184    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1185    /// #     assert_eq!(stream.next().await.unwrap(), w);
1186    /// # }
1187    /// # }));
1188    /// # }
1189    /// ```
1190    pub fn into_keyed_stream(
1191        self,
1192    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1193        KeyedStream::new(
1194            self.location.clone(),
1195            HydroNode::Cast {
1196                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1197                metadata: self.location.new_node_metadata(KeyedStream::<
1198                    K,
1199                    V,
1200                    L,
1201                    B::UnderlyingBound,
1202                    TotalOrder,
1203                    ExactlyOnce,
1204                >::collection_kind()),
1205            },
1206        )
1207    }
1208}
1209
1210impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1211where
1212    L: Location<'a>,
1213{
1214    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1215    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1216    ///
1217    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1218    /// processed before an acknowledgement is emitted.
1219    pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1220        let id = self.location.flow_state().borrow_mut().next_clock_id();
1221        let out_location = Atomic {
1222            tick: Tick {
1223                id,
1224                l: self.location.clone(),
1225            },
1226        };
1227        KeyedSingleton::new(
1228            out_location.clone(),
1229            HydroNode::BeginAtomic {
1230                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1231                metadata: out_location
1232                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1233            },
1234        )
1235    }
1236}
1237
1238impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1239where
1240    L: Location<'a> + NoTick,
1241{
1242    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1243    /// See [`KeyedSingleton::atomic`] for more details.
1244    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1245        KeyedSingleton::new(
1246            self.location.tick.l.clone(),
1247            HydroNode::EndAtomic {
1248                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1249                metadata: self
1250                    .location
1251                    .tick
1252                    .l
1253                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1254            },
1255        )
1256    }
1257}
1258
1259impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1260    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1261    /// tick `T` always has the entries of `self` at tick `T - 1`.
1262    ///
1263    /// At tick `0`, the output has no entries, since there is no previous tick.
1264    ///
1265    /// This operator enables stateful iterative processing with ticks, by sending data from one
1266    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1267    ///
1268    /// # Example
1269    /// ```rust
1270    /// # #[cfg(feature = "deploy")] {
1271    /// # use hydro_lang::prelude::*;
1272    /// # use futures::StreamExt;
1273    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1274    /// let tick = process.tick();
1275    /// # // ticks are lazy by default, forces the second tick to run
1276    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1277    /// # let batch_first_tick = process
1278    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1279    /// #   .batch(&tick, nondet!(/** test */))
1280    /// #   .into_keyed();
1281    /// # let batch_second_tick = process
1282    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1283    /// #   .batch(&tick, nondet!(/** test */))
1284    /// #   .into_keyed()
1285    /// #   .defer_tick(); // appears on the second tick
1286    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1287    /// # batch_first_tick.chain(batch_second_tick).first();
1288    /// input_batch.clone().filter_key_not_in(
1289    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1290    /// )
1291    /// # .entries().all_ticks()
1292    /// # }, |mut stream| async move {
1293    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1294    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1295    /// #     assert_eq!(stream.next().await.unwrap(), w);
1296    /// # }
1297    /// # }));
1298    /// # }
1299    /// ```
1300    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1301        KeyedSingleton::new(
1302            self.location.clone(),
1303            HydroNode::DeferTick {
1304                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1305                metadata: self
1306                    .location
1307                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1308            },
1309        )
1310    }
1311}
1312
1313impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1314where
1315    L: Location<'a>,
1316{
1317    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1318    /// point in time.
1319    ///
1320    /// # Non-Determinism
1321    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1322    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1323    pub fn snapshot(
1324        self,
1325        tick: &Tick<L>,
1326        _nondet: NonDet,
1327    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1328        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1329        KeyedSingleton::new(
1330            tick.clone(),
1331            HydroNode::Batch {
1332                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1333                metadata: tick
1334                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1335            },
1336        )
1337    }
1338}
1339
1340impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1341where
1342    L: Location<'a> + NoTick,
1343{
1344    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1345    /// state of the keyed singleton being atomically processed.
1346    ///
1347    /// # Non-Determinism
1348    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1349    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1350    pub fn snapshot_atomic(
1351        self,
1352        tick: &Tick<L>,
1353        _nondet: NonDet,
1354    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1355        KeyedSingleton::new(
1356            tick.clone(),
1357            HydroNode::Batch {
1358                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1359                metadata: tick
1360                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1361            },
1362        )
1363    }
1364}
1365
1366impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1367where
1368    L: Location<'a>,
1369{
1370    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1371    ///
1372    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1373    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1374    /// is filtered out.
1375    ///
1376    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1377    /// not modify or take ownership of the values. If you need to modify the values while filtering
1378    /// use [`KeyedSingleton::filter_map`] instead.
1379    ///
1380    /// # Example
1381    /// ```rust
1382    /// # #[cfg(feature = "deploy")] {
1383    /// # use hydro_lang::prelude::*;
1384    /// # use futures::StreamExt;
1385    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1386    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1387    /// # process
1388    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1389    /// #     .into_keyed()
1390    /// #     .first();
1391    /// keyed_singleton.filter(q!(|&v| v > 1))
1392    /// #   .entries()
1393    /// # }, |mut stream| async move {
1394    /// // { 1: 2, 2: 4 }
1395    /// # let mut results = Vec::new();
1396    /// # for _ in 0..2 {
1397    /// #     results.push(stream.next().await.unwrap());
1398    /// # }
1399    /// # results.sort();
1400    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1401    /// # }));
1402    /// # }
1403    /// ```
1404    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1405    where
1406        F: Fn(&V) -> bool + 'a,
1407    {
1408        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1409        let filter_f = q!({
1410            let orig = f;
1411            move |t: &(_, _)| orig(&t.1)
1412        })
1413        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1414        .into();
1415
1416        KeyedSingleton::new(
1417            self.location.clone(),
1418            HydroNode::Filter {
1419                f: filter_f,
1420                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1421                metadata: self
1422                    .location
1423                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1424            },
1425        )
1426    }
1427
1428    /// An operator that both filters and maps values. It yields only the key-value pairs where
1429    /// the supplied closure `f` returns `Some(value)`.
1430    ///
1431    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1432    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1433    /// If it returns `None`, the key-value pair is filtered out.
1434    ///
1435    /// # Example
1436    /// ```rust
1437    /// # #[cfg(feature = "deploy")] {
1438    /// # use hydro_lang::prelude::*;
1439    /// # use futures::StreamExt;
1440    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1441    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1442    /// # process
1443    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1444    /// #     .into_keyed()
1445    /// #     .first();
1446    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1447    /// #   .entries()
1448    /// # }, |mut stream| async move {
1449    /// // { 1: 42, 3: 100 }
1450    /// # let mut results = Vec::new();
1451    /// # for _ in 0..2 {
1452    /// #     results.push(stream.next().await.unwrap());
1453    /// # }
1454    /// # results.sort();
1455    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1456    /// # }));
1457    /// # }
1458    /// ```
1459    pub fn filter_map<F, U>(
1460        self,
1461        f: impl IntoQuotedMut<'a, F, L> + Copy,
1462    ) -> KeyedSingleton<K, U, L, B>
1463    where
1464        F: Fn(V) -> Option<U> + 'a,
1465    {
1466        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1467        let filter_map_f = q!({
1468            let orig = f;
1469            move |(k, v)| orig(v).map(|o| (k, o))
1470        })
1471        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1472        .into();
1473
1474        KeyedSingleton::new(
1475            self.location.clone(),
1476            HydroNode::FilterMap {
1477                f: filter_map_f,
1478                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1479                metadata: self
1480                    .location
1481                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1482            },
1483        )
1484    }
1485
1486    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1487    /// arrived since the previous batch was released.
1488    ///
1489    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1490    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1491    ///
1492    /// # Non-Determinism
1493    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1494    /// has a non-deterministic set of key-value pairs.
1495    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1496    where
1497        L: NoTick,
1498    {
1499        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1500        KeyedSingleton::new(
1501            tick.clone(),
1502            HydroNode::Batch {
1503                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1504                metadata: tick
1505                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1506            },
1507        )
1508    }
1509}
1510
1511impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1512where
1513    L: Location<'a> + NoTick,
1514{
1515    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1516    /// atomically processed.
1517    ///
1518    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1519    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1520    ///
1521    /// # Non-Determinism
1522    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1523    /// has a non-deterministic set of key-value pairs.
1524    pub fn batch_atomic(
1525        self,
1526        tick: &Tick<L>,
1527        nondet: NonDet,
1528    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1529        let _ = nondet;
1530        KeyedSingleton::new(
1531            tick.clone(),
1532            HydroNode::Batch {
1533                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1534                metadata: tick
1535                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1536            },
1537        )
1538    }
1539}
1540
1541#[cfg(test)]
1542mod tests {
1543    #[cfg(feature = "deploy")]
1544    use futures::{SinkExt, StreamExt};
1545    #[cfg(feature = "deploy")]
1546    use hydro_deploy::Deployment;
1547    #[cfg(any(feature = "deploy", feature = "sim"))]
1548    use stageleft::q;
1549
1550    #[cfg(any(feature = "deploy", feature = "sim"))]
1551    use crate::compile::builder::FlowBuilder;
1552    #[cfg(any(feature = "deploy", feature = "sim"))]
1553    use crate::location::Location;
1554    #[cfg(any(feature = "deploy", feature = "sim"))]
1555    use crate::nondet::nondet;
1556
1557    #[cfg(feature = "deploy")]
1558    #[tokio::test]
1559    async fn key_count_bounded_value() {
1560        let mut deployment = Deployment::new();
1561
1562        let mut flow = FlowBuilder::new();
1563        let node = flow.process::<()>();
1564        let external = flow.external::<()>();
1565
1566        let (input_port, input) = node.source_external_bincode(&external);
1567        let out = input
1568            .into_keyed()
1569            .first()
1570            .key_count()
1571            .sample_eager(nondet!(/** test */))
1572            .send_bincode_external(&external);
1573
1574        let nodes = flow
1575            .with_process(&node, deployment.Localhost())
1576            .with_external(&external, deployment.Localhost())
1577            .deploy(&mut deployment);
1578
1579        deployment.deploy().await.unwrap();
1580
1581        let mut external_in = nodes.connect(input_port).await;
1582        let mut external_out = nodes.connect(out).await;
1583
1584        deployment.start().await.unwrap();
1585
1586        assert_eq!(external_out.next().await.unwrap(), 0);
1587
1588        external_in.send((1, 1)).await.unwrap();
1589        assert_eq!(external_out.next().await.unwrap(), 1);
1590
1591        external_in.send((2, 2)).await.unwrap();
1592        assert_eq!(external_out.next().await.unwrap(), 2);
1593    }
1594
1595    #[cfg(feature = "deploy")]
1596    #[tokio::test]
1597    async fn key_count_unbounded_value() {
1598        let mut deployment = Deployment::new();
1599
1600        let mut flow = FlowBuilder::new();
1601        let node = flow.process::<()>();
1602        let external = flow.external::<()>();
1603
1604        let (input_port, input) = node.source_external_bincode(&external);
1605        let out = input
1606            .into_keyed()
1607            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1608            .key_count()
1609            .sample_eager(nondet!(/** test */))
1610            .send_bincode_external(&external);
1611
1612        let nodes = flow
1613            .with_process(&node, deployment.Localhost())
1614            .with_external(&external, deployment.Localhost())
1615            .deploy(&mut deployment);
1616
1617        deployment.deploy().await.unwrap();
1618
1619        let mut external_in = nodes.connect(input_port).await;
1620        let mut external_out = nodes.connect(out).await;
1621
1622        deployment.start().await.unwrap();
1623
1624        assert_eq!(external_out.next().await.unwrap(), 0);
1625
1626        external_in.send((1, 1)).await.unwrap();
1627        assert_eq!(external_out.next().await.unwrap(), 1);
1628
1629        external_in.send((1, 2)).await.unwrap();
1630        assert_eq!(external_out.next().await.unwrap(), 1);
1631
1632        external_in.send((2, 2)).await.unwrap();
1633        assert_eq!(external_out.next().await.unwrap(), 2);
1634
1635        external_in.send((1, 1)).await.unwrap();
1636        assert_eq!(external_out.next().await.unwrap(), 2);
1637
1638        external_in.send((3, 1)).await.unwrap();
1639        assert_eq!(external_out.next().await.unwrap(), 3);
1640    }
1641
1642    #[cfg(feature = "deploy")]
1643    #[tokio::test]
1644    async fn into_singleton_bounded_value() {
1645        let mut deployment = Deployment::new();
1646
1647        let mut flow = FlowBuilder::new();
1648        let node = flow.process::<()>();
1649        let external = flow.external::<()>();
1650
1651        let (input_port, input) = node.source_external_bincode(&external);
1652        let out = input
1653            .into_keyed()
1654            .first()
1655            .into_singleton()
1656            .sample_eager(nondet!(/** test */))
1657            .send_bincode_external(&external);
1658
1659        let nodes = flow
1660            .with_process(&node, deployment.Localhost())
1661            .with_external(&external, deployment.Localhost())
1662            .deploy(&mut deployment);
1663
1664        deployment.deploy().await.unwrap();
1665
1666        let mut external_in = nodes.connect(input_port).await;
1667        let mut external_out = nodes.connect(out).await;
1668
1669        deployment.start().await.unwrap();
1670
1671        assert_eq!(
1672            external_out.next().await.unwrap(),
1673            std::collections::HashMap::new()
1674        );
1675
1676        external_in.send((1, 1)).await.unwrap();
1677        assert_eq!(
1678            external_out.next().await.unwrap(),
1679            vec![(1, 1)].into_iter().collect()
1680        );
1681
1682        external_in.send((2, 2)).await.unwrap();
1683        assert_eq!(
1684            external_out.next().await.unwrap(),
1685            vec![(1, 1), (2, 2)].into_iter().collect()
1686        );
1687    }
1688
1689    #[cfg(feature = "deploy")]
1690    #[tokio::test]
1691    async fn into_singleton_unbounded_value() {
1692        let mut deployment = Deployment::new();
1693
1694        let mut flow = FlowBuilder::new();
1695        let node = flow.process::<()>();
1696        let external = flow.external::<()>();
1697
1698        let (input_port, input) = node.source_external_bincode(&external);
1699        let out = input
1700            .into_keyed()
1701            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1702            .into_singleton()
1703            .sample_eager(nondet!(/** test */))
1704            .send_bincode_external(&external);
1705
1706        let nodes = flow
1707            .with_process(&node, deployment.Localhost())
1708            .with_external(&external, deployment.Localhost())
1709            .deploy(&mut deployment);
1710
1711        deployment.deploy().await.unwrap();
1712
1713        let mut external_in = nodes.connect(input_port).await;
1714        let mut external_out = nodes.connect(out).await;
1715
1716        deployment.start().await.unwrap();
1717
1718        assert_eq!(
1719            external_out.next().await.unwrap(),
1720            std::collections::HashMap::new()
1721        );
1722
1723        external_in.send((1, 1)).await.unwrap();
1724        assert_eq!(
1725            external_out.next().await.unwrap(),
1726            vec![(1, 1)].into_iter().collect()
1727        );
1728
1729        external_in.send((1, 2)).await.unwrap();
1730        assert_eq!(
1731            external_out.next().await.unwrap(),
1732            vec![(1, 2)].into_iter().collect()
1733        );
1734
1735        external_in.send((2, 2)).await.unwrap();
1736        assert_eq!(
1737            external_out.next().await.unwrap(),
1738            vec![(1, 2), (2, 1)].into_iter().collect()
1739        );
1740
1741        external_in.send((1, 1)).await.unwrap();
1742        assert_eq!(
1743            external_out.next().await.unwrap(),
1744            vec![(1, 3), (2, 1)].into_iter().collect()
1745        );
1746
1747        external_in.send((3, 1)).await.unwrap();
1748        assert_eq!(
1749            external_out.next().await.unwrap(),
1750            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1751        );
1752    }
1753
1754    #[cfg(feature = "sim")]
1755    #[test]
1756    fn sim_unbounded_singleton_snapshot() {
1757        let mut flow = FlowBuilder::new();
1758        let node = flow.process::<()>();
1759
1760        let (input_port, input) = node.sim_input();
1761        let output = input
1762            .into_keyed()
1763            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1764            .snapshot(&node.tick(), nondet!(/** test */))
1765            .entries()
1766            .all_ticks()
1767            .sim_output();
1768
1769        let count = flow.sim().exhaustive(async || {
1770            input_port.send((1, 123));
1771            input_port.send((1, 456));
1772            input_port.send((2, 123));
1773
1774            let all = output.collect_sorted::<Vec<_>>().await;
1775            assert_eq!(all.last().unwrap(), &(2, 1));
1776        });
1777
1778        assert_eq!(count, 8);
1779    }
1780
1781    #[cfg(feature = "deploy")]
1782    #[tokio::test]
1783    async fn join_keyed_stream() {
1784        let mut deployment = Deployment::new();
1785
1786        let mut flow = FlowBuilder::new();
1787        let node = flow.process::<()>();
1788        let external = flow.external::<()>();
1789
1790        let tick = node.tick();
1791        let keyed_data = node
1792            .source_iter(q!(vec![(1, 10), (2, 20)]))
1793            .into_keyed()
1794            .batch(&tick, nondet!(/** test */))
1795            .first();
1796        let requests = node
1797            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1798            .into_keyed()
1799            .batch(&tick, nondet!(/** test */));
1800
1801        let out = keyed_data
1802            .join_keyed_stream(requests)
1803            .entries()
1804            .all_ticks()
1805            .send_bincode_external(&external);
1806
1807        let nodes = flow
1808            .with_process(&node, deployment.Localhost())
1809            .with_external(&external, deployment.Localhost())
1810            .deploy(&mut deployment);
1811
1812        deployment.deploy().await.unwrap();
1813
1814        let mut external_out = nodes.connect(out).await;
1815
1816        deployment.start().await.unwrap();
1817
1818        let mut results = vec![];
1819        for _ in 0..2 {
1820            results.push(external_out.next().await.unwrap());
1821        }
1822        results.sort();
1823
1824        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1825    }
1826}