๐Ÿ“ฆ RightNow-AI / openfang

๐Ÿ“„ reply_directives.rs ยท 251 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251//! Reply directive parsing and streaming accumulation.
//!
//! Supports inline directives in agent output:
//! - `[[reply:id]]` โ€” reply to a specific message ID
//! - `[[@current]]` โ€” reply in the current thread
//! - `[[silent]]` โ€” suppress the response from being sent to the user
//!
//! Directives are stripped from the visible text and collected into a
//! `DirectiveSet`. The `StreamingDirectiveAccumulator` handles partial
//! directive splits at chunk boundaries during streaming.

use serde::{Deserialize, Serialize};

/// Collected directives parsed from agent output.
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct DirectiveSet {
    /// Reply to a specific message ID.
    pub reply_to: Option<String>,
    /// Reply in the current thread.
    pub current_thread: bool,
    /// Suppress the response.
    pub silent: bool,
}

/// Accumulator that handles directive parsing across streaming chunk boundaries.
///
/// Holds a small partial buffer for cases where a directive tag is split
/// across two chunks (e.g., `[[re` then `ply:123]]`).
pub struct StreamingDirectiveAccumulator {
    /// Partial buffer for incomplete directive tags.
    partial: String,
    /// Accumulated directives (sticky โ€” once set, stays set).
    pub directives: DirectiveSet,
}

/// Maximum size of the partial buffer before we give up and flush it as text.
const MAX_PARTIAL_LEN: usize = 30;

impl StreamingDirectiveAccumulator {
    /// Create a new accumulator.
    pub fn new() -> Self {
        Self {
            partial: String::new(),
            directives: DirectiveSet::default(),
        }
    }

    /// Process a streaming chunk, extracting any directives.
    ///
    /// Returns the cleaned text to display. Handles partial directive tags
    /// that span chunk boundaries. On `is_final`, flushes any remaining
    /// partial buffer as literal text.
    pub fn consume(&mut self, chunk: &str, is_final: bool) -> String {
        // Prepend any partial from previous chunk
        let input = if self.partial.is_empty() {
            chunk.to_string()
        } else {
            let mut combined = std::mem::take(&mut self.partial);
            combined.push_str(chunk);
            combined
        };

        let mut output = String::with_capacity(input.len());
        let mut chars = input.chars().peekable();

        while let Some(&ch) = chars.peek() {
            if ch == '[' {
                // Collect potential directive tag
                let remaining: String = chars.clone().collect();

                // Check if we might be at the start of a directive
                if let Some(after_open) = remaining.strip_prefix("[[") {
                    // Look for closing ]]
                    if let Some(end) = after_open.find("]]") {
                        let tag_content = &after_open[..end];
                        let tag_len = 2 + end + 2; // [[ + content + ]]

                        // Parse the directive
                        self.parse_tag(tag_content);

                        // Advance past the full tag
                        for _ in 0..tag_len {
                            chars.next();
                        }
                        continue;
                    } else if !is_final && remaining.len() < MAX_PARTIAL_LEN {
                        // Might be split across chunks โ€” buffer it
                        self.partial = remaining;
                        return output;
                    }
                    // Else: too long or final โ€” treat as literal
                }
            }

            output.push(chars.next().unwrap());
        }

        // On final chunk, flush any remaining partial as literal text
        if is_final && !self.partial.is_empty() {
            output.push_str(&std::mem::take(&mut self.partial));
        }

        output
    }

    /// Parse a directive tag's inner content.
    fn parse_tag(&mut self, content: &str) {
        let trimmed = content.trim();
        if let Some(id) = trimmed.strip_prefix("reply:") {
            let id = id.trim();
            if !id.is_empty() {
                self.directives.reply_to = Some(id.to_string());
            }
        } else if trimmed == "@current" {
            self.directives.current_thread = true;
        } else if trimmed == "silent" {
            self.directives.silent = true;
        }
        // Unknown directives are silently dropped (stripped from output)
    }
}

impl Default for StreamingDirectiveAccumulator {
    fn default() -> Self {
        Self::new()
    }
}

