Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37/// Wrapper that displays only the tokens of a parsed expr.
38///
39/// Boxes `syn::Type` which is ~240 bytes.
40#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl From<syn::Expr> for DebugExpr {
44    fn from(expr: syn::Expr) -> Self {
45        Self(Box::new(expr))
46    }
47}
48
49impl Deref for DebugExpr {
50    type Target = syn::Expr;
51
52    fn deref(&self) -> &Self::Target {
53        &self.0
54    }
55}
56
57impl ToTokens for DebugExpr {
58    fn to_tokens(&self, tokens: &mut TokenStream) {
59        self.0.to_tokens(tokens);
60    }
61}
62
63impl Debug for DebugExpr {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "{}", self.0.to_token_stream())
66    }
67}
68
69impl Display for DebugExpr {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        let original = self.0.as_ref().clone();
72        let simplified = simplify_q_macro(original);
73
74        // For now, just use quote formatting without trying to parse as a statement
75        // This avoids the syn::parse_quote! issues entirely
76        write!(f, "q!({})", quote::quote!(#simplified))
77    }
78}
79
80/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
81fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
82    // Try to parse the token string as a syn::Expr
83    // Use a visitor to simplify q! macro expansions
84    let mut simplifier = QMacroSimplifier::new();
85    simplifier.visit_expr_mut(&mut expr);
86
87    // If we found and simplified a q! macro, return the simplified version
88    if let Some(simplified) = simplifier.simplified_result {
89        simplified
90    } else {
91        expr
92    }
93}
94
95/// AST visitor that simplifies q! macro expansions
96#[derive(Default)]
97pub struct QMacroSimplifier {
98    pub simplified_result: Option<syn::Expr>,
99}
100
101impl QMacroSimplifier {
102    pub fn new() -> Self {
103        Self::default()
104    }
105}
106
107impl VisitMut for QMacroSimplifier {
108    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
109        // Check if we already found a result to avoid further processing
110        if self.simplified_result.is_some() {
111            return;
112        }
113
114        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
115            // Look for calls to stageleft::runtime_support::fn*
116            && self.is_stageleft_runtime_support_call(&path_expr.path)
117            // Try to extract the closure from the arguments
118            && let Some(closure) = self.extract_closure_from_args(&call.args)
119        {
120            self.simplified_result = Some(closure);
121            return;
122        }
123
124        // Continue visiting child expressions using the default implementation
125        // Use the default visitor to avoid infinite recursion
126        syn::visit_mut::visit_expr_mut(self, expr);
127    }
128}
129
130impl QMacroSimplifier {
131    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
132        // Check if this is a call to stageleft::runtime_support::fn*
133        if let Some(last_segment) = path.segments.last() {
134            let fn_name = last_segment.ident.to_string();
135            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
136            fn_name.contains("_type_hint")
137                && path.segments.len() > 2
138                && path.segments[0].ident == "stageleft"
139                && path.segments[1].ident == "runtime_support"
140        } else {
141            false
142        }
143    }
144
145    fn extract_closure_from_args(
146        &self,
147        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
148    ) -> Option<syn::Expr> {
149        // Look through the arguments for a closure expression
150        for arg in args {
151            if let syn::Expr::Closure(_) = arg {
152                return Some(arg.clone());
153            }
154            // Also check for closures nested in other expressions (like blocks)
155            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
156                return Some(closure_expr);
157            }
158        }
159        None
160    }
161
162    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
163        let mut visitor = ClosureFinder {
164            found_closure: None,
165            prefer_inner_blocks: true,
166        };
167        visitor.visit_expr(expr);
168        visitor.found_closure
169    }
170}
171
172/// Visitor that finds closures in expressions with special block handling
173struct ClosureFinder {
174    found_closure: Option<syn::Expr>,
175    prefer_inner_blocks: bool,
176}
177
178impl<'ast> Visit<'ast> for ClosureFinder {
179    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
180        // If we already found a closure, don't continue searching
181        if self.found_closure.is_some() {
182            return;
183        }
184
185        match expr {
186            syn::Expr::Closure(_) => {
187                self.found_closure = Some(expr.clone());
188            }
189            syn::Expr::Block(block) if self.prefer_inner_blocks => {
190                // Special handling for blocks - look for inner blocks that contain closures
191                for stmt in &block.block.stmts {
192                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
193                        && let syn::Expr::Block(_) = stmt_expr
194                    {
195                        // Check if this nested block contains a closure
196                        let mut inner_visitor = ClosureFinder {
197                            found_closure: None,
198                            prefer_inner_blocks: false, // Avoid infinite recursion
199                        };
200                        inner_visitor.visit_expr(stmt_expr);
201                        if inner_visitor.found_closure.is_some() {
202                            // Found a closure in an inner block, return that block
203                            self.found_closure = Some(stmt_expr.clone());
204                            return;
205                        }
206                    }
207                }
208
209                // If no inner block with closure found, continue with normal visitation
210                visit::visit_expr(self, expr);
211
212                // If we found a closure, just return the closure itself, not the whole block
213                // unless we're in the special case where we want the containing block
214                if self.found_closure.is_some() {
215                    // The closure was found during visitation, no need to wrap in block
216                }
217            }
218            _ => {
219                // Use default visitor behavior for all other expressions
220                visit::visit_expr(self, expr);
221            }
222        }
223    }
224}
225
226/// Debug displays the type's tokens.
227///
228/// Boxes `syn::Type` which is ~320 bytes.
229#[derive(Clone, PartialEq, Eq, Hash)]
230pub struct DebugType(pub Box<syn::Type>);
231
232impl From<syn::Type> for DebugType {
233    fn from(t: syn::Type) -> Self {
234        Self(Box::new(t))
235    }
236}
237
238impl Deref for DebugType {
239    type Target = syn::Type;
240
241    fn deref(&self) -> &Self::Target {
242        &self.0
243    }
244}
245
246impl ToTokens for DebugType {
247    fn to_tokens(&self, tokens: &mut TokenStream) {
248        self.0.to_tokens(tokens);
249    }
250}
251
252impl Debug for DebugType {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        write!(f, "{}", self.0.to_token_stream())
255    }
256}
257
258pub enum DebugInstantiate {
259    Building,
260    Finalized(Box<DebugInstantiateFinalized>),
261}
262
263#[cfg_attr(
264    not(feature = "build"),
265    expect(
266        dead_code,
267        reason = "sink, source unused without `feature = \"build\"`."
268    )
269)]
270pub struct DebugInstantiateFinalized {
271    sink: syn::Expr,
272    source: syn::Expr,
273    connect_fn: Option<Box<dyn FnOnce()>>,
274}
275
276impl From<DebugInstantiateFinalized> for DebugInstantiate {
277    fn from(f: DebugInstantiateFinalized) -> Self {
278        Self::Finalized(Box::new(f))
279    }
280}
281
282impl Debug for DebugInstantiate {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        write!(f, "<network instantiate>")
285    }
286}
287
288impl Hash for DebugInstantiate {
289    fn hash<H: Hasher>(&self, _state: &mut H) {
290        // Do nothing
291    }
292}
293
294impl Clone for DebugInstantiate {
295    fn clone(&self) -> Self {
296        match self {
297            DebugInstantiate::Building => DebugInstantiate::Building,
298            DebugInstantiate::Finalized(_) => {
299                panic!("DebugInstantiate::Finalized should not be cloned")
300            }
301        }
302    }
303}
304
305/// Tracks the instantiation state of a `ClusterMembers` source.
306///
307/// During `compile_network`, the first `ClusterMembers` node for a given
308/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
309/// receives the expression returned by `Deploy::cluster_membership_stream`.
310/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
311/// during code-gen they simply reference the tee output of the first node
312/// instead of creating a redundant `source_stream`.
313#[derive(Debug, Hash, Clone)]
314pub enum ClusterMembersState {
315    /// Not yet instantiated.
316    Uninit,
317    /// The primary instance: holds the stream expression and will emit
318    /// `source_stream(expr) -> tee()` during code-gen.
319    Stream(DebugExpr),
320    /// A secondary instance that references the tee output of the primary.
321    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
322    /// can derive the deterministic tee ident without extra state.
323    Tee(LocationId, LocationId),
324}
325
326/// A source in a Hydro graph, where data enters the graph.
327#[derive(Debug, Hash, Clone)]
328pub enum HydroSource {
329    Stream(DebugExpr),
330    ExternalNetwork(),
331    Iter(DebugExpr),
332    Spin(),
333    ClusterMembers(LocationId, ClusterMembersState),
334    Embedded(syn::Ident),
335    EmbeddedSingleton(syn::Ident),
336}
337
338#[cfg(feature = "build")]
339/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
340/// and simulations.
341///
342/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
343/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
344pub trait DfirBuilder {
345    /// Whether the representation of singletons should include intermediate states.
346    fn singleton_intermediates(&self) -> bool;
347
348    /// Gets the DFIR builder for the given location, creating it if necessary.
349    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
350
351    fn batch(
352        &mut self,
353        in_ident: syn::Ident,
354        in_location: &LocationId,
355        in_kind: &CollectionKind,
356        out_ident: &syn::Ident,
357        out_location: &LocationId,
358        op_meta: &HydroIrOpMetadata,
359    );
360    fn yield_from_tick(
361        &mut self,
362        in_ident: syn::Ident,
363        in_location: &LocationId,
364        in_kind: &CollectionKind,
365        out_ident: &syn::Ident,
366        out_location: &LocationId,
367    );
368
369    fn begin_atomic(
370        &mut self,
371        in_ident: syn::Ident,
372        in_location: &LocationId,
373        in_kind: &CollectionKind,
374        out_ident: &syn::Ident,
375        out_location: &LocationId,
376        op_meta: &HydroIrOpMetadata,
377    );
378    fn end_atomic(
379        &mut self,
380        in_ident: syn::Ident,
381        in_location: &LocationId,
382        in_kind: &CollectionKind,
383        out_ident: &syn::Ident,
384    );
385
386    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
387    fn observe_nondet(
388        &mut self,
389        trusted: bool,
390        location: &LocationId,
391        in_ident: syn::Ident,
392        in_kind: &CollectionKind,
393        out_ident: &syn::Ident,
394        out_kind: &CollectionKind,
395        op_meta: &HydroIrOpMetadata,
396    );
397
398    #[expect(clippy::too_many_arguments, reason = "TODO")]
399    fn create_network(
400        &mut self,
401        from: &LocationId,
402        to: &LocationId,
403        input_ident: syn::Ident,
404        out_ident: &syn::Ident,
405        serialize: Option<&DebugExpr>,
406        sink: syn::Expr,
407        source: syn::Expr,
408        deserialize: Option<&DebugExpr>,
409        tag_id: usize,
410        networking_info: &crate::networking::NetworkingInfo,
411    );
412
413    fn create_external_source(
414        &mut self,
415        on: &LocationId,
416        source_expr: syn::Expr,
417        out_ident: &syn::Ident,
418        deserialize: Option<&DebugExpr>,
419        tag_id: usize,
420    );
421
422    fn create_external_output(
423        &mut self,
424        on: &LocationId,
425        sink_expr: syn::Expr,
426        input_ident: &syn::Ident,
427        serialize: Option<&DebugExpr>,
428        tag_id: usize,
429    );
430}
431
432#[cfg(feature = "build")]
433impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
434    fn singleton_intermediates(&self) -> bool {
435        false
436    }
437
438    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
439        self.entry(location.root().key())
440            .expect("location was removed")
441            .or_default()
442    }
443
444    fn batch(
445        &mut self,
446        in_ident: syn::Ident,
447        in_location: &LocationId,
448        in_kind: &CollectionKind,
449        out_ident: &syn::Ident,
450        _out_location: &LocationId,
451        _op_meta: &HydroIrOpMetadata,
452    ) {
453        let builder = self.get_dfir_mut(in_location.root());
454        if in_kind.is_bounded()
455            && matches!(
456                in_kind,
457                CollectionKind::Singleton { .. }
458                    | CollectionKind::Optional { .. }
459                    | CollectionKind::KeyedSingleton { .. }
460            )
461        {
462            assert!(in_location.is_top_level());
463            builder.add_dfir(
464                parse_quote! {
465                    #out_ident = #in_ident -> persist::<'static>();
466                },
467                None,
468                None,
469            );
470        } else {
471            builder.add_dfir(
472                parse_quote! {
473                    #out_ident = #in_ident;
474                },
475                None,
476                None,
477            );
478        }
479    }
480
481    fn yield_from_tick(
482        &mut self,
483        in_ident: syn::Ident,
484        in_location: &LocationId,
485        _in_kind: &CollectionKind,
486        out_ident: &syn::Ident,
487        _out_location: &LocationId,
488    ) {
489        let builder = self.get_dfir_mut(in_location.root());
490        builder.add_dfir(
491            parse_quote! {
492                #out_ident = #in_ident;
493            },
494            None,
495            None,
496        );
497    }
498
499    fn begin_atomic(
500        &mut self,
501        in_ident: syn::Ident,
502        in_location: &LocationId,
503        _in_kind: &CollectionKind,
504        out_ident: &syn::Ident,
505        _out_location: &LocationId,
506        _op_meta: &HydroIrOpMetadata,
507    ) {
508        let builder = self.get_dfir_mut(in_location.root());
509        builder.add_dfir(
510            parse_quote! {
511                #out_ident = #in_ident;
512            },
513            None,
514            None,
515        );
516    }
517
518    fn end_atomic(
519        &mut self,
520        in_ident: syn::Ident,
521        in_location: &LocationId,
522        _in_kind: &CollectionKind,
523        out_ident: &syn::Ident,
524    ) {
525        let builder = self.get_dfir_mut(in_location.root());
526        builder.add_dfir(
527            parse_quote! {
528                #out_ident = #in_ident;
529            },
530            None,
531            None,
532        );
533    }
534
535    fn observe_nondet(
536        &mut self,
537        _trusted: bool,
538        location: &LocationId,
539        in_ident: syn::Ident,
540        _in_kind: &CollectionKind,
541        out_ident: &syn::Ident,
542        _out_kind: &CollectionKind,
543        _op_meta: &HydroIrOpMetadata,
544    ) {
545        let builder = self.get_dfir_mut(location);
546        builder.add_dfir(
547            parse_quote! {
548                #out_ident = #in_ident;
549            },
550            None,
551            None,
552        );
553    }
554
555    fn create_network(
556        &mut self,
557        from: &LocationId,
558        to: &LocationId,
559        input_ident: syn::Ident,
560        out_ident: &syn::Ident,
561        serialize: Option<&DebugExpr>,
562        sink: syn::Expr,
563        source: syn::Expr,
564        deserialize: Option<&DebugExpr>,
565        tag_id: usize,
566        _networking_info: &crate::networking::NetworkingInfo,
567    ) {
568        let sender_builder = self.get_dfir_mut(from);
569        if let Some(serialize_pipeline) = serialize {
570            sender_builder.add_dfir(
571                parse_quote! {
572                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
573                },
574                None,
575                // operator tag separates send and receive, which otherwise have the same next_stmt_id
576                Some(&format!("send{}", tag_id)),
577            );
578        } else {
579            sender_builder.add_dfir(
580                parse_quote! {
581                    #input_ident -> dest_sink(#sink);
582                },
583                None,
584                Some(&format!("send{}", tag_id)),
585            );
586        }
587
588        let receiver_builder = self.get_dfir_mut(to);
589        if let Some(deserialize_pipeline) = deserialize {
590            receiver_builder.add_dfir(
591                parse_quote! {
592                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
593                },
594                None,
595                Some(&format!("recv{}", tag_id)),
596            );
597        } else {
598            receiver_builder.add_dfir(
599                parse_quote! {
600                    #out_ident = source_stream(#source);
601                },
602                None,
603                Some(&format!("recv{}", tag_id)),
604            );
605        }
606    }
607
608    fn create_external_source(
609        &mut self,
610        on: &LocationId,
611        source_expr: syn::Expr,
612        out_ident: &syn::Ident,
613        deserialize: Option<&DebugExpr>,
614        tag_id: usize,
615    ) {
616        let receiver_builder = self.get_dfir_mut(on);
617        if let Some(deserialize_pipeline) = deserialize {
618            receiver_builder.add_dfir(
619                parse_quote! {
620                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
621                },
622                None,
623                Some(&format!("recv{}", tag_id)),
624            );
625        } else {
626            receiver_builder.add_dfir(
627                parse_quote! {
628                    #out_ident = source_stream(#source_expr);
629                },
630                None,
631                Some(&format!("recv{}", tag_id)),
632            );
633        }
634    }
635
636    fn create_external_output(
637        &mut self,
638        on: &LocationId,
639        sink_expr: syn::Expr,
640        input_ident: &syn::Ident,
641        serialize: Option<&DebugExpr>,
642        tag_id: usize,
643    ) {
644        let sender_builder = self.get_dfir_mut(on);
645        if let Some(serialize_fn) = serialize {
646            sender_builder.add_dfir(
647                parse_quote! {
648                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
649                },
650                None,
651                // operator tag separates send and receive, which otherwise have the same next_stmt_id
652                Some(&format!("send{}", tag_id)),
653            );
654        } else {
655            sender_builder.add_dfir(
656                parse_quote! {
657                    #input_ident -> dest_sink(#sink_expr);
658                },
659                None,
660                Some(&format!("send{}", tag_id)),
661            );
662        }
663    }
664}
665
666#[cfg(feature = "build")]
667pub enum BuildersOrCallback<'a, L, N>
668where
669    L: FnMut(&mut HydroRoot, &mut usize),
670    N: FnMut(&mut HydroNode, &mut usize),
671{
672    Builders(&'a mut dyn DfirBuilder),
673    Callback(L, N),
674}
675
676/// An root in a Hydro graph, which is an pipeline that doesn't emit
677/// any downstream values. Traversals over the dataflow graph and
678/// generating DFIR IR start from roots.
679#[derive(Debug, Hash)]
680pub enum HydroRoot {
681    ForEach {
682        f: DebugExpr,
683        input: Box<HydroNode>,
684        op_metadata: HydroIrOpMetadata,
685    },
686    SendExternal {
687        to_external_key: LocationKey,
688        to_port_id: ExternalPortId,
689        to_many: bool,
690        unpaired: bool,
691        serialize_fn: Option<DebugExpr>,
692        instantiate_fn: DebugInstantiate,
693        input: Box<HydroNode>,
694        op_metadata: HydroIrOpMetadata,
695    },
696    DestSink {
697        sink: DebugExpr,
698        input: Box<HydroNode>,
699        op_metadata: HydroIrOpMetadata,
700    },
701    CycleSink {
702        cycle_id: CycleId,
703        input: Box<HydroNode>,
704        op_metadata: HydroIrOpMetadata,
705    },
706    EmbeddedOutput {
707        ident: syn::Ident,
708        input: Box<HydroNode>,
709        op_metadata: HydroIrOpMetadata,
710    },
711    Null {
712        input: Box<HydroNode>,
713        op_metadata: HydroIrOpMetadata,
714    },
715}
716
717impl HydroRoot {
718    #[cfg(feature = "build")]
719    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
720    pub fn compile_network<'a, D>(
721        &mut self,
722        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723        seen_tees: &mut SeenSharedNodes,
724        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
725        processes: &SparseSecondaryMap<LocationKey, D::Process>,
726        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727        externals: &SparseSecondaryMap<LocationKey, D::External>,
728        env: &mut D::InstantiateEnv,
729    ) where
730        D: Deploy<'a>,
731    {
732        let refcell_extra_stmts = RefCell::new(extra_stmts);
733        let refcell_env = RefCell::new(env);
734        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
735        self.transform_bottom_up(
736            &mut |l| {
737                if let HydroRoot::SendExternal {
738                    input,
739                    to_external_key,
740                    to_port_id,
741                    to_many,
742                    unpaired,
743                    instantiate_fn,
744                    ..
745                } = l
746                {
747                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
748                        DebugInstantiate::Building => {
749                            let to_node = externals
750                                .get(*to_external_key)
751                                .unwrap_or_else(|| {
752                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
753                                })
754                                .clone();
755
756                            match input.metadata().location_id.root() {
757                                &LocationId::Process(process_key) => {
758                                    if *to_many {
759                                        (
760                                            (
761                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
762                                                parse_quote!(DUMMY),
763                                            ),
764                                            Box::new(|| {}) as Box<dyn FnOnce()>,
765                                        )
766                                    } else {
767                                        let from_node = processes
768                                            .get(process_key)
769                                            .unwrap_or_else(|| {
770                                                panic!("A process used in the graph was not instantiated: {}", process_key)
771                                            })
772                                            .clone();
773
774                                        let sink_port = from_node.next_port();
775                                        let source_port = to_node.next_port();
776
777                                        if *unpaired {
778                                            use stageleft::quote_type;
779                                            use tokio_util::codec::LengthDelimitedCodec;
780
781                                            to_node.register(*to_port_id, source_port.clone());
782
783                                            let _ = D::e2o_source(
784                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
785                                                &to_node, &source_port,
786                                                &from_node, &sink_port,
787                                                &quote_type::<LengthDelimitedCodec>(),
788                                                format!("{}_{}", *to_external_key, *to_port_id)
789                                            );
790                                        }
791
792                                        (
793                                            (
794                                                D::o2e_sink(
795                                                    &from_node,
796                                                    &sink_port,
797                                                    &to_node,
798                                                    &source_port,
799                                                    format!("{}_{}", *to_external_key, *to_port_id)
800                                                ),
801                                                parse_quote!(DUMMY),
802                                            ),
803                                            if *unpaired {
804                                                D::e2o_connect(
805                                                    &to_node,
806                                                    &source_port,
807                                                    &from_node,
808                                                    &sink_port,
809                                                    *to_many,
810                                                    NetworkHint::Auto,
811                                                )
812                                            } else {
813                                                Box::new(|| {}) as Box<dyn FnOnce()>
814                                            },
815                                        )
816                                    }
817                                }
818                                LocationId::Cluster(_) => todo!("SendExternal from a cluster location is not yet supported"),
819                                _ => panic!()
820                            }
821                        },
822
823                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
824                    };
825
826                    *instantiate_fn = DebugInstantiateFinalized {
827                        sink: sink_expr,
828                        source: source_expr,
829                        connect_fn: Some(connect_fn),
830                    }
831                    .into();
832                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
833                    let element_type = match &input.metadata().collection_kind {
834                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
835                        _ => panic!("Embedded output must have Stream collection kind"),
836                    };
837                    let location_key = match input.metadata().location_id.root() {
838                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
839                        _ => panic!("Embedded output must be on a process or cluster"),
840                    };
841                    D::register_embedded_output(
842                        &mut refcell_env.borrow_mut(),
843                        location_key,
844                        ident,
845                        &element_type,
846                    );
847                }
848            },
849            &mut |n| {
850                if let HydroNode::Network {
851                    name,
852                    networking_info,
853                    input,
854                    instantiate_fn,
855                    metadata,
856                    ..
857                } = n
858                {
859                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
860                        DebugInstantiate::Building => instantiate_network::<D>(
861                            &mut refcell_env.borrow_mut(),
862                            input.metadata().location_id.root(),
863                            metadata.location_id.root(),
864                            processes,
865                            clusters,
866                            name.as_deref(),
867                            networking_info,
868                        ),
869
870                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
871                    };
872
873                    *instantiate_fn = DebugInstantiateFinalized {
874                        sink: sink_expr,
875                        source: source_expr,
876                        connect_fn: Some(connect_fn),
877                    }
878                    .into();
879                } else if let HydroNode::ExternalInput {
880                    from_external_key,
881                    from_port_id,
882                    from_many,
883                    codec_type,
884                    port_hint,
885                    instantiate_fn,
886                    metadata,
887                    ..
888                } = n
889                {
890                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
891                        DebugInstantiate::Building => {
892                            let from_node = externals
893                                .get(*from_external_key)
894                                .unwrap_or_else(|| {
895                                    panic!(
896                                        "A external used in the graph was not instantiated: {}",
897                                        from_external_key,
898                                    )
899                                })
900                                .clone();
901
902                            match metadata.location_id.root() {
903                                &LocationId::Process(process_key) => {
904                                    let to_node = processes
905                                        .get(process_key)
906                                        .unwrap_or_else(|| {
907                                            panic!("A process used in the graph was not instantiated: {}", process_key)
908                                        })
909                                        .clone();
910
911                                    let sink_port = from_node.next_port();
912                                    let source_port = to_node.next_port();
913
914                                    from_node.register(*from_port_id, sink_port.clone());
915
916                                    (
917                                        (
918                                            parse_quote!(DUMMY),
919                                            if *from_many {
920                                                D::e2o_many_source(
921                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
922                                                    &to_node, &source_port,
923                                                    codec_type.0.as_ref(),
924                                                    format!("{}_{}", *from_external_key, *from_port_id)
925                                                )
926                                            } else {
927                                                D::e2o_source(
928                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
929                                                    &from_node, &sink_port,
930                                                    &to_node, &source_port,
931                                                    codec_type.0.as_ref(),
932                                                    format!("{}_{}", *from_external_key, *from_port_id)
933                                                )
934                                            },
935                                        ),
936                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
937                                    )
938                                }
939                                LocationId::Cluster(_) => todo!("ExternalInput to a cluster location is not yet supported"),
940                                _ => panic!()
941                            }
942                        },
943
944                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
945                    };
946
947                    *instantiate_fn = DebugInstantiateFinalized {
948                        sink: sink_expr,
949                        source: source_expr,
950                        connect_fn: Some(connect_fn),
951                    }
952                    .into();
953                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
954                    let element_type = match &metadata.collection_kind {
955                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
956                        _ => panic!("Embedded source must have Stream collection kind"),
957                    };
958                    let location_key = match metadata.location_id.root() {
959                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
960                        _ => panic!("Embedded source must be on a process or cluster"),
961                    };
962                    D::register_embedded_stream_input(
963                        &mut refcell_env.borrow_mut(),
964                        location_key,
965                        ident,
966                        &element_type,
967                    );
968                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
969                    let element_type = match &metadata.collection_kind {
970                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
971                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
972                    };
973                    let location_key = match metadata.location_id.root() {
974                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
975                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
976                    };
977                    D::register_embedded_singleton_input(
978                        &mut refcell_env.borrow_mut(),
979                        location_key,
980                        ident,
981                        &element_type,
982                    );
983                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
984                    match state {
985                        ClusterMembersState::Uninit => {
986                            let at_location = metadata.location_id.root().clone();
987                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
988                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
989                                // First occurrence: call cluster_membership_stream and mark as Stream.
990                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
991                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
992                                    &(),
993                                );
994                                *state = ClusterMembersState::Stream(expr.into());
995                            } else {
996                                // Already instantiated for this (at, target) pair: just tee.
997                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
998                            }
999                        }
1000                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1001                            panic!("cluster members already finalized");
1002                        }
1003                    }
1004                }
1005            },
1006            seen_tees,
1007            false,
1008        );
1009    }
1010
1011    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1012        self.transform_bottom_up(
1013            &mut |l| {
1014                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1015                    match instantiate_fn {
1016                        DebugInstantiate::Building => panic!("network not built"),
1017
1018                        DebugInstantiate::Finalized(finalized) => {
1019                            (finalized.connect_fn.take().unwrap())();
1020                        }
1021                    }
1022                }
1023            },
1024            &mut |n| {
1025                if let HydroNode::Network { instantiate_fn, .. }
1026                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1027                {
1028                    match instantiate_fn {
1029                        DebugInstantiate::Building => panic!("network not built"),
1030
1031                        DebugInstantiate::Finalized(finalized) => {
1032                            (finalized.connect_fn.take().unwrap())();
1033                        }
1034                    }
1035                }
1036            },
1037            seen_tees,
1038            false,
1039        );
1040    }
1041
1042    pub fn transform_bottom_up(
1043        &mut self,
1044        transform_root: &mut impl FnMut(&mut HydroRoot),
1045        transform_node: &mut impl FnMut(&mut HydroNode),
1046        seen_tees: &mut SeenSharedNodes,
1047        check_well_formed: bool,
1048    ) {
1049        self.transform_children(
1050            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1051            seen_tees,
1052        );
1053
1054        transform_root(self);
1055    }
1056
1057    pub fn transform_children(
1058        &mut self,
1059        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1060        seen_tees: &mut SeenSharedNodes,
1061    ) {
1062        match self {
1063            HydroRoot::ForEach { input, .. }
1064            | HydroRoot::SendExternal { input, .. }
1065            | HydroRoot::DestSink { input, .. }
1066            | HydroRoot::CycleSink { input, .. }
1067            | HydroRoot::EmbeddedOutput { input, .. }
1068            | HydroRoot::Null { input, .. } => {
1069                transform(input, seen_tees);
1070            }
1071        }
1072    }
1073
1074    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1075        match self {
1076            HydroRoot::ForEach {
1077                f,
1078                input,
1079                op_metadata,
1080            } => HydroRoot::ForEach {
1081                f: f.clone(),
1082                input: Box::new(input.deep_clone(seen_tees)),
1083                op_metadata: op_metadata.clone(),
1084            },
1085            HydroRoot::SendExternal {
1086                to_external_key,
1087                to_port_id,
1088                to_many,
1089                unpaired,
1090                serialize_fn,
1091                instantiate_fn,
1092                input,
1093                op_metadata,
1094            } => HydroRoot::SendExternal {
1095                to_external_key: *to_external_key,
1096                to_port_id: *to_port_id,
1097                to_many: *to_many,
1098                unpaired: *unpaired,
1099                serialize_fn: serialize_fn.clone(),
1100                instantiate_fn: instantiate_fn.clone(),
1101                input: Box::new(input.deep_clone(seen_tees)),
1102                op_metadata: op_metadata.clone(),
1103            },
1104            HydroRoot::DestSink {
1105                sink,
1106                input,
1107                op_metadata,
1108            } => HydroRoot::DestSink {
1109                sink: sink.clone(),
1110                input: Box::new(input.deep_clone(seen_tees)),
1111                op_metadata: op_metadata.clone(),
1112            },
1113            HydroRoot::CycleSink {
1114                cycle_id,
1115                input,
1116                op_metadata,
1117            } => HydroRoot::CycleSink {
1118                cycle_id: *cycle_id,
1119                input: Box::new(input.deep_clone(seen_tees)),
1120                op_metadata: op_metadata.clone(),
1121            },
1122            HydroRoot::EmbeddedOutput {
1123                ident,
1124                input,
1125                op_metadata,
1126            } => HydroRoot::EmbeddedOutput {
1127                ident: ident.clone(),
1128                input: Box::new(input.deep_clone(seen_tees)),
1129                op_metadata: op_metadata.clone(),
1130            },
1131            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1132                input: Box::new(input.deep_clone(seen_tees)),
1133                op_metadata: op_metadata.clone(),
1134            },
1135        }
1136    }
1137
1138    #[cfg(feature = "build")]
1139    pub fn emit(
1140        &mut self,
1141        graph_builders: &mut dyn DfirBuilder,
1142        seen_tees: &mut SeenSharedNodes,
1143        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1144        next_stmt_id: &mut usize,
1145    ) {
1146        self.emit_core(
1147            &mut BuildersOrCallback::<
1148                fn(&mut HydroRoot, &mut usize),
1149                fn(&mut HydroNode, &mut usize),
1150            >::Builders(graph_builders),
1151            seen_tees,
1152            built_tees,
1153            next_stmt_id,
1154        );
1155    }
1156
1157    #[cfg(feature = "build")]
1158    pub fn emit_core(
1159        &mut self,
1160        builders_or_callback: &mut BuildersOrCallback<
1161            impl FnMut(&mut HydroRoot, &mut usize),
1162            impl FnMut(&mut HydroNode, &mut usize),
1163        >,
1164        seen_tees: &mut SeenSharedNodes,
1165        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1166        next_stmt_id: &mut usize,
1167    ) {
1168        match self {
1169            HydroRoot::ForEach { f, input, .. } => {
1170                let input_ident =
1171                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1172
1173                match builders_or_callback {
1174                    BuildersOrCallback::Builders(graph_builders) => {
1175                        graph_builders
1176                            .get_dfir_mut(&input.metadata().location_id)
1177                            .add_dfir(
1178                                parse_quote! {
1179                                    #input_ident -> for_each(#f);
1180                                },
1181                                None,
1182                                Some(&next_stmt_id.to_string()),
1183                            );
1184                    }
1185                    BuildersOrCallback::Callback(leaf_callback, _) => {
1186                        leaf_callback(self, next_stmt_id);
1187                    }
1188                }
1189
1190                *next_stmt_id += 1;
1191            }
1192
1193            HydroRoot::SendExternal {
1194                serialize_fn,
1195                instantiate_fn,
1196                input,
1197                ..
1198            } => {
1199                let input_ident =
1200                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1201
1202                match builders_or_callback {
1203                    BuildersOrCallback::Builders(graph_builders) => {
1204                        let (sink_expr, _) = match instantiate_fn {
1205                            DebugInstantiate::Building => (
1206                                syn::parse_quote!(DUMMY_SINK),
1207                                syn::parse_quote!(DUMMY_SOURCE),
1208                            ),
1209
1210                            DebugInstantiate::Finalized(finalized) => {
1211                                (finalized.sink.clone(), finalized.source.clone())
1212                            }
1213                        };
1214
1215                        graph_builders.create_external_output(
1216                            &input.metadata().location_id,
1217                            sink_expr,
1218                            &input_ident,
1219                            serialize_fn.as_ref(),
1220                            *next_stmt_id,
1221                        );
1222                    }
1223                    BuildersOrCallback::Callback(leaf_callback, _) => {
1224                        leaf_callback(self, next_stmt_id);
1225                    }
1226                }
1227
1228                *next_stmt_id += 1;
1229            }
1230
1231            HydroRoot::DestSink { sink, input, .. } => {
1232                let input_ident =
1233                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1234
1235                match builders_or_callback {
1236                    BuildersOrCallback::Builders(graph_builders) => {
1237                        graph_builders
1238                            .get_dfir_mut(&input.metadata().location_id)
1239                            .add_dfir(
1240                                parse_quote! {
1241                                    #input_ident -> dest_sink(#sink);
1242                                },
1243                                None,
1244                                Some(&next_stmt_id.to_string()),
1245                            );
1246                    }
1247                    BuildersOrCallback::Callback(leaf_callback, _) => {
1248                        leaf_callback(self, next_stmt_id);
1249                    }
1250                }
1251
1252                *next_stmt_id += 1;
1253            }
1254
1255            HydroRoot::CycleSink {
1256                cycle_id, input, ..
1257            } => {
1258                let input_ident =
1259                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1260
1261                match builders_or_callback {
1262                    BuildersOrCallback::Builders(graph_builders) => {
1263                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1264                            CollectionKind::KeyedSingleton {
1265                                key_type,
1266                                value_type,
1267                                ..
1268                            }
1269                            | CollectionKind::KeyedStream {
1270                                key_type,
1271                                value_type,
1272                                ..
1273                            } => {
1274                                parse_quote!((#key_type, #value_type))
1275                            }
1276                            CollectionKind::Stream { element_type, .. }
1277                            | CollectionKind::Singleton { element_type, .. }
1278                            | CollectionKind::Optional { element_type, .. } => {
1279                                parse_quote!(#element_type)
1280                            }
1281                        };
1282
1283                        let cycle_id_ident = cycle_id.as_ident();
1284                        graph_builders
1285                            .get_dfir_mut(&input.metadata().location_id)
1286                            .add_dfir(
1287                                parse_quote! {
1288                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1289                                },
1290                                None,
1291                                None,
1292                            );
1293                    }
1294                    // No ID, no callback
1295                    BuildersOrCallback::Callback(_, _) => {}
1296                }
1297            }
1298
1299            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1300                let input_ident =
1301                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1302
1303                match builders_or_callback {
1304                    BuildersOrCallback::Builders(graph_builders) => {
1305                        graph_builders
1306                            .get_dfir_mut(&input.metadata().location_id)
1307                            .add_dfir(
1308                                parse_quote! {
1309                                    #input_ident -> for_each(&mut #ident);
1310                                },
1311                                None,
1312                                Some(&next_stmt_id.to_string()),
1313                            );
1314                    }
1315                    BuildersOrCallback::Callback(leaf_callback, _) => {
1316                        leaf_callback(self, next_stmt_id);
1317                    }
1318                }
1319
1320                *next_stmt_id += 1;
1321            }
1322
1323            HydroRoot::Null { input, .. } => {
1324                let input_ident =
1325                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1326
1327                match builders_or_callback {
1328                    BuildersOrCallback::Builders(graph_builders) => {
1329                        graph_builders
1330                            .get_dfir_mut(&input.metadata().location_id)
1331                            .add_dfir(
1332                                parse_quote! {
1333                                    #input_ident -> for_each(|_| {});
1334                                },
1335                                None,
1336                                Some(&next_stmt_id.to_string()),
1337                            );
1338                    }
1339                    BuildersOrCallback::Callback(leaf_callback, _) => {
1340                        leaf_callback(self, next_stmt_id);
1341                    }
1342                }
1343
1344                *next_stmt_id += 1;
1345            }
1346        }
1347    }
1348
1349    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1350        match self {
1351            HydroRoot::ForEach { op_metadata, .. }
1352            | HydroRoot::SendExternal { op_metadata, .. }
1353            | HydroRoot::DestSink { op_metadata, .. }
1354            | HydroRoot::CycleSink { op_metadata, .. }
1355            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1356            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1357        }
1358    }
1359
1360    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1361        match self {
1362            HydroRoot::ForEach { op_metadata, .. }
1363            | HydroRoot::SendExternal { op_metadata, .. }
1364            | HydroRoot::DestSink { op_metadata, .. }
1365            | HydroRoot::CycleSink { op_metadata, .. }
1366            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1367            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1368        }
1369    }
1370
1371    pub fn input(&self) -> &HydroNode {
1372        match self {
1373            HydroRoot::ForEach { input, .. }
1374            | HydroRoot::SendExternal { input, .. }
1375            | HydroRoot::DestSink { input, .. }
1376            | HydroRoot::CycleSink { input, .. }
1377            | HydroRoot::EmbeddedOutput { input, .. }
1378            | HydroRoot::Null { input, .. } => input,
1379        }
1380    }
1381
1382    pub fn input_metadata(&self) -> &HydroIrMetadata {
1383        self.input().metadata()
1384    }
1385
1386    pub fn print_root(&self) -> String {
1387        match self {
1388            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1389            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1390            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1391            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1392            HydroRoot::EmbeddedOutput { ident, .. } => {
1393                format!("EmbeddedOutput({})", ident)
1394            }
1395            HydroRoot::Null { .. } => "Null".to_owned(),
1396        }
1397    }
1398
1399    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1400        match self {
1401            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1402                transform(f);
1403            }
1404            HydroRoot::SendExternal { .. }
1405            | HydroRoot::CycleSink { .. }
1406            | HydroRoot::EmbeddedOutput { .. }
1407            | HydroRoot::Null { .. } => {}
1408        }
1409    }
1410}
1411
1412#[cfg(feature = "build")]
1413fn tick_of(loc: &LocationId) -> Option<ClockId> {
1414    match loc {
1415        LocationId::Tick(id, _) => Some(*id),
1416        LocationId::Atomic(inner) => tick_of(inner),
1417        _ => None,
1418    }
1419}
1420
1421#[cfg(feature = "build")]
1422fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1423    match loc {
1424        LocationId::Tick(id, inner) => {
1425            *id = uf_find(uf, *id);
1426            remap_location(inner, uf);
1427        }
1428        LocationId::Atomic(inner) => {
1429            remap_location(inner, uf);
1430        }
1431        LocationId::Process(_) | LocationId::Cluster(_) => {}
1432    }
1433}
1434
1435#[cfg(feature = "build")]
1436fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1437    let p = *parent.get(&x).unwrap_or(&x);
1438    if p == x {
1439        return x;
1440    }
1441    let root = uf_find(parent, p);
1442    parent.insert(x, root);
1443    root
1444}
1445
1446#[cfg(feature = "build")]
1447fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1448    let ra = uf_find(parent, a);
1449    let rb = uf_find(parent, b);
1450    if ra != rb {
1451        parent.insert(ra, rb);
1452    }
1453}
1454
1455/// Traverse the IR to build a union-find that unifies tick IDs connected
1456/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1457/// rewrite all `LocationId`s to use the representative tick ID.
1458#[cfg(feature = "build")]
1459pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1460    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1461
1462    // Pass 1: collect unifications.
1463    transform_bottom_up(
1464        ir,
1465        &mut |_| {},
1466        &mut |node: &mut HydroNode| {
1467            if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1468                node
1469                && let (Some(a), Some(b)) = (
1470                    tick_of(&inner.metadata().location_id),
1471                    tick_of(&metadata.location_id),
1472                )
1473            {
1474                uf_union(&mut uf, a, b);
1475            }
1476        },
1477        false,
1478    );
1479
1480    // Pass 2: rewrite all LocationIds.
1481    transform_bottom_up(
1482        ir,
1483        &mut |_| {},
1484        &mut |node: &mut HydroNode| {
1485            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1486        },
1487        false,
1488    );
1489}
1490
1491#[cfg(feature = "build")]
1492pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1493    let mut builders = SecondaryMap::new();
1494    let mut seen_tees = HashMap::new();
1495    let mut built_tees = HashMap::new();
1496    let mut next_stmt_id = 0;
1497    for leaf in ir {
1498        leaf.emit(
1499            &mut builders,
1500            &mut seen_tees,
1501            &mut built_tees,
1502            &mut next_stmt_id,
1503        );
1504    }
1505    builders
1506}
1507
1508#[cfg(feature = "build")]
1509pub fn traverse_dfir(
1510    ir: &mut [HydroRoot],
1511    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1512    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1513) {
1514    let mut seen_tees = HashMap::new();
1515    let mut built_tees = HashMap::new();
1516    let mut next_stmt_id = 0;
1517    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1518    ir.iter_mut().for_each(|leaf| {
1519        leaf.emit_core(
1520            &mut callback,
1521            &mut seen_tees,
1522            &mut built_tees,
1523            &mut next_stmt_id,
1524        );
1525    });
1526}
1527
1528pub fn transform_bottom_up(
1529    ir: &mut [HydroRoot],
1530    transform_root: &mut impl FnMut(&mut HydroRoot),
1531    transform_node: &mut impl FnMut(&mut HydroNode),
1532    check_well_formed: bool,
1533) {
1534    let mut seen_tees = HashMap::new();
1535    ir.iter_mut().for_each(|leaf| {
1536        leaf.transform_bottom_up(
1537            transform_root,
1538            transform_node,
1539            &mut seen_tees,
1540            check_well_formed,
1541        );
1542    });
1543}
1544
1545pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1546    let mut seen_tees = HashMap::new();
1547    ir.iter()
1548        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1549        .collect()
1550}
1551
1552type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1553thread_local! {
1554    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1555}
1556
1557pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1558    PRINTED_TEES.with(|printed_tees| {
1559        let mut printed_tees_mut = printed_tees.borrow_mut();
1560        *printed_tees_mut = Some((0, HashMap::new()));
1561        drop(printed_tees_mut);
1562
1563        let ret = f();
1564
1565        let mut printed_tees_mut = printed_tees.borrow_mut();
1566        *printed_tees_mut = None;
1567
1568        ret
1569    })
1570}
1571
1572pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1573
1574impl SharedNode {
1575    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1576        Rc::as_ptr(&self.0)
1577    }
1578}
1579
1580impl Debug for SharedNode {
1581    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1582        PRINTED_TEES.with(|printed_tees| {
1583            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1584            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1585
1586            if let Some(printed_tees_mut) = printed_tees_mut {
1587                if let Some(existing) = printed_tees_mut
1588                    .1
1589                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1590                {
1591                    write!(f, "<shared {}>", existing)
1592                } else {
1593                    let next_id = printed_tees_mut.0;
1594                    printed_tees_mut.0 += 1;
1595                    printed_tees_mut
1596                        .1
1597                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1598                    drop(printed_tees_mut_borrow);
1599                    write!(f, "<shared {}>: ", next_id)?;
1600                    Debug::fmt(&self.0.borrow(), f)
1601                }
1602            } else {
1603                drop(printed_tees_mut_borrow);
1604                write!(f, "<shared>: ")?;
1605                Debug::fmt(&self.0.borrow(), f)
1606            }
1607        })
1608    }
1609}
1610
1611impl Hash for SharedNode {
1612    fn hash<H: Hasher>(&self, state: &mut H) {
1613        self.0.borrow_mut().hash(state);
1614    }
1615}
1616
1617#[derive(Clone, PartialEq, Eq, Debug)]
1618pub enum BoundKind {
1619    Unbounded,
1620    Bounded,
1621}
1622
1623#[derive(Clone, PartialEq, Eq, Debug)]
1624pub enum StreamOrder {
1625    NoOrder,
1626    TotalOrder,
1627}
1628
1629#[derive(Clone, PartialEq, Eq, Debug)]
1630pub enum StreamRetry {
1631    AtLeastOnce,
1632    ExactlyOnce,
1633}
1634
1635#[derive(Clone, PartialEq, Eq, Debug)]
1636pub enum KeyedSingletonBoundKind {
1637    Unbounded,
1638    BoundedValue,
1639    Bounded,
1640}
1641
1642#[derive(Clone, PartialEq, Eq, Debug)]
1643pub enum CollectionKind {
1644    Stream {
1645        bound: BoundKind,
1646        order: StreamOrder,
1647        retry: StreamRetry,
1648        element_type: DebugType,
1649    },
1650    Singleton {
1651        bound: BoundKind,
1652        element_type: DebugType,
1653    },
1654    Optional {
1655        bound: BoundKind,
1656        element_type: DebugType,
1657    },
1658    KeyedStream {
1659        bound: BoundKind,
1660        value_order: StreamOrder,
1661        value_retry: StreamRetry,
1662        key_type: DebugType,
1663        value_type: DebugType,
1664    },
1665    KeyedSingleton {
1666        bound: KeyedSingletonBoundKind,
1667        key_type: DebugType,
1668        value_type: DebugType,
1669    },
1670}
1671
1672impl CollectionKind {
1673    pub fn is_bounded(&self) -> bool {
1674        matches!(
1675            self,
1676            CollectionKind::Stream {
1677                bound: BoundKind::Bounded,
1678                ..
1679            } | CollectionKind::Singleton {
1680                bound: BoundKind::Bounded,
1681                ..
1682            } | CollectionKind::Optional {
1683                bound: BoundKind::Bounded,
1684                ..
1685            } | CollectionKind::KeyedStream {
1686                bound: BoundKind::Bounded,
1687                ..
1688            } | CollectionKind::KeyedSingleton {
1689                bound: KeyedSingletonBoundKind::Bounded,
1690                ..
1691            }
1692        )
1693    }
1694}
1695
1696#[derive(Clone)]
1697pub struct HydroIrMetadata {
1698    pub location_id: LocationId,
1699    pub collection_kind: CollectionKind,
1700    pub cardinality: Option<usize>,
1701    pub tag: Option<String>,
1702    pub op: HydroIrOpMetadata,
1703}
1704
1705// HydroIrMetadata shouldn't be used to hash or compare
1706impl Hash for HydroIrMetadata {
1707    fn hash<H: Hasher>(&self, _: &mut H) {}
1708}
1709
1710impl PartialEq for HydroIrMetadata {
1711    fn eq(&self, _: &Self) -> bool {
1712        true
1713    }
1714}
1715
1716impl Eq for HydroIrMetadata {}
1717
1718impl Debug for HydroIrMetadata {
1719    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1720        f.debug_struct("HydroIrMetadata")
1721            .field("location_id", &self.location_id)
1722            .field("collection_kind", &self.collection_kind)
1723            .finish()
1724    }
1725}
1726
1727/// Metadata that is specific to the operator itself, rather than its outputs.
1728/// This is available on _both_ inner nodes and roots.
1729#[derive(Clone)]
1730pub struct HydroIrOpMetadata {
1731    pub backtrace: Backtrace,
1732    pub cpu_usage: Option<f64>,
1733    pub network_recv_cpu_usage: Option<f64>,
1734    pub id: Option<usize>,
1735}
1736
1737impl HydroIrOpMetadata {
1738    #[expect(
1739        clippy::new_without_default,
1740        reason = "explicit calls to new ensure correct backtrace bounds"
1741    )]
1742    pub fn new() -> HydroIrOpMetadata {
1743        Self::new_with_skip(1)
1744    }
1745
1746    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1747        HydroIrOpMetadata {
1748            backtrace: Backtrace::get_backtrace(2 + skip_count),
1749            cpu_usage: None,
1750            network_recv_cpu_usage: None,
1751            id: None,
1752        }
1753    }
1754}
1755
1756impl Debug for HydroIrOpMetadata {
1757    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1758        f.debug_struct("HydroIrOpMetadata").finish()
1759    }
1760}
1761
1762impl Hash for HydroIrOpMetadata {
1763    fn hash<H: Hasher>(&self, _: &mut H) {}
1764}
1765
1766/// An intermediate node in a Hydro graph, which consumes data
1767/// from upstream nodes and emits data to downstream nodes.
1768#[derive(Debug, Hash)]
1769pub enum HydroNode {
1770    Placeholder,
1771
1772    /// Manually "casts" between two different collection kinds.
1773    ///
1774    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1775    /// correctness checks. In particular, the user must ensure that every possible
1776    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1777    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1778    /// collection. This ensures that the simulator does not miss any possible outputs.
1779    Cast {
1780        inner: Box<HydroNode>,
1781        metadata: HydroIrMetadata,
1782    },
1783
1784    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1785    /// interpretation of the input stream.
1786    ///
1787    /// In production, this simply passes through the input, but in simulation, this operator
1788    /// explicitly selects a randomized interpretation.
1789    ObserveNonDet {
1790        inner: Box<HydroNode>,
1791        trusted: bool, // if true, we do not need to simulate non-determinism
1792        metadata: HydroIrMetadata,
1793    },
1794
1795    Source {
1796        source: HydroSource,
1797        metadata: HydroIrMetadata,
1798    },
1799
1800    SingletonSource {
1801        value: DebugExpr,
1802        first_tick_only: bool,
1803        metadata: HydroIrMetadata,
1804    },
1805
1806    CycleSource {
1807        cycle_id: CycleId,
1808        metadata: HydroIrMetadata,
1809    },
1810
1811    Tee {
1812        inner: SharedNode,
1813        metadata: HydroIrMetadata,
1814    },
1815
1816    Partition {
1817        inner: SharedNode,
1818        f: DebugExpr,
1819        is_true: bool,
1820        metadata: HydroIrMetadata,
1821    },
1822
1823    BeginAtomic {
1824        inner: Box<HydroNode>,
1825        metadata: HydroIrMetadata,
1826    },
1827
1828    EndAtomic {
1829        inner: Box<HydroNode>,
1830        metadata: HydroIrMetadata,
1831    },
1832
1833    Batch {
1834        inner: Box<HydroNode>,
1835        metadata: HydroIrMetadata,
1836    },
1837
1838    YieldConcat {
1839        inner: Box<HydroNode>,
1840        metadata: HydroIrMetadata,
1841    },
1842
1843    Chain {
1844        first: Box<HydroNode>,
1845        second: Box<HydroNode>,
1846        metadata: HydroIrMetadata,
1847    },
1848
1849    ChainFirst {
1850        first: Box<HydroNode>,
1851        second: Box<HydroNode>,
1852        metadata: HydroIrMetadata,
1853    },
1854
1855    CrossProduct {
1856        left: Box<HydroNode>,
1857        right: Box<HydroNode>,
1858        metadata: HydroIrMetadata,
1859    },
1860
1861    CrossSingleton {
1862        left: Box<HydroNode>,
1863        right: Box<HydroNode>,
1864        metadata: HydroIrMetadata,
1865    },
1866
1867    Join {
1868        left: Box<HydroNode>,
1869        right: Box<HydroNode>,
1870        metadata: HydroIrMetadata,
1871    },
1872
1873    Difference {
1874        pos: Box<HydroNode>,
1875        neg: Box<HydroNode>,
1876        metadata: HydroIrMetadata,
1877    },
1878
1879    AntiJoin {
1880        pos: Box<HydroNode>,
1881        neg: Box<HydroNode>,
1882        metadata: HydroIrMetadata,
1883    },
1884
1885    ResolveFutures {
1886        input: Box<HydroNode>,
1887        metadata: HydroIrMetadata,
1888    },
1889    ResolveFuturesBlocking {
1890        input: Box<HydroNode>,
1891        metadata: HydroIrMetadata,
1892    },
1893    ResolveFuturesOrdered {
1894        input: Box<HydroNode>,
1895        metadata: HydroIrMetadata,
1896    },
1897
1898    Map {
1899        f: DebugExpr,
1900        input: Box<HydroNode>,
1901        metadata: HydroIrMetadata,
1902    },
1903    FlatMap {
1904        f: DebugExpr,
1905        input: Box<HydroNode>,
1906        metadata: HydroIrMetadata,
1907    },
1908    Filter {
1909        f: DebugExpr,
1910        input: Box<HydroNode>,
1911        metadata: HydroIrMetadata,
1912    },
1913    FilterMap {
1914        f: DebugExpr,
1915        input: Box<HydroNode>,
1916        metadata: HydroIrMetadata,
1917    },
1918
1919    DeferTick {
1920        input: Box<HydroNode>,
1921        metadata: HydroIrMetadata,
1922    },
1923    Enumerate {
1924        input: Box<HydroNode>,
1925        metadata: HydroIrMetadata,
1926    },
1927    Inspect {
1928        f: DebugExpr,
1929        input: Box<HydroNode>,
1930        metadata: HydroIrMetadata,
1931    },
1932
1933    Unique {
1934        input: Box<HydroNode>,
1935        metadata: HydroIrMetadata,
1936    },
1937
1938    Sort {
1939        input: Box<HydroNode>,
1940        metadata: HydroIrMetadata,
1941    },
1942    Fold {
1943        init: DebugExpr,
1944        acc: DebugExpr,
1945        input: Box<HydroNode>,
1946        metadata: HydroIrMetadata,
1947    },
1948
1949    Scan {
1950        init: DebugExpr,
1951        acc: DebugExpr,
1952        input: Box<HydroNode>,
1953        metadata: HydroIrMetadata,
1954    },
1955    FoldKeyed {
1956        init: DebugExpr,
1957        acc: DebugExpr,
1958        input: Box<HydroNode>,
1959        metadata: HydroIrMetadata,
1960    },
1961
1962    Reduce {
1963        f: DebugExpr,
1964        input: Box<HydroNode>,
1965        metadata: HydroIrMetadata,
1966    },
1967    ReduceKeyed {
1968        f: DebugExpr,
1969        input: Box<HydroNode>,
1970        metadata: HydroIrMetadata,
1971    },
1972    ReduceKeyedWatermark {
1973        f: DebugExpr,
1974        input: Box<HydroNode>,
1975        watermark: Box<HydroNode>,
1976        metadata: HydroIrMetadata,
1977    },
1978
1979    Network {
1980        name: Option<String>,
1981        networking_info: crate::networking::NetworkingInfo,
1982        serialize_fn: Option<DebugExpr>,
1983        instantiate_fn: DebugInstantiate,
1984        deserialize_fn: Option<DebugExpr>,
1985        input: Box<HydroNode>,
1986        metadata: HydroIrMetadata,
1987    },
1988
1989    ExternalInput {
1990        from_external_key: LocationKey,
1991        from_port_id: ExternalPortId,
1992        from_many: bool,
1993        codec_type: DebugType,
1994        port_hint: NetworkHint,
1995        instantiate_fn: DebugInstantiate,
1996        deserialize_fn: Option<DebugExpr>,
1997        metadata: HydroIrMetadata,
1998    },
1999
2000    Counter {
2001        tag: String,
2002        duration: DebugExpr,
2003        prefix: String,
2004        input: Box<HydroNode>,
2005        metadata: HydroIrMetadata,
2006    },
2007}
2008
2009pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2010pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2011
2012impl HydroNode {
2013    pub fn transform_bottom_up(
2014        &mut self,
2015        transform: &mut impl FnMut(&mut HydroNode),
2016        seen_tees: &mut SeenSharedNodes,
2017        check_well_formed: bool,
2018    ) {
2019        self.transform_children(
2020            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2021            seen_tees,
2022        );
2023
2024        transform(self);
2025
2026        let self_location = self.metadata().location_id.root();
2027
2028        if check_well_formed {
2029            match &*self {
2030                HydroNode::Network { .. } => {}
2031                _ => {
2032                    self.input_metadata().iter().for_each(|i| {
2033                        if i.location_id.root() != self_location {
2034                            panic!(
2035                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2036                                i,
2037                                i.location_id.root(),
2038                                self,
2039                                self_location
2040                            )
2041                        }
2042                    });
2043                }
2044            }
2045        }
2046    }
2047
2048    #[inline(always)]
2049    pub fn transform_children(
2050        &mut self,
2051        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2052        seen_tees: &mut SeenSharedNodes,
2053    ) {
2054        match self {
2055            HydroNode::Placeholder => {
2056                panic!();
2057            }
2058
2059            HydroNode::Source { .. }
2060            | HydroNode::SingletonSource { .. }
2061            | HydroNode::CycleSource { .. }
2062            | HydroNode::ExternalInput { .. } => {}
2063
2064            HydroNode::Tee { inner, .. } => {
2065                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2066                    *inner = SharedNode(transformed.clone());
2067                } else {
2068                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2069                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2070                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2071                    transform(&mut orig, seen_tees);
2072                    *transformed_cell.borrow_mut() = orig;
2073                    *inner = SharedNode(transformed_cell);
2074                }
2075            }
2076
2077            HydroNode::Partition { inner, .. } => {
2078                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2079                    *inner = SharedNode(transformed.clone());
2080                } else {
2081                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2082                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2083                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2084                    transform(&mut orig, seen_tees);
2085                    *transformed_cell.borrow_mut() = orig;
2086                    *inner = SharedNode(transformed_cell);
2087                }
2088            }
2089
2090            HydroNode::Cast { inner, .. }
2091            | HydroNode::ObserveNonDet { inner, .. }
2092            | HydroNode::BeginAtomic { inner, .. }
2093            | HydroNode::EndAtomic { inner, .. }
2094            | HydroNode::Batch { inner, .. }
2095            | HydroNode::YieldConcat { inner, .. } => {
2096                transform(inner.as_mut(), seen_tees);
2097            }
2098
2099            HydroNode::Chain { first, second, .. } => {
2100                transform(first.as_mut(), seen_tees);
2101                transform(second.as_mut(), seen_tees);
2102            }
2103
2104            HydroNode::ChainFirst { first, second, .. } => {
2105                transform(first.as_mut(), seen_tees);
2106                transform(second.as_mut(), seen_tees);
2107            }
2108
2109            HydroNode::CrossSingleton { left, right, .. }
2110            | HydroNode::CrossProduct { left, right, .. }
2111            | HydroNode::Join { left, right, .. } => {
2112                transform(left.as_mut(), seen_tees);
2113                transform(right.as_mut(), seen_tees);
2114            }
2115
2116            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2117                transform(pos.as_mut(), seen_tees);
2118                transform(neg.as_mut(), seen_tees);
2119            }
2120
2121            HydroNode::ReduceKeyedWatermark {
2122                input, watermark, ..
2123            } => {
2124                transform(input.as_mut(), seen_tees);
2125                transform(watermark.as_mut(), seen_tees);
2126            }
2127
2128            HydroNode::Map { input, .. }
2129            | HydroNode::ResolveFutures { input, .. }
2130            | HydroNode::ResolveFuturesBlocking { input, .. }
2131            | HydroNode::ResolveFuturesOrdered { input, .. }
2132            | HydroNode::FlatMap { input, .. }
2133            | HydroNode::Filter { input, .. }
2134            | HydroNode::FilterMap { input, .. }
2135            | HydroNode::Sort { input, .. }
2136            | HydroNode::DeferTick { input, .. }
2137            | HydroNode::Enumerate { input, .. }
2138            | HydroNode::Inspect { input, .. }
2139            | HydroNode::Unique { input, .. }
2140            | HydroNode::Network { input, .. }
2141            | HydroNode::Fold { input, .. }
2142            | HydroNode::Scan { input, .. }
2143            | HydroNode::FoldKeyed { input, .. }
2144            | HydroNode::Reduce { input, .. }
2145            | HydroNode::ReduceKeyed { input, .. }
2146            | HydroNode::Counter { input, .. } => {
2147                transform(input.as_mut(), seen_tees);
2148            }
2149        }
2150    }
2151
2152    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2153        match self {
2154            HydroNode::Placeholder => HydroNode::Placeholder,
2155            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2156                inner: Box::new(inner.deep_clone(seen_tees)),
2157                metadata: metadata.clone(),
2158            },
2159            HydroNode::ObserveNonDet {
2160                inner,
2161                trusted,
2162                metadata,
2163            } => HydroNode::ObserveNonDet {
2164                inner: Box::new(inner.deep_clone(seen_tees)),
2165                trusted: *trusted,
2166                metadata: metadata.clone(),
2167            },
2168            HydroNode::Source { source, metadata } => HydroNode::Source {
2169                source: source.clone(),
2170                metadata: metadata.clone(),
2171            },
2172            HydroNode::SingletonSource {
2173                value,
2174                first_tick_only,
2175                metadata,
2176            } => HydroNode::SingletonSource {
2177                value: value.clone(),
2178                first_tick_only: *first_tick_only,
2179                metadata: metadata.clone(),
2180            },
2181            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2182                cycle_id: *cycle_id,
2183                metadata: metadata.clone(),
2184            },
2185            HydroNode::Tee { inner, metadata } => {
2186                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2187                    HydroNode::Tee {
2188                        inner: SharedNode(transformed.clone()),
2189                        metadata: metadata.clone(),
2190                    }
2191                } else {
2192                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2193                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2194                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2195                    *new_rc.borrow_mut() = cloned;
2196                    HydroNode::Tee {
2197                        inner: SharedNode(new_rc),
2198                        metadata: metadata.clone(),
2199                    }
2200                }
2201            }
2202            HydroNode::Partition {
2203                inner,
2204                f,
2205                is_true,
2206                metadata,
2207            } => {
2208                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2209                    HydroNode::Partition {
2210                        inner: SharedNode(transformed.clone()),
2211                        f: f.clone(),
2212                        is_true: *is_true,
2213                        metadata: metadata.clone(),
2214                    }
2215                } else {
2216                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2217                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2218                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2219                    *new_rc.borrow_mut() = cloned;
2220                    HydroNode::Partition {
2221                        inner: SharedNode(new_rc),
2222                        f: f.clone(),
2223                        is_true: *is_true,
2224                        metadata: metadata.clone(),
2225                    }
2226                }
2227            }
2228            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2229                inner: Box::new(inner.deep_clone(seen_tees)),
2230                metadata: metadata.clone(),
2231            },
2232            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2233                inner: Box::new(inner.deep_clone(seen_tees)),
2234                metadata: metadata.clone(),
2235            },
2236            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2237                inner: Box::new(inner.deep_clone(seen_tees)),
2238                metadata: metadata.clone(),
2239            },
2240            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2241                inner: Box::new(inner.deep_clone(seen_tees)),
2242                metadata: metadata.clone(),
2243            },
2244            HydroNode::Chain {
2245                first,
2246                second,
2247                metadata,
2248            } => HydroNode::Chain {
2249                first: Box::new(first.deep_clone(seen_tees)),
2250                second: Box::new(second.deep_clone(seen_tees)),
2251                metadata: metadata.clone(),
2252            },
2253            HydroNode::ChainFirst {
2254                first,
2255                second,
2256                metadata,
2257            } => HydroNode::ChainFirst {
2258                first: Box::new(first.deep_clone(seen_tees)),
2259                second: Box::new(second.deep_clone(seen_tees)),
2260                metadata: metadata.clone(),
2261            },
2262            HydroNode::CrossProduct {
2263                left,
2264                right,
2265                metadata,
2266            } => HydroNode::CrossProduct {
2267                left: Box::new(left.deep_clone(seen_tees)),
2268                right: Box::new(right.deep_clone(seen_tees)),
2269                metadata: metadata.clone(),
2270            },
2271            HydroNode::CrossSingleton {
2272                left,
2273                right,
2274                metadata,
2275            } => HydroNode::CrossSingleton {
2276                left: Box::new(left.deep_clone(seen_tees)),
2277                right: Box::new(right.deep_clone(seen_tees)),
2278                metadata: metadata.clone(),
2279            },
2280            HydroNode::Join {
2281                left,
2282                right,
2283                metadata,
2284            } => HydroNode::Join {
2285                left: Box::new(left.deep_clone(seen_tees)),
2286                right: Box::new(right.deep_clone(seen_tees)),
2287                metadata: metadata.clone(),
2288            },
2289            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2290                pos: Box::new(pos.deep_clone(seen_tees)),
2291                neg: Box::new(neg.deep_clone(seen_tees)),
2292                metadata: metadata.clone(),
2293            },
2294            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2295                pos: Box::new(pos.deep_clone(seen_tees)),
2296                neg: Box::new(neg.deep_clone(seen_tees)),
2297                metadata: metadata.clone(),
2298            },
2299            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2300                input: Box::new(input.deep_clone(seen_tees)),
2301                metadata: metadata.clone(),
2302            },
2303            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2304                HydroNode::ResolveFuturesBlocking {
2305                    input: Box::new(input.deep_clone(seen_tees)),
2306                    metadata: metadata.clone(),
2307                }
2308            }
2309            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2310                HydroNode::ResolveFuturesOrdered {
2311                    input: Box::new(input.deep_clone(seen_tees)),
2312                    metadata: metadata.clone(),
2313                }
2314            }
2315            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2316                f: f.clone(),
2317                input: Box::new(input.deep_clone(seen_tees)),
2318                metadata: metadata.clone(),
2319            },
2320            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2321                f: f.clone(),
2322                input: Box::new(input.deep_clone(seen_tees)),
2323                metadata: metadata.clone(),
2324            },
2325            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2326                f: f.clone(),
2327                input: Box::new(input.deep_clone(seen_tees)),
2328                metadata: metadata.clone(),
2329            },
2330            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2331                f: f.clone(),
2332                input: Box::new(input.deep_clone(seen_tees)),
2333                metadata: metadata.clone(),
2334            },
2335            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2336                input: Box::new(input.deep_clone(seen_tees)),
2337                metadata: metadata.clone(),
2338            },
2339            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2340                input: Box::new(input.deep_clone(seen_tees)),
2341                metadata: metadata.clone(),
2342            },
2343            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2344                f: f.clone(),
2345                input: Box::new(input.deep_clone(seen_tees)),
2346                metadata: metadata.clone(),
2347            },
2348            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2349                input: Box::new(input.deep_clone(seen_tees)),
2350                metadata: metadata.clone(),
2351            },
2352            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2353                input: Box::new(input.deep_clone(seen_tees)),
2354                metadata: metadata.clone(),
2355            },
2356            HydroNode::Fold {
2357                init,
2358                acc,
2359                input,
2360                metadata,
2361            } => HydroNode::Fold {
2362                init: init.clone(),
2363                acc: acc.clone(),
2364                input: Box::new(input.deep_clone(seen_tees)),
2365                metadata: metadata.clone(),
2366            },
2367            HydroNode::Scan {
2368                init,
2369                acc,
2370                input,
2371                metadata,
2372            } => HydroNode::Scan {
2373                init: init.clone(),
2374                acc: acc.clone(),
2375                input: Box::new(input.deep_clone(seen_tees)),
2376                metadata: metadata.clone(),
2377            },
2378            HydroNode::FoldKeyed {
2379                init,
2380                acc,
2381                input,
2382                metadata,
2383            } => HydroNode::FoldKeyed {
2384                init: init.clone(),
2385                acc: acc.clone(),
2386                input: Box::new(input.deep_clone(seen_tees)),
2387                metadata: metadata.clone(),
2388            },
2389            HydroNode::ReduceKeyedWatermark {
2390                f,
2391                input,
2392                watermark,
2393                metadata,
2394            } => HydroNode::ReduceKeyedWatermark {
2395                f: f.clone(),
2396                input: Box::new(input.deep_clone(seen_tees)),
2397                watermark: Box::new(watermark.deep_clone(seen_tees)),
2398                metadata: metadata.clone(),
2399            },
2400            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2401                f: f.clone(),
2402                input: Box::new(input.deep_clone(seen_tees)),
2403                metadata: metadata.clone(),
2404            },
2405            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2406                f: f.clone(),
2407                input: Box::new(input.deep_clone(seen_tees)),
2408                metadata: metadata.clone(),
2409            },
2410            HydroNode::Network {
2411                name,
2412                networking_info,
2413                serialize_fn,
2414                instantiate_fn,
2415                deserialize_fn,
2416                input,
2417                metadata,
2418            } => HydroNode::Network {
2419                name: name.clone(),
2420                networking_info: networking_info.clone(),
2421                serialize_fn: serialize_fn.clone(),
2422                instantiate_fn: instantiate_fn.clone(),
2423                deserialize_fn: deserialize_fn.clone(),
2424                input: Box::new(input.deep_clone(seen_tees)),
2425                metadata: metadata.clone(),
2426            },
2427            HydroNode::ExternalInput {
2428                from_external_key,
2429                from_port_id,
2430                from_many,
2431                codec_type,
2432                port_hint,
2433                instantiate_fn,
2434                deserialize_fn,
2435                metadata,
2436            } => HydroNode::ExternalInput {
2437                from_external_key: *from_external_key,
2438                from_port_id: *from_port_id,
2439                from_many: *from_many,
2440                codec_type: codec_type.clone(),
2441                port_hint: *port_hint,
2442                instantiate_fn: instantiate_fn.clone(),
2443                deserialize_fn: deserialize_fn.clone(),
2444                metadata: metadata.clone(),
2445            },
2446            HydroNode::Counter {
2447                tag,
2448                duration,
2449                prefix,
2450                input,
2451                metadata,
2452            } => HydroNode::Counter {
2453                tag: tag.clone(),
2454                duration: duration.clone(),
2455                prefix: prefix.clone(),
2456                input: Box::new(input.deep_clone(seen_tees)),
2457                metadata: metadata.clone(),
2458            },
2459        }
2460    }
2461
2462    #[cfg(feature = "build")]
2463    pub fn emit_core(
2464        &mut self,
2465        builders_or_callback: &mut BuildersOrCallback<
2466            impl FnMut(&mut HydroRoot, &mut usize),
2467            impl FnMut(&mut HydroNode, &mut usize),
2468        >,
2469        seen_tees: &mut SeenSharedNodes,
2470        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2471        next_stmt_id: &mut usize,
2472    ) -> syn::Ident {
2473        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2474
2475        self.transform_bottom_up(
2476            &mut |node: &mut HydroNode| {
2477                let out_location = node.metadata().location_id.clone();
2478                match node {
2479                    HydroNode::Placeholder => {
2480                        panic!()
2481                    }
2482
2483                    HydroNode::Cast { .. } => {
2484                        // Cast passes through the input ident unchanged
2485                        // The input ident is already on the stack from processing the child
2486                        match builders_or_callback {
2487                            BuildersOrCallback::Builders(_) => {}
2488                            BuildersOrCallback::Callback(_, node_callback) => {
2489                                node_callback(node, next_stmt_id);
2490                            }
2491                        }
2492
2493                        *next_stmt_id += 1;
2494                        // input_ident stays on stack as output
2495                    }
2496
2497                    HydroNode::ObserveNonDet {
2498                        inner,
2499                        trusted,
2500                        metadata,
2501                        ..
2502                    } => {
2503                        let inner_ident = ident_stack.pop().unwrap();
2504
2505                        let observe_ident =
2506                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2507
2508                        match builders_or_callback {
2509                            BuildersOrCallback::Builders(graph_builders) => {
2510                                graph_builders.observe_nondet(
2511                                    *trusted,
2512                                    &inner.metadata().location_id,
2513                                    inner_ident,
2514                                    &inner.metadata().collection_kind,
2515                                    &observe_ident,
2516                                    &metadata.collection_kind,
2517                                    &metadata.op,
2518                                );
2519                            }
2520                            BuildersOrCallback::Callback(_, node_callback) => {
2521                                node_callback(node, next_stmt_id);
2522                            }
2523                        }
2524
2525                        *next_stmt_id += 1;
2526
2527                        ident_stack.push(observe_ident);
2528                    }
2529
2530                    HydroNode::Batch {
2531                        inner, metadata, ..
2532                    } => {
2533                        let inner_ident = ident_stack.pop().unwrap();
2534
2535                        let batch_ident =
2536                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2537
2538                        match builders_or_callback {
2539                            BuildersOrCallback::Builders(graph_builders) => {
2540                                graph_builders.batch(
2541                                    inner_ident,
2542                                    &inner.metadata().location_id,
2543                                    &inner.metadata().collection_kind,
2544                                    &batch_ident,
2545                                    &out_location,
2546                                    &metadata.op,
2547                                );
2548                            }
2549                            BuildersOrCallback::Callback(_, node_callback) => {
2550                                node_callback(node, next_stmt_id);
2551                            }
2552                        }
2553
2554                        *next_stmt_id += 1;
2555
2556                        ident_stack.push(batch_ident);
2557                    }
2558
2559                    HydroNode::YieldConcat { inner, .. } => {
2560                        let inner_ident = ident_stack.pop().unwrap();
2561
2562                        let yield_ident =
2563                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2564
2565                        match builders_or_callback {
2566                            BuildersOrCallback::Builders(graph_builders) => {
2567                                graph_builders.yield_from_tick(
2568                                    inner_ident,
2569                                    &inner.metadata().location_id,
2570                                    &inner.metadata().collection_kind,
2571                                    &yield_ident,
2572                                    &out_location,
2573                                );
2574                            }
2575                            BuildersOrCallback::Callback(_, node_callback) => {
2576                                node_callback(node, next_stmt_id);
2577                            }
2578                        }
2579
2580                        *next_stmt_id += 1;
2581
2582                        ident_stack.push(yield_ident);
2583                    }
2584
2585                    HydroNode::BeginAtomic { inner, metadata } => {
2586                        let inner_ident = ident_stack.pop().unwrap();
2587
2588                        let begin_ident =
2589                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2590
2591                        match builders_or_callback {
2592                            BuildersOrCallback::Builders(graph_builders) => {
2593                                graph_builders.begin_atomic(
2594                                    inner_ident,
2595                                    &inner.metadata().location_id,
2596                                    &inner.metadata().collection_kind,
2597                                    &begin_ident,
2598                                    &out_location,
2599                                    &metadata.op,
2600                                );
2601                            }
2602                            BuildersOrCallback::Callback(_, node_callback) => {
2603                                node_callback(node, next_stmt_id);
2604                            }
2605                        }
2606
2607                        *next_stmt_id += 1;
2608
2609                        ident_stack.push(begin_ident);
2610                    }
2611
2612                    HydroNode::EndAtomic { inner, .. } => {
2613                        let inner_ident = ident_stack.pop().unwrap();
2614
2615                        let end_ident =
2616                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2617
2618                        match builders_or_callback {
2619                            BuildersOrCallback::Builders(graph_builders) => {
2620                                graph_builders.end_atomic(
2621                                    inner_ident,
2622                                    &inner.metadata().location_id,
2623                                    &inner.metadata().collection_kind,
2624                                    &end_ident,
2625                                );
2626                            }
2627                            BuildersOrCallback::Callback(_, node_callback) => {
2628                                node_callback(node, next_stmt_id);
2629                            }
2630                        }
2631
2632                        *next_stmt_id += 1;
2633
2634                        ident_stack.push(end_ident);
2635                    }
2636
2637                    HydroNode::Source {
2638                        source, metadata, ..
2639                    } => {
2640                        if let HydroSource::ExternalNetwork() = source {
2641                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2642                        } else {
2643                            let source_ident =
2644                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2645
2646                            let source_stmt = match source {
2647                                HydroSource::Stream(expr) => {
2648                                    debug_assert!(metadata.location_id.is_top_level());
2649                                    parse_quote! {
2650                                        #source_ident = source_stream(#expr);
2651                                    }
2652                                }
2653
2654                                HydroSource::ExternalNetwork() => {
2655                                    unreachable!()
2656                                }
2657
2658                                HydroSource::Iter(expr) => {
2659                                    if metadata.location_id.is_top_level() {
2660                                        parse_quote! {
2661                                            #source_ident = source_iter(#expr);
2662                                        }
2663                                    } else {
2664                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2665                                        parse_quote! {
2666                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2667                                        }
2668                                    }
2669                                }
2670
2671                                HydroSource::Spin() => {
2672                                    debug_assert!(metadata.location_id.is_top_level());
2673                                    parse_quote! {
2674                                        #source_ident = spin();
2675                                    }
2676                                }
2677
2678                                HydroSource::ClusterMembers(target_loc, state) => {
2679                                    debug_assert!(metadata.location_id.is_top_level());
2680
2681                                    let members_tee_ident = syn::Ident::new(
2682                                        &format!(
2683                                            "__cluster_members_tee_{}_{}",
2684                                            metadata.location_id.root().key(),
2685                                            target_loc.key(),
2686                                        ),
2687                                        Span::call_site(),
2688                                    );
2689
2690                                    match state {
2691                                        ClusterMembersState::Stream(d) => {
2692                                            parse_quote! {
2693                                                #members_tee_ident = source_stream(#d) -> tee();
2694                                                #source_ident = #members_tee_ident;
2695                                            }
2696                                        },
2697                                        ClusterMembersState::Uninit => syn::parse_quote! {
2698                                            #source_ident = source_stream(DUMMY);
2699                                        },
2700                                        ClusterMembersState::Tee(..) => parse_quote! {
2701                                            #source_ident = #members_tee_ident;
2702                                        },
2703                                    }
2704                                }
2705
2706                                HydroSource::Embedded(ident) => {
2707                                    parse_quote! {
2708                                        #source_ident = source_stream(#ident);
2709                                    }
2710                                }
2711
2712                                HydroSource::EmbeddedSingleton(ident) => {
2713                                    parse_quote! {
2714                                        #source_ident = source_iter([#ident]);
2715                                    }
2716                                }
2717                            };
2718
2719                            match builders_or_callback {
2720                                BuildersOrCallback::Builders(graph_builders) => {
2721                                    let builder = graph_builders.get_dfir_mut(&out_location);
2722                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2723                                }
2724                                BuildersOrCallback::Callback(_, node_callback) => {
2725                                    node_callback(node, next_stmt_id);
2726                                }
2727                            }
2728
2729                            *next_stmt_id += 1;
2730
2731                            ident_stack.push(source_ident);
2732                        }
2733                    }
2734
2735                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2736                        let source_ident =
2737                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2738
2739                        match builders_or_callback {
2740                            BuildersOrCallback::Builders(graph_builders) => {
2741                                let builder = graph_builders.get_dfir_mut(&out_location);
2742
2743                                if *first_tick_only {
2744                                    assert!(
2745                                        !metadata.location_id.is_top_level(),
2746                                        "first_tick_only SingletonSource must be inside a tick"
2747                                    );
2748                                }
2749
2750                                if *first_tick_only
2751                                    || (metadata.location_id.is_top_level()
2752                                        && metadata.collection_kind.is_bounded())
2753                                {
2754                                    builder.add_dfir(
2755                                        parse_quote! {
2756                                            #source_ident = source_iter([#value]);
2757                                        },
2758                                        None,
2759                                        Some(&next_stmt_id.to_string()),
2760                                    );
2761                                } else {
2762                                    builder.add_dfir(
2763                                        parse_quote! {
2764                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2765                                        },
2766                                        None,
2767                                        Some(&next_stmt_id.to_string()),
2768                                    );
2769                                }
2770                            }
2771                            BuildersOrCallback::Callback(_, node_callback) => {
2772                                node_callback(node, next_stmt_id);
2773                            }
2774                        }
2775
2776                        *next_stmt_id += 1;
2777
2778                        ident_stack.push(source_ident);
2779                    }
2780
2781                    HydroNode::CycleSource { cycle_id, .. } => {
2782                        let ident = cycle_id.as_ident();
2783
2784                        match builders_or_callback {
2785                            BuildersOrCallback::Builders(_) => {}
2786                            BuildersOrCallback::Callback(_, node_callback) => {
2787                                node_callback(node, next_stmt_id);
2788                            }
2789                        }
2790
2791                        // consume a stmt id even though we did not emit anything so that we can instrument this
2792                        *next_stmt_id += 1;
2793
2794                        ident_stack.push(ident);
2795                    }
2796
2797                    HydroNode::Tee { inner, .. } => {
2798                        let ret_ident = if let Some(built_idents) =
2799                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2800                        {
2801                            match builders_or_callback {
2802                                BuildersOrCallback::Builders(_) => {}
2803                                BuildersOrCallback::Callback(_, node_callback) => {
2804                                    node_callback(node, next_stmt_id);
2805                                }
2806                            }
2807
2808                            built_idents[0].clone()
2809                        } else {
2810                            // The inner node was already processed by transform_bottom_up,
2811                            // so its ident is on the stack
2812                            let inner_ident = ident_stack.pop().unwrap();
2813
2814                            let tee_ident =
2815                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2816
2817                            built_tees.insert(
2818                                inner.0.as_ref() as *const RefCell<HydroNode>,
2819                                vec![tee_ident.clone()],
2820                            );
2821
2822                            match builders_or_callback {
2823                                BuildersOrCallback::Builders(graph_builders) => {
2824                                    let builder = graph_builders.get_dfir_mut(&out_location);
2825                                    builder.add_dfir(
2826                                        parse_quote! {
2827                                            #tee_ident = #inner_ident -> tee();
2828                                        },
2829                                        None,
2830                                        Some(&next_stmt_id.to_string()),
2831                                    );
2832                                }
2833                                BuildersOrCallback::Callback(_, node_callback) => {
2834                                    node_callback(node, next_stmt_id);
2835                                }
2836                            }
2837
2838                            tee_ident
2839                        };
2840
2841                        // we consume a stmt id regardless of if we emit the tee() operator,
2842                        // so that during rewrites we touch all recipients of the tee()
2843
2844                        *next_stmt_id += 1;
2845                        ident_stack.push(ret_ident);
2846                    }
2847
2848                    HydroNode::Partition {
2849                        inner, f, is_true, ..
2850                    } => {
2851                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
2852                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2853                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2854                            match builders_or_callback {
2855                                BuildersOrCallback::Builders(_) => {}
2856                                BuildersOrCallback::Callback(_, node_callback) => {
2857                                    node_callback(node, next_stmt_id);
2858                                }
2859                            }
2860
2861                            let idx = if is_true { 0 } else { 1 };
2862                            built_idents[idx].clone()
2863                        } else {
2864                            // The inner node was already processed by transform_bottom_up,
2865                            // so its ident is on the stack
2866                            let inner_ident = ident_stack.pop().unwrap();
2867
2868                            let partition_ident = syn::Ident::new(
2869                                &format!("stream_{}_partition", *next_stmt_id),
2870                                Span::call_site(),
2871                            );
2872                            let true_ident = syn::Ident::new(
2873                                &format!("stream_{}_true", *next_stmt_id),
2874                                Span::call_site(),
2875                            );
2876                            let false_ident = syn::Ident::new(
2877                                &format!("stream_{}_false", *next_stmt_id),
2878                                Span::call_site(),
2879                            );
2880
2881                            built_tees.insert(
2882                                ptr,
2883                                vec![true_ident.clone(), false_ident.clone()],
2884                            );
2885
2886                            match builders_or_callback {
2887                                BuildersOrCallback::Builders(graph_builders) => {
2888                                    let builder = graph_builders.get_dfir_mut(&out_location);
2889                                    builder.add_dfir(
2890                                        parse_quote! {
2891                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2892                                            #true_ident = #partition_ident[0];
2893                                            #false_ident = #partition_ident[1];
2894                                        },
2895                                        None,
2896                                        Some(&next_stmt_id.to_string()),
2897                                    );
2898                                }
2899                                BuildersOrCallback::Callback(_, node_callback) => {
2900                                    node_callback(node, next_stmt_id);
2901                                }
2902                            }
2903
2904                            if is_true { true_ident } else { false_ident }
2905                        };
2906
2907                        *next_stmt_id += 1;
2908                        ident_stack.push(ret_ident);
2909                    }
2910
2911                    HydroNode::Chain { .. } => {
2912                        // Children are processed left-to-right, so second is on top
2913                        let second_ident = ident_stack.pop().unwrap();
2914                        let first_ident = ident_stack.pop().unwrap();
2915
2916                        let chain_ident =
2917                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2918
2919                        match builders_or_callback {
2920                            BuildersOrCallback::Builders(graph_builders) => {
2921                                let builder = graph_builders.get_dfir_mut(&out_location);
2922                                builder.add_dfir(
2923                                    parse_quote! {
2924                                        #chain_ident = chain();
2925                                        #first_ident -> [0]#chain_ident;
2926                                        #second_ident -> [1]#chain_ident;
2927                                    },
2928                                    None,
2929                                    Some(&next_stmt_id.to_string()),
2930                                );
2931                            }
2932                            BuildersOrCallback::Callback(_, node_callback) => {
2933                                node_callback(node, next_stmt_id);
2934                            }
2935                        }
2936
2937                        *next_stmt_id += 1;
2938
2939                        ident_stack.push(chain_ident);
2940                    }
2941
2942                    HydroNode::ChainFirst { .. } => {
2943                        let second_ident = ident_stack.pop().unwrap();
2944                        let first_ident = ident_stack.pop().unwrap();
2945
2946                        let chain_ident =
2947                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2948
2949                        match builders_or_callback {
2950                            BuildersOrCallback::Builders(graph_builders) => {
2951                                let builder = graph_builders.get_dfir_mut(&out_location);
2952                                builder.add_dfir(
2953                                    parse_quote! {
2954                                        #chain_ident = chain_first_n(1);
2955                                        #first_ident -> [0]#chain_ident;
2956                                        #second_ident -> [1]#chain_ident;
2957                                    },
2958                                    None,
2959                                    Some(&next_stmt_id.to_string()),
2960                                );
2961                            }
2962                            BuildersOrCallback::Callback(_, node_callback) => {
2963                                node_callback(node, next_stmt_id);
2964                            }
2965                        }
2966
2967                        *next_stmt_id += 1;
2968
2969                        ident_stack.push(chain_ident);
2970                    }
2971
2972                    HydroNode::CrossSingleton { right, .. } => {
2973                        let right_ident = ident_stack.pop().unwrap();
2974                        let left_ident = ident_stack.pop().unwrap();
2975
2976                        let cross_ident =
2977                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2978
2979                        match builders_or_callback {
2980                            BuildersOrCallback::Builders(graph_builders) => {
2981                                let builder = graph_builders.get_dfir_mut(&out_location);
2982
2983                                if right.metadata().location_id.is_top_level()
2984                                    && right.metadata().collection_kind.is_bounded()
2985                                {
2986                                    builder.add_dfir(
2987                                        parse_quote! {
2988                                            #cross_ident = cross_singleton();
2989                                            #left_ident -> [input]#cross_ident;
2990                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2991                                        },
2992                                        None,
2993                                        Some(&next_stmt_id.to_string()),
2994                                    );
2995                                } else {
2996                                    builder.add_dfir(
2997                                        parse_quote! {
2998                                            #cross_ident = cross_singleton();
2999                                            #left_ident -> [input]#cross_ident;
3000                                            #right_ident -> [single]#cross_ident;
3001                                        },
3002                                        None,
3003                                        Some(&next_stmt_id.to_string()),
3004                                    );
3005                                }
3006                            }
3007                            BuildersOrCallback::Callback(_, node_callback) => {
3008                                node_callback(node, next_stmt_id);
3009                            }
3010                        }
3011
3012                        *next_stmt_id += 1;
3013
3014                        ident_stack.push(cross_ident);
3015                    }
3016
3017                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3018                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3019                            parse_quote!(cross_join_multiset)
3020                        } else {
3021                            parse_quote!(join_multiset)
3022                        };
3023
3024                        let (HydroNode::CrossProduct { left, right, .. }
3025                        | HydroNode::Join { left, right, .. }) = node
3026                        else {
3027                            unreachable!()
3028                        };
3029
3030                        let is_top_level = left.metadata().location_id.is_top_level()
3031                            && right.metadata().location_id.is_top_level();
3032                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3033                            quote!('static)
3034                        } else {
3035                            quote!('tick)
3036                        };
3037
3038                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3039                            quote!('static)
3040                        } else {
3041                            quote!('tick)
3042                        };
3043
3044                        let right_ident = ident_stack.pop().unwrap();
3045                        let left_ident = ident_stack.pop().unwrap();
3046
3047                        let stream_ident =
3048                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3049
3050                        match builders_or_callback {
3051                            BuildersOrCallback::Builders(graph_builders) => {
3052                                let builder = graph_builders.get_dfir_mut(&out_location);
3053                                builder.add_dfir(
3054                                    if is_top_level {
3055                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3056                                        // a multiset_delta() to negate the replay behavior
3057                                        parse_quote! {
3058                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3059                                            #left_ident -> [0]#stream_ident;
3060                                            #right_ident -> [1]#stream_ident;
3061                                        }
3062                                    } else {
3063                                        parse_quote! {
3064                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3065                                            #left_ident -> [0]#stream_ident;
3066                                            #right_ident -> [1]#stream_ident;
3067                                        }
3068                                    }
3069                                    ,
3070                                    None,
3071                                    Some(&next_stmt_id.to_string()),
3072                                );
3073                            }
3074                            BuildersOrCallback::Callback(_, node_callback) => {
3075                                node_callback(node, next_stmt_id);
3076                            }
3077                        }
3078
3079                        *next_stmt_id += 1;
3080
3081                        ident_stack.push(stream_ident);
3082                    }
3083
3084                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3085                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3086                            parse_quote!(difference)
3087                        } else {
3088                            parse_quote!(anti_join)
3089                        };
3090
3091                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3092                            node
3093                        else {
3094                            unreachable!()
3095                        };
3096
3097                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3098                            quote!('static)
3099                        } else {
3100                            quote!('tick)
3101                        };
3102
3103                        let neg_ident = ident_stack.pop().unwrap();
3104                        let pos_ident = ident_stack.pop().unwrap();
3105
3106                        let stream_ident =
3107                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3108
3109                        match builders_or_callback {
3110                            BuildersOrCallback::Builders(graph_builders) => {
3111                                let builder = graph_builders.get_dfir_mut(&out_location);
3112                                builder.add_dfir(
3113                                    parse_quote! {
3114                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3115                                        #pos_ident -> [pos]#stream_ident;
3116                                        #neg_ident -> [neg]#stream_ident;
3117                                    },
3118                                    None,
3119                                    Some(&next_stmt_id.to_string()),
3120                                );
3121                            }
3122                            BuildersOrCallback::Callback(_, node_callback) => {
3123                                node_callback(node, next_stmt_id);
3124                            }
3125                        }
3126
3127                        *next_stmt_id += 1;
3128
3129                        ident_stack.push(stream_ident);
3130                    }
3131
3132                    HydroNode::ResolveFutures { .. } => {
3133                        let input_ident = ident_stack.pop().unwrap();
3134
3135                        let futures_ident =
3136                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3137
3138                        match builders_or_callback {
3139                            BuildersOrCallback::Builders(graph_builders) => {
3140                                let builder = graph_builders.get_dfir_mut(&out_location);
3141                                builder.add_dfir(
3142                                    parse_quote! {
3143                                        #futures_ident = #input_ident -> resolve_futures();
3144                                    },
3145                                    None,
3146                                    Some(&next_stmt_id.to_string()),
3147                                );
3148                            }
3149                            BuildersOrCallback::Callback(_, node_callback) => {
3150                                node_callback(node, next_stmt_id);
3151                            }
3152                        }
3153
3154                        *next_stmt_id += 1;
3155
3156                        ident_stack.push(futures_ident);
3157                    }
3158
3159                    HydroNode::ResolveFuturesBlocking { .. } => {
3160                        let input_ident = ident_stack.pop().unwrap();
3161
3162                        let futures_ident =
3163                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3164
3165                        match builders_or_callback {
3166                            BuildersOrCallback::Builders(graph_builders) => {
3167                                let builder = graph_builders.get_dfir_mut(&out_location);
3168                                builder.add_dfir(
3169                                    parse_quote! {
3170                                        #futures_ident = #input_ident -> resolve_futures_blocking();
3171                                    },
3172                                    None,
3173                                    Some(&next_stmt_id.to_string()),
3174                                );
3175                            }
3176                            BuildersOrCallback::Callback(_, node_callback) => {
3177                                node_callback(node, next_stmt_id);
3178                            }
3179                        }
3180
3181                        *next_stmt_id += 1;
3182
3183                        ident_stack.push(futures_ident);
3184                    }
3185
3186                    HydroNode::ResolveFuturesOrdered { .. } => {
3187                        let input_ident = ident_stack.pop().unwrap();
3188
3189                        let futures_ident =
3190                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3191
3192                        match builders_or_callback {
3193                            BuildersOrCallback::Builders(graph_builders) => {
3194                                let builder = graph_builders.get_dfir_mut(&out_location);
3195                                builder.add_dfir(
3196                                    parse_quote! {
3197                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3198                                    },
3199                                    None,
3200                                    Some(&next_stmt_id.to_string()),
3201                                );
3202                            }
3203                            BuildersOrCallback::Callback(_, node_callback) => {
3204                                node_callback(node, next_stmt_id);
3205                            }
3206                        }
3207
3208                        *next_stmt_id += 1;
3209
3210                        ident_stack.push(futures_ident);
3211                    }
3212
3213                    HydroNode::Map { f, .. } => {
3214                        let input_ident = ident_stack.pop().unwrap();
3215
3216                        let map_ident =
3217                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3218
3219                        match builders_or_callback {
3220                            BuildersOrCallback::Builders(graph_builders) => {
3221                                let builder = graph_builders.get_dfir_mut(&out_location);
3222                                builder.add_dfir(
3223                                    parse_quote! {
3224                                        #map_ident = #input_ident -> map(#f);
3225                                    },
3226                                    None,
3227                                    Some(&next_stmt_id.to_string()),
3228                                );
3229                            }
3230                            BuildersOrCallback::Callback(_, node_callback) => {
3231                                node_callback(node, next_stmt_id);
3232                            }
3233                        }
3234
3235                        *next_stmt_id += 1;
3236
3237                        ident_stack.push(map_ident);
3238                    }
3239
3240                    HydroNode::FlatMap { f, .. } => {
3241                        let input_ident = ident_stack.pop().unwrap();
3242
3243                        let flat_map_ident =
3244                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3245
3246                        match builders_or_callback {
3247                            BuildersOrCallback::Builders(graph_builders) => {
3248                                let builder = graph_builders.get_dfir_mut(&out_location);
3249                                builder.add_dfir(
3250                                    parse_quote! {
3251                                        #flat_map_ident = #input_ident -> flat_map(#f);
3252                                    },
3253                                    None,
3254                                    Some(&next_stmt_id.to_string()),
3255                                );
3256                            }
3257                            BuildersOrCallback::Callback(_, node_callback) => {
3258                                node_callback(node, next_stmt_id);
3259                            }
3260                        }
3261
3262                        *next_stmt_id += 1;
3263
3264                        ident_stack.push(flat_map_ident);
3265                    }
3266
3267                    HydroNode::Filter { f, .. } => {
3268                        let input_ident = ident_stack.pop().unwrap();
3269
3270                        let filter_ident =
3271                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3272
3273                        match builders_or_callback {
3274                            BuildersOrCallback::Builders(graph_builders) => {
3275                                let builder = graph_builders.get_dfir_mut(&out_location);
3276                                builder.add_dfir(
3277                                    parse_quote! {
3278                                        #filter_ident = #input_ident -> filter(#f);
3279                                    },
3280                                    None,
3281                                    Some(&next_stmt_id.to_string()),
3282                                );
3283                            }
3284                            BuildersOrCallback::Callback(_, node_callback) => {
3285                                node_callback(node, next_stmt_id);
3286                            }
3287                        }
3288
3289                        *next_stmt_id += 1;
3290
3291                        ident_stack.push(filter_ident);
3292                    }
3293
3294                    HydroNode::FilterMap { f, .. } => {
3295                        let input_ident = ident_stack.pop().unwrap();
3296
3297                        let filter_map_ident =
3298                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3299
3300                        match builders_or_callback {
3301                            BuildersOrCallback::Builders(graph_builders) => {
3302                                let builder = graph_builders.get_dfir_mut(&out_location);
3303                                builder.add_dfir(
3304                                    parse_quote! {
3305                                        #filter_map_ident = #input_ident -> filter_map(#f);
3306                                    },
3307                                    None,
3308                                    Some(&next_stmt_id.to_string()),
3309                                );
3310                            }
3311                            BuildersOrCallback::Callback(_, node_callback) => {
3312                                node_callback(node, next_stmt_id);
3313                            }
3314                        }
3315
3316                        *next_stmt_id += 1;
3317
3318                        ident_stack.push(filter_map_ident);
3319                    }
3320
3321                    HydroNode::Sort { .. } => {
3322                        let input_ident = ident_stack.pop().unwrap();
3323
3324                        let sort_ident =
3325                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3326
3327                        match builders_or_callback {
3328                            BuildersOrCallback::Builders(graph_builders) => {
3329                                let builder = graph_builders.get_dfir_mut(&out_location);
3330                                builder.add_dfir(
3331                                    parse_quote! {
3332                                        #sort_ident = #input_ident -> sort();
3333                                    },
3334                                    None,
3335                                    Some(&next_stmt_id.to_string()),
3336                                );
3337                            }
3338                            BuildersOrCallback::Callback(_, node_callback) => {
3339                                node_callback(node, next_stmt_id);
3340                            }
3341                        }
3342
3343                        *next_stmt_id += 1;
3344
3345                        ident_stack.push(sort_ident);
3346                    }
3347
3348                    HydroNode::DeferTick { .. } => {
3349                        let input_ident = ident_stack.pop().unwrap();
3350
3351                        let defer_tick_ident =
3352                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3353
3354                        match builders_or_callback {
3355                            BuildersOrCallback::Builders(graph_builders) => {
3356                                let builder = graph_builders.get_dfir_mut(&out_location);
3357                                builder.add_dfir(
3358                                    parse_quote! {
3359                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3360                                    },
3361                                    None,
3362                                    Some(&next_stmt_id.to_string()),
3363                                );
3364                            }
3365                            BuildersOrCallback::Callback(_, node_callback) => {
3366                                node_callback(node, next_stmt_id);
3367                            }
3368                        }
3369
3370                        *next_stmt_id += 1;
3371
3372                        ident_stack.push(defer_tick_ident);
3373                    }
3374
3375                    HydroNode::Enumerate { input, .. } => {
3376                        let input_ident = ident_stack.pop().unwrap();
3377
3378                        let enumerate_ident =
3379                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3380
3381                        match builders_or_callback {
3382                            BuildersOrCallback::Builders(graph_builders) => {
3383                                let builder = graph_builders.get_dfir_mut(&out_location);
3384                                let lifetime = if input.metadata().location_id.is_top_level() {
3385                                    quote!('static)
3386                                } else {
3387                                    quote!('tick)
3388                                };
3389                                builder.add_dfir(
3390                                    parse_quote! {
3391                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3392                                    },
3393                                    None,
3394                                    Some(&next_stmt_id.to_string()),
3395                                );
3396                            }
3397                            BuildersOrCallback::Callback(_, node_callback) => {
3398                                node_callback(node, next_stmt_id);
3399                            }
3400                        }
3401
3402                        *next_stmt_id += 1;
3403
3404                        ident_stack.push(enumerate_ident);
3405                    }
3406
3407                    HydroNode::Inspect { f, .. } => {
3408                        let input_ident = ident_stack.pop().unwrap();
3409
3410                        let inspect_ident =
3411                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3412
3413                        match builders_or_callback {
3414                            BuildersOrCallback::Builders(graph_builders) => {
3415                                let builder = graph_builders.get_dfir_mut(&out_location);
3416                                builder.add_dfir(
3417                                    parse_quote! {
3418                                        #inspect_ident = #input_ident -> inspect(#f);
3419                                    },
3420                                    None,
3421                                    Some(&next_stmt_id.to_string()),
3422                                );
3423                            }
3424                            BuildersOrCallback::Callback(_, node_callback) => {
3425                                node_callback(node, next_stmt_id);
3426                            }
3427                        }
3428
3429                        *next_stmt_id += 1;
3430
3431                        ident_stack.push(inspect_ident);
3432                    }
3433
3434                    HydroNode::Unique { input, .. } => {
3435                        let input_ident = ident_stack.pop().unwrap();
3436
3437                        let unique_ident =
3438                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3439
3440                        match builders_or_callback {
3441                            BuildersOrCallback::Builders(graph_builders) => {
3442                                let builder = graph_builders.get_dfir_mut(&out_location);
3443                                let lifetime = if input.metadata().location_id.is_top_level() {
3444                                    quote!('static)
3445                                } else {
3446                                    quote!('tick)
3447                                };
3448
3449                                builder.add_dfir(
3450                                    parse_quote! {
3451                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3452                                    },
3453                                    None,
3454                                    Some(&next_stmt_id.to_string()),
3455                                );
3456                            }
3457                            BuildersOrCallback::Callback(_, node_callback) => {
3458                                node_callback(node, next_stmt_id);
3459                            }
3460                        }
3461
3462                        *next_stmt_id += 1;
3463
3464                        ident_stack.push(unique_ident);
3465                    }
3466
3467                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3468                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3469                            if input.metadata().location_id.is_top_level()
3470                                && input.metadata().collection_kind.is_bounded()
3471                            {
3472                                parse_quote!(fold_no_replay)
3473                            } else {
3474                                parse_quote!(fold)
3475                            }
3476                        } else if matches!(node, HydroNode::Scan { .. }) {
3477                            parse_quote!(scan)
3478                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3479                            if input.metadata().location_id.is_top_level()
3480                                && input.metadata().collection_kind.is_bounded()
3481                            {
3482                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3483                            } else {
3484                                parse_quote!(fold_keyed)
3485                            }
3486                        } else {
3487                            unreachable!()
3488                        };
3489
3490                        let (HydroNode::Fold { input, .. }
3491                        | HydroNode::FoldKeyed { input, .. }
3492                        | HydroNode::Scan { input, .. }) = node
3493                        else {
3494                            unreachable!()
3495                        };
3496
3497                        let lifetime = if input.metadata().location_id.is_top_level() {
3498                            quote!('static)
3499                        } else {
3500                            quote!('tick)
3501                        };
3502
3503                        let input_ident = ident_stack.pop().unwrap();
3504
3505                        let (HydroNode::Fold { init, acc, .. }
3506                        | HydroNode::FoldKeyed { init, acc, .. }
3507                        | HydroNode::Scan { init, acc, .. }) = &*node
3508                        else {
3509                            unreachable!()
3510                        };
3511
3512                        let fold_ident =
3513                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3514
3515                        match builders_or_callback {
3516                            BuildersOrCallback::Builders(graph_builders) => {
3517                                if matches!(node, HydroNode::Fold { .. })
3518                                    && node.metadata().location_id.is_top_level()
3519                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3520                                    && graph_builders.singleton_intermediates()
3521                                    && !node.metadata().collection_kind.is_bounded()
3522                                {
3523                                    let builder = graph_builders.get_dfir_mut(&out_location);
3524
3525                                    let acc: syn::Expr = parse_quote!({
3526                                        let mut __inner = #acc;
3527                                        move |__state, __value| {
3528                                            __inner(__state, __value);
3529                                            Some(__state.clone())
3530                                        }
3531                                    });
3532
3533                                    builder.add_dfir(
3534                                        parse_quote! {
3535                                            source_iter([(#init)()]) -> [0]#fold_ident;
3536                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3537                                            #fold_ident = chain();
3538                                        },
3539                                        None,
3540                                        Some(&next_stmt_id.to_string()),
3541                                    );
3542                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3543                                    && node.metadata().location_id.is_top_level()
3544                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3545                                    && graph_builders.singleton_intermediates()
3546                                    && !node.metadata().collection_kind.is_bounded()
3547                                {
3548                                    let builder = graph_builders.get_dfir_mut(&out_location);
3549
3550                                    let acc: syn::Expr = parse_quote!({
3551                                        let mut __init = #init;
3552                                        let mut __inner = #acc;
3553                                        move |__state, __kv: (_, _)| {
3554                                            // TODO(shadaj): we can avoid the clone when the entry exists
3555                                            let __state = __state
3556                                                .entry(::std::clone::Clone::clone(&__kv.0))
3557                                                .or_insert_with(|| (__init)());
3558                                            __inner(__state, __kv.1);
3559                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3560                                        }
3561                                    });
3562
3563                                    builder.add_dfir(
3564                                        parse_quote! {
3565                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3566                                        },
3567                                        None,
3568                                        Some(&next_stmt_id.to_string()),
3569                                    );
3570                                } else {
3571                                    let builder = graph_builders.get_dfir_mut(&out_location);
3572                                    builder.add_dfir(
3573                                        parse_quote! {
3574                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3575                                        },
3576                                        None,
3577                                        Some(&next_stmt_id.to_string()),
3578                                    );
3579                                }
3580                            }
3581                            BuildersOrCallback::Callback(_, node_callback) => {
3582                                node_callback(node, next_stmt_id);
3583                            }
3584                        }
3585
3586                        *next_stmt_id += 1;
3587
3588                        ident_stack.push(fold_ident);
3589                    }
3590
3591                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3592                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3593                            if input.metadata().location_id.is_top_level()
3594                                && input.metadata().collection_kind.is_bounded()
3595                            {
3596                                parse_quote!(reduce_no_replay)
3597                            } else {
3598                                parse_quote!(reduce)
3599                            }
3600                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3601                            if input.metadata().location_id.is_top_level()
3602                                && input.metadata().collection_kind.is_bounded()
3603                            {
3604                                todo!(
3605                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3606                                )
3607                            } else {
3608                                parse_quote!(reduce_keyed)
3609                            }
3610                        } else {
3611                            unreachable!()
3612                        };
3613
3614                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3615                        else {
3616                            unreachable!()
3617                        };
3618
3619                        let lifetime = if input.metadata().location_id.is_top_level() {
3620                            quote!('static)
3621                        } else {
3622                            quote!('tick)
3623                        };
3624
3625                        let input_ident = ident_stack.pop().unwrap();
3626
3627                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3628                        else {
3629                            unreachable!()
3630                        };
3631
3632                        let reduce_ident =
3633                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3634
3635                        match builders_or_callback {
3636                            BuildersOrCallback::Builders(graph_builders) => {
3637                                if matches!(node, HydroNode::Reduce { .. })
3638                                    && node.metadata().location_id.is_top_level()
3639                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3640                                    && graph_builders.singleton_intermediates()
3641                                    && !node.metadata().collection_kind.is_bounded()
3642                                {
3643                                    todo!(
3644                                        "Reduce with optional intermediates is not yet supported in simulator"
3645                                    );
3646                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3647                                    && node.metadata().location_id.is_top_level()
3648                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3649                                    && graph_builders.singleton_intermediates()
3650                                    && !node.metadata().collection_kind.is_bounded()
3651                                {
3652                                    todo!(
3653                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3654                                    );
3655                                } else {
3656                                    let builder = graph_builders.get_dfir_mut(&out_location);
3657                                    builder.add_dfir(
3658                                        parse_quote! {
3659                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3660                                        },
3661                                        None,
3662                                        Some(&next_stmt_id.to_string()),
3663                                    );
3664                                }
3665                            }
3666                            BuildersOrCallback::Callback(_, node_callback) => {
3667                                node_callback(node, next_stmt_id);
3668                            }
3669                        }
3670
3671                        *next_stmt_id += 1;
3672
3673                        ident_stack.push(reduce_ident);
3674                    }
3675
3676                    HydroNode::ReduceKeyedWatermark {
3677                        f,
3678                        input,
3679                        metadata,
3680                        ..
3681                    } => {
3682                        let lifetime = if input.metadata().location_id.is_top_level() {
3683                            quote!('static)
3684                        } else {
3685                            quote!('tick)
3686                        };
3687
3688                        // watermark is processed second, so it's on top
3689                        let watermark_ident = ident_stack.pop().unwrap();
3690                        let input_ident = ident_stack.pop().unwrap();
3691
3692                        let chain_ident = syn::Ident::new(
3693                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3694                            Span::call_site(),
3695                        );
3696
3697                        let fold_ident =
3698                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3699
3700                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3701                            && input.metadata().collection_kind.is_bounded()
3702                        {
3703                            parse_quote!(fold_no_replay)
3704                        } else {
3705                            parse_quote!(fold)
3706                        };
3707
3708                        match builders_or_callback {
3709                            BuildersOrCallback::Builders(graph_builders) => {
3710                                if metadata.location_id.is_top_level()
3711                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3712                                    && graph_builders.singleton_intermediates()
3713                                    && !metadata.collection_kind.is_bounded()
3714                                {
3715                                    todo!(
3716                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3717                                    )
3718                                } else {
3719                                    let builder = graph_builders.get_dfir_mut(&out_location);
3720                                    builder.add_dfir(
3721                                        parse_quote! {
3722                                            #chain_ident = chain();
3723                                            #input_ident
3724                                                -> map(|x| (Some(x), None))
3725                                                -> [0]#chain_ident;
3726                                            #watermark_ident
3727                                                -> map(|watermark| (None, Some(watermark)))
3728                                                -> [1]#chain_ident;
3729
3730                                            #fold_ident = #chain_ident
3731                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3732                                                    let __reduce_keyed_fn = #f;
3733                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3734                                                        if let Some((k, v)) = opt_payload {
3735                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3736                                                                if k < curr_watermark {
3737                                                                    return;
3738                                                                }
3739                                                            }
3740                                                            match map.entry(k) {
3741                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3742                                                                    e.insert(v);
3743                                                                }
3744                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3745                                                                    __reduce_keyed_fn(e.get_mut(), v);
3746                                                                }
3747                                                            }
3748                                                        } else {
3749                                                            let watermark = opt_watermark.unwrap();
3750                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3751                                                                if watermark <= curr_watermark {
3752                                                                    return;
3753                                                                }
3754                                                            }
3755                                                            *opt_curr_watermark = opt_watermark;
3756                                                            map.retain(|k, _| *k >= watermark);
3757                                                        }
3758                                                    }
3759                                                })
3760                                                -> flat_map(|(map, _curr_watermark)| map);
3761                                        },
3762                                        None,
3763                                        Some(&next_stmt_id.to_string()),
3764                                    );
3765                                }
3766                            }
3767                            BuildersOrCallback::Callback(_, node_callback) => {
3768                                node_callback(node, next_stmt_id);
3769                            }
3770                        }
3771
3772                        *next_stmt_id += 1;
3773
3774                        ident_stack.push(fold_ident);
3775                    }
3776
3777                    HydroNode::Network {
3778                        networking_info,
3779                        serialize_fn: serialize_pipeline,
3780                        instantiate_fn,
3781                        deserialize_fn: deserialize_pipeline,
3782                        input,
3783                        ..
3784                    } => {
3785                        let input_ident = ident_stack.pop().unwrap();
3786
3787                        let receiver_stream_ident =
3788                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3789
3790                        match builders_or_callback {
3791                            BuildersOrCallback::Builders(graph_builders) => {
3792                                let (sink_expr, source_expr) = match instantiate_fn {
3793                                    DebugInstantiate::Building => (
3794                                        syn::parse_quote!(DUMMY_SINK),
3795                                        syn::parse_quote!(DUMMY_SOURCE),
3796                                    ),
3797
3798                                    DebugInstantiate::Finalized(finalized) => {
3799                                        (finalized.sink.clone(), finalized.source.clone())
3800                                    }
3801                                };
3802
3803                                graph_builders.create_network(
3804                                    &input.metadata().location_id,
3805                                    &out_location,
3806                                    input_ident,
3807                                    &receiver_stream_ident,
3808                                    serialize_pipeline.as_ref(),
3809                                    sink_expr,
3810                                    source_expr,
3811                                    deserialize_pipeline.as_ref(),
3812                                    *next_stmt_id,
3813                                    networking_info,
3814                                );
3815                            }
3816                            BuildersOrCallback::Callback(_, node_callback) => {
3817                                node_callback(node, next_stmt_id);
3818                            }
3819                        }
3820
3821                        *next_stmt_id += 1;
3822
3823                        ident_stack.push(receiver_stream_ident);
3824                    }
3825
3826                    HydroNode::ExternalInput {
3827                        instantiate_fn,
3828                        deserialize_fn: deserialize_pipeline,
3829                        ..
3830                    } => {
3831                        let receiver_stream_ident =
3832                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3833
3834                        match builders_or_callback {
3835                            BuildersOrCallback::Builders(graph_builders) => {
3836                                let (_, source_expr) = match instantiate_fn {
3837                                    DebugInstantiate::Building => (
3838                                        syn::parse_quote!(DUMMY_SINK),
3839                                        syn::parse_quote!(DUMMY_SOURCE),
3840                                    ),
3841
3842                                    DebugInstantiate::Finalized(finalized) => {
3843                                        (finalized.sink.clone(), finalized.source.clone())
3844                                    }
3845                                };
3846
3847                                graph_builders.create_external_source(
3848                                    &out_location,
3849                                    source_expr,
3850                                    &receiver_stream_ident,
3851                                    deserialize_pipeline.as_ref(),
3852                                    *next_stmt_id,
3853                                );
3854                            }
3855                            BuildersOrCallback::Callback(_, node_callback) => {
3856                                node_callback(node, next_stmt_id);
3857                            }
3858                        }
3859
3860                        *next_stmt_id += 1;
3861
3862                        ident_stack.push(receiver_stream_ident);
3863                    }
3864
3865                    HydroNode::Counter {
3866                        tag,
3867                        duration,
3868                        prefix,
3869                        ..
3870                    } => {
3871                        let input_ident = ident_stack.pop().unwrap();
3872
3873                        let counter_ident =
3874                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3875
3876                        match builders_or_callback {
3877                            BuildersOrCallback::Builders(graph_builders) => {
3878                                let arg = format!("{}({})", prefix, tag);
3879                                let builder = graph_builders.get_dfir_mut(&out_location);
3880                                builder.add_dfir(
3881                                    parse_quote! {
3882                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
3883                                    },
3884                                    None,
3885                                    Some(&next_stmt_id.to_string()),
3886                                );
3887                            }
3888                            BuildersOrCallback::Callback(_, node_callback) => {
3889                                node_callback(node, next_stmt_id);
3890                            }
3891                        }
3892
3893                        *next_stmt_id += 1;
3894
3895                        ident_stack.push(counter_ident);
3896                    }
3897                }
3898            },
3899            seen_tees,
3900            false,
3901        );
3902
3903        ident_stack
3904            .pop()
3905            .expect("ident_stack should have exactly one element after traversal")
3906    }
3907
3908    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3909        match self {
3910            HydroNode::Placeholder => {
3911                panic!()
3912            }
3913            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3914            HydroNode::Source { source, .. } => match source {
3915                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3916                HydroSource::ExternalNetwork()
3917                | HydroSource::Spin()
3918                | HydroSource::ClusterMembers(_, _)
3919                | HydroSource::Embedded(_)
3920                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
3921            },
3922            HydroNode::SingletonSource { value, .. } => {
3923                transform(value);
3924            }
3925            HydroNode::CycleSource { .. }
3926            | HydroNode::Tee { .. }
3927            | HydroNode::YieldConcat { .. }
3928            | HydroNode::BeginAtomic { .. }
3929            | HydroNode::EndAtomic { .. }
3930            | HydroNode::Batch { .. }
3931            | HydroNode::Chain { .. }
3932            | HydroNode::ChainFirst { .. }
3933            | HydroNode::CrossProduct { .. }
3934            | HydroNode::CrossSingleton { .. }
3935            | HydroNode::ResolveFutures { .. }
3936            | HydroNode::ResolveFuturesBlocking { .. }
3937            | HydroNode::ResolveFuturesOrdered { .. }
3938            | HydroNode::Join { .. }
3939            | HydroNode::Difference { .. }
3940            | HydroNode::AntiJoin { .. }
3941            | HydroNode::DeferTick { .. }
3942            | HydroNode::Enumerate { .. }
3943            | HydroNode::Unique { .. }
3944            | HydroNode::Sort { .. } => {}
3945            HydroNode::Map { f, .. }
3946            | HydroNode::FlatMap { f, .. }
3947            | HydroNode::Filter { f, .. }
3948            | HydroNode::FilterMap { f, .. }
3949            | HydroNode::Inspect { f, .. }
3950            | HydroNode::Partition { f, .. }
3951            | HydroNode::Reduce { f, .. }
3952            | HydroNode::ReduceKeyed { f, .. }
3953            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3954                transform(f);
3955            }
3956            HydroNode::Fold { init, acc, .. }
3957            | HydroNode::Scan { init, acc, .. }
3958            | HydroNode::FoldKeyed { init, acc, .. } => {
3959                transform(init);
3960                transform(acc);
3961            }
3962            HydroNode::Network {
3963                serialize_fn,
3964                deserialize_fn,
3965                ..
3966            } => {
3967                if let Some(serialize_fn) = serialize_fn {
3968                    transform(serialize_fn);
3969                }
3970                if let Some(deserialize_fn) = deserialize_fn {
3971                    transform(deserialize_fn);
3972                }
3973            }
3974            HydroNode::ExternalInput { deserialize_fn, .. } => {
3975                if let Some(deserialize_fn) = deserialize_fn {
3976                    transform(deserialize_fn);
3977                }
3978            }
3979            HydroNode::Counter { duration, .. } => {
3980                transform(duration);
3981            }
3982        }
3983    }
3984
3985    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3986        &self.metadata().op
3987    }
3988
3989    pub fn metadata(&self) -> &HydroIrMetadata {
3990        match self {
3991            HydroNode::Placeholder => {
3992                panic!()
3993            }
3994            HydroNode::Cast { metadata, .. } => metadata,
3995            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3996            HydroNode::Source { metadata, .. } => metadata,
3997            HydroNode::SingletonSource { metadata, .. } => metadata,
3998            HydroNode::CycleSource { metadata, .. } => metadata,
3999            HydroNode::Tee { metadata, .. } => metadata,
4000            HydroNode::Partition { metadata, .. } => metadata,
4001            HydroNode::YieldConcat { metadata, .. } => metadata,
4002            HydroNode::BeginAtomic { metadata, .. } => metadata,
4003            HydroNode::EndAtomic { metadata, .. } => metadata,
4004            HydroNode::Batch { metadata, .. } => metadata,
4005            HydroNode::Chain { metadata, .. } => metadata,
4006            HydroNode::ChainFirst { metadata, .. } => metadata,
4007            HydroNode::CrossProduct { metadata, .. } => metadata,
4008            HydroNode::CrossSingleton { metadata, .. } => metadata,
4009            HydroNode::Join { metadata, .. } => metadata,
4010            HydroNode::Difference { metadata, .. } => metadata,
4011            HydroNode::AntiJoin { metadata, .. } => metadata,
4012            HydroNode::ResolveFutures { metadata, .. } => metadata,
4013            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4014            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4015            HydroNode::Map { metadata, .. } => metadata,
4016            HydroNode::FlatMap { metadata, .. } => metadata,
4017            HydroNode::Filter { metadata, .. } => metadata,
4018            HydroNode::FilterMap { metadata, .. } => metadata,
4019            HydroNode::DeferTick { metadata, .. } => metadata,
4020            HydroNode::Enumerate { metadata, .. } => metadata,
4021            HydroNode::Inspect { metadata, .. } => metadata,
4022            HydroNode::Unique { metadata, .. } => metadata,
4023            HydroNode::Sort { metadata, .. } => metadata,
4024            HydroNode::Scan { metadata, .. } => metadata,
4025            HydroNode::Fold { metadata, .. } => metadata,
4026            HydroNode::FoldKeyed { metadata, .. } => metadata,
4027            HydroNode::Reduce { metadata, .. } => metadata,
4028            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4029            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4030            HydroNode::ExternalInput { metadata, .. } => metadata,
4031            HydroNode::Network { metadata, .. } => metadata,
4032            HydroNode::Counter { metadata, .. } => metadata,
4033        }
4034    }
4035
4036    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4037        &mut self.metadata_mut().op
4038    }
4039
4040    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4041        match self {
4042            HydroNode::Placeholder => {
4043                panic!()
4044            }
4045            HydroNode::Cast { metadata, .. } => metadata,
4046            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4047            HydroNode::Source { metadata, .. } => metadata,
4048            HydroNode::SingletonSource { metadata, .. } => metadata,
4049            HydroNode::CycleSource { metadata, .. } => metadata,
4050            HydroNode::Tee { metadata, .. } => metadata,
4051            HydroNode::Partition { metadata, .. } => metadata,
4052            HydroNode::YieldConcat { metadata, .. } => metadata,
4053            HydroNode::BeginAtomic { metadata, .. } => metadata,
4054            HydroNode::EndAtomic { metadata, .. } => metadata,
4055            HydroNode::Batch { metadata, .. } => metadata,
4056            HydroNode::Chain { metadata, .. } => metadata,
4057            HydroNode::ChainFirst { metadata, .. } => metadata,
4058            HydroNode::CrossProduct { metadata, .. } => metadata,
4059            HydroNode::CrossSingleton { metadata, .. } => metadata,
4060            HydroNode::Join { metadata, .. } => metadata,
4061            HydroNode::Difference { metadata, .. } => metadata,
4062            HydroNode::AntiJoin { metadata, .. } => metadata,
4063            HydroNode::ResolveFutures { metadata, .. } => metadata,
4064            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4065            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4066            HydroNode::Map { metadata, .. } => metadata,
4067            HydroNode::FlatMap { metadata, .. } => metadata,
4068            HydroNode::Filter { metadata, .. } => metadata,
4069            HydroNode::FilterMap { metadata, .. } => metadata,
4070            HydroNode::DeferTick { metadata, .. } => metadata,
4071            HydroNode::Enumerate { metadata, .. } => metadata,
4072            HydroNode::Inspect { metadata, .. } => metadata,
4073            HydroNode::Unique { metadata, .. } => metadata,
4074            HydroNode::Sort { metadata, .. } => metadata,
4075            HydroNode::Scan { metadata, .. } => metadata,
4076            HydroNode::Fold { metadata, .. } => metadata,
4077            HydroNode::FoldKeyed { metadata, .. } => metadata,
4078            HydroNode::Reduce { metadata, .. } => metadata,
4079            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4080            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4081            HydroNode::ExternalInput { metadata, .. } => metadata,
4082            HydroNode::Network { metadata, .. } => metadata,
4083            HydroNode::Counter { metadata, .. } => metadata,
4084        }
4085    }
4086
4087    pub fn input(&self) -> Vec<&HydroNode> {
4088        match self {
4089            HydroNode::Placeholder => {
4090                panic!()
4091            }
4092            HydroNode::Source { .. }
4093            | HydroNode::SingletonSource { .. }
4094            | HydroNode::ExternalInput { .. }
4095            | HydroNode::CycleSource { .. }
4096            | HydroNode::Tee { .. }
4097            | HydroNode::Partition { .. } => {
4098                // Tee/Partition should find their input in separate special ways
4099                vec![]
4100            }
4101            HydroNode::Cast { inner, .. }
4102            | HydroNode::ObserveNonDet { inner, .. }
4103            | HydroNode::YieldConcat { inner, .. }
4104            | HydroNode::BeginAtomic { inner, .. }
4105            | HydroNode::EndAtomic { inner, .. }
4106            | HydroNode::Batch { inner, .. } => {
4107                vec![inner]
4108            }
4109            HydroNode::Chain { first, second, .. } => {
4110                vec![first, second]
4111            }
4112            HydroNode::ChainFirst { first, second, .. } => {
4113                vec![first, second]
4114            }
4115            HydroNode::CrossProduct { left, right, .. }
4116            | HydroNode::CrossSingleton { left, right, .. }
4117            | HydroNode::Join { left, right, .. } => {
4118                vec![left, right]
4119            }
4120            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4121                vec![pos, neg]
4122            }
4123            HydroNode::Map { input, .. }
4124            | HydroNode::FlatMap { input, .. }
4125            | HydroNode::Filter { input, .. }
4126            | HydroNode::FilterMap { input, .. }
4127            | HydroNode::Sort { input, .. }
4128            | HydroNode::DeferTick { input, .. }
4129            | HydroNode::Enumerate { input, .. }
4130            | HydroNode::Inspect { input, .. }
4131            | HydroNode::Unique { input, .. }
4132            | HydroNode::Network { input, .. }
4133            | HydroNode::Counter { input, .. }
4134            | HydroNode::ResolveFutures { input, .. }
4135            | HydroNode::ResolveFuturesBlocking { input, .. }
4136            | HydroNode::ResolveFuturesOrdered { input, .. }
4137            | HydroNode::Fold { input, .. }
4138            | HydroNode::FoldKeyed { input, .. }
4139            | HydroNode::Reduce { input, .. }
4140            | HydroNode::ReduceKeyed { input, .. }
4141            | HydroNode::Scan { input, .. } => {
4142                vec![input]
4143            }
4144            HydroNode::ReduceKeyedWatermark {
4145                input, watermark, ..
4146            } => {
4147                vec![input, watermark]
4148            }
4149        }
4150    }
4151
4152    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4153        self.input()
4154            .iter()
4155            .map(|input_node| input_node.metadata())
4156            .collect()
4157    }
4158
4159    /// Returns `true` if this node is a Tee or Partition whose inner Rc
4160    /// has other live references, meaning the upstream is already driven
4161    /// by another consumer and does not need a Null sink.
4162    pub fn is_shared_with_others(&self) -> bool {
4163        match self {
4164            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4165                Rc::strong_count(&inner.0) > 1
4166            }
4167            _ => false,
4168        }
4169    }
4170
4171    pub fn print_root(&self) -> String {
4172        match self {
4173            HydroNode::Placeholder => {
4174                panic!()
4175            }
4176            HydroNode::Cast { .. } => "Cast()".to_owned(),
4177            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4178            HydroNode::Source { source, .. } => format!("Source({:?})", source),
4179            HydroNode::SingletonSource {
4180                value,
4181                first_tick_only,
4182                ..
4183            } => format!(
4184                "SingletonSource({:?}, first_tick_only={})",
4185                value, first_tick_only
4186            ),
4187            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4188            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4189            HydroNode::Partition { f, is_true, .. } => {
4190                format!("Partition({:?}, is_true={})", f, is_true)
4191            }
4192            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4193            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4194            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4195            HydroNode::Batch { .. } => "Batch()".to_owned(),
4196            HydroNode::Chain { first, second, .. } => {
4197                format!("Chain({}, {})", first.print_root(), second.print_root())
4198            }
4199            HydroNode::ChainFirst { first, second, .. } => {
4200                format!(
4201                    "ChainFirst({}, {})",
4202                    first.print_root(),
4203                    second.print_root()
4204                )
4205            }
4206            HydroNode::CrossProduct { left, right, .. } => {
4207                format!(
4208                    "CrossProduct({}, {})",
4209                    left.print_root(),
4210                    right.print_root()
4211                )
4212            }
4213            HydroNode::CrossSingleton { left, right, .. } => {
4214                format!(
4215                    "CrossSingleton({}, {})",
4216                    left.print_root(),
4217                    right.print_root()
4218                )
4219            }
4220            HydroNode::Join { left, right, .. } => {
4221                format!("Join({}, {})", left.print_root(), right.print_root())
4222            }
4223            HydroNode::Difference { pos, neg, .. } => {
4224                format!("Difference({}, {})", pos.print_root(), neg.print_root())
4225            }
4226            HydroNode::AntiJoin { pos, neg, .. } => {
4227                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4228            }
4229            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4230            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4231            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4232            HydroNode::Map { f, .. } => format!("Map({:?})", f),
4233            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4234            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4235            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4236            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4237            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4238            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4239            HydroNode::Unique { .. } => "Unique()".to_owned(),
4240            HydroNode::Sort { .. } => "Sort()".to_owned(),
4241            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4242            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4243            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4244            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4245            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4246            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4247            HydroNode::Network { .. } => "Network()".to_owned(),
4248            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4249            HydroNode::Counter { tag, duration, .. } => {
4250                format!("Counter({:?}, {:?})", tag, duration)
4251            }
4252        }
4253    }
4254}
4255
4256#[cfg(feature = "build")]
4257fn instantiate_network<'a, D>(
4258    env: &mut D::InstantiateEnv,
4259    from_location: &LocationId,
4260    to_location: &LocationId,
4261    processes: &SparseSecondaryMap<LocationKey, D::Process>,
4262    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4263    name: Option<&str>,
4264    networking_info: &crate::networking::NetworkingInfo,
4265) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4266where
4267    D: Deploy<'a>,
4268{
4269    let ((sink, source), connect_fn) = match (from_location, to_location) {
4270        (&LocationId::Process(from), &LocationId::Process(to)) => {
4271            let from_node = processes
4272                .get(from)
4273                .unwrap_or_else(|| {
4274                    panic!("A process used in the graph was not instantiated: {}", from)
4275                })
4276                .clone();
4277            let to_node = processes
4278                .get(to)
4279                .unwrap_or_else(|| {
4280                    panic!("A process used in the graph was not instantiated: {}", to)
4281                })
4282                .clone();
4283
4284            let sink_port = from_node.next_port();
4285            let source_port = to_node.next_port();
4286
4287            (
4288                D::o2o_sink_source(
4289                    env,
4290                    &from_node,
4291                    &sink_port,
4292                    &to_node,
4293                    &source_port,
4294                    name,
4295                    networking_info,
4296                ),
4297                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4298            )
4299        }
4300        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4301            let from_node = processes
4302                .get(from)
4303                .unwrap_or_else(|| {
4304                    panic!("A process used in the graph was not instantiated: {}", from)
4305                })
4306                .clone();
4307            let to_node = clusters
4308                .get(to)
4309                .unwrap_or_else(|| {
4310                    panic!("A cluster used in the graph was not instantiated: {}", to)
4311                })
4312                .clone();
4313
4314            let sink_port = from_node.next_port();
4315            let source_port = to_node.next_port();
4316
4317            (
4318                D::o2m_sink_source(
4319                    env,
4320                    &from_node,
4321                    &sink_port,
4322                    &to_node,
4323                    &source_port,
4324                    name,
4325                    networking_info,
4326                ),
4327                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4328            )
4329        }
4330        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4331            let from_node = clusters
4332                .get(from)
4333                .unwrap_or_else(|| {
4334                    panic!("A cluster used in the graph was not instantiated: {}", from)
4335                })
4336                .clone();
4337            let to_node = processes
4338                .get(to)
4339                .unwrap_or_else(|| {
4340                    panic!("A process used in the graph was not instantiated: {}", to)
4341                })
4342                .clone();
4343
4344            let sink_port = from_node.next_port();
4345            let source_port = to_node.next_port();
4346
4347            (
4348                D::m2o_sink_source(
4349                    env,
4350                    &from_node,
4351                    &sink_port,
4352                    &to_node,
4353                    &source_port,
4354                    name,
4355                    networking_info,
4356                ),
4357                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4358            )
4359        }
4360        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4361            let from_node = clusters
4362                .get(from)
4363                .unwrap_or_else(|| {
4364                    panic!("A cluster used in the graph was not instantiated: {}", from)
4365                })
4366                .clone();
4367            let to_node = clusters
4368                .get(to)
4369                .unwrap_or_else(|| {
4370                    panic!("A cluster used in the graph was not instantiated: {}", to)
4371                })
4372                .clone();
4373
4374            let sink_port = from_node.next_port();
4375            let source_port = to_node.next_port();
4376
4377            (
4378                D::m2m_sink_source(
4379                    env,
4380                    &from_node,
4381                    &sink_port,
4382                    &to_node,
4383                    &source_port,
4384                    name,
4385                    networking_info,
4386                ),
4387                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4388            )
4389        }
4390        (LocationId::Tick(_, _), _) => panic!(),
4391        (_, LocationId::Tick(_, _)) => panic!(),
4392        (LocationId::Atomic(_), _) => panic!(),
4393        (_, LocationId::Atomic(_)) => panic!(),
4394    };
4395    (sink, source, connect_fn)
4396}
4397
4398#[cfg(test)]
4399mod test {
4400    use std::mem::size_of;
4401
4402    use stageleft::{QuotedWithContext, q};
4403
4404    use super::*;
4405
4406    #[test]
4407    #[cfg_attr(
4408        not(feature = "build"),
4409        ignore = "expects inclusion of feature-gated fields"
4410    )]
4411    fn hydro_node_size() {
4412        assert_eq!(size_of::<HydroNode>(), 248);
4413    }
4414
4415    #[test]
4416    #[cfg_attr(
4417        not(feature = "build"),
4418        ignore = "expects inclusion of feature-gated fields"
4419    )]
4420    fn hydro_root_size() {
4421        assert_eq!(size_of::<HydroRoot>(), 136);
4422    }
4423
4424    #[test]
4425    fn test_simplify_q_macro_basic() {
4426        // Test basic non-q! expression
4427        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4428        let result = simplify_q_macro(simple_expr.clone());
4429        assert_eq!(result, simple_expr);
4430    }
4431
4432    #[test]
4433    fn test_simplify_q_macro_actual_stageleft_call() {
4434        // Test a simplified version of what a real stageleft call might look like
4435        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4436        let result = simplify_q_macro(stageleft_call);
4437        // This should be processed by our visitor and simplified to q!(...)
4438        // since we detect the stageleft::runtime_support::fn_* pattern
4439        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4440    }
4441
4442    #[test]
4443    fn test_closure_no_pipe_at_start() {
4444        // Test a closure that does not start with a pipe
4445        let stageleft_call = q!({
4446            let foo = 123;
4447            move |b: usize| b + foo
4448        })
4449        .splice_fn1_ctx(&());
4450        let result = simplify_q_macro(stageleft_call);
4451        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4452    }
4453}