1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::compile::ir::{DebugExpr, HydroNode, HydroRoot, HydroSource};
12
13#[derive(Debug, Clone)]
15pub enum NodeLabel {
16 Static(String),
18 WithExprs {
20 op_name: String,
21 exprs: Vec<DebugExpr>,
22 },
23}
24
25impl NodeLabel {
26 pub fn static_label(s: String) -> Self {
28 Self::Static(s)
29 }
30
31 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
33 Self::WithExprs { op_name, exprs }
34 }
35}
36
37impl std::fmt::Display for NodeLabel {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::Static(s) => write!(f, "{}", s),
41 Self::WithExprs { op_name, exprs } => {
42 if exprs.is_empty() {
43 write!(f, "{}()", op_name)
44 } else {
45 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
46 write!(f, "{}({})", op_name, expr_strs.join(", "))
47 }
48 }
49 }
50 }
51}
52
53pub struct IndentedGraphWriter<W> {
56 pub write: W,
57 pub indent: usize,
58 pub config: HydroWriteConfig,
59}
60
61impl<W> IndentedGraphWriter<W> {
62 pub fn new(write: W) -> Self {
64 Self {
65 write,
66 indent: 0,
67 config: HydroWriteConfig::default(),
68 }
69 }
70
71 pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
73 Self {
74 write,
75 indent: 0,
76 config: config.clone(),
77 }
78 }
79}
80
81impl<W: Write> IndentedGraphWriter<W> {
82 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
84 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
85 }
86}
87
88pub type GraphWriteError = std::fmt::Error;
90
91#[auto_impl(&mut, Box)]
93pub trait HydroGraphWrite {
94 type Err: Error;
96
97 fn write_prologue(&mut self) -> Result<(), Self::Err>;
99
100 fn write_node_definition(
102 &mut self,
103 node_id: usize,
104 node_label: &NodeLabel,
105 node_type: HydroNodeType,
106 location_id: Option<usize>,
107 location_type: Option<&str>,
108 ) -> Result<(), Self::Err>;
109
110 fn write_edge(
112 &mut self,
113 src_id: usize,
114 dst_id: usize,
115 edge_type: HydroEdgeType,
116 label: Option<&str>,
117 ) -> Result<(), Self::Err>;
118
119 fn write_location_start(
121 &mut self,
122 location_id: usize,
123 location_type: &str,
124 ) -> Result<(), Self::Err>;
125
126 fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
128
129 fn write_location_end(&mut self) -> Result<(), Self::Err>;
131
132 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
134}
135
136#[derive(Debug, Clone, Copy)]
138pub enum HydroNodeType {
139 Source,
140 Transform,
141 Join,
142 Aggregation,
143 Network,
144 Sink,
145 Tee,
146}
147
148#[derive(Debug, Clone, Copy)]
150pub enum HydroEdgeType {
151 Stream,
152 Persistent,
153 Network,
154 Cycle,
155}
156
157#[derive(Debug, Clone)]
159pub struct HydroWriteConfig {
160 pub show_metadata: bool,
161 pub show_location_groups: bool,
162 pub use_short_labels: bool,
163 pub process_id_name: Vec<(usize, String)>,
164 pub cluster_id_name: Vec<(usize, String)>,
165 pub external_id_name: Vec<(usize, String)>,
166}
167
168impl Default for HydroWriteConfig {
169 fn default() -> Self {
170 Self {
171 show_metadata: false,
172 show_location_groups: true,
173 use_short_labels: true, process_id_name: vec![],
175 cluster_id_name: vec![],
176 external_id_name: vec![],
177 }
178 }
179}
180
181#[derive(Debug, Default)]
183pub struct HydroGraphStructure {
184 pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, pub locations: HashMap<usize, String>, pub next_node_id: usize,
188}
189
190impl HydroGraphStructure {
191 pub fn new() -> Self {
192 Self::default()
193 }
194
195 pub fn add_node(
196 &mut self,
197 label: NodeLabel,
198 node_type: HydroNodeType,
199 location: Option<usize>,
200 ) -> usize {
201 let node_id = self.next_node_id;
202 self.next_node_id += 1;
203 self.nodes.insert(node_id, (label, node_type, location));
204 node_id
205 }
206
207 pub fn add_edge(
208 &mut self,
209 src: usize,
210 dst: usize,
211 edge_type: HydroEdgeType,
212 label: Option<String>,
213 ) {
214 self.edges.push((src, dst, edge_type, label));
215 }
216
217 pub fn add_location(&mut self, location_id: usize, location_type: String) {
218 self.locations.insert(location_id, location_type);
219 }
220}
221
222pub fn extract_op_name(full_label: String) -> String {
224 full_label
225 .split('(')
226 .next()
227 .unwrap_or("unknown")
228 .to_string()
229 .to_lowercase()
230}
231
232pub fn extract_short_label(full_label: &str) -> String {
234 if let Some(op_name) = full_label.split('(').next() {
236 let base_name = op_name.to_lowercase();
237 match base_name.as_str() {
238 "source" => {
240 if full_label.contains("Iter") {
241 "source_iter".to_string()
242 } else if full_label.contains("Stream") {
243 "source_stream".to_string()
244 } else if full_label.contains("ExternalNetwork") {
245 "external_network".to_string()
246 } else if full_label.contains("Spin") {
247 "spin".to_string()
248 } else {
249 "source".to_string()
250 }
251 }
252 "network" => {
253 if full_label.contains("deser") {
254 "network(recv)".to_string()
255 } else if full_label.contains("ser") {
256 "network(send)".to_string()
257 } else {
258 "network".to_string()
259 }
260 }
261 _ => base_name,
263 }
264 } else {
265 if full_label.len() > 20 {
267 format!("{}...", &full_label[..17])
268 } else {
269 full_label.to_string()
270 }
271 }
272}
273
274fn extract_location_id(
276 metadata: &crate::compile::ir::HydroIrMetadata,
277) -> (Option<usize>, Option<String>) {
278 use crate::location::dynamic::LocationId;
279 match &metadata.location_kind {
280 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
281 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
282 LocationId::Tick(_, inner) => match inner.as_ref() {
283 LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
284 LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
285 _ => (None, None),
286 },
287 }
288}
289
290fn setup_location(
292 structure: &mut HydroGraphStructure,
293 metadata: &crate::compile::ir::HydroIrMetadata,
294) -> Option<usize> {
295 let (location_id, location_type) = extract_location_id(metadata);
296 if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
297 structure.add_location(loc_id, loc_type);
298 }
299 location_id
300}
301
302impl HydroRoot {
303 pub fn write_graph<W>(
305 &self,
306 mut graph_write: W,
307 config: &HydroWriteConfig,
308 ) -> Result<(), W::Err>
309 where
310 W: HydroGraphWrite,
311 {
312 let mut structure = HydroGraphStructure::new();
313 let mut seen_tees = HashMap::new();
314
315 let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
317
318 graph_write.write_prologue()?;
320
321 for (&node_id, (label, node_type, location)) in &structure.nodes {
323 let (location_id, location_type) = if let Some(loc_id) = location {
324 (
325 Some(*loc_id),
326 structure.locations.get(loc_id).map(|s| s.as_str()),
327 )
328 } else {
329 (None, None)
330 };
331
332 graph_write.write_node_definition(
335 node_id,
336 label,
337 *node_type,
338 location_id,
339 location_type,
340 )?;
341 }
342
343 if config.show_location_groups {
345 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
346 for (&node_id, (_, _, location)) in &structure.nodes {
347 if let Some(location_id) = location {
348 nodes_by_location
349 .entry(*location_id)
350 .or_default()
351 .push(node_id);
352 }
353 }
354
355 for (&location_id, node_ids) in &nodes_by_location {
356 if let Some(location_type) = structure.locations.get(&location_id) {
357 graph_write.write_location_start(location_id, location_type)?;
358 for &node_id in node_ids {
359 graph_write.write_node(node_id)?;
360 }
361 graph_write.write_location_end()?;
362 }
363 }
364 }
365
366 for (src_id, dst_id, edge_type, label) in &structure.edges {
368 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
369 }
370
371 graph_write.write_epilogue()?;
372 Ok(())
373 }
374
375 pub fn build_graph_structure(
377 &self,
378 structure: &mut HydroGraphStructure,
379 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
380 config: &HydroWriteConfig,
381 ) -> usize {
382 fn build_sink_node(
384 structure: &mut HydroGraphStructure,
385 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
386 config: &HydroWriteConfig,
387 input: &HydroNode,
388 metadata: Option<&crate::compile::ir::HydroIrMetadata>,
389 label: NodeLabel,
390 edge_type: HydroEdgeType,
391 ) -> usize {
392 let input_id = input.build_graph_structure(structure, seen_tees, config);
393 let location_id = metadata.and_then(|m| setup_location(structure, m));
394 let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
395 structure.add_edge(input_id, sink_id, edge_type, None);
396 sink_id
397 }
398
399 match self {
400 HydroRoot::ForEach { f, input, .. } => build_sink_node(
402 structure,
403 seen_tees,
404 config,
405 input,
406 None,
407 NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
408 HydroEdgeType::Stream,
409 ),
410
411 HydroRoot::SendExternal {
412 to_external_id,
413 to_key,
414 input,
415 ..
416 } => build_sink_node(
417 structure,
418 seen_tees,
419 config,
420 input,
421 None,
422 NodeLabel::with_exprs(
423 format!("send_external({}:{})", to_external_id, to_key),
424 vec![],
425 ),
426 HydroEdgeType::Stream,
427 ),
428
429 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
430 structure,
431 seen_tees,
432 config,
433 input,
434 None,
435 NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
436 HydroEdgeType::Stream,
437 ),
438
439 HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
441 structure,
442 seen_tees,
443 config,
444 input,
445 None,
446 NodeLabel::static_label(format!("cycle_sink({})", ident)),
447 HydroEdgeType::Cycle,
448 ),
449 }
450 }
451}
452
453impl HydroNode {
454 pub fn build_graph_structure(
456 &self,
457 structure: &mut HydroGraphStructure,
458 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
459 config: &HydroWriteConfig,
460 ) -> usize {
461 use crate::location::dynamic::LocationId;
462
463 struct TransformParams<'a> {
467 structure: &'a mut HydroGraphStructure,
468 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
469 config: &'a HydroWriteConfig,
470 input: &'a HydroNode,
471 metadata: &'a crate::compile::ir::HydroIrMetadata,
472 op_name: String,
473 node_type: HydroNodeType,
474 edge_type: HydroEdgeType,
475 }
476
477 fn build_simple_transform(params: TransformParams) -> usize {
479 let input_id = params.input.build_graph_structure(
480 params.structure,
481 params.seen_tees,
482 params.config,
483 );
484 let location_id = setup_location(params.structure, params.metadata);
485 let node_id = params.structure.add_node(
486 NodeLabel::Static(params.op_name.to_string()),
487 params.node_type,
488 location_id,
489 );
490 params
491 .structure
492 .add_edge(input_id, node_id, params.edge_type, None);
493 node_id
494 }
495
496 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
498 let input_id = params.input.build_graph_structure(
499 params.structure,
500 params.seen_tees,
501 params.config,
502 );
503 let location_id = setup_location(params.structure, params.metadata);
504 let node_id = params.structure.add_node(
505 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
506 params.node_type,
507 location_id,
508 );
509 params
510 .structure
511 .add_edge(input_id, node_id, params.edge_type, None);
512 node_id
513 }
514
515 fn build_dual_expr_transform(
517 params: TransformParams,
518 expr1: &DebugExpr,
519 expr2: &DebugExpr,
520 ) -> usize {
521 let input_id = params.input.build_graph_structure(
522 params.structure,
523 params.seen_tees,
524 params.config,
525 );
526 let location_id = setup_location(params.structure, params.metadata);
527 let node_id = params.structure.add_node(
528 NodeLabel::with_exprs(
529 params.op_name.to_string(),
530 vec![expr1.clone(), expr2.clone()],
531 ),
532 params.node_type,
533 location_id,
534 );
535 params
536 .structure
537 .add_edge(input_id, node_id, params.edge_type, None);
538 node_id
539 }
540
541 fn build_source_node(
543 structure: &mut HydroGraphStructure,
544 metadata: &crate::compile::ir::HydroIrMetadata,
545 label: String,
546 ) -> usize {
547 let location_id = setup_location(structure, metadata);
548 structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
549 }
550
551 match self {
552 HydroNode::Placeholder => structure.add_node(
553 NodeLabel::Static("PLACEHOLDER".to_string()),
554 HydroNodeType::Transform,
555 None,
556 ),
557
558 HydroNode::Source {
559 source, metadata, ..
560 } => {
561 let label = match source {
562 HydroSource::Stream(expr) => format!("source_stream({})", expr),
563 HydroSource::ExternalNetwork() => "external_network()".to_string(),
564 HydroSource::Iter(expr) => format!("source_iter({})", expr),
565 HydroSource::Spin() => "spin()".to_string(),
566 };
567 build_source_node(structure, metadata, label)
568 }
569
570 HydroNode::ExternalInput {
571 from_external_id,
572 from_key,
573 metadata,
574 ..
575 } => build_source_node(
576 structure,
577 metadata,
578 format!("external_input({}:{})", from_external_id, from_key),
579 ),
580
581 HydroNode::CycleSource {
582 ident, metadata, ..
583 } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
584
585 HydroNode::Tee { inner, metadata } => {
586 let ptr = inner.as_ptr();
587 if let Some(&existing_id) = seen_tees.get(&ptr) {
588 return existing_id;
589 }
590
591 let input_id = inner
592 .0
593 .borrow()
594 .build_graph_structure(structure, seen_tees, config);
595 let location_id = setup_location(structure, metadata);
596
597 let tee_id = structure.add_node(
598 NodeLabel::Static(extract_op_name(self.print_root())),
599 HydroNodeType::Tee,
600 location_id,
601 );
602
603 seen_tees.insert(ptr, tee_id);
604
605 structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
606
607 tee_id
608 }
609
610 HydroNode::Delta { inner, metadata }
612 | HydroNode::DeferTick {
613 input: inner,
614 metadata,
615 }
616 | HydroNode::Enumerate {
617 input: inner,
618 metadata,
619 ..
620 }
621 | HydroNode::Unique {
622 input: inner,
623 metadata,
624 }
625 | HydroNode::ResolveFutures {
626 input: inner,
627 metadata,
628 }
629 | HydroNode::ResolveFuturesOrdered {
630 input: inner,
631 metadata,
632 } => build_simple_transform(TransformParams {
633 structure,
634 seen_tees,
635 config,
636 input: inner,
637 metadata,
638 op_name: extract_op_name(self.print_root()),
639 node_type: HydroNodeType::Transform,
640 edge_type: HydroEdgeType::Stream,
641 }),
642
643 HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
645 structure,
646 seen_tees,
647 config,
648 input: inner,
649 metadata,
650 op_name: extract_op_name(self.print_root()),
651 node_type: HydroNodeType::Transform,
652 edge_type: HydroEdgeType::Persistent,
653 }),
654
655 HydroNode::Sort {
657 input: inner,
658 metadata,
659 } => build_simple_transform(TransformParams {
660 structure,
661 seen_tees,
662 config,
663 input: inner,
664 metadata,
665 op_name: extract_op_name(self.print_root()),
666 node_type: HydroNodeType::Aggregation,
667 edge_type: HydroEdgeType::Stream,
668 }),
669
670 HydroNode::Map { f, input, metadata }
672 | HydroNode::Filter { f, input, metadata }
673 | HydroNode::FlatMap { f, input, metadata }
674 | HydroNode::FilterMap { f, input, metadata }
675 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
676 TransformParams {
677 structure,
678 seen_tees,
679 config,
680 input,
681 metadata,
682 op_name: extract_op_name(self.print_root()),
683 node_type: HydroNodeType::Transform,
684 edge_type: HydroEdgeType::Stream,
685 },
686 f,
687 ),
688
689 HydroNode::Reduce { f, input, metadata }
691 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
692 TransformParams {
693 structure,
694 seen_tees,
695 config,
696 input,
697 metadata,
698 op_name: extract_op_name(self.print_root()),
699 node_type: HydroNodeType::Aggregation,
700 edge_type: HydroEdgeType::Stream,
701 },
702 f,
703 ),
704
705 HydroNode::Join {
707 left,
708 right,
709 metadata,
710 }
711 | HydroNode::CrossProduct {
712 left,
713 right,
714 metadata,
715 }
716 | HydroNode::CrossSingleton {
717 left,
718 right,
719 metadata,
720 } => {
721 let left_id = left.build_graph_structure(structure, seen_tees, config);
722 let right_id = right.build_graph_structure(structure, seen_tees, config);
723 let location_id = setup_location(structure, metadata);
724 let node_id = structure.add_node(
725 NodeLabel::Static(extract_op_name(self.print_root())),
726 HydroNodeType::Join,
727 location_id,
728 );
729 structure.add_edge(
730 left_id,
731 node_id,
732 HydroEdgeType::Stream,
733 Some("left".to_string()),
734 );
735 structure.add_edge(
736 right_id,
737 node_id,
738 HydroEdgeType::Stream,
739 Some("right".to_string()),
740 );
741 node_id
742 }
743
744 HydroNode::Difference {
746 pos: left,
747 neg: right,
748 metadata,
749 }
750 | HydroNode::AntiJoin {
751 pos: left,
752 neg: right,
753 metadata,
754 } => {
755 let left_id = left.build_graph_structure(structure, seen_tees, config);
756 let right_id = right.build_graph_structure(structure, seen_tees, config);
757 let location_id = setup_location(structure, metadata);
758 let node_id = structure.add_node(
759 NodeLabel::Static(extract_op_name(self.print_root())),
760 HydroNodeType::Join,
761 location_id,
762 );
763 structure.add_edge(
764 left_id,
765 node_id,
766 HydroEdgeType::Stream,
767 Some("pos".to_string()),
768 );
769 structure.add_edge(
770 right_id,
771 node_id,
772 HydroEdgeType::Stream,
773 Some("neg".to_string()),
774 );
775 node_id
776 }
777
778 HydroNode::Fold {
780 init,
781 acc,
782 input,
783 metadata,
784 }
785 | HydroNode::FoldKeyed {
786 init,
787 acc,
788 input,
789 metadata,
790 }
791 | HydroNode::Scan {
792 init,
793 acc,
794 input,
795 metadata,
796 } => {
797 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
800 TransformParams {
801 structure,
802 seen_tees,
803 config,
804 input,
805 metadata,
806 op_name: extract_op_name(self.print_root()),
807 node_type,
808 edge_type: HydroEdgeType::Stream,
809 },
810 init,
811 acc,
812 )
813 }
814
815 HydroNode::ReduceKeyedWatermark {
817 f,
818 input,
819 watermark,
820 metadata,
821 } => {
822 let input_id = input.build_graph_structure(structure, seen_tees, config);
823 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
824 let location_id = setup_location(structure, metadata);
825 let join_node_id = structure.add_node(
826 NodeLabel::Static(extract_op_name(self.print_root())),
827 HydroNodeType::Join,
828 location_id,
829 );
830 structure.add_edge(
831 input_id,
832 join_node_id,
833 HydroEdgeType::Stream,
834 Some("input".to_string()),
835 );
836 structure.add_edge(
837 watermark_id,
838 join_node_id,
839 HydroEdgeType::Stream,
840 Some("watermark".to_string()),
841 );
842
843 let node_id = structure.add_node(
844 NodeLabel::with_exprs(
845 extract_op_name(self.print_root()).to_string(),
846 vec![f.clone()],
847 ),
848 HydroNodeType::Aggregation,
849 location_id,
850 );
851 structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
852 node_id
853 }
854
855 HydroNode::Network {
856 serialize_fn,
857 deserialize_fn,
858 input,
859 metadata,
860 ..
861 } => {
862 let input_id = input.build_graph_structure(structure, seen_tees, config);
863 let _from_location_id = setup_location(structure, metadata);
864
865 let to_location_id = match metadata.location_kind.root() {
866 LocationId::Process(id) => {
867 structure.add_location(*id, "Process".to_string());
868 Some(*id)
869 }
870 LocationId::Cluster(id) => {
871 structure.add_location(*id, "Cluster".to_string());
872 Some(*id)
873 }
874 _ => None,
875 };
876
877 let mut label = "network(".to_string();
878 if serialize_fn.is_some() {
879 label.push_str("ser");
880 }
881 if deserialize_fn.is_some() {
882 if serialize_fn.is_some() {
883 label.push_str(" + ");
884 }
885 label.push_str("deser");
886 }
887 label.push(')');
888
889 let network_id = structure.add_node(
890 NodeLabel::Static(label),
891 HydroNodeType::Network,
892 to_location_id,
893 );
894 structure.add_edge(
895 input_id,
896 network_id,
897 HydroEdgeType::Network,
898 Some(format!("to {:?}", to_location_id)),
899 );
900 network_id
901 }
902
903 HydroNode::Unpersist { inner, .. } => {
905 inner.build_graph_structure(structure, seen_tees, config)
907 }
908
909 HydroNode::Chain {
910 first,
911 second,
912 metadata,
913 } => {
914 let first_id = first.build_graph_structure(structure, seen_tees, config);
915 let second_id = second.build_graph_structure(structure, seen_tees, config);
916 let location_id = setup_location(structure, metadata);
917 let chain_id = structure.add_node(
918 NodeLabel::Static(extract_op_name(self.print_root())),
919 HydroNodeType::Transform,
920 location_id,
921 );
922 structure.add_edge(
923 first_id,
924 chain_id,
925 HydroEdgeType::Stream,
926 Some("first".to_string()),
927 );
928 structure.add_edge(
929 second_id,
930 chain_id,
931 HydroEdgeType::Stream,
932 Some("second".to_string()),
933 );
934 chain_id
935 }
936
937 HydroNode::ChainFirst {
938 first,
939 second,
940 metadata,
941 } => {
942 let first_id = first.build_graph_structure(structure, seen_tees, config);
943 let second_id = second.build_graph_structure(structure, seen_tees, config);
944 let location_id = setup_location(structure, metadata);
945 let chain_id = structure.add_node(
946 NodeLabel::Static(extract_op_name(self.print_root())),
947 HydroNodeType::Transform,
948 location_id,
949 );
950 structure.add_edge(
951 first_id,
952 chain_id,
953 HydroEdgeType::Stream,
954 Some("first".to_string()),
955 );
956 structure.add_edge(
957 second_id,
958 chain_id,
959 HydroEdgeType::Stream,
960 Some("second".to_string()),
961 );
962 chain_id
963 }
964
965 HydroNode::Counter {
966 tag: _,
967 prefix: _,
968 duration,
969 input,
970 metadata,
971 } => build_single_expr_transform(
972 TransformParams {
973 structure,
974 seen_tees,
975 config,
976 input,
977 metadata,
978 op_name: extract_op_name(self.print_root()),
979 node_type: HydroNodeType::Transform,
980 edge_type: HydroEdgeType::Stream,
981 },
982 duration,
983 ),
984 }
985 }
986}
987
988macro_rules! render_hydro_ir {
991 ($name:ident, $write_fn:ident) => {
992 pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
993 let mut output = String::new();
994 $write_fn(&mut output, roots, config).unwrap();
995 output
996 }
997 };
998}
999
1000macro_rules! write_hydro_ir {
1002 ($name:ident, $writer_type:ty, $constructor:expr) => {
1003 pub fn $name(
1004 output: impl std::fmt::Write,
1005 roots: &[HydroRoot],
1006 config: &HydroWriteConfig,
1007 ) -> std::fmt::Result {
1008 let mut graph_write: $writer_type = $constructor(output, config);
1009 write_hydro_ir_graph(&mut graph_write, roots, config)
1010 }
1011 };
1012}
1013
1014render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1015write_hydro_ir!(
1016 write_hydro_ir_mermaid,
1017 HydroMermaid<_>,
1018 HydroMermaid::new_with_config
1019);
1020
1021render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1022write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1023
1024render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
1025write_hydro_ir!(
1026 write_hydro_ir_reactflow,
1027 HydroReactFlow<_>,
1028 HydroReactFlow::new
1029);
1030
1031fn write_hydro_ir_graph<W>(
1032 mut graph_write: W,
1033 roots: &[HydroRoot],
1034 config: &HydroWriteConfig,
1035) -> Result<(), W::Err>
1036where
1037 W: HydroGraphWrite,
1038{
1039 let mut structure = HydroGraphStructure::new();
1040 let mut seen_tees = HashMap::new();
1041
1042 for leaf in roots {
1044 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1045 }
1046
1047 graph_write.write_prologue()?;
1049
1050 for (&node_id, (label, node_type, location)) in &structure.nodes {
1051 let (location_id, location_type) = if let Some(loc_id) = location {
1052 (
1053 Some(*loc_id),
1054 structure.locations.get(loc_id).map(|s| s.as_str()),
1055 )
1056 } else {
1057 (None, None)
1058 };
1059 graph_write.write_node_definition(
1060 node_id,
1061 label,
1062 *node_type,
1063 location_id,
1064 location_type,
1065 )?;
1066 }
1067
1068 if config.show_location_groups {
1069 let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1070 for (&node_id, (_, _, location)) in &structure.nodes {
1071 if let Some(location_id) = location {
1072 nodes_by_location
1073 .entry(*location_id)
1074 .or_default()
1075 .push(node_id);
1076 }
1077 }
1078
1079 for (&location_id, node_ids) in &nodes_by_location {
1080 if let Some(location_type) = structure.locations.get(&location_id) {
1081 graph_write.write_location_start(location_id, location_type)?;
1082 for &node_id in node_ids {
1083 graph_write.write_node(node_id)?;
1084 }
1085 graph_write.write_location_end()?;
1086 }
1087 }
1088 }
1089
1090 for (src_id, dst_id, edge_type, label) in &structure.edges {
1091 graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1092 }
1093
1094 graph_write.write_epilogue()?;
1095 Ok(())
1096}