1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16#[cfg(feature = "sim")]
17#[cfg(stageleft_runtime)]
18use crate::sim::flow::SimFlow;
19use crate::staging_util::Invariant;
20
21#[stageleft::export(ExternalPortId, CycleId, ClockId)]
22crate::newtype_counter! {
23 pub struct ExternalPortId(usize);
25
26 pub struct CycleId(usize);
28
29 pub struct ClockId(usize);
31}
32
33impl CycleId {
34 #[cfg(feature = "build")]
35 pub(crate) fn as_ident(&self) -> syn::Ident {
36 syn::Ident::new(&format!("cycle_{}", self), proc_macro2::Span::call_site())
37 }
38}
39
40pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
41
42pub(crate) struct FlowStateInner {
43 roots: Option<Vec<HydroRoot>>,
47
48 next_external_port: ExternalPortId,
50
51 next_cycle_id: CycleId,
53
54 next_clock_id: ClockId,
56}
57
58impl FlowStateInner {
59 pub fn next_external_port(&mut self) -> ExternalPortId {
60 self.next_external_port.get_and_increment()
61 }
62
63 pub fn next_cycle_id(&mut self) -> CycleId {
64 self.next_cycle_id.get_and_increment()
65 }
66
67 pub fn next_clock_id(&mut self) -> ClockId {
68 self.next_clock_id.get_and_increment()
69 }
70
71 pub fn push_root(&mut self, root: HydroRoot) {
72 self.roots
73 .as_mut()
74 .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
75 .push(root);
76 }
77
78 pub fn try_push_root(&mut self, root: HydroRoot) {
79 if let Some(roots) = self.roots.as_mut() {
80 roots.push(root);
81 }
82 }
83}
84
85pub struct FlowBuilder<'a> {
86 flow_state: FlowState,
88
89 locations: SlotMap<LocationKey, LocationType>,
91 location_names: SecondaryMap<LocationKey, String>,
93
94 #[cfg_attr(
96 not(feature = "build"),
97 expect(dead_code, reason = "unused without build")
98 )]
99 flow_name: String,
100
101 finalized: bool,
104
105 _phantom: Invariant<'a>,
110}
111
112impl Drop for FlowBuilder<'_> {
113 fn drop(&mut self) {
114 if !self.finalized && !std::thread::panicking() {
115 panic!(
116 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
117 );
118 }
119 }
120}
121
122#[expect(missing_docs, reason = "TODO")]
123impl<'a> FlowBuilder<'a> {
124 #[expect(
126 clippy::new_without_default,
127 reason = "call `new` explicitly, not `default`"
128 )]
129 pub fn new() -> Self {
130 let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
131 if let Ok(bin_path) = std::env::current_exe()
132 && let Some(bin_name) = bin_path.file_stem()
133 {
134 name = format!("{}/{}", name, bin_name.display());
135 }
136 Self::with_name(name)
137 }
138
139 pub fn with_name(name: impl Into<String>) -> Self {
141 Self {
142 flow_state: Rc::new(RefCell::new(FlowStateInner {
143 roots: Some(vec![]),
144 next_external_port: ExternalPortId::default(),
145 next_cycle_id: CycleId::default(),
146 next_clock_id: ClockId::default(),
147 })),
148 locations: SlotMap::with_key(),
149 location_names: SecondaryMap::new(),
150 flow_name: name.into(),
151 finalized: false,
152 _phantom: PhantomData,
153 }
154 }
155
156 pub(crate) fn flow_state(&self) -> &FlowState {
157 &self.flow_state
158 }
159
160 pub fn process<P>(&mut self) -> Process<'a, P> {
161 let key = self.locations.insert(LocationType::Process);
162 self.location_names.insert(key, type_name::<P>().to_owned());
163 Process {
164 key,
165 flow_state: self.flow_state().clone(),
166 _phantom: PhantomData,
167 }
168 }
169
170 pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
171 let key = self.locations.insert(LocationType::Cluster);
172 self.location_names.insert(key, type_name::<C>().to_owned());
173 Cluster {
174 key,
175 flow_state: self.flow_state().clone(),
176 _phantom: PhantomData,
177 }
178 }
179
180 pub fn external<E>(&mut self) -> External<'a, E> {
181 let key = self.locations.insert(LocationType::External);
182 self.location_names.insert(key, type_name::<E>().to_owned());
183 External {
184 key,
185 flow_state: self.flow_state().clone(),
186 _phantom: PhantomData,
187 }
188 }
189}
190
191#[cfg(feature = "build")]
192#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
193#[expect(missing_docs, reason = "TODO")]
194impl<'a> FlowBuilder<'a> {
195 pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
196 self.finalized = true;
197
198 let mut ir = self.flow_state.borrow_mut().roots.take().unwrap();
199 super::ir::unify_atomic_ticks(&mut ir);
200
201 super::built::BuiltFlow {
202 ir,
203 locations: std::mem::take(&mut self.locations),
204 location_names: std::mem::take(&mut self.location_names),
205 flow_name: std::mem::take(&mut self.flow_name),
206 _phantom: PhantomData,
207 }
208 }
209
210 pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
211 self.finalize().with_default_optimize()
212 }
213
214 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
215 self.finalize().optimize_with(f)
216 }
217
218 pub fn with_process<P, D: Deploy<'a>>(
219 self,
220 process: &Process<P>,
221 spec: impl IntoProcessSpec<'a, D>,
222 ) -> DeployFlow<'a, D> {
223 self.with_default_optimize().with_process(process, spec)
224 }
225
226 pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
227 self,
228 spec: impl Fn() -> S,
229 ) -> DeployFlow<'a, D> {
230 self.with_default_optimize().with_remaining_processes(spec)
231 }
232
233 pub fn with_external<P, D: Deploy<'a>>(
234 self,
235 process: &External<P>,
236 spec: impl ExternalSpec<'a, D>,
237 ) -> DeployFlow<'a, D> {
238 self.with_default_optimize().with_external(process, spec)
239 }
240
241 pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
242 self,
243 spec: impl Fn() -> S,
244 ) -> DeployFlow<'a, D> {
245 self.with_default_optimize().with_remaining_externals(spec)
246 }
247
248 pub fn with_cluster<C, D: Deploy<'a>>(
249 self,
250 cluster: &Cluster<C>,
251 spec: impl ClusterSpec<'a, D>,
252 ) -> DeployFlow<'a, D> {
253 self.with_default_optimize().with_cluster(cluster, spec)
254 }
255
256 pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
257 self,
258 spec: impl Fn() -> S,
259 ) -> DeployFlow<'a, D> {
260 self.with_default_optimize().with_remaining_clusters(spec)
261 }
262
263 pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
264 self.with_default_optimize::<D>().compile()
265 }
266
267 pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
268 self.with_default_optimize().deploy(env)
269 }
270
271 #[cfg(feature = "sim")]
272 pub fn sim(self) -> SimFlow<'a> {
275 self.finalize().sim()
276 }
277
278 pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
279 FlowBuilder {
280 flow_state: Rc::new(RefCell::new(FlowStateInner {
281 roots: None,
282 next_external_port: ExternalPortId::default(),
283 next_cycle_id: CycleId::default(),
284 next_clock_id: ClockId::default(),
285 })),
286 locations: built.locations.clone(),
287 location_names: built.location_names.clone(),
288 flow_name: built.flow_name.clone(),
289 finalized: false,
290 _phantom: PhantomData,
291 }
292 }
293
294 #[doc(hidden)] pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
296 self.flow_state.borrow_mut().roots = Some(roots);
297 }
298}