1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl From<syn::Expr> for DebugExpr {
44 fn from(expr: syn::Expr) -> Self {
45 Self(Box::new(expr))
46 }
47}
48
49impl Deref for DebugExpr {
50 type Target = syn::Expr;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57impl ToTokens for DebugExpr {
58 fn to_tokens(&self, tokens: &mut TokenStream) {
59 self.0.to_tokens(tokens);
60 }
61}
62
63impl Debug for DebugExpr {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "{}", self.0.to_token_stream())
66 }
67}
68
69impl Display for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let original = self.0.as_ref().clone();
72 let simplified = simplify_q_macro(original);
73
74 write!(f, "q!({})", quote::quote!(#simplified))
77 }
78}
79
80fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
82 let mut simplifier = QMacroSimplifier::new();
85 simplifier.visit_expr_mut(&mut expr);
86
87 if let Some(simplified) = simplifier.simplified_result {
89 simplified
90 } else {
91 expr
92 }
93}
94
95#[derive(Default)]
97pub struct QMacroSimplifier {
98 pub simplified_result: Option<syn::Expr>,
99}
100
101impl QMacroSimplifier {
102 pub fn new() -> Self {
103 Self::default()
104 }
105}
106
107impl VisitMut for QMacroSimplifier {
108 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
109 if self.simplified_result.is_some() {
111 return;
112 }
113
114 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
115 && self.is_stageleft_runtime_support_call(&path_expr.path)
117 && let Some(closure) = self.extract_closure_from_args(&call.args)
119 {
120 self.simplified_result = Some(closure);
121 return;
122 }
123
124 syn::visit_mut::visit_expr_mut(self, expr);
127 }
128}
129
130impl QMacroSimplifier {
131 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
132 if let Some(last_segment) = path.segments.last() {
134 let fn_name = last_segment.ident.to_string();
135 fn_name.contains("_type_hint")
137 && path.segments.len() > 2
138 && path.segments[0].ident == "stageleft"
139 && path.segments[1].ident == "runtime_support"
140 } else {
141 false
142 }
143 }
144
145 fn extract_closure_from_args(
146 &self,
147 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
148 ) -> Option<syn::Expr> {
149 for arg in args {
151 if let syn::Expr::Closure(_) = arg {
152 return Some(arg.clone());
153 }
154 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
156 return Some(closure_expr);
157 }
158 }
159 None
160 }
161
162 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
163 let mut visitor = ClosureFinder {
164 found_closure: None,
165 prefer_inner_blocks: true,
166 };
167 visitor.visit_expr(expr);
168 visitor.found_closure
169 }
170}
171
172struct ClosureFinder {
174 found_closure: Option<syn::Expr>,
175 prefer_inner_blocks: bool,
176}
177
178impl<'ast> Visit<'ast> for ClosureFinder {
179 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
180 if self.found_closure.is_some() {
182 return;
183 }
184
185 match expr {
186 syn::Expr::Closure(_) => {
187 self.found_closure = Some(expr.clone());
188 }
189 syn::Expr::Block(block) if self.prefer_inner_blocks => {
190 for stmt in &block.block.stmts {
192 if let syn::Stmt::Expr(stmt_expr, _) = stmt
193 && let syn::Expr::Block(_) = stmt_expr
194 {
195 let mut inner_visitor = ClosureFinder {
197 found_closure: None,
198 prefer_inner_blocks: false, };
200 inner_visitor.visit_expr(stmt_expr);
201 if inner_visitor.found_closure.is_some() {
202 self.found_closure = Some(stmt_expr.clone());
204 return;
205 }
206 }
207 }
208
209 visit::visit_expr(self, expr);
211
212 if self.found_closure.is_some() {
215 }
217 }
218 _ => {
219 visit::visit_expr(self, expr);
221 }
222 }
223 }
224}
225
226#[derive(Clone, PartialEq, Eq, Hash)]
230pub struct DebugType(pub Box<syn::Type>);
231
232impl From<syn::Type> for DebugType {
233 fn from(t: syn::Type) -> Self {
234 Self(Box::new(t))
235 }
236}
237
238impl Deref for DebugType {
239 type Target = syn::Type;
240
241 fn deref(&self) -> &Self::Target {
242 &self.0
243 }
244}
245
246impl ToTokens for DebugType {
247 fn to_tokens(&self, tokens: &mut TokenStream) {
248 self.0.to_tokens(tokens);
249 }
250}
251
252impl Debug for DebugType {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "{}", self.0.to_token_stream())
255 }
256}
257
258pub enum DebugInstantiate {
259 Building,
260 Finalized(Box<DebugInstantiateFinalized>),
261}
262
263#[cfg_attr(
264 not(feature = "build"),
265 expect(
266 dead_code,
267 reason = "sink, source unused without `feature = \"build\"`."
268 )
269)]
270pub struct DebugInstantiateFinalized {
271 sink: syn::Expr,
272 source: syn::Expr,
273 connect_fn: Option<Box<dyn FnOnce()>>,
274}
275
276impl From<DebugInstantiateFinalized> for DebugInstantiate {
277 fn from(f: DebugInstantiateFinalized) -> Self {
278 Self::Finalized(Box::new(f))
279 }
280}
281
282impl Debug for DebugInstantiate {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 write!(f, "<network instantiate>")
285 }
286}
287
288impl Hash for DebugInstantiate {
289 fn hash<H: Hasher>(&self, _state: &mut H) {
290 }
292}
293
294impl Clone for DebugInstantiate {
295 fn clone(&self) -> Self {
296 match self {
297 DebugInstantiate::Building => DebugInstantiate::Building,
298 DebugInstantiate::Finalized(_) => {
299 panic!("DebugInstantiate::Finalized should not be cloned")
300 }
301 }
302 }
303}
304
305#[derive(Debug, Hash, Clone)]
314pub enum ClusterMembersState {
315 Uninit,
317 Stream(DebugExpr),
320 Tee(LocationId, LocationId),
324}
325
326#[derive(Debug, Hash, Clone)]
328pub enum HydroSource {
329 Stream(DebugExpr),
330 ExternalNetwork(),
331 Iter(DebugExpr),
332 Spin(),
333 ClusterMembers(LocationId, ClusterMembersState),
334 Embedded(syn::Ident),
335 EmbeddedSingleton(syn::Ident),
336}
337
338#[cfg(feature = "build")]
339pub trait DfirBuilder {
345 fn singleton_intermediates(&self) -> bool;
347
348 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
350
351 fn batch(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 out_location: &LocationId,
358 op_meta: &HydroIrOpMetadata,
359 );
360 fn yield_from_tick(
361 &mut self,
362 in_ident: syn::Ident,
363 in_location: &LocationId,
364 in_kind: &CollectionKind,
365 out_ident: &syn::Ident,
366 out_location: &LocationId,
367 );
368
369 fn begin_atomic(
370 &mut self,
371 in_ident: syn::Ident,
372 in_location: &LocationId,
373 in_kind: &CollectionKind,
374 out_ident: &syn::Ident,
375 out_location: &LocationId,
376 op_meta: &HydroIrOpMetadata,
377 );
378 fn end_atomic(
379 &mut self,
380 in_ident: syn::Ident,
381 in_location: &LocationId,
382 in_kind: &CollectionKind,
383 out_ident: &syn::Ident,
384 );
385
386 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
387 fn observe_nondet(
388 &mut self,
389 trusted: bool,
390 location: &LocationId,
391 in_ident: syn::Ident,
392 in_kind: &CollectionKind,
393 out_ident: &syn::Ident,
394 out_kind: &CollectionKind,
395 op_meta: &HydroIrOpMetadata,
396 );
397
398 #[expect(clippy::too_many_arguments, reason = "TODO")]
399 fn create_network(
400 &mut self,
401 from: &LocationId,
402 to: &LocationId,
403 input_ident: syn::Ident,
404 out_ident: &syn::Ident,
405 serialize: Option<&DebugExpr>,
406 sink: syn::Expr,
407 source: syn::Expr,
408 deserialize: Option<&DebugExpr>,
409 tag_id: usize,
410 networking_info: &crate::networking::NetworkingInfo,
411 );
412
413 fn create_external_source(
414 &mut self,
415 on: &LocationId,
416 source_expr: syn::Expr,
417 out_ident: &syn::Ident,
418 deserialize: Option<&DebugExpr>,
419 tag_id: usize,
420 );
421
422 fn create_external_output(
423 &mut self,
424 on: &LocationId,
425 sink_expr: syn::Expr,
426 input_ident: &syn::Ident,
427 serialize: Option<&DebugExpr>,
428 tag_id: usize,
429 );
430}
431
432#[cfg(feature = "build")]
433impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
434 fn singleton_intermediates(&self) -> bool {
435 false
436 }
437
438 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
439 self.entry(location.root().key())
440 .expect("location was removed")
441 .or_default()
442 }
443
444 fn batch(
445 &mut self,
446 in_ident: syn::Ident,
447 in_location: &LocationId,
448 in_kind: &CollectionKind,
449 out_ident: &syn::Ident,
450 _out_location: &LocationId,
451 _op_meta: &HydroIrOpMetadata,
452 ) {
453 let builder = self.get_dfir_mut(in_location.root());
454 if in_kind.is_bounded()
455 && matches!(
456 in_kind,
457 CollectionKind::Singleton { .. }
458 | CollectionKind::Optional { .. }
459 | CollectionKind::KeyedSingleton { .. }
460 )
461 {
462 assert!(in_location.is_top_level());
463 builder.add_dfir(
464 parse_quote! {
465 #out_ident = #in_ident -> persist::<'static>();
466 },
467 None,
468 None,
469 );
470 } else {
471 builder.add_dfir(
472 parse_quote! {
473 #out_ident = #in_ident;
474 },
475 None,
476 None,
477 );
478 }
479 }
480
481 fn yield_from_tick(
482 &mut self,
483 in_ident: syn::Ident,
484 in_location: &LocationId,
485 _in_kind: &CollectionKind,
486 out_ident: &syn::Ident,
487 _out_location: &LocationId,
488 ) {
489 let builder = self.get_dfir_mut(in_location.root());
490 builder.add_dfir(
491 parse_quote! {
492 #out_ident = #in_ident;
493 },
494 None,
495 None,
496 );
497 }
498
499 fn begin_atomic(
500 &mut self,
501 in_ident: syn::Ident,
502 in_location: &LocationId,
503 _in_kind: &CollectionKind,
504 out_ident: &syn::Ident,
505 _out_location: &LocationId,
506 _op_meta: &HydroIrOpMetadata,
507 ) {
508 let builder = self.get_dfir_mut(in_location.root());
509 builder.add_dfir(
510 parse_quote! {
511 #out_ident = #in_ident;
512 },
513 None,
514 None,
515 );
516 }
517
518 fn end_atomic(
519 &mut self,
520 in_ident: syn::Ident,
521 in_location: &LocationId,
522 _in_kind: &CollectionKind,
523 out_ident: &syn::Ident,
524 ) {
525 let builder = self.get_dfir_mut(in_location.root());
526 builder.add_dfir(
527 parse_quote! {
528 #out_ident = #in_ident;
529 },
530 None,
531 None,
532 );
533 }
534
535 fn observe_nondet(
536 &mut self,
537 _trusted: bool,
538 location: &LocationId,
539 in_ident: syn::Ident,
540 _in_kind: &CollectionKind,
541 out_ident: &syn::Ident,
542 _out_kind: &CollectionKind,
543 _op_meta: &HydroIrOpMetadata,
544 ) {
545 let builder = self.get_dfir_mut(location);
546 builder.add_dfir(
547 parse_quote! {
548 #out_ident = #in_ident;
549 },
550 None,
551 None,
552 );
553 }
554
555 fn create_network(
556 &mut self,
557 from: &LocationId,
558 to: &LocationId,
559 input_ident: syn::Ident,
560 out_ident: &syn::Ident,
561 serialize: Option<&DebugExpr>,
562 sink: syn::Expr,
563 source: syn::Expr,
564 deserialize: Option<&DebugExpr>,
565 tag_id: usize,
566 _networking_info: &crate::networking::NetworkingInfo,
567 ) {
568 let sender_builder = self.get_dfir_mut(from);
569 if let Some(serialize_pipeline) = serialize {
570 sender_builder.add_dfir(
571 parse_quote! {
572 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
573 },
574 None,
575 Some(&format!("send{}", tag_id)),
577 );
578 } else {
579 sender_builder.add_dfir(
580 parse_quote! {
581 #input_ident -> dest_sink(#sink);
582 },
583 None,
584 Some(&format!("send{}", tag_id)),
585 );
586 }
587
588 let receiver_builder = self.get_dfir_mut(to);
589 if let Some(deserialize_pipeline) = deserialize {
590 receiver_builder.add_dfir(
591 parse_quote! {
592 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
593 },
594 None,
595 Some(&format!("recv{}", tag_id)),
596 );
597 } else {
598 receiver_builder.add_dfir(
599 parse_quote! {
600 #out_ident = source_stream(#source);
601 },
602 None,
603 Some(&format!("recv{}", tag_id)),
604 );
605 }
606 }
607
608 fn create_external_source(
609 &mut self,
610 on: &LocationId,
611 source_expr: syn::Expr,
612 out_ident: &syn::Ident,
613 deserialize: Option<&DebugExpr>,
614 tag_id: usize,
615 ) {
616 let receiver_builder = self.get_dfir_mut(on);
617 if let Some(deserialize_pipeline) = deserialize {
618 receiver_builder.add_dfir(
619 parse_quote! {
620 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
621 },
622 None,
623 Some(&format!("recv{}", tag_id)),
624 );
625 } else {
626 receiver_builder.add_dfir(
627 parse_quote! {
628 #out_ident = source_stream(#source_expr);
629 },
630 None,
631 Some(&format!("recv{}", tag_id)),
632 );
633 }
634 }
635
636 fn create_external_output(
637 &mut self,
638 on: &LocationId,
639 sink_expr: syn::Expr,
640 input_ident: &syn::Ident,
641 serialize: Option<&DebugExpr>,
642 tag_id: usize,
643 ) {
644 let sender_builder = self.get_dfir_mut(on);
645 if let Some(serialize_fn) = serialize {
646 sender_builder.add_dfir(
647 parse_quote! {
648 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
649 },
650 None,
651 Some(&format!("send{}", tag_id)),
653 );
654 } else {
655 sender_builder.add_dfir(
656 parse_quote! {
657 #input_ident -> dest_sink(#sink_expr);
658 },
659 None,
660 Some(&format!("send{}", tag_id)),
661 );
662 }
663 }
664}
665
666#[cfg(feature = "build")]
667pub enum BuildersOrCallback<'a, L, N>
668where
669 L: FnMut(&mut HydroRoot, &mut usize),
670 N: FnMut(&mut HydroNode, &mut usize),
671{
672 Builders(&'a mut dyn DfirBuilder),
673 Callback(L, N),
674}
675
676#[derive(Debug, Hash)]
680pub enum HydroRoot {
681 ForEach {
682 f: DebugExpr,
683 input: Box<HydroNode>,
684 op_metadata: HydroIrOpMetadata,
685 },
686 SendExternal {
687 to_external_key: LocationKey,
688 to_port_id: ExternalPortId,
689 to_many: bool,
690 unpaired: bool,
691 serialize_fn: Option<DebugExpr>,
692 instantiate_fn: DebugInstantiate,
693 input: Box<HydroNode>,
694 op_metadata: HydroIrOpMetadata,
695 },
696 DestSink {
697 sink: DebugExpr,
698 input: Box<HydroNode>,
699 op_metadata: HydroIrOpMetadata,
700 },
701 CycleSink {
702 cycle_id: CycleId,
703 input: Box<HydroNode>,
704 op_metadata: HydroIrOpMetadata,
705 },
706 EmbeddedOutput {
707 ident: syn::Ident,
708 input: Box<HydroNode>,
709 op_metadata: HydroIrOpMetadata,
710 },
711 Null {
712 input: Box<HydroNode>,
713 op_metadata: HydroIrOpMetadata,
714 },
715}
716
717impl HydroRoot {
718 #[cfg(feature = "build")]
719 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
720 pub fn compile_network<'a, D>(
721 &mut self,
722 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723 seen_tees: &mut SeenSharedNodes,
724 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
725 processes: &SparseSecondaryMap<LocationKey, D::Process>,
726 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727 externals: &SparseSecondaryMap<LocationKey, D::External>,
728 env: &mut D::InstantiateEnv,
729 ) where
730 D: Deploy<'a>,
731 {
732 let refcell_extra_stmts = RefCell::new(extra_stmts);
733 let refcell_env = RefCell::new(env);
734 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
735 self.transform_bottom_up(
736 &mut |l| {
737 if let HydroRoot::SendExternal {
738 input,
739 to_external_key,
740 to_port_id,
741 to_many,
742 unpaired,
743 instantiate_fn,
744 ..
745 } = l
746 {
747 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
748 DebugInstantiate::Building => {
749 let to_node = externals
750 .get(*to_external_key)
751 .unwrap_or_else(|| {
752 panic!("A external used in the graph was not instantiated: {}", to_external_key)
753 })
754 .clone();
755
756 match input.metadata().location_id.root() {
757 &LocationId::Process(process_key) => {
758 if *to_many {
759 (
760 (
761 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
762 parse_quote!(DUMMY),
763 ),
764 Box::new(|| {}) as Box<dyn FnOnce()>,
765 )
766 } else {
767 let from_node = processes
768 .get(process_key)
769 .unwrap_or_else(|| {
770 panic!("A process used in the graph was not instantiated: {}", process_key)
771 })
772 .clone();
773
774 let sink_port = from_node.next_port();
775 let source_port = to_node.next_port();
776
777 if *unpaired {
778 use stageleft::quote_type;
779 use tokio_util::codec::LengthDelimitedCodec;
780
781 to_node.register(*to_port_id, source_port.clone());
782
783 let _ = D::e2o_source(
784 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
785 &to_node, &source_port,
786 &from_node, &sink_port,
787 "e_type::<LengthDelimitedCodec>(),
788 format!("{}_{}", *to_external_key, *to_port_id)
789 );
790 }
791
792 (
793 (
794 D::o2e_sink(
795 &from_node,
796 &sink_port,
797 &to_node,
798 &source_port,
799 format!("{}_{}", *to_external_key, *to_port_id)
800 ),
801 parse_quote!(DUMMY),
802 ),
803 if *unpaired {
804 D::e2o_connect(
805 &to_node,
806 &source_port,
807 &from_node,
808 &sink_port,
809 *to_many,
810 NetworkHint::Auto,
811 )
812 } else {
813 Box::new(|| {}) as Box<dyn FnOnce()>
814 },
815 )
816 }
817 }
818 LocationId::Cluster(_) => todo!("SendExternal from a cluster location is not yet supported"),
819 _ => panic!()
820 }
821 },
822
823 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
824 };
825
826 *instantiate_fn = DebugInstantiateFinalized {
827 sink: sink_expr,
828 source: source_expr,
829 connect_fn: Some(connect_fn),
830 }
831 .into();
832 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
833 let element_type = match &input.metadata().collection_kind {
834 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
835 _ => panic!("Embedded output must have Stream collection kind"),
836 };
837 let location_key = match input.metadata().location_id.root() {
838 LocationId::Process(key) | LocationId::Cluster(key) => *key,
839 _ => panic!("Embedded output must be on a process or cluster"),
840 };
841 D::register_embedded_output(
842 &mut refcell_env.borrow_mut(),
843 location_key,
844 ident,
845 &element_type,
846 );
847 }
848 },
849 &mut |n| {
850 if let HydroNode::Network {
851 name,
852 networking_info,
853 input,
854 instantiate_fn,
855 metadata,
856 ..
857 } = n
858 {
859 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
860 DebugInstantiate::Building => instantiate_network::<D>(
861 &mut refcell_env.borrow_mut(),
862 input.metadata().location_id.root(),
863 metadata.location_id.root(),
864 processes,
865 clusters,
866 name.as_deref(),
867 networking_info,
868 ),
869
870 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
871 };
872
873 *instantiate_fn = DebugInstantiateFinalized {
874 sink: sink_expr,
875 source: source_expr,
876 connect_fn: Some(connect_fn),
877 }
878 .into();
879 } else if let HydroNode::ExternalInput {
880 from_external_key,
881 from_port_id,
882 from_many,
883 codec_type,
884 port_hint,
885 instantiate_fn,
886 metadata,
887 ..
888 } = n
889 {
890 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
891 DebugInstantiate::Building => {
892 let from_node = externals
893 .get(*from_external_key)
894 .unwrap_or_else(|| {
895 panic!(
896 "A external used in the graph was not instantiated: {}",
897 from_external_key,
898 )
899 })
900 .clone();
901
902 match metadata.location_id.root() {
903 &LocationId::Process(process_key) => {
904 let to_node = processes
905 .get(process_key)
906 .unwrap_or_else(|| {
907 panic!("A process used in the graph was not instantiated: {}", process_key)
908 })
909 .clone();
910
911 let sink_port = from_node.next_port();
912 let source_port = to_node.next_port();
913
914 from_node.register(*from_port_id, sink_port.clone());
915
916 (
917 (
918 parse_quote!(DUMMY),
919 if *from_many {
920 D::e2o_many_source(
921 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
922 &to_node, &source_port,
923 codec_type.0.as_ref(),
924 format!("{}_{}", *from_external_key, *from_port_id)
925 )
926 } else {
927 D::e2o_source(
928 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
929 &from_node, &sink_port,
930 &to_node, &source_port,
931 codec_type.0.as_ref(),
932 format!("{}_{}", *from_external_key, *from_port_id)
933 )
934 },
935 ),
936 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
937 )
938 }
939 LocationId::Cluster(_) => todo!("ExternalInput to a cluster location is not yet supported"),
940 _ => panic!()
941 }
942 },
943
944 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
945 };
946
947 *instantiate_fn = DebugInstantiateFinalized {
948 sink: sink_expr,
949 source: source_expr,
950 connect_fn: Some(connect_fn),
951 }
952 .into();
953 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
954 let element_type = match &metadata.collection_kind {
955 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
956 _ => panic!("Embedded source must have Stream collection kind"),
957 };
958 let location_key = match metadata.location_id.root() {
959 LocationId::Process(key) | LocationId::Cluster(key) => *key,
960 _ => panic!("Embedded source must be on a process or cluster"),
961 };
962 D::register_embedded_stream_input(
963 &mut refcell_env.borrow_mut(),
964 location_key,
965 ident,
966 &element_type,
967 );
968 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
969 let element_type = match &metadata.collection_kind {
970 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
971 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
972 };
973 let location_key = match metadata.location_id.root() {
974 LocationId::Process(key) | LocationId::Cluster(key) => *key,
975 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
976 };
977 D::register_embedded_singleton_input(
978 &mut refcell_env.borrow_mut(),
979 location_key,
980 ident,
981 &element_type,
982 );
983 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
984 match state {
985 ClusterMembersState::Uninit => {
986 let at_location = metadata.location_id.root().clone();
987 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
988 if refcell_seen_cluster_members.borrow_mut().insert(key) {
989 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
991 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
992 &(),
993 );
994 *state = ClusterMembersState::Stream(expr.into());
995 } else {
996 *state = ClusterMembersState::Tee(at_location, location_id.clone());
998 }
999 }
1000 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1001 panic!("cluster members already finalized");
1002 }
1003 }
1004 }
1005 },
1006 seen_tees,
1007 false,
1008 );
1009 }
1010
1011 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1012 self.transform_bottom_up(
1013 &mut |l| {
1014 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1015 match instantiate_fn {
1016 DebugInstantiate::Building => panic!("network not built"),
1017
1018 DebugInstantiate::Finalized(finalized) => {
1019 (finalized.connect_fn.take().unwrap())();
1020 }
1021 }
1022 }
1023 },
1024 &mut |n| {
1025 if let HydroNode::Network { instantiate_fn, .. }
1026 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1027 {
1028 match instantiate_fn {
1029 DebugInstantiate::Building => panic!("network not built"),
1030
1031 DebugInstantiate::Finalized(finalized) => {
1032 (finalized.connect_fn.take().unwrap())();
1033 }
1034 }
1035 }
1036 },
1037 seen_tees,
1038 false,
1039 );
1040 }
1041
1042 pub fn transform_bottom_up(
1043 &mut self,
1044 transform_root: &mut impl FnMut(&mut HydroRoot),
1045 transform_node: &mut impl FnMut(&mut HydroNode),
1046 seen_tees: &mut SeenSharedNodes,
1047 check_well_formed: bool,
1048 ) {
1049 self.transform_children(
1050 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1051 seen_tees,
1052 );
1053
1054 transform_root(self);
1055 }
1056
1057 pub fn transform_children(
1058 &mut self,
1059 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1060 seen_tees: &mut SeenSharedNodes,
1061 ) {
1062 match self {
1063 HydroRoot::ForEach { input, .. }
1064 | HydroRoot::SendExternal { input, .. }
1065 | HydroRoot::DestSink { input, .. }
1066 | HydroRoot::CycleSink { input, .. }
1067 | HydroRoot::EmbeddedOutput { input, .. }
1068 | HydroRoot::Null { input, .. } => {
1069 transform(input, seen_tees);
1070 }
1071 }
1072 }
1073
1074 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1075 match self {
1076 HydroRoot::ForEach {
1077 f,
1078 input,
1079 op_metadata,
1080 } => HydroRoot::ForEach {
1081 f: f.clone(),
1082 input: Box::new(input.deep_clone(seen_tees)),
1083 op_metadata: op_metadata.clone(),
1084 },
1085 HydroRoot::SendExternal {
1086 to_external_key,
1087 to_port_id,
1088 to_many,
1089 unpaired,
1090 serialize_fn,
1091 instantiate_fn,
1092 input,
1093 op_metadata,
1094 } => HydroRoot::SendExternal {
1095 to_external_key: *to_external_key,
1096 to_port_id: *to_port_id,
1097 to_many: *to_many,
1098 unpaired: *unpaired,
1099 serialize_fn: serialize_fn.clone(),
1100 instantiate_fn: instantiate_fn.clone(),
1101 input: Box::new(input.deep_clone(seen_tees)),
1102 op_metadata: op_metadata.clone(),
1103 },
1104 HydroRoot::DestSink {
1105 sink,
1106 input,
1107 op_metadata,
1108 } => HydroRoot::DestSink {
1109 sink: sink.clone(),
1110 input: Box::new(input.deep_clone(seen_tees)),
1111 op_metadata: op_metadata.clone(),
1112 },
1113 HydroRoot::CycleSink {
1114 cycle_id,
1115 input,
1116 op_metadata,
1117 } => HydroRoot::CycleSink {
1118 cycle_id: *cycle_id,
1119 input: Box::new(input.deep_clone(seen_tees)),
1120 op_metadata: op_metadata.clone(),
1121 },
1122 HydroRoot::EmbeddedOutput {
1123 ident,
1124 input,
1125 op_metadata,
1126 } => HydroRoot::EmbeddedOutput {
1127 ident: ident.clone(),
1128 input: Box::new(input.deep_clone(seen_tees)),
1129 op_metadata: op_metadata.clone(),
1130 },
1131 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1132 input: Box::new(input.deep_clone(seen_tees)),
1133 op_metadata: op_metadata.clone(),
1134 },
1135 }
1136 }
1137
1138 #[cfg(feature = "build")]
1139 pub fn emit(
1140 &mut self,
1141 graph_builders: &mut dyn DfirBuilder,
1142 seen_tees: &mut SeenSharedNodes,
1143 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1144 next_stmt_id: &mut usize,
1145 ) {
1146 self.emit_core(
1147 &mut BuildersOrCallback::<
1148 fn(&mut HydroRoot, &mut usize),
1149 fn(&mut HydroNode, &mut usize),
1150 >::Builders(graph_builders),
1151 seen_tees,
1152 built_tees,
1153 next_stmt_id,
1154 );
1155 }
1156
1157 #[cfg(feature = "build")]
1158 pub fn emit_core(
1159 &mut self,
1160 builders_or_callback: &mut BuildersOrCallback<
1161 impl FnMut(&mut HydroRoot, &mut usize),
1162 impl FnMut(&mut HydroNode, &mut usize),
1163 >,
1164 seen_tees: &mut SeenSharedNodes,
1165 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1166 next_stmt_id: &mut usize,
1167 ) {
1168 match self {
1169 HydroRoot::ForEach { f, input, .. } => {
1170 let input_ident =
1171 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1172
1173 match builders_or_callback {
1174 BuildersOrCallback::Builders(graph_builders) => {
1175 graph_builders
1176 .get_dfir_mut(&input.metadata().location_id)
1177 .add_dfir(
1178 parse_quote! {
1179 #input_ident -> for_each(#f);
1180 },
1181 None,
1182 Some(&next_stmt_id.to_string()),
1183 );
1184 }
1185 BuildersOrCallback::Callback(leaf_callback, _) => {
1186 leaf_callback(self, next_stmt_id);
1187 }
1188 }
1189
1190 *next_stmt_id += 1;
1191 }
1192
1193 HydroRoot::SendExternal {
1194 serialize_fn,
1195 instantiate_fn,
1196 input,
1197 ..
1198 } => {
1199 let input_ident =
1200 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1201
1202 match builders_or_callback {
1203 BuildersOrCallback::Builders(graph_builders) => {
1204 let (sink_expr, _) = match instantiate_fn {
1205 DebugInstantiate::Building => (
1206 syn::parse_quote!(DUMMY_SINK),
1207 syn::parse_quote!(DUMMY_SOURCE),
1208 ),
1209
1210 DebugInstantiate::Finalized(finalized) => {
1211 (finalized.sink.clone(), finalized.source.clone())
1212 }
1213 };
1214
1215 graph_builders.create_external_output(
1216 &input.metadata().location_id,
1217 sink_expr,
1218 &input_ident,
1219 serialize_fn.as_ref(),
1220 *next_stmt_id,
1221 );
1222 }
1223 BuildersOrCallback::Callback(leaf_callback, _) => {
1224 leaf_callback(self, next_stmt_id);
1225 }
1226 }
1227
1228 *next_stmt_id += 1;
1229 }
1230
1231 HydroRoot::DestSink { sink, input, .. } => {
1232 let input_ident =
1233 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1234
1235 match builders_or_callback {
1236 BuildersOrCallback::Builders(graph_builders) => {
1237 graph_builders
1238 .get_dfir_mut(&input.metadata().location_id)
1239 .add_dfir(
1240 parse_quote! {
1241 #input_ident -> dest_sink(#sink);
1242 },
1243 None,
1244 Some(&next_stmt_id.to_string()),
1245 );
1246 }
1247 BuildersOrCallback::Callback(leaf_callback, _) => {
1248 leaf_callback(self, next_stmt_id);
1249 }
1250 }
1251
1252 *next_stmt_id += 1;
1253 }
1254
1255 HydroRoot::CycleSink {
1256 cycle_id, input, ..
1257 } => {
1258 let input_ident =
1259 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1260
1261 match builders_or_callback {
1262 BuildersOrCallback::Builders(graph_builders) => {
1263 let elem_type: syn::Type = match &input.metadata().collection_kind {
1264 CollectionKind::KeyedSingleton {
1265 key_type,
1266 value_type,
1267 ..
1268 }
1269 | CollectionKind::KeyedStream {
1270 key_type,
1271 value_type,
1272 ..
1273 } => {
1274 parse_quote!((#key_type, #value_type))
1275 }
1276 CollectionKind::Stream { element_type, .. }
1277 | CollectionKind::Singleton { element_type, .. }
1278 | CollectionKind::Optional { element_type, .. } => {
1279 parse_quote!(#element_type)
1280 }
1281 };
1282
1283 let cycle_id_ident = cycle_id.as_ident();
1284 graph_builders
1285 .get_dfir_mut(&input.metadata().location_id)
1286 .add_dfir(
1287 parse_quote! {
1288 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1289 },
1290 None,
1291 None,
1292 );
1293 }
1294 BuildersOrCallback::Callback(_, _) => {}
1296 }
1297 }
1298
1299 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1300 let input_ident =
1301 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1302
1303 match builders_or_callback {
1304 BuildersOrCallback::Builders(graph_builders) => {
1305 graph_builders
1306 .get_dfir_mut(&input.metadata().location_id)
1307 .add_dfir(
1308 parse_quote! {
1309 #input_ident -> for_each(&mut #ident);
1310 },
1311 None,
1312 Some(&next_stmt_id.to_string()),
1313 );
1314 }
1315 BuildersOrCallback::Callback(leaf_callback, _) => {
1316 leaf_callback(self, next_stmt_id);
1317 }
1318 }
1319
1320 *next_stmt_id += 1;
1321 }
1322
1323 HydroRoot::Null { input, .. } => {
1324 let input_ident =
1325 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1326
1327 match builders_or_callback {
1328 BuildersOrCallback::Builders(graph_builders) => {
1329 graph_builders
1330 .get_dfir_mut(&input.metadata().location_id)
1331 .add_dfir(
1332 parse_quote! {
1333 #input_ident -> for_each(|_| {});
1334 },
1335 None,
1336 Some(&next_stmt_id.to_string()),
1337 );
1338 }
1339 BuildersOrCallback::Callback(leaf_callback, _) => {
1340 leaf_callback(self, next_stmt_id);
1341 }
1342 }
1343
1344 *next_stmt_id += 1;
1345 }
1346 }
1347 }
1348
1349 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1350 match self {
1351 HydroRoot::ForEach { op_metadata, .. }
1352 | HydroRoot::SendExternal { op_metadata, .. }
1353 | HydroRoot::DestSink { op_metadata, .. }
1354 | HydroRoot::CycleSink { op_metadata, .. }
1355 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1356 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1357 }
1358 }
1359
1360 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1361 match self {
1362 HydroRoot::ForEach { op_metadata, .. }
1363 | HydroRoot::SendExternal { op_metadata, .. }
1364 | HydroRoot::DestSink { op_metadata, .. }
1365 | HydroRoot::CycleSink { op_metadata, .. }
1366 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1367 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1368 }
1369 }
1370
1371 pub fn input(&self) -> &HydroNode {
1372 match self {
1373 HydroRoot::ForEach { input, .. }
1374 | HydroRoot::SendExternal { input, .. }
1375 | HydroRoot::DestSink { input, .. }
1376 | HydroRoot::CycleSink { input, .. }
1377 | HydroRoot::EmbeddedOutput { input, .. }
1378 | HydroRoot::Null { input, .. } => input,
1379 }
1380 }
1381
1382 pub fn input_metadata(&self) -> &HydroIrMetadata {
1383 self.input().metadata()
1384 }
1385
1386 pub fn print_root(&self) -> String {
1387 match self {
1388 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1389 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1390 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1391 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1392 HydroRoot::EmbeddedOutput { ident, .. } => {
1393 format!("EmbeddedOutput({})", ident)
1394 }
1395 HydroRoot::Null { .. } => "Null".to_owned(),
1396 }
1397 }
1398
1399 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1400 match self {
1401 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1402 transform(f);
1403 }
1404 HydroRoot::SendExternal { .. }
1405 | HydroRoot::CycleSink { .. }
1406 | HydroRoot::EmbeddedOutput { .. }
1407 | HydroRoot::Null { .. } => {}
1408 }
1409 }
1410}
1411
1412#[cfg(feature = "build")]
1413fn tick_of(loc: &LocationId) -> Option<ClockId> {
1414 match loc {
1415 LocationId::Tick(id, _) => Some(*id),
1416 LocationId::Atomic(inner) => tick_of(inner),
1417 _ => None,
1418 }
1419}
1420
1421#[cfg(feature = "build")]
1422fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1423 match loc {
1424 LocationId::Tick(id, inner) => {
1425 *id = uf_find(uf, *id);
1426 remap_location(inner, uf);
1427 }
1428 LocationId::Atomic(inner) => {
1429 remap_location(inner, uf);
1430 }
1431 LocationId::Process(_) | LocationId::Cluster(_) => {}
1432 }
1433}
1434
1435#[cfg(feature = "build")]
1436fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1437 let p = *parent.get(&x).unwrap_or(&x);
1438 if p == x {
1439 return x;
1440 }
1441 let root = uf_find(parent, p);
1442 parent.insert(x, root);
1443 root
1444}
1445
1446#[cfg(feature = "build")]
1447fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1448 let ra = uf_find(parent, a);
1449 let rb = uf_find(parent, b);
1450 if ra != rb {
1451 parent.insert(ra, rb);
1452 }
1453}
1454
1455#[cfg(feature = "build")]
1459pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1460 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1461
1462 transform_bottom_up(
1464 ir,
1465 &mut |_| {},
1466 &mut |node: &mut HydroNode| {
1467 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1468 node
1469 && let (Some(a), Some(b)) = (
1470 tick_of(&inner.metadata().location_id),
1471 tick_of(&metadata.location_id),
1472 )
1473 {
1474 uf_union(&mut uf, a, b);
1475 }
1476 },
1477 false,
1478 );
1479
1480 transform_bottom_up(
1482 ir,
1483 &mut |_| {},
1484 &mut |node: &mut HydroNode| {
1485 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1486 },
1487 false,
1488 );
1489}
1490
1491#[cfg(feature = "build")]
1492pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1493 let mut builders = SecondaryMap::new();
1494 let mut seen_tees = HashMap::new();
1495 let mut built_tees = HashMap::new();
1496 let mut next_stmt_id = 0;
1497 for leaf in ir {
1498 leaf.emit(
1499 &mut builders,
1500 &mut seen_tees,
1501 &mut built_tees,
1502 &mut next_stmt_id,
1503 );
1504 }
1505 builders
1506}
1507
1508#[cfg(feature = "build")]
1509pub fn traverse_dfir(
1510 ir: &mut [HydroRoot],
1511 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1512 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1513) {
1514 let mut seen_tees = HashMap::new();
1515 let mut built_tees = HashMap::new();
1516 let mut next_stmt_id = 0;
1517 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1518 ir.iter_mut().for_each(|leaf| {
1519 leaf.emit_core(
1520 &mut callback,
1521 &mut seen_tees,
1522 &mut built_tees,
1523 &mut next_stmt_id,
1524 );
1525 });
1526}
1527
1528pub fn transform_bottom_up(
1529 ir: &mut [HydroRoot],
1530 transform_root: &mut impl FnMut(&mut HydroRoot),
1531 transform_node: &mut impl FnMut(&mut HydroNode),
1532 check_well_formed: bool,
1533) {
1534 let mut seen_tees = HashMap::new();
1535 ir.iter_mut().for_each(|leaf| {
1536 leaf.transform_bottom_up(
1537 transform_root,
1538 transform_node,
1539 &mut seen_tees,
1540 check_well_formed,
1541 );
1542 });
1543}
1544
1545pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1546 let mut seen_tees = HashMap::new();
1547 ir.iter()
1548 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1549 .collect()
1550}
1551
1552type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1553thread_local! {
1554 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1555}
1556
1557pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1558 PRINTED_TEES.with(|printed_tees| {
1559 let mut printed_tees_mut = printed_tees.borrow_mut();
1560 *printed_tees_mut = Some((0, HashMap::new()));
1561 drop(printed_tees_mut);
1562
1563 let ret = f();
1564
1565 let mut printed_tees_mut = printed_tees.borrow_mut();
1566 *printed_tees_mut = None;
1567
1568 ret
1569 })
1570}
1571
1572pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1573
1574impl SharedNode {
1575 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1576 Rc::as_ptr(&self.0)
1577 }
1578}
1579
1580impl Debug for SharedNode {
1581 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1582 PRINTED_TEES.with(|printed_tees| {
1583 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1584 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1585
1586 if let Some(printed_tees_mut) = printed_tees_mut {
1587 if let Some(existing) = printed_tees_mut
1588 .1
1589 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1590 {
1591 write!(f, "<shared {}>", existing)
1592 } else {
1593 let next_id = printed_tees_mut.0;
1594 printed_tees_mut.0 += 1;
1595 printed_tees_mut
1596 .1
1597 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1598 drop(printed_tees_mut_borrow);
1599 write!(f, "<shared {}>: ", next_id)?;
1600 Debug::fmt(&self.0.borrow(), f)
1601 }
1602 } else {
1603 drop(printed_tees_mut_borrow);
1604 write!(f, "<shared>: ")?;
1605 Debug::fmt(&self.0.borrow(), f)
1606 }
1607 })
1608 }
1609}
1610
1611impl Hash for SharedNode {
1612 fn hash<H: Hasher>(&self, state: &mut H) {
1613 self.0.borrow_mut().hash(state);
1614 }
1615}
1616
1617#[derive(Clone, PartialEq, Eq, Debug)]
1618pub enum BoundKind {
1619 Unbounded,
1620 Bounded,
1621}
1622
1623#[derive(Clone, PartialEq, Eq, Debug)]
1624pub enum StreamOrder {
1625 NoOrder,
1626 TotalOrder,
1627}
1628
1629#[derive(Clone, PartialEq, Eq, Debug)]
1630pub enum StreamRetry {
1631 AtLeastOnce,
1632 ExactlyOnce,
1633}
1634
1635#[derive(Clone, PartialEq, Eq, Debug)]
1636pub enum KeyedSingletonBoundKind {
1637 Unbounded,
1638 BoundedValue,
1639 Bounded,
1640}
1641
1642#[derive(Clone, PartialEq, Eq, Debug)]
1643pub enum CollectionKind {
1644 Stream {
1645 bound: BoundKind,
1646 order: StreamOrder,
1647 retry: StreamRetry,
1648 element_type: DebugType,
1649 },
1650 Singleton {
1651 bound: BoundKind,
1652 element_type: DebugType,
1653 },
1654 Optional {
1655 bound: BoundKind,
1656 element_type: DebugType,
1657 },
1658 KeyedStream {
1659 bound: BoundKind,
1660 value_order: StreamOrder,
1661 value_retry: StreamRetry,
1662 key_type: DebugType,
1663 value_type: DebugType,
1664 },
1665 KeyedSingleton {
1666 bound: KeyedSingletonBoundKind,
1667 key_type: DebugType,
1668 value_type: DebugType,
1669 },
1670}
1671
1672impl CollectionKind {
1673 pub fn is_bounded(&self) -> bool {
1674 matches!(
1675 self,
1676 CollectionKind::Stream {
1677 bound: BoundKind::Bounded,
1678 ..
1679 } | CollectionKind::Singleton {
1680 bound: BoundKind::Bounded,
1681 ..
1682 } | CollectionKind::Optional {
1683 bound: BoundKind::Bounded,
1684 ..
1685 } | CollectionKind::KeyedStream {
1686 bound: BoundKind::Bounded,
1687 ..
1688 } | CollectionKind::KeyedSingleton {
1689 bound: KeyedSingletonBoundKind::Bounded,
1690 ..
1691 }
1692 )
1693 }
1694}
1695
1696#[derive(Clone)]
1697pub struct HydroIrMetadata {
1698 pub location_id: LocationId,
1699 pub collection_kind: CollectionKind,
1700 pub cardinality: Option<usize>,
1701 pub tag: Option<String>,
1702 pub op: HydroIrOpMetadata,
1703}
1704
1705impl Hash for HydroIrMetadata {
1707 fn hash<H: Hasher>(&self, _: &mut H) {}
1708}
1709
1710impl PartialEq for HydroIrMetadata {
1711 fn eq(&self, _: &Self) -> bool {
1712 true
1713 }
1714}
1715
1716impl Eq for HydroIrMetadata {}
1717
1718impl Debug for HydroIrMetadata {
1719 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1720 f.debug_struct("HydroIrMetadata")
1721 .field("location_id", &self.location_id)
1722 .field("collection_kind", &self.collection_kind)
1723 .finish()
1724 }
1725}
1726
1727#[derive(Clone)]
1730pub struct HydroIrOpMetadata {
1731 pub backtrace: Backtrace,
1732 pub cpu_usage: Option<f64>,
1733 pub network_recv_cpu_usage: Option<f64>,
1734 pub id: Option<usize>,
1735}
1736
1737impl HydroIrOpMetadata {
1738 #[expect(
1739 clippy::new_without_default,
1740 reason = "explicit calls to new ensure correct backtrace bounds"
1741 )]
1742 pub fn new() -> HydroIrOpMetadata {
1743 Self::new_with_skip(1)
1744 }
1745
1746 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1747 HydroIrOpMetadata {
1748 backtrace: Backtrace::get_backtrace(2 + skip_count),
1749 cpu_usage: None,
1750 network_recv_cpu_usage: None,
1751 id: None,
1752 }
1753 }
1754}
1755
1756impl Debug for HydroIrOpMetadata {
1757 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1758 f.debug_struct("HydroIrOpMetadata").finish()
1759 }
1760}
1761
1762impl Hash for HydroIrOpMetadata {
1763 fn hash<H: Hasher>(&self, _: &mut H) {}
1764}
1765
1766#[derive(Debug, Hash)]
1769pub enum HydroNode {
1770 Placeholder,
1771
1772 Cast {
1780 inner: Box<HydroNode>,
1781 metadata: HydroIrMetadata,
1782 },
1783
1784 ObserveNonDet {
1790 inner: Box<HydroNode>,
1791 trusted: bool, metadata: HydroIrMetadata,
1793 },
1794
1795 Source {
1796 source: HydroSource,
1797 metadata: HydroIrMetadata,
1798 },
1799
1800 SingletonSource {
1801 value: DebugExpr,
1802 first_tick_only: bool,
1803 metadata: HydroIrMetadata,
1804 },
1805
1806 CycleSource {
1807 cycle_id: CycleId,
1808 metadata: HydroIrMetadata,
1809 },
1810
1811 Tee {
1812 inner: SharedNode,
1813 metadata: HydroIrMetadata,
1814 },
1815
1816 Partition {
1817 inner: SharedNode,
1818 f: DebugExpr,
1819 is_true: bool,
1820 metadata: HydroIrMetadata,
1821 },
1822
1823 BeginAtomic {
1824 inner: Box<HydroNode>,
1825 metadata: HydroIrMetadata,
1826 },
1827
1828 EndAtomic {
1829 inner: Box<HydroNode>,
1830 metadata: HydroIrMetadata,
1831 },
1832
1833 Batch {
1834 inner: Box<HydroNode>,
1835 metadata: HydroIrMetadata,
1836 },
1837
1838 YieldConcat {
1839 inner: Box<HydroNode>,
1840 metadata: HydroIrMetadata,
1841 },
1842
1843 Chain {
1844 first: Box<HydroNode>,
1845 second: Box<HydroNode>,
1846 metadata: HydroIrMetadata,
1847 },
1848
1849 ChainFirst {
1850 first: Box<HydroNode>,
1851 second: Box<HydroNode>,
1852 metadata: HydroIrMetadata,
1853 },
1854
1855 CrossProduct {
1856 left: Box<HydroNode>,
1857 right: Box<HydroNode>,
1858 metadata: HydroIrMetadata,
1859 },
1860
1861 CrossSingleton {
1862 left: Box<HydroNode>,
1863 right: Box<HydroNode>,
1864 metadata: HydroIrMetadata,
1865 },
1866
1867 Join {
1868 left: Box<HydroNode>,
1869 right: Box<HydroNode>,
1870 metadata: HydroIrMetadata,
1871 },
1872
1873 Difference {
1874 pos: Box<HydroNode>,
1875 neg: Box<HydroNode>,
1876 metadata: HydroIrMetadata,
1877 },
1878
1879 AntiJoin {
1880 pos: Box<HydroNode>,
1881 neg: Box<HydroNode>,
1882 metadata: HydroIrMetadata,
1883 },
1884
1885 ResolveFutures {
1886 input: Box<HydroNode>,
1887 metadata: HydroIrMetadata,
1888 },
1889 ResolveFuturesBlocking {
1890 input: Box<HydroNode>,
1891 metadata: HydroIrMetadata,
1892 },
1893 ResolveFuturesOrdered {
1894 input: Box<HydroNode>,
1895 metadata: HydroIrMetadata,
1896 },
1897
1898 Map {
1899 f: DebugExpr,
1900 input: Box<HydroNode>,
1901 metadata: HydroIrMetadata,
1902 },
1903 FlatMap {
1904 f: DebugExpr,
1905 input: Box<HydroNode>,
1906 metadata: HydroIrMetadata,
1907 },
1908 Filter {
1909 f: DebugExpr,
1910 input: Box<HydroNode>,
1911 metadata: HydroIrMetadata,
1912 },
1913 FilterMap {
1914 f: DebugExpr,
1915 input: Box<HydroNode>,
1916 metadata: HydroIrMetadata,
1917 },
1918
1919 DeferTick {
1920 input: Box<HydroNode>,
1921 metadata: HydroIrMetadata,
1922 },
1923 Enumerate {
1924 input: Box<HydroNode>,
1925 metadata: HydroIrMetadata,
1926 },
1927 Inspect {
1928 f: DebugExpr,
1929 input: Box<HydroNode>,
1930 metadata: HydroIrMetadata,
1931 },
1932
1933 Unique {
1934 input: Box<HydroNode>,
1935 metadata: HydroIrMetadata,
1936 },
1937
1938 Sort {
1939 input: Box<HydroNode>,
1940 metadata: HydroIrMetadata,
1941 },
1942 Fold {
1943 init: DebugExpr,
1944 acc: DebugExpr,
1945 input: Box<HydroNode>,
1946 metadata: HydroIrMetadata,
1947 },
1948
1949 Scan {
1950 init: DebugExpr,
1951 acc: DebugExpr,
1952 input: Box<HydroNode>,
1953 metadata: HydroIrMetadata,
1954 },
1955 FoldKeyed {
1956 init: DebugExpr,
1957 acc: DebugExpr,
1958 input: Box<HydroNode>,
1959 metadata: HydroIrMetadata,
1960 },
1961
1962 Reduce {
1963 f: DebugExpr,
1964 input: Box<HydroNode>,
1965 metadata: HydroIrMetadata,
1966 },
1967 ReduceKeyed {
1968 f: DebugExpr,
1969 input: Box<HydroNode>,
1970 metadata: HydroIrMetadata,
1971 },
1972 ReduceKeyedWatermark {
1973 f: DebugExpr,
1974 input: Box<HydroNode>,
1975 watermark: Box<HydroNode>,
1976 metadata: HydroIrMetadata,
1977 },
1978
1979 Network {
1980 name: Option<String>,
1981 networking_info: crate::networking::NetworkingInfo,
1982 serialize_fn: Option<DebugExpr>,
1983 instantiate_fn: DebugInstantiate,
1984 deserialize_fn: Option<DebugExpr>,
1985 input: Box<HydroNode>,
1986 metadata: HydroIrMetadata,
1987 },
1988
1989 ExternalInput {
1990 from_external_key: LocationKey,
1991 from_port_id: ExternalPortId,
1992 from_many: bool,
1993 codec_type: DebugType,
1994 port_hint: NetworkHint,
1995 instantiate_fn: DebugInstantiate,
1996 deserialize_fn: Option<DebugExpr>,
1997 metadata: HydroIrMetadata,
1998 },
1999
2000 Counter {
2001 tag: String,
2002 duration: DebugExpr,
2003 prefix: String,
2004 input: Box<HydroNode>,
2005 metadata: HydroIrMetadata,
2006 },
2007}
2008
2009pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2010pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2011
2012impl HydroNode {
2013 pub fn transform_bottom_up(
2014 &mut self,
2015 transform: &mut impl FnMut(&mut HydroNode),
2016 seen_tees: &mut SeenSharedNodes,
2017 check_well_formed: bool,
2018 ) {
2019 self.transform_children(
2020 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2021 seen_tees,
2022 );
2023
2024 transform(self);
2025
2026 let self_location = self.metadata().location_id.root();
2027
2028 if check_well_formed {
2029 match &*self {
2030 HydroNode::Network { .. } => {}
2031 _ => {
2032 self.input_metadata().iter().for_each(|i| {
2033 if i.location_id.root() != self_location {
2034 panic!(
2035 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2036 i,
2037 i.location_id.root(),
2038 self,
2039 self_location
2040 )
2041 }
2042 });
2043 }
2044 }
2045 }
2046 }
2047
2048 #[inline(always)]
2049 pub fn transform_children(
2050 &mut self,
2051 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2052 seen_tees: &mut SeenSharedNodes,
2053 ) {
2054 match self {
2055 HydroNode::Placeholder => {
2056 panic!();
2057 }
2058
2059 HydroNode::Source { .. }
2060 | HydroNode::SingletonSource { .. }
2061 | HydroNode::CycleSource { .. }
2062 | HydroNode::ExternalInput { .. } => {}
2063
2064 HydroNode::Tee { inner, .. } => {
2065 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2066 *inner = SharedNode(transformed.clone());
2067 } else {
2068 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2069 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2070 let mut orig = inner.0.replace(HydroNode::Placeholder);
2071 transform(&mut orig, seen_tees);
2072 *transformed_cell.borrow_mut() = orig;
2073 *inner = SharedNode(transformed_cell);
2074 }
2075 }
2076
2077 HydroNode::Partition { inner, .. } => {
2078 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2079 *inner = SharedNode(transformed.clone());
2080 } else {
2081 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2082 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2083 let mut orig = inner.0.replace(HydroNode::Placeholder);
2084 transform(&mut orig, seen_tees);
2085 *transformed_cell.borrow_mut() = orig;
2086 *inner = SharedNode(transformed_cell);
2087 }
2088 }
2089
2090 HydroNode::Cast { inner, .. }
2091 | HydroNode::ObserveNonDet { inner, .. }
2092 | HydroNode::BeginAtomic { inner, .. }
2093 | HydroNode::EndAtomic { inner, .. }
2094 | HydroNode::Batch { inner, .. }
2095 | HydroNode::YieldConcat { inner, .. } => {
2096 transform(inner.as_mut(), seen_tees);
2097 }
2098
2099 HydroNode::Chain { first, second, .. } => {
2100 transform(first.as_mut(), seen_tees);
2101 transform(second.as_mut(), seen_tees);
2102 }
2103
2104 HydroNode::ChainFirst { first, second, .. } => {
2105 transform(first.as_mut(), seen_tees);
2106 transform(second.as_mut(), seen_tees);
2107 }
2108
2109 HydroNode::CrossSingleton { left, right, .. }
2110 | HydroNode::CrossProduct { left, right, .. }
2111 | HydroNode::Join { left, right, .. } => {
2112 transform(left.as_mut(), seen_tees);
2113 transform(right.as_mut(), seen_tees);
2114 }
2115
2116 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2117 transform(pos.as_mut(), seen_tees);
2118 transform(neg.as_mut(), seen_tees);
2119 }
2120
2121 HydroNode::ReduceKeyedWatermark {
2122 input, watermark, ..
2123 } => {
2124 transform(input.as_mut(), seen_tees);
2125 transform(watermark.as_mut(), seen_tees);
2126 }
2127
2128 HydroNode::Map { input, .. }
2129 | HydroNode::ResolveFutures { input, .. }
2130 | HydroNode::ResolveFuturesBlocking { input, .. }
2131 | HydroNode::ResolveFuturesOrdered { input, .. }
2132 | HydroNode::FlatMap { input, .. }
2133 | HydroNode::Filter { input, .. }
2134 | HydroNode::FilterMap { input, .. }
2135 | HydroNode::Sort { input, .. }
2136 | HydroNode::DeferTick { input, .. }
2137 | HydroNode::Enumerate { input, .. }
2138 | HydroNode::Inspect { input, .. }
2139 | HydroNode::Unique { input, .. }
2140 | HydroNode::Network { input, .. }
2141 | HydroNode::Fold { input, .. }
2142 | HydroNode::Scan { input, .. }
2143 | HydroNode::FoldKeyed { input, .. }
2144 | HydroNode::Reduce { input, .. }
2145 | HydroNode::ReduceKeyed { input, .. }
2146 | HydroNode::Counter { input, .. } => {
2147 transform(input.as_mut(), seen_tees);
2148 }
2149 }
2150 }
2151
2152 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2153 match self {
2154 HydroNode::Placeholder => HydroNode::Placeholder,
2155 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2156 inner: Box::new(inner.deep_clone(seen_tees)),
2157 metadata: metadata.clone(),
2158 },
2159 HydroNode::ObserveNonDet {
2160 inner,
2161 trusted,
2162 metadata,
2163 } => HydroNode::ObserveNonDet {
2164 inner: Box::new(inner.deep_clone(seen_tees)),
2165 trusted: *trusted,
2166 metadata: metadata.clone(),
2167 },
2168 HydroNode::Source { source, metadata } => HydroNode::Source {
2169 source: source.clone(),
2170 metadata: metadata.clone(),
2171 },
2172 HydroNode::SingletonSource {
2173 value,
2174 first_tick_only,
2175 metadata,
2176 } => HydroNode::SingletonSource {
2177 value: value.clone(),
2178 first_tick_only: *first_tick_only,
2179 metadata: metadata.clone(),
2180 },
2181 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2182 cycle_id: *cycle_id,
2183 metadata: metadata.clone(),
2184 },
2185 HydroNode::Tee { inner, metadata } => {
2186 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2187 HydroNode::Tee {
2188 inner: SharedNode(transformed.clone()),
2189 metadata: metadata.clone(),
2190 }
2191 } else {
2192 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2193 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2194 let cloned = inner.0.borrow().deep_clone(seen_tees);
2195 *new_rc.borrow_mut() = cloned;
2196 HydroNode::Tee {
2197 inner: SharedNode(new_rc),
2198 metadata: metadata.clone(),
2199 }
2200 }
2201 }
2202 HydroNode::Partition {
2203 inner,
2204 f,
2205 is_true,
2206 metadata,
2207 } => {
2208 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2209 HydroNode::Partition {
2210 inner: SharedNode(transformed.clone()),
2211 f: f.clone(),
2212 is_true: *is_true,
2213 metadata: metadata.clone(),
2214 }
2215 } else {
2216 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2217 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2218 let cloned = inner.0.borrow().deep_clone(seen_tees);
2219 *new_rc.borrow_mut() = cloned;
2220 HydroNode::Partition {
2221 inner: SharedNode(new_rc),
2222 f: f.clone(),
2223 is_true: *is_true,
2224 metadata: metadata.clone(),
2225 }
2226 }
2227 }
2228 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2229 inner: Box::new(inner.deep_clone(seen_tees)),
2230 metadata: metadata.clone(),
2231 },
2232 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2233 inner: Box::new(inner.deep_clone(seen_tees)),
2234 metadata: metadata.clone(),
2235 },
2236 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2237 inner: Box::new(inner.deep_clone(seen_tees)),
2238 metadata: metadata.clone(),
2239 },
2240 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2241 inner: Box::new(inner.deep_clone(seen_tees)),
2242 metadata: metadata.clone(),
2243 },
2244 HydroNode::Chain {
2245 first,
2246 second,
2247 metadata,
2248 } => HydroNode::Chain {
2249 first: Box::new(first.deep_clone(seen_tees)),
2250 second: Box::new(second.deep_clone(seen_tees)),
2251 metadata: metadata.clone(),
2252 },
2253 HydroNode::ChainFirst {
2254 first,
2255 second,
2256 metadata,
2257 } => HydroNode::ChainFirst {
2258 first: Box::new(first.deep_clone(seen_tees)),
2259 second: Box::new(second.deep_clone(seen_tees)),
2260 metadata: metadata.clone(),
2261 },
2262 HydroNode::CrossProduct {
2263 left,
2264 right,
2265 metadata,
2266 } => HydroNode::CrossProduct {
2267 left: Box::new(left.deep_clone(seen_tees)),
2268 right: Box::new(right.deep_clone(seen_tees)),
2269 metadata: metadata.clone(),
2270 },
2271 HydroNode::CrossSingleton {
2272 left,
2273 right,
2274 metadata,
2275 } => HydroNode::CrossSingleton {
2276 left: Box::new(left.deep_clone(seen_tees)),
2277 right: Box::new(right.deep_clone(seen_tees)),
2278 metadata: metadata.clone(),
2279 },
2280 HydroNode::Join {
2281 left,
2282 right,
2283 metadata,
2284 } => HydroNode::Join {
2285 left: Box::new(left.deep_clone(seen_tees)),
2286 right: Box::new(right.deep_clone(seen_tees)),
2287 metadata: metadata.clone(),
2288 },
2289 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2290 pos: Box::new(pos.deep_clone(seen_tees)),
2291 neg: Box::new(neg.deep_clone(seen_tees)),
2292 metadata: metadata.clone(),
2293 },
2294 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2295 pos: Box::new(pos.deep_clone(seen_tees)),
2296 neg: Box::new(neg.deep_clone(seen_tees)),
2297 metadata: metadata.clone(),
2298 },
2299 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2300 input: Box::new(input.deep_clone(seen_tees)),
2301 metadata: metadata.clone(),
2302 },
2303 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2304 HydroNode::ResolveFuturesBlocking {
2305 input: Box::new(input.deep_clone(seen_tees)),
2306 metadata: metadata.clone(),
2307 }
2308 }
2309 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2310 HydroNode::ResolveFuturesOrdered {
2311 input: Box::new(input.deep_clone(seen_tees)),
2312 metadata: metadata.clone(),
2313 }
2314 }
2315 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2316 f: f.clone(),
2317 input: Box::new(input.deep_clone(seen_tees)),
2318 metadata: metadata.clone(),
2319 },
2320 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2321 f: f.clone(),
2322 input: Box::new(input.deep_clone(seen_tees)),
2323 metadata: metadata.clone(),
2324 },
2325 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2326 f: f.clone(),
2327 input: Box::new(input.deep_clone(seen_tees)),
2328 metadata: metadata.clone(),
2329 },
2330 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2331 f: f.clone(),
2332 input: Box::new(input.deep_clone(seen_tees)),
2333 metadata: metadata.clone(),
2334 },
2335 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2336 input: Box::new(input.deep_clone(seen_tees)),
2337 metadata: metadata.clone(),
2338 },
2339 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2340 input: Box::new(input.deep_clone(seen_tees)),
2341 metadata: metadata.clone(),
2342 },
2343 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2344 f: f.clone(),
2345 input: Box::new(input.deep_clone(seen_tees)),
2346 metadata: metadata.clone(),
2347 },
2348 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2349 input: Box::new(input.deep_clone(seen_tees)),
2350 metadata: metadata.clone(),
2351 },
2352 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2353 input: Box::new(input.deep_clone(seen_tees)),
2354 metadata: metadata.clone(),
2355 },
2356 HydroNode::Fold {
2357 init,
2358 acc,
2359 input,
2360 metadata,
2361 } => HydroNode::Fold {
2362 init: init.clone(),
2363 acc: acc.clone(),
2364 input: Box::new(input.deep_clone(seen_tees)),
2365 metadata: metadata.clone(),
2366 },
2367 HydroNode::Scan {
2368 init,
2369 acc,
2370 input,
2371 metadata,
2372 } => HydroNode::Scan {
2373 init: init.clone(),
2374 acc: acc.clone(),
2375 input: Box::new(input.deep_clone(seen_tees)),
2376 metadata: metadata.clone(),
2377 },
2378 HydroNode::FoldKeyed {
2379 init,
2380 acc,
2381 input,
2382 metadata,
2383 } => HydroNode::FoldKeyed {
2384 init: init.clone(),
2385 acc: acc.clone(),
2386 input: Box::new(input.deep_clone(seen_tees)),
2387 metadata: metadata.clone(),
2388 },
2389 HydroNode::ReduceKeyedWatermark {
2390 f,
2391 input,
2392 watermark,
2393 metadata,
2394 } => HydroNode::ReduceKeyedWatermark {
2395 f: f.clone(),
2396 input: Box::new(input.deep_clone(seen_tees)),
2397 watermark: Box::new(watermark.deep_clone(seen_tees)),
2398 metadata: metadata.clone(),
2399 },
2400 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2401 f: f.clone(),
2402 input: Box::new(input.deep_clone(seen_tees)),
2403 metadata: metadata.clone(),
2404 },
2405 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2406 f: f.clone(),
2407 input: Box::new(input.deep_clone(seen_tees)),
2408 metadata: metadata.clone(),
2409 },
2410 HydroNode::Network {
2411 name,
2412 networking_info,
2413 serialize_fn,
2414 instantiate_fn,
2415 deserialize_fn,
2416 input,
2417 metadata,
2418 } => HydroNode::Network {
2419 name: name.clone(),
2420 networking_info: networking_info.clone(),
2421 serialize_fn: serialize_fn.clone(),
2422 instantiate_fn: instantiate_fn.clone(),
2423 deserialize_fn: deserialize_fn.clone(),
2424 input: Box::new(input.deep_clone(seen_tees)),
2425 metadata: metadata.clone(),
2426 },
2427 HydroNode::ExternalInput {
2428 from_external_key,
2429 from_port_id,
2430 from_many,
2431 codec_type,
2432 port_hint,
2433 instantiate_fn,
2434 deserialize_fn,
2435 metadata,
2436 } => HydroNode::ExternalInput {
2437 from_external_key: *from_external_key,
2438 from_port_id: *from_port_id,
2439 from_many: *from_many,
2440 codec_type: codec_type.clone(),
2441 port_hint: *port_hint,
2442 instantiate_fn: instantiate_fn.clone(),
2443 deserialize_fn: deserialize_fn.clone(),
2444 metadata: metadata.clone(),
2445 },
2446 HydroNode::Counter {
2447 tag,
2448 duration,
2449 prefix,
2450 input,
2451 metadata,
2452 } => HydroNode::Counter {
2453 tag: tag.clone(),
2454 duration: duration.clone(),
2455 prefix: prefix.clone(),
2456 input: Box::new(input.deep_clone(seen_tees)),
2457 metadata: metadata.clone(),
2458 },
2459 }
2460 }
2461
2462 #[cfg(feature = "build")]
2463 pub fn emit_core(
2464 &mut self,
2465 builders_or_callback: &mut BuildersOrCallback<
2466 impl FnMut(&mut HydroRoot, &mut usize),
2467 impl FnMut(&mut HydroNode, &mut usize),
2468 >,
2469 seen_tees: &mut SeenSharedNodes,
2470 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2471 next_stmt_id: &mut usize,
2472 ) -> syn::Ident {
2473 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2474
2475 self.transform_bottom_up(
2476 &mut |node: &mut HydroNode| {
2477 let out_location = node.metadata().location_id.clone();
2478 match node {
2479 HydroNode::Placeholder => {
2480 panic!()
2481 }
2482
2483 HydroNode::Cast { .. } => {
2484 match builders_or_callback {
2487 BuildersOrCallback::Builders(_) => {}
2488 BuildersOrCallback::Callback(_, node_callback) => {
2489 node_callback(node, next_stmt_id);
2490 }
2491 }
2492
2493 *next_stmt_id += 1;
2494 }
2496
2497 HydroNode::ObserveNonDet {
2498 inner,
2499 trusted,
2500 metadata,
2501 ..
2502 } => {
2503 let inner_ident = ident_stack.pop().unwrap();
2504
2505 let observe_ident =
2506 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2507
2508 match builders_or_callback {
2509 BuildersOrCallback::Builders(graph_builders) => {
2510 graph_builders.observe_nondet(
2511 *trusted,
2512 &inner.metadata().location_id,
2513 inner_ident,
2514 &inner.metadata().collection_kind,
2515 &observe_ident,
2516 &metadata.collection_kind,
2517 &metadata.op,
2518 );
2519 }
2520 BuildersOrCallback::Callback(_, node_callback) => {
2521 node_callback(node, next_stmt_id);
2522 }
2523 }
2524
2525 *next_stmt_id += 1;
2526
2527 ident_stack.push(observe_ident);
2528 }
2529
2530 HydroNode::Batch {
2531 inner, metadata, ..
2532 } => {
2533 let inner_ident = ident_stack.pop().unwrap();
2534
2535 let batch_ident =
2536 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2537
2538 match builders_or_callback {
2539 BuildersOrCallback::Builders(graph_builders) => {
2540 graph_builders.batch(
2541 inner_ident,
2542 &inner.metadata().location_id,
2543 &inner.metadata().collection_kind,
2544 &batch_ident,
2545 &out_location,
2546 &metadata.op,
2547 );
2548 }
2549 BuildersOrCallback::Callback(_, node_callback) => {
2550 node_callback(node, next_stmt_id);
2551 }
2552 }
2553
2554 *next_stmt_id += 1;
2555
2556 ident_stack.push(batch_ident);
2557 }
2558
2559 HydroNode::YieldConcat { inner, .. } => {
2560 let inner_ident = ident_stack.pop().unwrap();
2561
2562 let yield_ident =
2563 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2564
2565 match builders_or_callback {
2566 BuildersOrCallback::Builders(graph_builders) => {
2567 graph_builders.yield_from_tick(
2568 inner_ident,
2569 &inner.metadata().location_id,
2570 &inner.metadata().collection_kind,
2571 &yield_ident,
2572 &out_location,
2573 );
2574 }
2575 BuildersOrCallback::Callback(_, node_callback) => {
2576 node_callback(node, next_stmt_id);
2577 }
2578 }
2579
2580 *next_stmt_id += 1;
2581
2582 ident_stack.push(yield_ident);
2583 }
2584
2585 HydroNode::BeginAtomic { inner, metadata } => {
2586 let inner_ident = ident_stack.pop().unwrap();
2587
2588 let begin_ident =
2589 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2590
2591 match builders_or_callback {
2592 BuildersOrCallback::Builders(graph_builders) => {
2593 graph_builders.begin_atomic(
2594 inner_ident,
2595 &inner.metadata().location_id,
2596 &inner.metadata().collection_kind,
2597 &begin_ident,
2598 &out_location,
2599 &metadata.op,
2600 );
2601 }
2602 BuildersOrCallback::Callback(_, node_callback) => {
2603 node_callback(node, next_stmt_id);
2604 }
2605 }
2606
2607 *next_stmt_id += 1;
2608
2609 ident_stack.push(begin_ident);
2610 }
2611
2612 HydroNode::EndAtomic { inner, .. } => {
2613 let inner_ident = ident_stack.pop().unwrap();
2614
2615 let end_ident =
2616 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2617
2618 match builders_or_callback {
2619 BuildersOrCallback::Builders(graph_builders) => {
2620 graph_builders.end_atomic(
2621 inner_ident,
2622 &inner.metadata().location_id,
2623 &inner.metadata().collection_kind,
2624 &end_ident,
2625 );
2626 }
2627 BuildersOrCallback::Callback(_, node_callback) => {
2628 node_callback(node, next_stmt_id);
2629 }
2630 }
2631
2632 *next_stmt_id += 1;
2633
2634 ident_stack.push(end_ident);
2635 }
2636
2637 HydroNode::Source {
2638 source, metadata, ..
2639 } => {
2640 if let HydroSource::ExternalNetwork() = source {
2641 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2642 } else {
2643 let source_ident =
2644 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2645
2646 let source_stmt = match source {
2647 HydroSource::Stream(expr) => {
2648 debug_assert!(metadata.location_id.is_top_level());
2649 parse_quote! {
2650 #source_ident = source_stream(#expr);
2651 }
2652 }
2653
2654 HydroSource::ExternalNetwork() => {
2655 unreachable!()
2656 }
2657
2658 HydroSource::Iter(expr) => {
2659 if metadata.location_id.is_top_level() {
2660 parse_quote! {
2661 #source_ident = source_iter(#expr);
2662 }
2663 } else {
2664 parse_quote! {
2666 #source_ident = source_iter(#expr) -> persist::<'static>();
2667 }
2668 }
2669 }
2670
2671 HydroSource::Spin() => {
2672 debug_assert!(metadata.location_id.is_top_level());
2673 parse_quote! {
2674 #source_ident = spin();
2675 }
2676 }
2677
2678 HydroSource::ClusterMembers(target_loc, state) => {
2679 debug_assert!(metadata.location_id.is_top_level());
2680
2681 let members_tee_ident = syn::Ident::new(
2682 &format!(
2683 "__cluster_members_tee_{}_{}",
2684 metadata.location_id.root().key(),
2685 target_loc.key(),
2686 ),
2687 Span::call_site(),
2688 );
2689
2690 match state {
2691 ClusterMembersState::Stream(d) => {
2692 parse_quote! {
2693 #members_tee_ident = source_stream(#d) -> tee();
2694 #source_ident = #members_tee_ident;
2695 }
2696 },
2697 ClusterMembersState::Uninit => syn::parse_quote! {
2698 #source_ident = source_stream(DUMMY);
2699 },
2700 ClusterMembersState::Tee(..) => parse_quote! {
2701 #source_ident = #members_tee_ident;
2702 },
2703 }
2704 }
2705
2706 HydroSource::Embedded(ident) => {
2707 parse_quote! {
2708 #source_ident = source_stream(#ident);
2709 }
2710 }
2711
2712 HydroSource::EmbeddedSingleton(ident) => {
2713 parse_quote! {
2714 #source_ident = source_iter([#ident]);
2715 }
2716 }
2717 };
2718
2719 match builders_or_callback {
2720 BuildersOrCallback::Builders(graph_builders) => {
2721 let builder = graph_builders.get_dfir_mut(&out_location);
2722 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2723 }
2724 BuildersOrCallback::Callback(_, node_callback) => {
2725 node_callback(node, next_stmt_id);
2726 }
2727 }
2728
2729 *next_stmt_id += 1;
2730
2731 ident_stack.push(source_ident);
2732 }
2733 }
2734
2735 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2736 let source_ident =
2737 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2738
2739 match builders_or_callback {
2740 BuildersOrCallback::Builders(graph_builders) => {
2741 let builder = graph_builders.get_dfir_mut(&out_location);
2742
2743 if *first_tick_only {
2744 assert!(
2745 !metadata.location_id.is_top_level(),
2746 "first_tick_only SingletonSource must be inside a tick"
2747 );
2748 }
2749
2750 if *first_tick_only
2751 || (metadata.location_id.is_top_level()
2752 && metadata.collection_kind.is_bounded())
2753 {
2754 builder.add_dfir(
2755 parse_quote! {
2756 #source_ident = source_iter([#value]);
2757 },
2758 None,
2759 Some(&next_stmt_id.to_string()),
2760 );
2761 } else {
2762 builder.add_dfir(
2763 parse_quote! {
2764 #source_ident = source_iter([#value]) -> persist::<'static>();
2765 },
2766 None,
2767 Some(&next_stmt_id.to_string()),
2768 );
2769 }
2770 }
2771 BuildersOrCallback::Callback(_, node_callback) => {
2772 node_callback(node, next_stmt_id);
2773 }
2774 }
2775
2776 *next_stmt_id += 1;
2777
2778 ident_stack.push(source_ident);
2779 }
2780
2781 HydroNode::CycleSource { cycle_id, .. } => {
2782 let ident = cycle_id.as_ident();
2783
2784 match builders_or_callback {
2785 BuildersOrCallback::Builders(_) => {}
2786 BuildersOrCallback::Callback(_, node_callback) => {
2787 node_callback(node, next_stmt_id);
2788 }
2789 }
2790
2791 *next_stmt_id += 1;
2793
2794 ident_stack.push(ident);
2795 }
2796
2797 HydroNode::Tee { inner, .. } => {
2798 let ret_ident = if let Some(built_idents) =
2799 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2800 {
2801 match builders_or_callback {
2802 BuildersOrCallback::Builders(_) => {}
2803 BuildersOrCallback::Callback(_, node_callback) => {
2804 node_callback(node, next_stmt_id);
2805 }
2806 }
2807
2808 built_idents[0].clone()
2809 } else {
2810 let inner_ident = ident_stack.pop().unwrap();
2813
2814 let tee_ident =
2815 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2816
2817 built_tees.insert(
2818 inner.0.as_ref() as *const RefCell<HydroNode>,
2819 vec![tee_ident.clone()],
2820 );
2821
2822 match builders_or_callback {
2823 BuildersOrCallback::Builders(graph_builders) => {
2824 let builder = graph_builders.get_dfir_mut(&out_location);
2825 builder.add_dfir(
2826 parse_quote! {
2827 #tee_ident = #inner_ident -> tee();
2828 },
2829 None,
2830 Some(&next_stmt_id.to_string()),
2831 );
2832 }
2833 BuildersOrCallback::Callback(_, node_callback) => {
2834 node_callback(node, next_stmt_id);
2835 }
2836 }
2837
2838 tee_ident
2839 };
2840
2841 *next_stmt_id += 1;
2845 ident_stack.push(ret_ident);
2846 }
2847
2848 HydroNode::Partition {
2849 inner, f, is_true, ..
2850 } => {
2851 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2853 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2854 match builders_or_callback {
2855 BuildersOrCallback::Builders(_) => {}
2856 BuildersOrCallback::Callback(_, node_callback) => {
2857 node_callback(node, next_stmt_id);
2858 }
2859 }
2860
2861 let idx = if is_true { 0 } else { 1 };
2862 built_idents[idx].clone()
2863 } else {
2864 let inner_ident = ident_stack.pop().unwrap();
2867
2868 let partition_ident = syn::Ident::new(
2869 &format!("stream_{}_partition", *next_stmt_id),
2870 Span::call_site(),
2871 );
2872 let true_ident = syn::Ident::new(
2873 &format!("stream_{}_true", *next_stmt_id),
2874 Span::call_site(),
2875 );
2876 let false_ident = syn::Ident::new(
2877 &format!("stream_{}_false", *next_stmt_id),
2878 Span::call_site(),
2879 );
2880
2881 built_tees.insert(
2882 ptr,
2883 vec![true_ident.clone(), false_ident.clone()],
2884 );
2885
2886 match builders_or_callback {
2887 BuildersOrCallback::Builders(graph_builders) => {
2888 let builder = graph_builders.get_dfir_mut(&out_location);
2889 builder.add_dfir(
2890 parse_quote! {
2891 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2892 #true_ident = #partition_ident[0];
2893 #false_ident = #partition_ident[1];
2894 },
2895 None,
2896 Some(&next_stmt_id.to_string()),
2897 );
2898 }
2899 BuildersOrCallback::Callback(_, node_callback) => {
2900 node_callback(node, next_stmt_id);
2901 }
2902 }
2903
2904 if is_true { true_ident } else { false_ident }
2905 };
2906
2907 *next_stmt_id += 1;
2908 ident_stack.push(ret_ident);
2909 }
2910
2911 HydroNode::Chain { .. } => {
2912 let second_ident = ident_stack.pop().unwrap();
2914 let first_ident = ident_stack.pop().unwrap();
2915
2916 let chain_ident =
2917 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2918
2919 match builders_or_callback {
2920 BuildersOrCallback::Builders(graph_builders) => {
2921 let builder = graph_builders.get_dfir_mut(&out_location);
2922 builder.add_dfir(
2923 parse_quote! {
2924 #chain_ident = chain();
2925 #first_ident -> [0]#chain_ident;
2926 #second_ident -> [1]#chain_ident;
2927 },
2928 None,
2929 Some(&next_stmt_id.to_string()),
2930 );
2931 }
2932 BuildersOrCallback::Callback(_, node_callback) => {
2933 node_callback(node, next_stmt_id);
2934 }
2935 }
2936
2937 *next_stmt_id += 1;
2938
2939 ident_stack.push(chain_ident);
2940 }
2941
2942 HydroNode::ChainFirst { .. } => {
2943 let second_ident = ident_stack.pop().unwrap();
2944 let first_ident = ident_stack.pop().unwrap();
2945
2946 let chain_ident =
2947 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2948
2949 match builders_or_callback {
2950 BuildersOrCallback::Builders(graph_builders) => {
2951 let builder = graph_builders.get_dfir_mut(&out_location);
2952 builder.add_dfir(
2953 parse_quote! {
2954 #chain_ident = chain_first_n(1);
2955 #first_ident -> [0]#chain_ident;
2956 #second_ident -> [1]#chain_ident;
2957 },
2958 None,
2959 Some(&next_stmt_id.to_string()),
2960 );
2961 }
2962 BuildersOrCallback::Callback(_, node_callback) => {
2963 node_callback(node, next_stmt_id);
2964 }
2965 }
2966
2967 *next_stmt_id += 1;
2968
2969 ident_stack.push(chain_ident);
2970 }
2971
2972 HydroNode::CrossSingleton { right, .. } => {
2973 let right_ident = ident_stack.pop().unwrap();
2974 let left_ident = ident_stack.pop().unwrap();
2975
2976 let cross_ident =
2977 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2978
2979 match builders_or_callback {
2980 BuildersOrCallback::Builders(graph_builders) => {
2981 let builder = graph_builders.get_dfir_mut(&out_location);
2982
2983 if right.metadata().location_id.is_top_level()
2984 && right.metadata().collection_kind.is_bounded()
2985 {
2986 builder.add_dfir(
2987 parse_quote! {
2988 #cross_ident = cross_singleton();
2989 #left_ident -> [input]#cross_ident;
2990 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2991 },
2992 None,
2993 Some(&next_stmt_id.to_string()),
2994 );
2995 } else {
2996 builder.add_dfir(
2997 parse_quote! {
2998 #cross_ident = cross_singleton();
2999 #left_ident -> [input]#cross_ident;
3000 #right_ident -> [single]#cross_ident;
3001 },
3002 None,
3003 Some(&next_stmt_id.to_string()),
3004 );
3005 }
3006 }
3007 BuildersOrCallback::Callback(_, node_callback) => {
3008 node_callback(node, next_stmt_id);
3009 }
3010 }
3011
3012 *next_stmt_id += 1;
3013
3014 ident_stack.push(cross_ident);
3015 }
3016
3017 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3018 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3019 parse_quote!(cross_join_multiset)
3020 } else {
3021 parse_quote!(join_multiset)
3022 };
3023
3024 let (HydroNode::CrossProduct { left, right, .. }
3025 | HydroNode::Join { left, right, .. }) = node
3026 else {
3027 unreachable!()
3028 };
3029
3030 let is_top_level = left.metadata().location_id.is_top_level()
3031 && right.metadata().location_id.is_top_level();
3032 let left_lifetime = if left.metadata().location_id.is_top_level() {
3033 quote!('static)
3034 } else {
3035 quote!('tick)
3036 };
3037
3038 let right_lifetime = if right.metadata().location_id.is_top_level() {
3039 quote!('static)
3040 } else {
3041 quote!('tick)
3042 };
3043
3044 let right_ident = ident_stack.pop().unwrap();
3045 let left_ident = ident_stack.pop().unwrap();
3046
3047 let stream_ident =
3048 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3049
3050 match builders_or_callback {
3051 BuildersOrCallback::Builders(graph_builders) => {
3052 let builder = graph_builders.get_dfir_mut(&out_location);
3053 builder.add_dfir(
3054 if is_top_level {
3055 parse_quote! {
3058 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3059 #left_ident -> [0]#stream_ident;
3060 #right_ident -> [1]#stream_ident;
3061 }
3062 } else {
3063 parse_quote! {
3064 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3065 #left_ident -> [0]#stream_ident;
3066 #right_ident -> [1]#stream_ident;
3067 }
3068 }
3069 ,
3070 None,
3071 Some(&next_stmt_id.to_string()),
3072 );
3073 }
3074 BuildersOrCallback::Callback(_, node_callback) => {
3075 node_callback(node, next_stmt_id);
3076 }
3077 }
3078
3079 *next_stmt_id += 1;
3080
3081 ident_stack.push(stream_ident);
3082 }
3083
3084 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3085 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3086 parse_quote!(difference)
3087 } else {
3088 parse_quote!(anti_join)
3089 };
3090
3091 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3092 node
3093 else {
3094 unreachable!()
3095 };
3096
3097 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3098 quote!('static)
3099 } else {
3100 quote!('tick)
3101 };
3102
3103 let neg_ident = ident_stack.pop().unwrap();
3104 let pos_ident = ident_stack.pop().unwrap();
3105
3106 let stream_ident =
3107 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3108
3109 match builders_or_callback {
3110 BuildersOrCallback::Builders(graph_builders) => {
3111 let builder = graph_builders.get_dfir_mut(&out_location);
3112 builder.add_dfir(
3113 parse_quote! {
3114 #stream_ident = #operator::<'tick, #neg_lifetime>();
3115 #pos_ident -> [pos]#stream_ident;
3116 #neg_ident -> [neg]#stream_ident;
3117 },
3118 None,
3119 Some(&next_stmt_id.to_string()),
3120 );
3121 }
3122 BuildersOrCallback::Callback(_, node_callback) => {
3123 node_callback(node, next_stmt_id);
3124 }
3125 }
3126
3127 *next_stmt_id += 1;
3128
3129 ident_stack.push(stream_ident);
3130 }
3131
3132 HydroNode::ResolveFutures { .. } => {
3133 let input_ident = ident_stack.pop().unwrap();
3134
3135 let futures_ident =
3136 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3137
3138 match builders_or_callback {
3139 BuildersOrCallback::Builders(graph_builders) => {
3140 let builder = graph_builders.get_dfir_mut(&out_location);
3141 builder.add_dfir(
3142 parse_quote! {
3143 #futures_ident = #input_ident -> resolve_futures();
3144 },
3145 None,
3146 Some(&next_stmt_id.to_string()),
3147 );
3148 }
3149 BuildersOrCallback::Callback(_, node_callback) => {
3150 node_callback(node, next_stmt_id);
3151 }
3152 }
3153
3154 *next_stmt_id += 1;
3155
3156 ident_stack.push(futures_ident);
3157 }
3158
3159 HydroNode::ResolveFuturesBlocking { .. } => {
3160 let input_ident = ident_stack.pop().unwrap();
3161
3162 let futures_ident =
3163 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3164
3165 match builders_or_callback {
3166 BuildersOrCallback::Builders(graph_builders) => {
3167 let builder = graph_builders.get_dfir_mut(&out_location);
3168 builder.add_dfir(
3169 parse_quote! {
3170 #futures_ident = #input_ident -> resolve_futures_blocking();
3171 },
3172 None,
3173 Some(&next_stmt_id.to_string()),
3174 );
3175 }
3176 BuildersOrCallback::Callback(_, node_callback) => {
3177 node_callback(node, next_stmt_id);
3178 }
3179 }
3180
3181 *next_stmt_id += 1;
3182
3183 ident_stack.push(futures_ident);
3184 }
3185
3186 HydroNode::ResolveFuturesOrdered { .. } => {
3187 let input_ident = ident_stack.pop().unwrap();
3188
3189 let futures_ident =
3190 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3191
3192 match builders_or_callback {
3193 BuildersOrCallback::Builders(graph_builders) => {
3194 let builder = graph_builders.get_dfir_mut(&out_location);
3195 builder.add_dfir(
3196 parse_quote! {
3197 #futures_ident = #input_ident -> resolve_futures_ordered();
3198 },
3199 None,
3200 Some(&next_stmt_id.to_string()),
3201 );
3202 }
3203 BuildersOrCallback::Callback(_, node_callback) => {
3204 node_callback(node, next_stmt_id);
3205 }
3206 }
3207
3208 *next_stmt_id += 1;
3209
3210 ident_stack.push(futures_ident);
3211 }
3212
3213 HydroNode::Map { f, .. } => {
3214 let input_ident = ident_stack.pop().unwrap();
3215
3216 let map_ident =
3217 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3218
3219 match builders_or_callback {
3220 BuildersOrCallback::Builders(graph_builders) => {
3221 let builder = graph_builders.get_dfir_mut(&out_location);
3222 builder.add_dfir(
3223 parse_quote! {
3224 #map_ident = #input_ident -> map(#f);
3225 },
3226 None,
3227 Some(&next_stmt_id.to_string()),
3228 );
3229 }
3230 BuildersOrCallback::Callback(_, node_callback) => {
3231 node_callback(node, next_stmt_id);
3232 }
3233 }
3234
3235 *next_stmt_id += 1;
3236
3237 ident_stack.push(map_ident);
3238 }
3239
3240 HydroNode::FlatMap { f, .. } => {
3241 let input_ident = ident_stack.pop().unwrap();
3242
3243 let flat_map_ident =
3244 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3245
3246 match builders_or_callback {
3247 BuildersOrCallback::Builders(graph_builders) => {
3248 let builder = graph_builders.get_dfir_mut(&out_location);
3249 builder.add_dfir(
3250 parse_quote! {
3251 #flat_map_ident = #input_ident -> flat_map(#f);
3252 },
3253 None,
3254 Some(&next_stmt_id.to_string()),
3255 );
3256 }
3257 BuildersOrCallback::Callback(_, node_callback) => {
3258 node_callback(node, next_stmt_id);
3259 }
3260 }
3261
3262 *next_stmt_id += 1;
3263
3264 ident_stack.push(flat_map_ident);
3265 }
3266
3267 HydroNode::Filter { f, .. } => {
3268 let input_ident = ident_stack.pop().unwrap();
3269
3270 let filter_ident =
3271 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3272
3273 match builders_or_callback {
3274 BuildersOrCallback::Builders(graph_builders) => {
3275 let builder = graph_builders.get_dfir_mut(&out_location);
3276 builder.add_dfir(
3277 parse_quote! {
3278 #filter_ident = #input_ident -> filter(#f);
3279 },
3280 None,
3281 Some(&next_stmt_id.to_string()),
3282 );
3283 }
3284 BuildersOrCallback::Callback(_, node_callback) => {
3285 node_callback(node, next_stmt_id);
3286 }
3287 }
3288
3289 *next_stmt_id += 1;
3290
3291 ident_stack.push(filter_ident);
3292 }
3293
3294 HydroNode::FilterMap { f, .. } => {
3295 let input_ident = ident_stack.pop().unwrap();
3296
3297 let filter_map_ident =
3298 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3299
3300 match builders_or_callback {
3301 BuildersOrCallback::Builders(graph_builders) => {
3302 let builder = graph_builders.get_dfir_mut(&out_location);
3303 builder.add_dfir(
3304 parse_quote! {
3305 #filter_map_ident = #input_ident -> filter_map(#f);
3306 },
3307 None,
3308 Some(&next_stmt_id.to_string()),
3309 );
3310 }
3311 BuildersOrCallback::Callback(_, node_callback) => {
3312 node_callback(node, next_stmt_id);
3313 }
3314 }
3315
3316 *next_stmt_id += 1;
3317
3318 ident_stack.push(filter_map_ident);
3319 }
3320
3321 HydroNode::Sort { .. } => {
3322 let input_ident = ident_stack.pop().unwrap();
3323
3324 let sort_ident =
3325 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3326
3327 match builders_or_callback {
3328 BuildersOrCallback::Builders(graph_builders) => {
3329 let builder = graph_builders.get_dfir_mut(&out_location);
3330 builder.add_dfir(
3331 parse_quote! {
3332 #sort_ident = #input_ident -> sort();
3333 },
3334 None,
3335 Some(&next_stmt_id.to_string()),
3336 );
3337 }
3338 BuildersOrCallback::Callback(_, node_callback) => {
3339 node_callback(node, next_stmt_id);
3340 }
3341 }
3342
3343 *next_stmt_id += 1;
3344
3345 ident_stack.push(sort_ident);
3346 }
3347
3348 HydroNode::DeferTick { .. } => {
3349 let input_ident = ident_stack.pop().unwrap();
3350
3351 let defer_tick_ident =
3352 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3353
3354 match builders_or_callback {
3355 BuildersOrCallback::Builders(graph_builders) => {
3356 let builder = graph_builders.get_dfir_mut(&out_location);
3357 builder.add_dfir(
3358 parse_quote! {
3359 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3360 },
3361 None,
3362 Some(&next_stmt_id.to_string()),
3363 );
3364 }
3365 BuildersOrCallback::Callback(_, node_callback) => {
3366 node_callback(node, next_stmt_id);
3367 }
3368 }
3369
3370 *next_stmt_id += 1;
3371
3372 ident_stack.push(defer_tick_ident);
3373 }
3374
3375 HydroNode::Enumerate { input, .. } => {
3376 let input_ident = ident_stack.pop().unwrap();
3377
3378 let enumerate_ident =
3379 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3380
3381 match builders_or_callback {
3382 BuildersOrCallback::Builders(graph_builders) => {
3383 let builder = graph_builders.get_dfir_mut(&out_location);
3384 let lifetime = if input.metadata().location_id.is_top_level() {
3385 quote!('static)
3386 } else {
3387 quote!('tick)
3388 };
3389 builder.add_dfir(
3390 parse_quote! {
3391 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3392 },
3393 None,
3394 Some(&next_stmt_id.to_string()),
3395 );
3396 }
3397 BuildersOrCallback::Callback(_, node_callback) => {
3398 node_callback(node, next_stmt_id);
3399 }
3400 }
3401
3402 *next_stmt_id += 1;
3403
3404 ident_stack.push(enumerate_ident);
3405 }
3406
3407 HydroNode::Inspect { f, .. } => {
3408 let input_ident = ident_stack.pop().unwrap();
3409
3410 let inspect_ident =
3411 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3412
3413 match builders_or_callback {
3414 BuildersOrCallback::Builders(graph_builders) => {
3415 let builder = graph_builders.get_dfir_mut(&out_location);
3416 builder.add_dfir(
3417 parse_quote! {
3418 #inspect_ident = #input_ident -> inspect(#f);
3419 },
3420 None,
3421 Some(&next_stmt_id.to_string()),
3422 );
3423 }
3424 BuildersOrCallback::Callback(_, node_callback) => {
3425 node_callback(node, next_stmt_id);
3426 }
3427 }
3428
3429 *next_stmt_id += 1;
3430
3431 ident_stack.push(inspect_ident);
3432 }
3433
3434 HydroNode::Unique { input, .. } => {
3435 let input_ident = ident_stack.pop().unwrap();
3436
3437 let unique_ident =
3438 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3439
3440 match builders_or_callback {
3441 BuildersOrCallback::Builders(graph_builders) => {
3442 let builder = graph_builders.get_dfir_mut(&out_location);
3443 let lifetime = if input.metadata().location_id.is_top_level() {
3444 quote!('static)
3445 } else {
3446 quote!('tick)
3447 };
3448
3449 builder.add_dfir(
3450 parse_quote! {
3451 #unique_ident = #input_ident -> unique::<#lifetime>();
3452 },
3453 None,
3454 Some(&next_stmt_id.to_string()),
3455 );
3456 }
3457 BuildersOrCallback::Callback(_, node_callback) => {
3458 node_callback(node, next_stmt_id);
3459 }
3460 }
3461
3462 *next_stmt_id += 1;
3463
3464 ident_stack.push(unique_ident);
3465 }
3466
3467 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3468 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3469 if input.metadata().location_id.is_top_level()
3470 && input.metadata().collection_kind.is_bounded()
3471 {
3472 parse_quote!(fold_no_replay)
3473 } else {
3474 parse_quote!(fold)
3475 }
3476 } else if matches!(node, HydroNode::Scan { .. }) {
3477 parse_quote!(scan)
3478 } else if let HydroNode::FoldKeyed { input, .. } = node {
3479 if input.metadata().location_id.is_top_level()
3480 && input.metadata().collection_kind.is_bounded()
3481 {
3482 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3483 } else {
3484 parse_quote!(fold_keyed)
3485 }
3486 } else {
3487 unreachable!()
3488 };
3489
3490 let (HydroNode::Fold { input, .. }
3491 | HydroNode::FoldKeyed { input, .. }
3492 | HydroNode::Scan { input, .. }) = node
3493 else {
3494 unreachable!()
3495 };
3496
3497 let lifetime = if input.metadata().location_id.is_top_level() {
3498 quote!('static)
3499 } else {
3500 quote!('tick)
3501 };
3502
3503 let input_ident = ident_stack.pop().unwrap();
3504
3505 let (HydroNode::Fold { init, acc, .. }
3506 | HydroNode::FoldKeyed { init, acc, .. }
3507 | HydroNode::Scan { init, acc, .. }) = &*node
3508 else {
3509 unreachable!()
3510 };
3511
3512 let fold_ident =
3513 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3514
3515 match builders_or_callback {
3516 BuildersOrCallback::Builders(graph_builders) => {
3517 if matches!(node, HydroNode::Fold { .. })
3518 && node.metadata().location_id.is_top_level()
3519 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3520 && graph_builders.singleton_intermediates()
3521 && !node.metadata().collection_kind.is_bounded()
3522 {
3523 let builder = graph_builders.get_dfir_mut(&out_location);
3524
3525 let acc: syn::Expr = parse_quote!({
3526 let mut __inner = #acc;
3527 move |__state, __value| {
3528 __inner(__state, __value);
3529 Some(__state.clone())
3530 }
3531 });
3532
3533 builder.add_dfir(
3534 parse_quote! {
3535 source_iter([(#init)()]) -> [0]#fold_ident;
3536 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3537 #fold_ident = chain();
3538 },
3539 None,
3540 Some(&next_stmt_id.to_string()),
3541 );
3542 } else if matches!(node, HydroNode::FoldKeyed { .. })
3543 && node.metadata().location_id.is_top_level()
3544 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3545 && graph_builders.singleton_intermediates()
3546 && !node.metadata().collection_kind.is_bounded()
3547 {
3548 let builder = graph_builders.get_dfir_mut(&out_location);
3549
3550 let acc: syn::Expr = parse_quote!({
3551 let mut __init = #init;
3552 let mut __inner = #acc;
3553 move |__state, __kv: (_, _)| {
3554 let __state = __state
3556 .entry(::std::clone::Clone::clone(&__kv.0))
3557 .or_insert_with(|| (__init)());
3558 __inner(__state, __kv.1);
3559 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3560 }
3561 });
3562
3563 builder.add_dfir(
3564 parse_quote! {
3565 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3566 },
3567 None,
3568 Some(&next_stmt_id.to_string()),
3569 );
3570 } else {
3571 let builder = graph_builders.get_dfir_mut(&out_location);
3572 builder.add_dfir(
3573 parse_quote! {
3574 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3575 },
3576 None,
3577 Some(&next_stmt_id.to_string()),
3578 );
3579 }
3580 }
3581 BuildersOrCallback::Callback(_, node_callback) => {
3582 node_callback(node, next_stmt_id);
3583 }
3584 }
3585
3586 *next_stmt_id += 1;
3587
3588 ident_stack.push(fold_ident);
3589 }
3590
3591 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3592 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3593 if input.metadata().location_id.is_top_level()
3594 && input.metadata().collection_kind.is_bounded()
3595 {
3596 parse_quote!(reduce_no_replay)
3597 } else {
3598 parse_quote!(reduce)
3599 }
3600 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3601 if input.metadata().location_id.is_top_level()
3602 && input.metadata().collection_kind.is_bounded()
3603 {
3604 todo!(
3605 "Calling keyed reduce on a top-level bounded collection is not supported"
3606 )
3607 } else {
3608 parse_quote!(reduce_keyed)
3609 }
3610 } else {
3611 unreachable!()
3612 };
3613
3614 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3615 else {
3616 unreachable!()
3617 };
3618
3619 let lifetime = if input.metadata().location_id.is_top_level() {
3620 quote!('static)
3621 } else {
3622 quote!('tick)
3623 };
3624
3625 let input_ident = ident_stack.pop().unwrap();
3626
3627 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3628 else {
3629 unreachable!()
3630 };
3631
3632 let reduce_ident =
3633 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3634
3635 match builders_or_callback {
3636 BuildersOrCallback::Builders(graph_builders) => {
3637 if matches!(node, HydroNode::Reduce { .. })
3638 && node.metadata().location_id.is_top_level()
3639 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3640 && graph_builders.singleton_intermediates()
3641 && !node.metadata().collection_kind.is_bounded()
3642 {
3643 todo!(
3644 "Reduce with optional intermediates is not yet supported in simulator"
3645 );
3646 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3647 && node.metadata().location_id.is_top_level()
3648 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3649 && graph_builders.singleton_intermediates()
3650 && !node.metadata().collection_kind.is_bounded()
3651 {
3652 todo!(
3653 "Reduce keyed with optional intermediates is not yet supported in simulator"
3654 );
3655 } else {
3656 let builder = graph_builders.get_dfir_mut(&out_location);
3657 builder.add_dfir(
3658 parse_quote! {
3659 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3660 },
3661 None,
3662 Some(&next_stmt_id.to_string()),
3663 );
3664 }
3665 }
3666 BuildersOrCallback::Callback(_, node_callback) => {
3667 node_callback(node, next_stmt_id);
3668 }
3669 }
3670
3671 *next_stmt_id += 1;
3672
3673 ident_stack.push(reduce_ident);
3674 }
3675
3676 HydroNode::ReduceKeyedWatermark {
3677 f,
3678 input,
3679 metadata,
3680 ..
3681 } => {
3682 let lifetime = if input.metadata().location_id.is_top_level() {
3683 quote!('static)
3684 } else {
3685 quote!('tick)
3686 };
3687
3688 let watermark_ident = ident_stack.pop().unwrap();
3690 let input_ident = ident_stack.pop().unwrap();
3691
3692 let chain_ident = syn::Ident::new(
3693 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3694 Span::call_site(),
3695 );
3696
3697 let fold_ident =
3698 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3699
3700 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3701 && input.metadata().collection_kind.is_bounded()
3702 {
3703 parse_quote!(fold_no_replay)
3704 } else {
3705 parse_quote!(fold)
3706 };
3707
3708 match builders_or_callback {
3709 BuildersOrCallback::Builders(graph_builders) => {
3710 if metadata.location_id.is_top_level()
3711 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3712 && graph_builders.singleton_intermediates()
3713 && !metadata.collection_kind.is_bounded()
3714 {
3715 todo!(
3716 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3717 )
3718 } else {
3719 let builder = graph_builders.get_dfir_mut(&out_location);
3720 builder.add_dfir(
3721 parse_quote! {
3722 #chain_ident = chain();
3723 #input_ident
3724 -> map(|x| (Some(x), None))
3725 -> [0]#chain_ident;
3726 #watermark_ident
3727 -> map(|watermark| (None, Some(watermark)))
3728 -> [1]#chain_ident;
3729
3730 #fold_ident = #chain_ident
3731 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3732 let __reduce_keyed_fn = #f;
3733 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3734 if let Some((k, v)) = opt_payload {
3735 if let Some(curr_watermark) = *opt_curr_watermark {
3736 if k < curr_watermark {
3737 return;
3738 }
3739 }
3740 match map.entry(k) {
3741 ::std::collections::hash_map::Entry::Vacant(e) => {
3742 e.insert(v);
3743 }
3744 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3745 __reduce_keyed_fn(e.get_mut(), v);
3746 }
3747 }
3748 } else {
3749 let watermark = opt_watermark.unwrap();
3750 if let Some(curr_watermark) = *opt_curr_watermark {
3751 if watermark <= curr_watermark {
3752 return;
3753 }
3754 }
3755 *opt_curr_watermark = opt_watermark;
3756 map.retain(|k, _| *k >= watermark);
3757 }
3758 }
3759 })
3760 -> flat_map(|(map, _curr_watermark)| map);
3761 },
3762 None,
3763 Some(&next_stmt_id.to_string()),
3764 );
3765 }
3766 }
3767 BuildersOrCallback::Callback(_, node_callback) => {
3768 node_callback(node, next_stmt_id);
3769 }
3770 }
3771
3772 *next_stmt_id += 1;
3773
3774 ident_stack.push(fold_ident);
3775 }
3776
3777 HydroNode::Network {
3778 networking_info,
3779 serialize_fn: serialize_pipeline,
3780 instantiate_fn,
3781 deserialize_fn: deserialize_pipeline,
3782 input,
3783 ..
3784 } => {
3785 let input_ident = ident_stack.pop().unwrap();
3786
3787 let receiver_stream_ident =
3788 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3789
3790 match builders_or_callback {
3791 BuildersOrCallback::Builders(graph_builders) => {
3792 let (sink_expr, source_expr) = match instantiate_fn {
3793 DebugInstantiate::Building => (
3794 syn::parse_quote!(DUMMY_SINK),
3795 syn::parse_quote!(DUMMY_SOURCE),
3796 ),
3797
3798 DebugInstantiate::Finalized(finalized) => {
3799 (finalized.sink.clone(), finalized.source.clone())
3800 }
3801 };
3802
3803 graph_builders.create_network(
3804 &input.metadata().location_id,
3805 &out_location,
3806 input_ident,
3807 &receiver_stream_ident,
3808 serialize_pipeline.as_ref(),
3809 sink_expr,
3810 source_expr,
3811 deserialize_pipeline.as_ref(),
3812 *next_stmt_id,
3813 networking_info,
3814 );
3815 }
3816 BuildersOrCallback::Callback(_, node_callback) => {
3817 node_callback(node, next_stmt_id);
3818 }
3819 }
3820
3821 *next_stmt_id += 1;
3822
3823 ident_stack.push(receiver_stream_ident);
3824 }
3825
3826 HydroNode::ExternalInput {
3827 instantiate_fn,
3828 deserialize_fn: deserialize_pipeline,
3829 ..
3830 } => {
3831 let receiver_stream_ident =
3832 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3833
3834 match builders_or_callback {
3835 BuildersOrCallback::Builders(graph_builders) => {
3836 let (_, source_expr) = match instantiate_fn {
3837 DebugInstantiate::Building => (
3838 syn::parse_quote!(DUMMY_SINK),
3839 syn::parse_quote!(DUMMY_SOURCE),
3840 ),
3841
3842 DebugInstantiate::Finalized(finalized) => {
3843 (finalized.sink.clone(), finalized.source.clone())
3844 }
3845 };
3846
3847 graph_builders.create_external_source(
3848 &out_location,
3849 source_expr,
3850 &receiver_stream_ident,
3851 deserialize_pipeline.as_ref(),
3852 *next_stmt_id,
3853 );
3854 }
3855 BuildersOrCallback::Callback(_, node_callback) => {
3856 node_callback(node, next_stmt_id);
3857 }
3858 }
3859
3860 *next_stmt_id += 1;
3861
3862 ident_stack.push(receiver_stream_ident);
3863 }
3864
3865 HydroNode::Counter {
3866 tag,
3867 duration,
3868 prefix,
3869 ..
3870 } => {
3871 let input_ident = ident_stack.pop().unwrap();
3872
3873 let counter_ident =
3874 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3875
3876 match builders_or_callback {
3877 BuildersOrCallback::Builders(graph_builders) => {
3878 let arg = format!("{}({})", prefix, tag);
3879 let builder = graph_builders.get_dfir_mut(&out_location);
3880 builder.add_dfir(
3881 parse_quote! {
3882 #counter_ident = #input_ident -> _counter(#arg, #duration);
3883 },
3884 None,
3885 Some(&next_stmt_id.to_string()),
3886 );
3887 }
3888 BuildersOrCallback::Callback(_, node_callback) => {
3889 node_callback(node, next_stmt_id);
3890 }
3891 }
3892
3893 *next_stmt_id += 1;
3894
3895 ident_stack.push(counter_ident);
3896 }
3897 }
3898 },
3899 seen_tees,
3900 false,
3901 );
3902
3903 ident_stack
3904 .pop()
3905 .expect("ident_stack should have exactly one element after traversal")
3906 }
3907
3908 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3909 match self {
3910 HydroNode::Placeholder => {
3911 panic!()
3912 }
3913 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3914 HydroNode::Source { source, .. } => match source {
3915 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3916 HydroSource::ExternalNetwork()
3917 | HydroSource::Spin()
3918 | HydroSource::ClusterMembers(_, _)
3919 | HydroSource::Embedded(_)
3920 | HydroSource::EmbeddedSingleton(_) => {} },
3922 HydroNode::SingletonSource { value, .. } => {
3923 transform(value);
3924 }
3925 HydroNode::CycleSource { .. }
3926 | HydroNode::Tee { .. }
3927 | HydroNode::YieldConcat { .. }
3928 | HydroNode::BeginAtomic { .. }
3929 | HydroNode::EndAtomic { .. }
3930 | HydroNode::Batch { .. }
3931 | HydroNode::Chain { .. }
3932 | HydroNode::ChainFirst { .. }
3933 | HydroNode::CrossProduct { .. }
3934 | HydroNode::CrossSingleton { .. }
3935 | HydroNode::ResolveFutures { .. }
3936 | HydroNode::ResolveFuturesBlocking { .. }
3937 | HydroNode::ResolveFuturesOrdered { .. }
3938 | HydroNode::Join { .. }
3939 | HydroNode::Difference { .. }
3940 | HydroNode::AntiJoin { .. }
3941 | HydroNode::DeferTick { .. }
3942 | HydroNode::Enumerate { .. }
3943 | HydroNode::Unique { .. }
3944 | HydroNode::Sort { .. } => {}
3945 HydroNode::Map { f, .. }
3946 | HydroNode::FlatMap { f, .. }
3947 | HydroNode::Filter { f, .. }
3948 | HydroNode::FilterMap { f, .. }
3949 | HydroNode::Inspect { f, .. }
3950 | HydroNode::Partition { f, .. }
3951 | HydroNode::Reduce { f, .. }
3952 | HydroNode::ReduceKeyed { f, .. }
3953 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3954 transform(f);
3955 }
3956 HydroNode::Fold { init, acc, .. }
3957 | HydroNode::Scan { init, acc, .. }
3958 | HydroNode::FoldKeyed { init, acc, .. } => {
3959 transform(init);
3960 transform(acc);
3961 }
3962 HydroNode::Network {
3963 serialize_fn,
3964 deserialize_fn,
3965 ..
3966 } => {
3967 if let Some(serialize_fn) = serialize_fn {
3968 transform(serialize_fn);
3969 }
3970 if let Some(deserialize_fn) = deserialize_fn {
3971 transform(deserialize_fn);
3972 }
3973 }
3974 HydroNode::ExternalInput { deserialize_fn, .. } => {
3975 if let Some(deserialize_fn) = deserialize_fn {
3976 transform(deserialize_fn);
3977 }
3978 }
3979 HydroNode::Counter { duration, .. } => {
3980 transform(duration);
3981 }
3982 }
3983 }
3984
3985 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3986 &self.metadata().op
3987 }
3988
3989 pub fn metadata(&self) -> &HydroIrMetadata {
3990 match self {
3991 HydroNode::Placeholder => {
3992 panic!()
3993 }
3994 HydroNode::Cast { metadata, .. } => metadata,
3995 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3996 HydroNode::Source { metadata, .. } => metadata,
3997 HydroNode::SingletonSource { metadata, .. } => metadata,
3998 HydroNode::CycleSource { metadata, .. } => metadata,
3999 HydroNode::Tee { metadata, .. } => metadata,
4000 HydroNode::Partition { metadata, .. } => metadata,
4001 HydroNode::YieldConcat { metadata, .. } => metadata,
4002 HydroNode::BeginAtomic { metadata, .. } => metadata,
4003 HydroNode::EndAtomic { metadata, .. } => metadata,
4004 HydroNode::Batch { metadata, .. } => metadata,
4005 HydroNode::Chain { metadata, .. } => metadata,
4006 HydroNode::ChainFirst { metadata, .. } => metadata,
4007 HydroNode::CrossProduct { metadata, .. } => metadata,
4008 HydroNode::CrossSingleton { metadata, .. } => metadata,
4009 HydroNode::Join { metadata, .. } => metadata,
4010 HydroNode::Difference { metadata, .. } => metadata,
4011 HydroNode::AntiJoin { metadata, .. } => metadata,
4012 HydroNode::ResolveFutures { metadata, .. } => metadata,
4013 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4014 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4015 HydroNode::Map { metadata, .. } => metadata,
4016 HydroNode::FlatMap { metadata, .. } => metadata,
4017 HydroNode::Filter { metadata, .. } => metadata,
4018 HydroNode::FilterMap { metadata, .. } => metadata,
4019 HydroNode::DeferTick { metadata, .. } => metadata,
4020 HydroNode::Enumerate { metadata, .. } => metadata,
4021 HydroNode::Inspect { metadata, .. } => metadata,
4022 HydroNode::Unique { metadata, .. } => metadata,
4023 HydroNode::Sort { metadata, .. } => metadata,
4024 HydroNode::Scan { metadata, .. } => metadata,
4025 HydroNode::Fold { metadata, .. } => metadata,
4026 HydroNode::FoldKeyed { metadata, .. } => metadata,
4027 HydroNode::Reduce { metadata, .. } => metadata,
4028 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4029 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4030 HydroNode::ExternalInput { metadata, .. } => metadata,
4031 HydroNode::Network { metadata, .. } => metadata,
4032 HydroNode::Counter { metadata, .. } => metadata,
4033 }
4034 }
4035
4036 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4037 &mut self.metadata_mut().op
4038 }
4039
4040 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4041 match self {
4042 HydroNode::Placeholder => {
4043 panic!()
4044 }
4045 HydroNode::Cast { metadata, .. } => metadata,
4046 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4047 HydroNode::Source { metadata, .. } => metadata,
4048 HydroNode::SingletonSource { metadata, .. } => metadata,
4049 HydroNode::CycleSource { metadata, .. } => metadata,
4050 HydroNode::Tee { metadata, .. } => metadata,
4051 HydroNode::Partition { metadata, .. } => metadata,
4052 HydroNode::YieldConcat { metadata, .. } => metadata,
4053 HydroNode::BeginAtomic { metadata, .. } => metadata,
4054 HydroNode::EndAtomic { metadata, .. } => metadata,
4055 HydroNode::Batch { metadata, .. } => metadata,
4056 HydroNode::Chain { metadata, .. } => metadata,
4057 HydroNode::ChainFirst { metadata, .. } => metadata,
4058 HydroNode::CrossProduct { metadata, .. } => metadata,
4059 HydroNode::CrossSingleton { metadata, .. } => metadata,
4060 HydroNode::Join { metadata, .. } => metadata,
4061 HydroNode::Difference { metadata, .. } => metadata,
4062 HydroNode::AntiJoin { metadata, .. } => metadata,
4063 HydroNode::ResolveFutures { metadata, .. } => metadata,
4064 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4065 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4066 HydroNode::Map { metadata, .. } => metadata,
4067 HydroNode::FlatMap { metadata, .. } => metadata,
4068 HydroNode::Filter { metadata, .. } => metadata,
4069 HydroNode::FilterMap { metadata, .. } => metadata,
4070 HydroNode::DeferTick { metadata, .. } => metadata,
4071 HydroNode::Enumerate { metadata, .. } => metadata,
4072 HydroNode::Inspect { metadata, .. } => metadata,
4073 HydroNode::Unique { metadata, .. } => metadata,
4074 HydroNode::Sort { metadata, .. } => metadata,
4075 HydroNode::Scan { metadata, .. } => metadata,
4076 HydroNode::Fold { metadata, .. } => metadata,
4077 HydroNode::FoldKeyed { metadata, .. } => metadata,
4078 HydroNode::Reduce { metadata, .. } => metadata,
4079 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4080 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4081 HydroNode::ExternalInput { metadata, .. } => metadata,
4082 HydroNode::Network { metadata, .. } => metadata,
4083 HydroNode::Counter { metadata, .. } => metadata,
4084 }
4085 }
4086
4087 pub fn input(&self) -> Vec<&HydroNode> {
4088 match self {
4089 HydroNode::Placeholder => {
4090 panic!()
4091 }
4092 HydroNode::Source { .. }
4093 | HydroNode::SingletonSource { .. }
4094 | HydroNode::ExternalInput { .. }
4095 | HydroNode::CycleSource { .. }
4096 | HydroNode::Tee { .. }
4097 | HydroNode::Partition { .. } => {
4098 vec![]
4100 }
4101 HydroNode::Cast { inner, .. }
4102 | HydroNode::ObserveNonDet { inner, .. }
4103 | HydroNode::YieldConcat { inner, .. }
4104 | HydroNode::BeginAtomic { inner, .. }
4105 | HydroNode::EndAtomic { inner, .. }
4106 | HydroNode::Batch { inner, .. } => {
4107 vec![inner]
4108 }
4109 HydroNode::Chain { first, second, .. } => {
4110 vec![first, second]
4111 }
4112 HydroNode::ChainFirst { first, second, .. } => {
4113 vec![first, second]
4114 }
4115 HydroNode::CrossProduct { left, right, .. }
4116 | HydroNode::CrossSingleton { left, right, .. }
4117 | HydroNode::Join { left, right, .. } => {
4118 vec![left, right]
4119 }
4120 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4121 vec![pos, neg]
4122 }
4123 HydroNode::Map { input, .. }
4124 | HydroNode::FlatMap { input, .. }
4125 | HydroNode::Filter { input, .. }
4126 | HydroNode::FilterMap { input, .. }
4127 | HydroNode::Sort { input, .. }
4128 | HydroNode::DeferTick { input, .. }
4129 | HydroNode::Enumerate { input, .. }
4130 | HydroNode::Inspect { input, .. }
4131 | HydroNode::Unique { input, .. }
4132 | HydroNode::Network { input, .. }
4133 | HydroNode::Counter { input, .. }
4134 | HydroNode::ResolveFutures { input, .. }
4135 | HydroNode::ResolveFuturesBlocking { input, .. }
4136 | HydroNode::ResolveFuturesOrdered { input, .. }
4137 | HydroNode::Fold { input, .. }
4138 | HydroNode::FoldKeyed { input, .. }
4139 | HydroNode::Reduce { input, .. }
4140 | HydroNode::ReduceKeyed { input, .. }
4141 | HydroNode::Scan { input, .. } => {
4142 vec![input]
4143 }
4144 HydroNode::ReduceKeyedWatermark {
4145 input, watermark, ..
4146 } => {
4147 vec![input, watermark]
4148 }
4149 }
4150 }
4151
4152 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4153 self.input()
4154 .iter()
4155 .map(|input_node| input_node.metadata())
4156 .collect()
4157 }
4158
4159 pub fn is_shared_with_others(&self) -> bool {
4163 match self {
4164 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4165 Rc::strong_count(&inner.0) > 1
4166 }
4167 _ => false,
4168 }
4169 }
4170
4171 pub fn print_root(&self) -> String {
4172 match self {
4173 HydroNode::Placeholder => {
4174 panic!()
4175 }
4176 HydroNode::Cast { .. } => "Cast()".to_owned(),
4177 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4178 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4179 HydroNode::SingletonSource {
4180 value,
4181 first_tick_only,
4182 ..
4183 } => format!(
4184 "SingletonSource({:?}, first_tick_only={})",
4185 value, first_tick_only
4186 ),
4187 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4188 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4189 HydroNode::Partition { f, is_true, .. } => {
4190 format!("Partition({:?}, is_true={})", f, is_true)
4191 }
4192 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4193 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4194 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4195 HydroNode::Batch { .. } => "Batch()".to_owned(),
4196 HydroNode::Chain { first, second, .. } => {
4197 format!("Chain({}, {})", first.print_root(), second.print_root())
4198 }
4199 HydroNode::ChainFirst { first, second, .. } => {
4200 format!(
4201 "ChainFirst({}, {})",
4202 first.print_root(),
4203 second.print_root()
4204 )
4205 }
4206 HydroNode::CrossProduct { left, right, .. } => {
4207 format!(
4208 "CrossProduct({}, {})",
4209 left.print_root(),
4210 right.print_root()
4211 )
4212 }
4213 HydroNode::CrossSingleton { left, right, .. } => {
4214 format!(
4215 "CrossSingleton({}, {})",
4216 left.print_root(),
4217 right.print_root()
4218 )
4219 }
4220 HydroNode::Join { left, right, .. } => {
4221 format!("Join({}, {})", left.print_root(), right.print_root())
4222 }
4223 HydroNode::Difference { pos, neg, .. } => {
4224 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4225 }
4226 HydroNode::AntiJoin { pos, neg, .. } => {
4227 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4228 }
4229 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4230 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4231 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4232 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4233 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4234 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4235 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4236 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4237 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4238 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4239 HydroNode::Unique { .. } => "Unique()".to_owned(),
4240 HydroNode::Sort { .. } => "Sort()".to_owned(),
4241 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4242 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4243 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4244 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4245 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4246 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4247 HydroNode::Network { .. } => "Network()".to_owned(),
4248 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4249 HydroNode::Counter { tag, duration, .. } => {
4250 format!("Counter({:?}, {:?})", tag, duration)
4251 }
4252 }
4253 }
4254}
4255
4256#[cfg(feature = "build")]
4257fn instantiate_network<'a, D>(
4258 env: &mut D::InstantiateEnv,
4259 from_location: &LocationId,
4260 to_location: &LocationId,
4261 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4262 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4263 name: Option<&str>,
4264 networking_info: &crate::networking::NetworkingInfo,
4265) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4266where
4267 D: Deploy<'a>,
4268{
4269 let ((sink, source), connect_fn) = match (from_location, to_location) {
4270 (&LocationId::Process(from), &LocationId::Process(to)) => {
4271 let from_node = processes
4272 .get(from)
4273 .unwrap_or_else(|| {
4274 panic!("A process used in the graph was not instantiated: {}", from)
4275 })
4276 .clone();
4277 let to_node = processes
4278 .get(to)
4279 .unwrap_or_else(|| {
4280 panic!("A process used in the graph was not instantiated: {}", to)
4281 })
4282 .clone();
4283
4284 let sink_port = from_node.next_port();
4285 let source_port = to_node.next_port();
4286
4287 (
4288 D::o2o_sink_source(
4289 env,
4290 &from_node,
4291 &sink_port,
4292 &to_node,
4293 &source_port,
4294 name,
4295 networking_info,
4296 ),
4297 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4298 )
4299 }
4300 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4301 let from_node = processes
4302 .get(from)
4303 .unwrap_or_else(|| {
4304 panic!("A process used in the graph was not instantiated: {}", from)
4305 })
4306 .clone();
4307 let to_node = clusters
4308 .get(to)
4309 .unwrap_or_else(|| {
4310 panic!("A cluster used in the graph was not instantiated: {}", to)
4311 })
4312 .clone();
4313
4314 let sink_port = from_node.next_port();
4315 let source_port = to_node.next_port();
4316
4317 (
4318 D::o2m_sink_source(
4319 env,
4320 &from_node,
4321 &sink_port,
4322 &to_node,
4323 &source_port,
4324 name,
4325 networking_info,
4326 ),
4327 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4328 )
4329 }
4330 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4331 let from_node = clusters
4332 .get(from)
4333 .unwrap_or_else(|| {
4334 panic!("A cluster used in the graph was not instantiated: {}", from)
4335 })
4336 .clone();
4337 let to_node = processes
4338 .get(to)
4339 .unwrap_or_else(|| {
4340 panic!("A process used in the graph was not instantiated: {}", to)
4341 })
4342 .clone();
4343
4344 let sink_port = from_node.next_port();
4345 let source_port = to_node.next_port();
4346
4347 (
4348 D::m2o_sink_source(
4349 env,
4350 &from_node,
4351 &sink_port,
4352 &to_node,
4353 &source_port,
4354 name,
4355 networking_info,
4356 ),
4357 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4358 )
4359 }
4360 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4361 let from_node = clusters
4362 .get(from)
4363 .unwrap_or_else(|| {
4364 panic!("A cluster used in the graph was not instantiated: {}", from)
4365 })
4366 .clone();
4367 let to_node = clusters
4368 .get(to)
4369 .unwrap_or_else(|| {
4370 panic!("A cluster used in the graph was not instantiated: {}", to)
4371 })
4372 .clone();
4373
4374 let sink_port = from_node.next_port();
4375 let source_port = to_node.next_port();
4376
4377 (
4378 D::m2m_sink_source(
4379 env,
4380 &from_node,
4381 &sink_port,
4382 &to_node,
4383 &source_port,
4384 name,
4385 networking_info,
4386 ),
4387 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4388 )
4389 }
4390 (LocationId::Tick(_, _), _) => panic!(),
4391 (_, LocationId::Tick(_, _)) => panic!(),
4392 (LocationId::Atomic(_), _) => panic!(),
4393 (_, LocationId::Atomic(_)) => panic!(),
4394 };
4395 (sink, source, connect_fn)
4396}
4397
4398#[cfg(test)]
4399mod test {
4400 use std::mem::size_of;
4401
4402 use stageleft::{QuotedWithContext, q};
4403
4404 use super::*;
4405
4406 #[test]
4407 #[cfg_attr(
4408 not(feature = "build"),
4409 ignore = "expects inclusion of feature-gated fields"
4410 )]
4411 fn hydro_node_size() {
4412 assert_eq!(size_of::<HydroNode>(), 248);
4413 }
4414
4415 #[test]
4416 #[cfg_attr(
4417 not(feature = "build"),
4418 ignore = "expects inclusion of feature-gated fields"
4419 )]
4420 fn hydro_root_size() {
4421 assert_eq!(size_of::<HydroRoot>(), 136);
4422 }
4423
4424 #[test]
4425 fn test_simplify_q_macro_basic() {
4426 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4428 let result = simplify_q_macro(simple_expr.clone());
4429 assert_eq!(result, simple_expr);
4430 }
4431
4432 #[test]
4433 fn test_simplify_q_macro_actual_stageleft_call() {
4434 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4436 let result = simplify_q_macro(stageleft_call);
4437 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4440 }
4441
4442 #[test]
4443 fn test_closure_no_pipe_at_start() {
4444 let stageleft_call = q!({
4446 let foo = 123;
4447 move |b: usize| b + foo
4448 })
4449 .splice_fn1_ctx(&());
4450 let result = simplify_q_macro(stageleft_call);
4451 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4452 }
4453}