1use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46use crate::location::dynamic::LocationId;
47use crate::location::external_process::{
48 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
49};
50use crate::nondet::NonDet;
51#[cfg(feature = "sim")]
52use crate::sim::SimSender;
53use crate::staging_util::get_this_crate;
54
55pub mod dynamic;
56
57pub mod external_process;
58pub use external_process::External;
59
60pub mod process;
61pub use process::Process;
62
63pub mod cluster;
64pub use cluster::Cluster;
65
66pub mod member_id;
67pub use member_id::{MemberId, TaglessMemberId};
68
69pub mod tick;
70pub use tick::{Atomic, NoTick, Tick};
71
72#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
75pub enum MembershipEvent {
76 Joined,
78 Left,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
88pub enum NetworkHint {
89 Auto,
91 TcpPort(Option<u16>),
96}
97
98pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
99 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
100}
101
102#[stageleft::export(LocationKey)]
103new_key_type! {
104 pub struct LocationKey;
106}
107
108impl std::fmt::Display for LocationKey {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 write!(f, "loc{:?}", self.data()) }
112}
113
114impl std::str::FromStr for LocationKey {
117 type Err = Option<ParseIntError>;
118
119 fn from_str(s: &str) -> Result<Self, Self::Err> {
120 let nvn = s.strip_prefix("loc").ok_or(None)?;
121 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
122 let idx: u64 = idx.parse()?;
123 let ver: u64 = ver.parse()?;
124 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
125 }
126}
127
128impl LocationKey {
129 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
135 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
139 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
141
142impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
144 type O = LocationKey;
145
146 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
147 where
148 Self: Sized,
149 {
150 let root = get_this_crate();
151 let n = Key::data(&self).as_ffi();
152 (
153 QuoteTokens {
154 prelude: None,
155 expr: Some(quote! {
156 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
157 }),
158 },
159 (),
160 )
161 }
162}
163
164#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
166pub enum LocationType {
167 Process,
169 Cluster,
171 External,
173}
174
175#[expect(
189 private_bounds,
190 reason = "only internal Hydro code can define location types"
191)]
192pub trait Location<'a>: dynamic::DynLocation {
193 type Root: Location<'a>;
198
199 fn root(&self) -> Self::Root;
204
205 fn try_tick(&self) -> Option<Tick<Self>> {
212 if Self::is_top_level() {
213 let id = self.flow_state().borrow_mut().next_clock_id();
214 Some(Tick {
215 id,
216 l: self.clone(),
217 })
218 } else {
219 None
220 }
221 }
222
223 fn id(&self) -> LocationId {
225 dynamic::DynLocation::id(self)
226 }
227
228 fn tick(&self) -> Tick<Self>
254 where
255 Self: NoTick,
256 {
257 let id = self.flow_state().borrow_mut().next_clock_id();
258 Tick {
259 id,
260 l: self.clone(),
261 }
262 }
263
264 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
289 where
290 Self: Sized + NoTick,
291 {
292 Stream::new(
293 self.clone(),
294 HydroNode::Source {
295 source: HydroSource::Spin(),
296 metadata: self.new_node_metadata(Stream::<
297 (),
298 Self,
299 Unbounded,
300 TotalOrder,
301 ExactlyOnce,
302 >::collection_kind()),
303 },
304 )
305 }
306
307 fn source_stream<T, E>(
328 &self,
329 e: impl QuotedWithContext<'a, E, Self>,
330 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
331 where
332 E: FuturesStream<Item = T> + Unpin,
333 Self: Sized + NoTick,
334 {
335 let e = e.splice_untyped_ctx(self);
336
337 Stream::new(
338 self.clone(),
339 HydroNode::Source {
340 source: HydroSource::Stream(e.into()),
341 metadata: self.new_node_metadata(Stream::<
342 T,
343 Self,
344 Unbounded,
345 TotalOrder,
346 ExactlyOnce,
347 >::collection_kind()),
348 },
349 )
350 }
351
352 fn source_iter<T, E>(
374 &self,
375 e: impl QuotedWithContext<'a, E, Self>,
376 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
377 where
378 E: IntoIterator<Item = T>,
379 Self: Sized,
380 {
381 let e = e.splice_typed_ctx(self);
382
383 Stream::new(
384 self.clone(),
385 HydroNode::Source {
386 source: HydroSource::Iter(e.into()),
387 metadata: self.new_node_metadata(
388 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
389 ),
390 },
391 )
392 }
393
394 fn source_cluster_members<C: 'a>(
428 &self,
429 cluster: &Cluster<'a, C>,
430 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
431 where
432 Self: Sized + NoTick,
433 {
434 Stream::new(
435 self.clone(),
436 HydroNode::Source {
437 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
438 metadata: self.new_node_metadata(Stream::<
439 (TaglessMemberId, MembershipEvent),
440 Self,
441 Unbounded,
442 TotalOrder,
443 ExactlyOnce,
444 >::collection_kind()),
445 },
446 )
447 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
448 .into_keyed()
449 }
450
451 fn source_external_bytes<L>(
459 &self,
460 from: &External<L>,
461 ) -> (
462 ExternalBytesPort,
463 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
464 )
465 where
466 Self: Sized + NoTick,
467 {
468 let (port, stream, sink) =
469 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
470
471 sink.complete(self.source_iter(q!([])));
472
473 (port, stream)
474 }
475
476 #[expect(clippy::type_complexity, reason = "stream markers")]
483 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
484 &self,
485 from: &External<L>,
486 ) -> (
487 ExternalBincodeSink<T, NotMany, O, R>,
488 Stream<T, Self, Unbounded, O, R>,
489 )
490 where
491 Self: Sized + NoTick,
492 T: Serialize + DeserializeOwned,
493 {
494 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
495 sink.complete(self.source_iter(q!([])));
496
497 (
498 ExternalBincodeSink {
499 process_key: from.key,
500 port_id: port.port_id,
501 _phantom: PhantomData,
502 },
503 stream.weaken_ordering().weaken_retries(),
504 )
505 }
506
507 #[cfg(feature = "sim")]
512 #[expect(clippy::type_complexity, reason = "stream markers")]
513 fn sim_input<T, O: Ordering, R: Retries>(
514 &self,
515 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
516 where
517 Self: Sized + NoTick,
518 T: Serialize + DeserializeOwned,
519 {
520 let external_location: External<'a, ()> = External {
521 key: LocationKey::FIRST,
522 flow_state: self.flow_state().clone(),
523 _phantom: PhantomData,
524 };
525
526 let (external, stream) = self.source_external_bincode(&external_location);
527
528 (SimSender(external.port_id, PhantomData), stream)
529 }
530
531 fn embedded_input<T>(
537 &self,
538 name: impl Into<String>,
539 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
540 where
541 Self: Sized + NoTick,
542 {
543 let ident = syn::Ident::new(&name.into(), Span::call_site());
544
545 Stream::new(
546 self.clone(),
547 HydroNode::Source {
548 source: HydroSource::Embedded(ident),
549 metadata: self.new_node_metadata(Stream::<
550 T,
551 Self,
552 Unbounded,
553 TotalOrder,
554 ExactlyOnce,
555 >::collection_kind()),
556 },
557 )
558 }
559
560 fn embedded_singleton_input<T>(&self, name: impl Into<String>) -> Singleton<T, Self, Bounded>
566 where
567 Self: Sized + NoTick,
568 {
569 let ident = syn::Ident::new(&name.into(), Span::call_site());
570
571 Singleton::new(
572 self.clone(),
573 HydroNode::Source {
574 source: HydroSource::EmbeddedSingleton(ident),
575 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
576 },
577 )
578 }
579
580 #[expect(clippy::type_complexity, reason = "stream markers")]
625 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
626 &self,
627 from: &External<L>,
628 port_hint: NetworkHint,
629 ) -> (
630 ExternalBytesPort<NotMany>,
631 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
632 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
633 )
634 where
635 Self: Sized + NoTick,
636 {
637 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
638
639 let (fwd_ref, to_sink) =
640 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
641 let mut flow_state_borrow = self.flow_state().borrow_mut();
642
643 flow_state_borrow.push_root(HydroRoot::SendExternal {
644 to_external_key: from.key,
645 to_port_id: next_external_port_id,
646 to_many: false,
647 unpaired: false,
648 serialize_fn: None,
649 instantiate_fn: DebugInstantiate::Building,
650 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
651 op_metadata: HydroIrOpMetadata::new(),
652 });
653
654 let raw_stream: Stream<
655 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
656 Self,
657 Unbounded,
658 TotalOrder,
659 ExactlyOnce,
660 > = Stream::new(
661 self.clone(),
662 HydroNode::ExternalInput {
663 from_external_key: from.key,
664 from_port_id: next_external_port_id,
665 from_many: false,
666 codec_type: quote_type::<Codec>().into(),
667 port_hint,
668 instantiate_fn: DebugInstantiate::Building,
669 deserialize_fn: None,
670 metadata: self.new_node_metadata(Stream::<
671 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
672 Self,
673 Unbounded,
674 TotalOrder,
675 ExactlyOnce,
676 >::collection_kind()),
677 },
678 );
679
680 (
681 ExternalBytesPort {
682 process_key: from.key,
683 port_id: next_external_port_id,
684 _phantom: PhantomData,
685 },
686 raw_stream.flatten_ordered(),
687 fwd_ref,
688 )
689 }
690
691 #[expect(clippy::type_complexity, reason = "stream markers")]
701 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
702 &self,
703 from: &External<L>,
704 ) -> (
705 ExternalBincodeBidi<InT, OutT, NotMany>,
706 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
707 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
708 )
709 where
710 Self: Sized + NoTick,
711 {
712 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
713
714 let (fwd_ref, to_sink) =
715 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
716 let mut flow_state_borrow = self.flow_state().borrow_mut();
717
718 let root = get_this_crate();
719
720 let out_t_type = quote_type::<OutT>();
721 let ser_fn: syn::Expr = syn::parse_quote! {
722 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
723 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
724 )
725 };
726
727 flow_state_borrow.push_root(HydroRoot::SendExternal {
728 to_external_key: from.key,
729 to_port_id: next_external_port_id,
730 to_many: false,
731 unpaired: false,
732 serialize_fn: Some(ser_fn.into()),
733 instantiate_fn: DebugInstantiate::Building,
734 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
735 op_metadata: HydroIrOpMetadata::new(),
736 });
737
738 let in_t_type = quote_type::<InT>();
739
740 let deser_fn: syn::Expr = syn::parse_quote! {
741 |res| {
742 let b = res.unwrap();
743 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
744 }
745 };
746
747 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
748 self.clone(),
749 HydroNode::ExternalInput {
750 from_external_key: from.key,
751 from_port_id: next_external_port_id,
752 from_many: false,
753 codec_type: quote_type::<LengthDelimitedCodec>().into(),
754 port_hint: NetworkHint::Auto,
755 instantiate_fn: DebugInstantiate::Building,
756 deserialize_fn: Some(deser_fn.into()),
757 metadata: self.new_node_metadata(Stream::<
758 InT,
759 Self,
760 Unbounded,
761 TotalOrder,
762 ExactlyOnce,
763 >::collection_kind()),
764 },
765 );
766
767 (
768 ExternalBincodeBidi {
769 process_key: from.key,
770 port_id: next_external_port_id,
771 _phantom: PhantomData,
772 },
773 raw_stream,
774 fwd_ref,
775 )
776 }
777
778 #[expect(clippy::type_complexity, reason = "stream markers")]
790 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
791 &self,
792 from: &External<L>,
793 port_hint: NetworkHint,
794 ) -> (
795 ExternalBytesPort<Many>,
796 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
797 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
798 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
799 )
800 where
801 Self: Sized + NoTick,
802 {
803 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
804
805 let (fwd_ref, to_sink) =
806 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
807 let mut flow_state_borrow = self.flow_state().borrow_mut();
808
809 flow_state_borrow.push_root(HydroRoot::SendExternal {
810 to_external_key: from.key,
811 to_port_id: next_external_port_id,
812 to_many: true,
813 unpaired: false,
814 serialize_fn: None,
815 instantiate_fn: DebugInstantiate::Building,
816 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
817 op_metadata: HydroIrOpMetadata::new(),
818 });
819
820 let raw_stream: Stream<
821 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
822 Self,
823 Unbounded,
824 TotalOrder,
825 ExactlyOnce,
826 > = Stream::new(
827 self.clone(),
828 HydroNode::ExternalInput {
829 from_external_key: from.key,
830 from_port_id: next_external_port_id,
831 from_many: true,
832 codec_type: quote_type::<Codec>().into(),
833 port_hint,
834 instantiate_fn: DebugInstantiate::Building,
835 deserialize_fn: None,
836 metadata: self.new_node_metadata(Stream::<
837 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
838 Self,
839 Unbounded,
840 TotalOrder,
841 ExactlyOnce,
842 >::collection_kind()),
843 },
844 );
845
846 let membership_stream_ident = syn::Ident::new(
847 &format!(
848 "__hydro_deploy_many_{}_{}_membership",
849 from.key, next_external_port_id
850 ),
851 Span::call_site(),
852 );
853 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
854 let raw_membership_stream: KeyedStream<
855 u64,
856 bool,
857 Self,
858 Unbounded,
859 TotalOrder,
860 ExactlyOnce,
861 > = KeyedStream::new(
862 self.clone(),
863 HydroNode::Source {
864 source: HydroSource::Stream(membership_stream_expr.into()),
865 metadata: self.new_node_metadata(KeyedStream::<
866 u64,
867 bool,
868 Self,
869 Unbounded,
870 TotalOrder,
871 ExactlyOnce,
872 >::collection_kind()),
873 },
874 );
875
876 (
877 ExternalBytesPort {
878 process_key: from.key,
879 port_id: next_external_port_id,
880 _phantom: PhantomData,
881 },
882 raw_stream
883 .flatten_ordered() .into_keyed(),
885 raw_membership_stream.map(q!(|join| {
886 if join {
887 MembershipEvent::Joined
888 } else {
889 MembershipEvent::Left
890 }
891 })),
892 fwd_ref,
893 )
894 }
895
896 #[expect(clippy::type_complexity, reason = "stream markers")]
912 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
913 &self,
914 from: &External<L>,
915 ) -> (
916 ExternalBincodeBidi<InT, OutT, Many>,
917 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
918 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
919 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
920 )
921 where
922 Self: Sized + NoTick,
923 {
924 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
925
926 let (fwd_ref, to_sink) =
927 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
928 let mut flow_state_borrow = self.flow_state().borrow_mut();
929
930 let root = get_this_crate();
931
932 let out_t_type = quote_type::<OutT>();
933 let ser_fn: syn::Expr = syn::parse_quote! {
934 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
935 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
936 )
937 };
938
939 flow_state_borrow.push_root(HydroRoot::SendExternal {
940 to_external_key: from.key,
941 to_port_id: next_external_port_id,
942 to_many: true,
943 unpaired: false,
944 serialize_fn: Some(ser_fn.into()),
945 instantiate_fn: DebugInstantiate::Building,
946 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
947 op_metadata: HydroIrOpMetadata::new(),
948 });
949
950 let in_t_type = quote_type::<InT>();
951
952 let deser_fn: syn::Expr = syn::parse_quote! {
953 |res| {
954 let (id, b) = res.unwrap();
955 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
956 }
957 };
958
959 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
960 KeyedStream::new(
961 self.clone(),
962 HydroNode::ExternalInput {
963 from_external_key: from.key,
964 from_port_id: next_external_port_id,
965 from_many: true,
966 codec_type: quote_type::<LengthDelimitedCodec>().into(),
967 port_hint: NetworkHint::Auto,
968 instantiate_fn: DebugInstantiate::Building,
969 deserialize_fn: Some(deser_fn.into()),
970 metadata: self.new_node_metadata(KeyedStream::<
971 u64,
972 InT,
973 Self,
974 Unbounded,
975 TotalOrder,
976 ExactlyOnce,
977 >::collection_kind()),
978 },
979 );
980
981 let membership_stream_ident = syn::Ident::new(
982 &format!(
983 "__hydro_deploy_many_{}_{}_membership",
984 from.key, next_external_port_id
985 ),
986 Span::call_site(),
987 );
988 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
989 let raw_membership_stream: KeyedStream<
990 u64,
991 bool,
992 Self,
993 Unbounded,
994 TotalOrder,
995 ExactlyOnce,
996 > = KeyedStream::new(
997 self.clone(),
998 HydroNode::Source {
999 source: HydroSource::Stream(membership_stream_expr.into()),
1000 metadata: self.new_node_metadata(KeyedStream::<
1001 u64,
1002 bool,
1003 Self,
1004 Unbounded,
1005 TotalOrder,
1006 ExactlyOnce,
1007 >::collection_kind()),
1008 },
1009 );
1010
1011 (
1012 ExternalBincodeBidi {
1013 process_key: from.key,
1014 port_id: next_external_port_id,
1015 _phantom: PhantomData,
1016 },
1017 raw_stream,
1018 raw_membership_stream.map(q!(|join| {
1019 if join {
1020 MembershipEvent::Joined
1021 } else {
1022 MembershipEvent::Left
1023 }
1024 })),
1025 fwd_ref,
1026 )
1027 }
1028
1029 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1049 where
1050 Self: Sized + NoTick,
1051 {
1052 let e = e.splice_untyped_ctx(self);
1053
1054 Singleton::new(
1055 self.clone(),
1056 HydroNode::SingletonSource {
1057 value: e.into(),
1058 first_tick_only: false,
1059 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1060 },
1061 )
1062 }
1063
1064 fn singleton_future<F>(
1087 &self,
1088 e: impl QuotedWithContext<'a, F, Self>,
1089 ) -> Singleton<F::Output, Self, Bounded>
1090 where
1091 F: Future,
1092 Self: Sized + NoTick,
1093 {
1094 self.singleton(e).resolve_future_blocking()
1095 }
1096
1097 fn source_interval(
1107 &self,
1108 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1109 _nondet: NonDet,
1110 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1111 where
1112 Self: Sized + NoTick,
1113 {
1114 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1115 tokio::time::interval(interval)
1116 )))
1117 }
1118
1119 fn source_interval_delayed(
1130 &self,
1131 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1132 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1133 _nondet: NonDet,
1134 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1135 where
1136 Self: Sized + NoTick,
1137 {
1138 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1139 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1140 )))
1141 }
1142
1143 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1177 where
1178 S: CycleCollection<'a, ForwardRef, Location = Self>,
1179 {
1180 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1181 (
1182 ForwardHandle::new(cycle_id, Location::id(self)),
1183 S::create_source(cycle_id, self.clone()),
1184 )
1185 }
1186}
1187
1188#[cfg(feature = "deploy")]
1189#[cfg(test)]
1190mod tests {
1191 use std::collections::HashSet;
1192
1193 use futures::{SinkExt, StreamExt};
1194 use hydro_deploy::Deployment;
1195 use stageleft::q;
1196 use tokio_util::codec::LengthDelimitedCodec;
1197
1198 use crate::compile::builder::FlowBuilder;
1199 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1200 use crate::location::{Location, NetworkHint};
1201 use crate::nondet::nondet;
1202
1203 #[tokio::test]
1204 async fn top_level_singleton_replay_cardinality() {
1205 let mut deployment = Deployment::new();
1206
1207 let mut flow = FlowBuilder::new();
1208 let node = flow.process::<()>();
1209 let external = flow.external::<()>();
1210
1211 let (in_port, input) =
1212 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1213 let singleton = node.singleton(q!(123));
1214 let tick = node.tick();
1215 let out = input
1216 .batch(&tick, nondet!())
1217 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1218 .cross_singleton(
1219 singleton
1220 .snapshot(&tick, nondet!())
1221 .into_stream()
1222 .count(),
1223 )
1224 .all_ticks()
1225 .send_bincode_external(&external);
1226
1227 let nodes = flow
1228 .with_process(&node, deployment.Localhost())
1229 .with_external(&external, deployment.Localhost())
1230 .deploy(&mut deployment);
1231
1232 deployment.deploy().await.unwrap();
1233
1234 let mut external_in = nodes.connect(in_port).await;
1235 let mut external_out = nodes.connect(out).await;
1236
1237 deployment.start().await.unwrap();
1238
1239 external_in.send(1).await.unwrap();
1240 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1241
1242 external_in.send(2).await.unwrap();
1243 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1244 }
1245
1246 #[tokio::test]
1247 async fn tick_singleton_replay_cardinality() {
1248 let mut deployment = Deployment::new();
1249
1250 let mut flow = FlowBuilder::new();
1251 let node = flow.process::<()>();
1252 let external = flow.external::<()>();
1253
1254 let (in_port, input) =
1255 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1256 let tick = node.tick();
1257 let singleton = tick.singleton(q!(123));
1258 let out = input
1259 .batch(&tick, nondet!())
1260 .cross_singleton(singleton.clone())
1261 .cross_singleton(singleton.into_stream().count())
1262 .all_ticks()
1263 .send_bincode_external(&external);
1264
1265 let nodes = flow
1266 .with_process(&node, deployment.Localhost())
1267 .with_external(&external, deployment.Localhost())
1268 .deploy(&mut deployment);
1269
1270 deployment.deploy().await.unwrap();
1271
1272 let mut external_in = nodes.connect(in_port).await;
1273 let mut external_out = nodes.connect(out).await;
1274
1275 deployment.start().await.unwrap();
1276
1277 external_in.send(1).await.unwrap();
1278 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1279
1280 external_in.send(2).await.unwrap();
1281 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1282 }
1283
1284 #[tokio::test]
1285 async fn external_bytes() {
1286 let mut deployment = Deployment::new();
1287
1288 let mut flow = FlowBuilder::new();
1289 let first_node = flow.process::<()>();
1290 let external = flow.external::<()>();
1291
1292 let (in_port, input) = first_node.source_external_bytes(&external);
1293 let out = input.send_bincode_external(&external);
1294
1295 let nodes = flow
1296 .with_process(&first_node, deployment.Localhost())
1297 .with_external(&external, deployment.Localhost())
1298 .deploy(&mut deployment);
1299
1300 deployment.deploy().await.unwrap();
1301
1302 let mut external_in = nodes.connect(in_port).await.1;
1303 let mut external_out = nodes.connect(out).await;
1304
1305 deployment.start().await.unwrap();
1306
1307 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1308
1309 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1310 }
1311
1312 #[tokio::test]
1313 async fn multi_external_source() {
1314 let mut deployment = Deployment::new();
1315
1316 let mut flow = FlowBuilder::new();
1317 let first_node = flow.process::<()>();
1318 let external = flow.external::<()>();
1319
1320 let (in_port, input, _membership, complete_sink) =
1321 first_node.bidi_external_many_bincode(&external);
1322 let out = input.entries().send_bincode_external(&external);
1323 complete_sink.complete(
1324 first_node
1325 .source_iter::<(u64, ()), _>(q!([]))
1326 .into_keyed()
1327 .weaken_ordering(),
1328 );
1329
1330 let nodes = flow
1331 .with_process(&first_node, deployment.Localhost())
1332 .with_external(&external, deployment.Localhost())
1333 .deploy(&mut deployment);
1334
1335 deployment.deploy().await.unwrap();
1336
1337 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1338 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1339 let external_out = nodes.connect(out).await;
1340
1341 deployment.start().await.unwrap();
1342
1343 external_in_1.send(123).await.unwrap();
1344 external_in_2.send(456).await.unwrap();
1345
1346 assert_eq!(
1347 external_out.take(2).collect::<HashSet<_>>().await,
1348 vec![(0, 123), (1, 456)].into_iter().collect()
1349 );
1350 }
1351
1352 #[tokio::test]
1353 async fn second_connection_only_multi_source() {
1354 let mut deployment = Deployment::new();
1355
1356 let mut flow = FlowBuilder::new();
1357 let first_node = flow.process::<()>();
1358 let external = flow.external::<()>();
1359
1360 let (in_port, input, _membership, complete_sink) =
1361 first_node.bidi_external_many_bincode(&external);
1362 let out = input.entries().send_bincode_external(&external);
1363 complete_sink.complete(
1364 first_node
1365 .source_iter::<(u64, ()), _>(q!([]))
1366 .into_keyed()
1367 .weaken_ordering(),
1368 );
1369
1370 let nodes = flow
1371 .with_process(&first_node, deployment.Localhost())
1372 .with_external(&external, deployment.Localhost())
1373 .deploy(&mut deployment);
1374
1375 deployment.deploy().await.unwrap();
1376
1377 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1379 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1380 let mut external_out = nodes.connect(out).await;
1381
1382 deployment.start().await.unwrap();
1383
1384 external_in_2.send(456).await.unwrap();
1385
1386 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1387 }
1388
1389 #[tokio::test]
1390 async fn multi_external_bytes() {
1391 let mut deployment = Deployment::new();
1392
1393 let mut flow = FlowBuilder::new();
1394 let first_node = flow.process::<()>();
1395 let external = flow.external::<()>();
1396
1397 let (in_port, input, _membership, complete_sink) = first_node
1398 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1399 let out = input.entries().send_bincode_external(&external);
1400 complete_sink.complete(
1401 first_node
1402 .source_iter(q!([]))
1403 .into_keyed()
1404 .weaken_ordering(),
1405 );
1406
1407 let nodes = flow
1408 .with_process(&first_node, deployment.Localhost())
1409 .with_external(&external, deployment.Localhost())
1410 .deploy(&mut deployment);
1411
1412 deployment.deploy().await.unwrap();
1413
1414 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1415 let mut external_in_2 = nodes.connect(in_port).await.1;
1416 let external_out = nodes.connect(out).await;
1417
1418 deployment.start().await.unwrap();
1419
1420 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1421 external_in_2.send(vec![4, 5].into()).await.unwrap();
1422
1423 assert_eq!(
1424 external_out.take(2).collect::<HashSet<_>>().await,
1425 vec![
1426 (0, (&[1u8, 2, 3] as &[u8]).into()),
1427 (1, (&[4u8, 5] as &[u8]).into())
1428 ]
1429 .into_iter()
1430 .collect()
1431 );
1432 }
1433
1434 #[tokio::test]
1435 async fn single_client_external_bytes() {
1436 let mut deployment = Deployment::new();
1437 let mut flow = FlowBuilder::new();
1438 let first_node = flow.process::<()>();
1439 let external = flow.external::<()>();
1440 let (port, input, complete_sink) = first_node
1441 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1442 complete_sink.complete(input.map(q!(|data| {
1443 let mut resp: Vec<u8> = data.into();
1444 resp.push(42);
1445 resp.into() })));
1447
1448 let nodes = flow
1449 .with_process(&first_node, deployment.Localhost())
1450 .with_external(&external, deployment.Localhost())
1451 .deploy(&mut deployment);
1452
1453 deployment.deploy().await.unwrap();
1454 deployment.start().await.unwrap();
1455
1456 let (mut external_out, mut external_in) = nodes.connect(port).await;
1457
1458 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1459 assert_eq!(
1460 external_out.next().await.unwrap().unwrap(),
1461 vec![1, 2, 3, 42]
1462 );
1463 }
1464
1465 #[tokio::test]
1466 async fn echo_external_bytes() {
1467 let mut deployment = Deployment::new();
1468
1469 let mut flow = FlowBuilder::new();
1470 let first_node = flow.process::<()>();
1471 let external = flow.external::<()>();
1472
1473 let (port, input, _membership, complete_sink) = first_node
1474 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1475 complete_sink
1476 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1477
1478 let nodes = flow
1479 .with_process(&first_node, deployment.Localhost())
1480 .with_external(&external, deployment.Localhost())
1481 .deploy(&mut deployment);
1482
1483 deployment.deploy().await.unwrap();
1484
1485 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1486 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1487
1488 deployment.start().await.unwrap();
1489
1490 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1491 external_in_2.send(vec![4, 5].into()).await.unwrap();
1492
1493 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1494 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1495 }
1496
1497 #[tokio::test]
1498 async fn echo_external_bincode() {
1499 let mut deployment = Deployment::new();
1500
1501 let mut flow = FlowBuilder::new();
1502 let first_node = flow.process::<()>();
1503 let external = flow.external::<()>();
1504
1505 let (port, input, _membership, complete_sink) =
1506 first_node.bidi_external_many_bincode(&external);
1507 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1508
1509 let nodes = flow
1510 .with_process(&first_node, deployment.Localhost())
1511 .with_external(&external, deployment.Localhost())
1512 .deploy(&mut deployment);
1513
1514 deployment.deploy().await.unwrap();
1515
1516 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1517 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1518
1519 deployment.start().await.unwrap();
1520
1521 external_in_1.send("hi".to_owned()).await.unwrap();
1522 external_in_2.send("hello".to_owned()).await.unwrap();
1523
1524 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1525 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1526 }
1527
1528 #[tokio::test]
1529 async fn closure_location_name() {
1530 let mut deployment = Deployment::new();
1531 let mut flow = FlowBuilder::new();
1532
1533 enum ClosureProcess {}
1534
1535 let node = flow.process::<ClosureProcess>();
1536 let external = flow.external::<()>();
1537
1538 let (in_port, input) =
1539 node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1540 let out = input.send_bincode_external(&external);
1541
1542 let nodes = flow
1543 .with_process(&node, deployment.Localhost())
1544 .with_external(&external, deployment.Localhost())
1545 .deploy(&mut deployment);
1546
1547 deployment.deploy().await.unwrap();
1548
1549 let mut external_in = nodes.connect(in_port).await;
1550 let mut external_out = nodes.connect(out).await;
1551
1552 deployment.start().await.unwrap();
1553
1554 external_in.send(42).await.unwrap();
1555 assert_eq!(external_out.next().await.unwrap(), 42);
1556 }
1557}