/// Parse directives from a complete text string.
///
/// Returns `(cleaned_text, directives)` where cleaned_text has all
/// directive tags removed.
pub fn parse_directives(text: &str) -> (String, DirectiveSet) {
    let mut acc = StreamingDirectiveAccumulator::new();
    let cleaned = acc.consume(text, true);
    (cleaned.trim().to_string(), acc.directives)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_reply_directive() {
        let (text, dirs) = parse_directives("[[reply:msg_123]] Hello!");
        assert_eq!(text, "Hello!");
        assert_eq!(dirs.reply_to.as_deref(), Some("msg_123"));
    }

    #[test]
    fn test_parse_current_thread() {
        let (text, dirs) = parse_directives("[[@current]] Replying in thread");
        assert_eq!(text, "Replying in thread");
        assert!(dirs.current_thread);
    }

    #[test]
    fn test_parse_silent() {
        let (text, dirs) = parse_directives("[[silent]] Internal note");
        assert_eq!(text, "Internal note");
        assert!(dirs.silent);
    }

    #[test]
    fn test_parse_multiple_directives() {
        let (text, dirs) = parse_directives("[[reply:456]] [[@current]] [[silent]] Done");
        assert_eq!(text, "Done");
        assert_eq!(dirs.reply_to.as_deref(), Some("456"));
        assert!(dirs.current_thread);
        assert!(dirs.silent);
    }

    #[test]
    fn test_no_directives() {
        let (text, dirs) = parse_directives("Just regular text");
        assert_eq!(text, "Just regular text");
        assert_eq!(dirs, DirectiveSet::default());
    }

    #[test]
    fn test_directive_in_middle() {
        let (text, dirs) = parse_directives("Hello [[silent]] world");
        assert_eq!(text, "Hello  world");
        assert!(dirs.silent);
    }

    #[test]
    fn test_streaming_split_directive() {
        let mut acc = StreamingDirectiveAccumulator::new();

        // First chunk ends mid-directive
        let out1 = acc.consume("Hello [[re", false);
        assert_eq!(out1, "Hello ");

        // Second chunk completes it
        let out2 = acc.consume("ply:xyz]] world", true);
        assert_eq!(out2, " world");
        assert_eq!(acc.directives.reply_to.as_deref(), Some("xyz"));
    }

    #[test]
    fn test_streaming_no_split() {
        let mut acc = StreamingDirectiveAccumulator::new();
        let out1 = acc.consume("[[silent]] chunk1", false);
        assert_eq!(out1, " chunk1");
        assert!(acc.directives.silent);

        let out2 = acc.consume(" chunk2", true);
        assert_eq!(out2, " chunk2");
    }

    #[test]
    fn test_streaming_sticky_directives() {
        let mut acc = StreamingDirectiveAccumulator::new();
        let _ = acc.consume("[[silent]]", false);
        assert!(acc.directives.silent);

        // Directive persists across chunks
        let _ = acc.consume("more text", true);
        assert!(acc.directives.silent);
    }

    #[test]
    fn test_partial_buffer_flush_on_final() {
        let mut acc = StreamingDirectiveAccumulator::new();
        // Looks like it could be a directive but never completes
        let out1 = acc.consume("text [[not_closed", false);
        assert_eq!(out1, "text ");

        // On final, partial is flushed as literal
        let out2 = acc.consume("", true);
        assert_eq!(out2, "[[not_closed");
    }

    #[test]
    fn test_backward_compat_no_reply() {
        // NO_REPLY token still works independently of directives
        let (text, dirs) = parse_directives("NO_REPLY");
        assert_eq!(text, "NO_REPLY");
        assert_eq!(dirs, DirectiveSet::default());
    }

    #[test]
    fn test_unknown_directive_stripped() {
        let (text, dirs) = parse_directives("[[unknown_thing]] visible");
        // Unknown directives are stripped from output but don't set any field
        assert_eq!(text, "visible");
        assert_eq!(dirs, DirectiveSet::default());
    }
}