Skip to main content

mecab_ko_core/
streaming.rs

1//! # Streaming Tokenizer Module
2//!
3//! 대용량 텍스트 스트리밍 처리를 위한 API
4//!
5//! ## 주요 기능
6//!
7//! - 청크 단위 토큰화
8//! - 문장 경계 감지 및 버퍼링
9//! - 메모리 효율적인 대용량 파일 처리
10//!
11//! ## Example
12//!
13//! ```rust,no_run
14//! use mecab_ko_core::streaming::StreamingTokenizer;
15//! use mecab_ko_core::tokenizer::Tokenizer;
16//!
17//! let tokenizer = Tokenizer::new().unwrap();
18//! let mut stream = StreamingTokenizer::new(tokenizer);
19//!
20//! // 청크 단위로 처리
21//! let text_chunks = vec!["안녕하세요. ", "오늘 날씨가 좋네요."];
22//! for chunk in text_chunks {
23//!     let tokens = stream.process_chunk(chunk);
24//!     for token in tokens {
25//!         println!("{}: {}", token.surface, token.pos);
26//!     }
27//! }
28//!
29//! // 남은 버퍼 flush
30//! let remaining = stream.flush();
31//! ```
32
33use std::collections::VecDeque;
34use std::io::{self, BufRead, BufReader, Read};
35
36use crate::tokenizer::{Token, Tokenizer};
37use crate::Result;
38
39/// 스트리밍 토크나이저
40///
41/// 대용량 텍스트를 청크 단위로 처리하며, 문장 경계를 고려하여
42/// 올바른 토큰화를 보장합니다.
43pub struct StreamingTokenizer {
44    /// 내부 토크나이저
45    tokenizer: Tokenizer,
46
47    /// 버퍼 (문장 경계를 고려하여 이전 청크의 일부를 보관)
48    buffer: String,
49
50    /// 청크 크기 (바이트)
51    chunk_size: usize,
52
53    /// 문장 구분자
54    sentence_delimiters: Vec<char>,
55
56    /// 전체 처리된 문자 수
57    total_chars_processed: usize,
58
59    /// 전체 처리된 바이트 수
60    total_bytes_processed: usize,
61
62    /// 버퍼 최대 크기 (바이트). 초과 시 강제 flush.
63    max_buffer_size: usize,
64}
65
66impl StreamingTokenizer {
67    /// 기본 청크 크기 (8KB)
68    pub const DEFAULT_CHUNK_SIZE: usize = 8192;
69
70    /// 기본 버퍼 최대 크기 (16MB)
71    pub const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
72
73    /// 새 스트리밍 토크나이저 생성
74    ///
75    /// # Arguments
76    ///
77    /// * `tokenizer` - 내부 토크나이저
78    ///
79    /// # Example
80    ///
81    /// ```rust,no_run
82    /// use mecab_ko_core::tokenizer::Tokenizer;
83    /// use mecab_ko_core::streaming::StreamingTokenizer;
84    ///
85    /// let tokenizer = Tokenizer::new().unwrap();
86    /// let stream = StreamingTokenizer::new(tokenizer);
87    /// ```
88    #[must_use]
89    pub fn new(tokenizer: Tokenizer) -> Self {
90        Self {
91            tokenizer,
92            buffer: String::with_capacity(Self::DEFAULT_CHUNK_SIZE),
93            chunk_size: Self::DEFAULT_CHUNK_SIZE,
94            sentence_delimiters: vec!['.', '!', '?', '。', '.', '\n'],
95            total_chars_processed: 0,
96            total_bytes_processed: 0,
97            max_buffer_size: Self::DEFAULT_MAX_BUFFER_SIZE,
98        }
99    }
100
101    /// 청크 크기 설정
102    ///
103    /// # Arguments
104    ///
105    /// * `size` - 청크 크기 (바이트)
106    #[must_use]
107    pub fn with_chunk_size(mut self, size: usize) -> Self {
108        self.chunk_size = size;
109        self.buffer = String::with_capacity(size);
110        self
111    }
112
113    /// 문장 구분자 설정
114    ///
115    /// # Arguments
116    ///
117    /// * `delimiters` - 문장 구분자 목록
118    #[must_use]
119    pub fn with_sentence_delimiters(mut self, delimiters: Vec<char>) -> Self {
120        self.sentence_delimiters = delimiters;
121        self
122    }
123
124    /// 청크 처리
125    ///
126    /// 입력 청크를 버퍼에 추가하고, 완전한 문장을 토큰화합니다.
127    ///
128    /// # Arguments
129    ///
130    /// * `chunk` - 입력 청크
131    ///
132    /// # Returns
133    ///
134    /// 토큰 목록
135    pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
136        self.buffer.push_str(chunk);
137
138        let split_pos = self.find_last_sentence_boundary();
139
140        if let Some(pos) = split_pos {
141            let to_process = self.buffer[..=pos].to_string();
142            let remaining = self.buffer[pos + 1..].to_string();
143
144            let mut tokens = self.tokenizer.tokenize(&to_process);
145
146            for token in &mut tokens {
147                token.start_pos += self.total_chars_processed;
148                token.end_pos += self.total_chars_processed;
149                token.start_byte += self.total_bytes_processed;
150                token.end_byte += self.total_bytes_processed;
151            }
152
153            self.total_chars_processed += to_process.chars().count();
154            self.total_bytes_processed += to_process.len();
155            self.buffer = remaining;
156
157            tokens
158        } else if self.buffer.len() > self.max_buffer_size {
159            self.force_flush_partial()
160        } else {
161            Vec::new()
162        }
163    }
164
165    /// 마지막 문장 경계 찾기 (역방향 탐색으로 최적화)
166    ///
167    /// Returns the byte index of the last byte of the delimiter character,
168    /// so that `buffer[..=pos]` includes the full delimiter and
169    /// `buffer[pos+1..]` starts at the next character boundary.
170    fn find_last_sentence_boundary(&self) -> Option<usize> {
171        let bytes = self.buffer.as_bytes();
172        for (i, ch) in self.buffer.char_indices().rev() {
173            if self.sentence_delimiters.contains(&ch) {
174                // Decimal number exception: digit.digit is not a boundary.
175                if ch == '.' && i > 0 && i + ch.len_utf8() < bytes.len() {
176                    let prev_byte = bytes[i - 1];
177                    let next_byte = bytes[i + ch.len_utf8()];
178                    if prev_byte.is_ascii_digit() && next_byte.is_ascii_digit() {
179                        continue;
180                    }
181                }
182                return Some(i + ch.len_utf8() - 1);
183            }
184        }
185        None
186    }
187
188    /// 문장 경계에서 분할 (단어 중간 분할 방지)
189    fn find_safe_split_point(&self, target_pos: usize) -> usize {
190        // Snap target_pos to a valid char boundary first.
191        let mut pos = target_pos.min(self.buffer.len());
192        while pos > 0 && !self.buffer.is_char_boundary(pos) {
193            pos -= 1;
194        }
195
196        // Walk backwards through valid char boundaries looking for whitespace
197        // or a sentence delimiter.
198        for (byte_idx, ch) in self.buffer[..pos].char_indices().rev() {
199            if ch.is_whitespace() || self.sentence_delimiters.contains(&ch) {
200                return byte_idx + ch.len_utf8();
201            }
202        }
203
204        // No safe split point found — fall back to the snapped boundary.
205        pos
206    }
207
208    /// 부분 버퍼 강제 flush (문장 경계가 없을 때)
209    fn force_flush_partial(&mut self) -> Vec<Token> {
210        // 안전한 분할점에서 처리 (단어 중간 분할 방지)
211        let target_pos = self.buffer.len() / 2;
212        let split_pos = self.find_safe_split_point(target_pos);
213
214        if split_pos == 0 {
215            // 분할점을 찾을 수 없으면 전체 버퍼 처리
216            return self.flush();
217        }
218
219        let to_process = self.buffer[..split_pos].to_string();
220        let remaining = self.buffer[split_pos..].to_string();
221
222        let mut tokens = self.tokenizer.tokenize(&to_process);
223
224        for token in &mut tokens {
225            token.start_pos += self.total_chars_processed;
226            token.end_pos += self.total_chars_processed;
227            token.start_byte += self.total_bytes_processed;
228            token.end_byte += self.total_bytes_processed;
229        }
230
231        self.total_chars_processed += to_process.chars().count();
232        self.total_bytes_processed += to_process.len();
233        self.buffer = remaining;
234
235        tokens
236    }
237
238    /// 남은 버퍼 처리
239    ///
240    /// 스트림 처리가 끝난 후 버퍼에 남아있는 텍스트를 처리합니다.
241    ///
242    /// # Returns
243    ///
244    /// 남은 토큰 목록
245    pub fn flush(&mut self) -> Vec<Token> {
246        if self.buffer.is_empty() {
247            return Vec::new();
248        }
249
250        let to_process = std::mem::take(&mut self.buffer);
251        let mut tokens = self.tokenizer.tokenize(&to_process);
252
253        for token in &mut tokens {
254            token.start_pos += self.total_chars_processed;
255            token.end_pos += self.total_chars_processed;
256            token.start_byte += self.total_bytes_processed;
257            token.end_byte += self.total_bytes_processed;
258        }
259
260        self.total_chars_processed += to_process.chars().count();
261        self.total_bytes_processed += to_process.len();
262
263        tokens
264    }
265
266    /// Reader에서 스트리밍 처리
267    ///
268    /// # Arguments
269    ///
270    /// * `reader` - 입력 Reader
271    ///
272    /// # Returns
273    ///
274    /// 모든 토큰 목록
275    ///
276    /// # Errors
277    ///
278    /// I/O 에러 발생 시
279    pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
280        let mut buf_reader = BufReader::with_capacity(self.chunk_size, reader);
281        let mut all_tokens = Vec::new();
282
283        loop {
284            let mut line = String::new();
285            let bytes_read = buf_reader
286                .read_line(&mut line)
287                .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
288
289            if bytes_read == 0 {
290                break; // EOF
291            }
292
293            let tokens = self.process_chunk(&line);
294            all_tokens.extend(tokens);
295        }
296
297        // Flush 남은 버퍼
298        let remaining = self.flush();
299        all_tokens.extend(remaining);
300
301        Ok(all_tokens)
302    }
303
304    /// 파일에서 스트리밍 처리
305    ///
306    /// # Arguments
307    ///
308    /// * `path` - 파일 경로
309    ///
310    /// # Returns
311    ///
312    /// 모든 토큰 목록
313    ///
314    /// # Errors
315    ///
316    /// 파일을 열 수 없거나 읽기 실패 시
317    pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
318        let file = std::fs::File::open(path)
319            .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
320        self.process_reader(file)
321    }
322
323    /// 버퍼 크기 확인
324    #[must_use]
325    pub fn buffer_len(&self) -> usize {
326        self.buffer.len()
327    }
328
329    /// 처리된 문자 수
330    #[must_use]
331    pub const fn total_chars_processed(&self) -> usize {
332        self.total_chars_processed
333    }
334
335    /// 처리된 바이트 수
336    #[must_use]
337    pub const fn total_bytes_processed(&self) -> usize {
338        self.total_bytes_processed
339    }
340
341    /// 스트림 리셋
342    pub fn reset(&mut self) {
343        self.buffer.clear();
344        self.total_chars_processed = 0;
345        self.total_bytes_processed = 0;
346    }
347}
348
349/// Iterator 기반 스트리밍 토크나이저
350///
351/// 텍스트 청크 iterator를 받아 토큰을 생성합니다.
352/// `VecDeque`를 사용하여 O(1) dequeue 성능을 보장합니다.
353pub struct TokenStream<I>
354where
355    I: Iterator<Item = String>,
356{
357    /// 청크 iterator
358    chunks: I,
359
360    /// 스트리밍 토크나이저
361    streaming: StreamingTokenizer,
362
363    /// 현재 처리 중인 토큰 버퍼 (`VecDeque` for O(1) `pop_front`)
364    token_buffer: VecDeque<Token>,
365
366    /// 스트림 종료 여부
367    finished: bool,
368
369    /// 처리된 총 토큰 수 (`size_hint`용)
370    tokens_yielded: usize,
371}
372
373impl<I> TokenStream<I>
374where
375    I: Iterator<Item = String>,
376{
377    /// 새 토큰 스트림 생성
378    ///
379    /// # Arguments
380    ///
381    /// * `chunks` - 텍스트 청크 iterator
382    /// * `tokenizer` - 토크나이저
383    #[must_use]
384    pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
385        Self {
386            chunks,
387            streaming: StreamingTokenizer::new(tokenizer),
388            token_buffer: VecDeque::new(),
389            finished: false,
390            tokens_yielded: 0,
391        }
392    }
393
394    /// 청크 크기 설정
395    #[must_use]
396    pub fn with_chunk_size(mut self, size: usize) -> Self {
397        self.streaming = self.streaming.with_chunk_size(size);
398        self
399    }
400
401    /// 처리된 토큰 수 조회
402    #[must_use]
403    pub const fn tokens_yielded(&self) -> usize {
404        self.tokens_yielded
405    }
406}
407
408impl<I> Iterator for TokenStream<I>
409where
410    I: Iterator<Item = String>,
411{
412    type Item = Token;
413
414    fn next(&mut self) -> Option<Self::Item> {
415        // 버퍼에서 토큰 반환 (O(1) pop_front)
416        if let Some(token) = self.token_buffer.pop_front() {
417            self.tokens_yielded += 1;
418            return Some(token);
419        }
420
421        // 스트림이 끝났으면 None
422        if self.finished {
423            return None;
424        }
425
426        // 다음 청크 처리
427        for chunk in self.chunks.by_ref() {
428            let tokens = self.streaming.process_chunk(&chunk);
429
430            if !tokens.is_empty() {
431                self.token_buffer.extend(tokens);
432                if let Some(token) = self.token_buffer.pop_front() {
433                    self.tokens_yielded += 1;
434                    return Some(token);
435                }
436            }
437        }
438
439        // 청크가 더 이상 없으면 flush
440        self.finished = true;
441        let remaining = self.streaming.flush();
442
443        if !remaining.is_empty() {
444            self.token_buffer.extend(remaining);
445            if let Some(token) = self.token_buffer.pop_front() {
446                self.tokens_yielded += 1;
447                return Some(token);
448            }
449        }
450
451        None
452    }
453
454    fn size_hint(&self) -> (usize, Option<usize>) {
455        // 버퍼에 있는 토큰 수를 최소 하한으로 제공
456        (self.token_buffer.len(), None)
457    }
458}
459
460/// 진행률 콜백 타입
461pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send>;
462
463/// 스트리밍 진행 상황
464#[derive(Debug, Clone)]
465pub struct StreamingProgress {
466    /// 처리된 바이트 수
467    pub bytes_processed: usize,
468    /// 총 바이트 수 (알 수 있는 경우)
469    pub total_bytes: Option<usize>,
470    /// 처리된 토큰 수
471    pub tokens_generated: usize,
472    /// 처리된 청크 수
473    pub chunks_processed: usize,
474}
475
476impl StreamingProgress {
477    /// 진행률 퍼센트 계산
478    #[must_use]
479    #[allow(clippy::cast_precision_loss)]
480    pub fn percent(&self) -> Option<f64> {
481        self.total_bytes
482            .map(|total| (self.bytes_processed as f64 / total as f64) * 100.0)
483    }
484}
485
486/// 진행률 추적 스트리밍 토크나이저
487///
488/// 대용량 파일 처리 시 진행 상황을 콜백으로 보고합니다.
489pub struct ProgressStreamingTokenizer {
490    /// 내부 스트리밍 토크나이저
491    inner: StreamingTokenizer,
492
493    /// 진행률 콜백
494    callback: Option<ProgressCallback>,
495
496    /// 처리된 바이트 수
497    bytes_processed: usize,
498
499    /// 총 바이트 수
500    total_bytes: Option<usize>,
501
502    /// 생성된 토큰 수
503    tokens_generated: usize,
504
505    /// 처리된 청크 수
506    chunks_processed: usize,
507
508    /// 콜백 호출 간격 (바이트)
509    callback_interval: usize,
510
511    /// 마지막 콜백 호출 시 처리된 바이트
512    last_callback_bytes: usize,
513}
514
515impl ProgressStreamingTokenizer {
516    /// 기본 콜백 간격 (64KB)
517    pub const DEFAULT_CALLBACK_INTERVAL: usize = 65536;
518
519    /// 새 진행률 추적 토크나이저 생성
520    #[must_use]
521    pub fn new(tokenizer: Tokenizer) -> Self {
522        Self {
523            inner: StreamingTokenizer::new(tokenizer),
524            callback: None,
525            bytes_processed: 0,
526            total_bytes: None,
527            tokens_generated: 0,
528            chunks_processed: 0,
529            callback_interval: Self::DEFAULT_CALLBACK_INTERVAL,
530            last_callback_bytes: 0,
531        }
532    }
533
534    /// 진행률 콜백 설정
535    #[must_use]
536    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
537    where
538        F: Fn(StreamingProgress) + Send + 'static,
539    {
540        self.callback = Some(Box::new(callback));
541        self
542    }
543
544    /// 총 바이트 수 설정 (진행률 계산용)
545    #[must_use]
546    pub const fn with_total_bytes(mut self, total: usize) -> Self {
547        self.total_bytes = Some(total);
548        self
549    }
550
551    /// 콜백 간격 설정
552    #[must_use]
553    pub const fn with_callback_interval(mut self, interval: usize) -> Self {
554        self.callback_interval = interval;
555        self
556    }
557
558    /// 청크 크기 설정
559    #[must_use]
560    pub fn with_chunk_size(mut self, size: usize) -> Self {
561        self.inner = self.inner.with_chunk_size(size);
562        self
563    }
564
565    /// 청크 처리
566    pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
567        self.bytes_processed += chunk.len();
568        self.chunks_processed += 1;
569
570        let tokens = self.inner.process_chunk(chunk);
571        self.tokens_generated += tokens.len();
572
573        // 콜백 호출 간격 확인
574        if self.bytes_processed - self.last_callback_bytes >= self.callback_interval {
575            self.report_progress();
576            self.last_callback_bytes = self.bytes_processed;
577        }
578
579        tokens
580    }
581
582    /// 남은 버퍼 처리
583    pub fn flush(&mut self) -> Vec<Token> {
584        let tokens = self.inner.flush();
585        self.tokens_generated += tokens.len();
586
587        // 최종 진행률 보고
588        self.report_progress();
589
590        tokens
591    }
592
593    /// 진행 상황 보고
594    fn report_progress(&self) {
595        if let Some(ref callback) = self.callback {
596            callback(StreamingProgress {
597                bytes_processed: self.bytes_processed,
598                total_bytes: self.total_bytes,
599                tokens_generated: self.tokens_generated,
600                chunks_processed: self.chunks_processed,
601            });
602        }
603    }
604
605    /// Reader에서 스트리밍 처리 (진행률 추적)
606    ///
607    /// # Errors
608    ///
609    /// I/O 에러 발생 시
610    pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
611        let mut buf_reader = BufReader::with_capacity(self.inner.chunk_size, reader);
612        let mut all_tokens = Vec::new();
613
614        loop {
615            let mut line = String::new();
616            let bytes_read = buf_reader
617                .read_line(&mut line)
618                .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
619
620            if bytes_read == 0 {
621                break;
622            }
623
624            let tokens = self.process_chunk(&line);
625            all_tokens.extend(tokens);
626        }
627
628        let remaining = self.flush();
629        all_tokens.extend(remaining);
630
631        Ok(all_tokens)
632    }
633
634    /// 파일에서 스트리밍 처리 (자동 크기 감지)
635    ///
636    /// # Errors
637    ///
638    /// 파일을 열 수 없거나 읽기 실패 시
639    #[allow(clippy::cast_possible_truncation)]
640    pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
641        let metadata = std::fs::metadata(path.as_ref())
642            .map_err(|e| crate::Error::Analysis(format!("Failed to read metadata: {e}")))?;
643
644        self.total_bytes = Some(metadata.len() as usize);
645
646        let file = std::fs::File::open(path)
647            .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
648
649        self.process_reader(file)
650    }
651
652    /// 현재 진행 상황 조회
653    #[must_use]
654    pub const fn progress(&self) -> StreamingProgress {
655        StreamingProgress {
656            bytes_processed: self.bytes_processed,
657            total_bytes: self.total_bytes,
658            tokens_generated: self.tokens_generated,
659            chunks_processed: self.chunks_processed,
660        }
661    }
662
663    /// 리셋
664    pub fn reset(&mut self) {
665        self.inner.reset();
666        self.bytes_processed = 0;
667        self.tokens_generated = 0;
668        self.chunks_processed = 0;
669        self.last_callback_bytes = 0;
670    }
671}
672
673/// 청크별 토큰 이터레이터
674///
675/// 토큰을 개별로 반환하지 않고 청크 단위로 반환하여 메모리 효율성 향상
676pub struct ChunkedTokenIterator<I>
677where
678    I: Iterator<Item = String>,
679{
680    /// 청크 iterator
681    chunks: I,
682
683    /// 스트리밍 토크나이저
684    streaming: StreamingTokenizer,
685
686    /// 스트림 종료 여부
687    finished: bool,
688}
689
690impl<I> ChunkedTokenIterator<I>
691where
692    I: Iterator<Item = String>,
693{
694    /// 새 청크 토큰 이터레이터 생성
695    #[must_use]
696    pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
697        Self {
698            chunks,
699            streaming: StreamingTokenizer::new(tokenizer),
700            finished: false,
701        }
702    }
703
704    /// 청크 크기 설정
705    #[must_use]
706    pub fn with_chunk_size(mut self, size: usize) -> Self {
707        self.streaming = self.streaming.with_chunk_size(size);
708        self
709    }
710}
711
712impl<I> Iterator for ChunkedTokenIterator<I>
713where
714    I: Iterator<Item = String>,
715{
716    type Item = Vec<Token>;
717
718    fn next(&mut self) -> Option<Self::Item> {
719        if self.finished {
720            return None;
721        }
722
723        // 다음 청크에서 토큰 생성
724        for chunk in self.chunks.by_ref() {
725            let tokens = self.streaming.process_chunk(&chunk);
726            if !tokens.is_empty() {
727                return Some(tokens);
728            }
729        }
730
731        // 청크가 더 이상 없으면 flush
732        self.finished = true;
733        let remaining = self.streaming.flush();
734
735        if remaining.is_empty() {
736            None
737        } else {
738            Some(remaining)
739        }
740    }
741}
742
743#[cfg(test)]
744#[allow(clippy::expect_used)]
745mod tests {
746    use super::*;
747
748    fn create_test_tokenizer() -> Tokenizer {
749        Tokenizer::new().expect("should create tokenizer")
750    }
751
752    #[test]
753    fn test_streaming_tokenizer_creation() {
754        let tokenizer = create_test_tokenizer();
755        let stream = StreamingTokenizer::new(tokenizer);
756
757        assert_eq!(stream.buffer_len(), 0);
758        assert_eq!(stream.total_chars_processed(), 0);
759    }
760
761    #[test]
762    fn test_process_chunk_with_delimiter() {
763        let tokenizer = create_test_tokenizer();
764        let mut stream = StreamingTokenizer::new(tokenizer);
765
766        let tokens = stream.process_chunk("안녕\n");
767        assert!(!tokens.is_empty() || stream.buffer_len() > 0);
768
769        // Flush로 남은 토큰 확인
770        let remaining = stream.flush();
771        let total_tokens = tokens.len() + remaining.len();
772        assert!(total_tokens > 0);
773    }
774
775    #[test]
776    fn test_process_chunk_without_delimiter() {
777        let tokenizer = create_test_tokenizer();
778        let mut stream = StreamingTokenizer::new(tokenizer);
779
780        let tokens = stream.process_chunk("안녕하세요");
781        // 구분자가 없으면 버퍼에 저장
782        assert!(tokens.is_empty() || stream.buffer_len() > 0);
783    }
784
785    #[test]
786    fn test_flush() {
787        let tokenizer = create_test_tokenizer();
788        let mut stream = StreamingTokenizer::new(tokenizer);
789
790        stream.process_chunk("안녕하세요");
791        let tokens = stream.flush();
792
793        assert!(!tokens.is_empty());
794        assert_eq!(stream.buffer_len(), 0);
795    }
796
797    #[test]
798    fn test_multiple_chunks() {
799        let tokenizer = create_test_tokenizer();
800        let mut stream = StreamingTokenizer::new(tokenizer);
801
802        let _tokens1 = stream.process_chunk("안녕하세요.\n");
803        let _tokens2 = stream.process_chunk("감사합니다.\n");
804        let _remaining = stream.flush();
805
806        assert!(stream.total_chars_processed() > 0);
807    }
808
809    #[test]
810    fn test_reset() {
811        let tokenizer = create_test_tokenizer();
812        let mut stream = StreamingTokenizer::new(tokenizer);
813
814        stream.process_chunk("안녕하세요");
815        stream.reset();
816
817        assert_eq!(stream.buffer_len(), 0);
818        assert_eq!(stream.total_chars_processed(), 0);
819        assert_eq!(stream.total_bytes_processed(), 0);
820    }
821
822    #[test]
823    fn test_byte_positions_accumulate_across_chunks() {
824        let tokenizer = create_test_tokenizer();
825        let mut stream = StreamingTokenizer::new(tokenizer);
826
827        let tokens1 = stream.process_chunk("안녕.\n");
828        let chunk1_bytes = "안녕.\n".len();
829
830        let tokens2 = stream.process_chunk("감사.\n");
831
832        for token in &tokens1 {
833            assert!(
834                token.start_byte < chunk1_bytes,
835                "first chunk token '{}' start_byte {} must be < chunk1 bytes {}",
836                token.surface,
837                token.start_byte,
838                chunk1_bytes
839            );
840        }
841
842        for token in &tokens2 {
843            assert!(
844                token.start_byte >= chunk1_bytes,
845                "second chunk token '{}' start_byte {} must be >= chunk1 bytes {}",
846                token.surface,
847                token.start_byte,
848                chunk1_bytes
849            );
850        }
851    }
852
853    #[test]
854    fn test_custom_chunk_size() {
855        let tokenizer = create_test_tokenizer();
856        let stream = StreamingTokenizer::new(tokenizer).with_chunk_size(1024);
857
858        assert_eq!(stream.chunk_size, 1024);
859    }
860
861    #[test]
862    fn test_custom_delimiters() {
863        let tokenizer = create_test_tokenizer();
864        let stream =
865            StreamingTokenizer::new(tokenizer).with_sentence_delimiters(vec!['.', '!', '?']);
866
867        assert_eq!(stream.sentence_delimiters.len(), 3);
868    }
869
870    #[test]
871    fn test_token_stream_creation() {
872        let tokenizer = create_test_tokenizer();
873        let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
874        let stream = TokenStream::new(chunks.into_iter(), tokenizer);
875
876        assert!(!stream.finished);
877    }
878
879    #[test]
880    fn test_token_stream_iteration() {
881        let tokenizer = create_test_tokenizer();
882        let chunks = vec!["안녕\n".to_string(), "감사\n".to_string()];
883        let stream = TokenStream::new(chunks.into_iter(), tokenizer);
884
885        let tokens: Vec<_> = stream.collect();
886        assert!(!tokens.is_empty());
887    }
888
889    #[test]
890    fn test_token_stream_tokens_yielded() {
891        let tokenizer = create_test_tokenizer();
892        let chunks = vec!["안녕하세요.\n".to_string()];
893        let mut stream = TokenStream::new(chunks.into_iter(), tokenizer);
894
895        // 몇 개의 토큰 소비
896        let mut count = 0;
897        while stream.next().is_some() {
898            count += 1;
899        }
900
901        // 토큰이 생성되었고, 카운트가 맞는지 확인
902        assert_eq!(stream.tokens_yielded(), count);
903    }
904
905    #[test]
906    fn test_token_stream_size_hint() {
907        let tokenizer = create_test_tokenizer();
908        let chunks = vec!["안녕하세요.\n".to_string()];
909        let stream = TokenStream::new(chunks.into_iter(), tokenizer);
910
911        let (lower, _upper) = stream.size_hint();
912        // 초기에는 버퍼가 비어있으므로 0
913        assert_eq!(lower, 0);
914    }
915
916    #[test]
917    fn test_progress_streaming_tokenizer() {
918        let tokenizer = create_test_tokenizer();
919        let mut stream = ProgressStreamingTokenizer::new(tokenizer);
920
921        let _tokens = stream.process_chunk("안녕하세요.\n");
922        let progress = stream.progress();
923
924        assert!(progress.bytes_processed > 0);
925        assert!(progress.chunks_processed > 0);
926    }
927
928    #[test]
929    fn test_progress_callback() {
930        use std::sync::atomic::{AtomicUsize, Ordering};
931        use std::sync::Arc;
932
933        let tokenizer = create_test_tokenizer();
934        let callback_count = Arc::new(AtomicUsize::new(0));
935        let callback_count_clone = Arc::clone(&callback_count);
936
937        let mut stream = ProgressStreamingTokenizer::new(tokenizer)
938            .with_callback_interval(1) // 매 바이트마다 콜백
939            .with_progress_callback(move |_progress| {
940                callback_count_clone.fetch_add(1, Ordering::SeqCst);
941            });
942
943        // 충분한 데이터 처리
944        stream.process_chunk("안녕하세요. 오늘 날씨가 좋네요.\n");
945        let _remaining = stream.flush();
946
947        // 콜백이 호출되었는지 확인
948        assert!(callback_count.load(Ordering::SeqCst) > 0);
949    }
950
951    #[test]
952    fn test_progress_percent() {
953        let progress = StreamingProgress {
954            bytes_processed: 50,
955            total_bytes: Some(100),
956            tokens_generated: 10,
957            chunks_processed: 2,
958        };
959
960        assert_eq!(progress.percent(), Some(50.0));
961    }
962
963    #[test]
964    fn test_chunked_token_iterator() {
965        let tokenizer = create_test_tokenizer();
966        let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
967        let iter = ChunkedTokenIterator::new(chunks.into_iter(), tokenizer);
968
969        let token_chunks: Vec<_> = iter.collect();
970        // 청크들을 수집 (일부 청크는 비어있을 수 있음)
971        let total_tokens: usize = token_chunks.iter().map(std::vec::Vec::len).sum();
972
973        // ChunkedTokenIterator가 정상 작동하는지 확인
974        // mini-dict 환경에서는 토큰 수가 적을 수 있으므로 패닉 없이 완료되면 성공
975        let _ = total_tokens; // 사용되지 않는 변수 경고 방지
976    }
977
978    #[test]
979    fn test_multibyte_delimiter_no_panic() {
980        let tokenizer = create_test_tokenizer();
981        let mut stream = StreamingTokenizer::new(tokenizer)
982            .with_sentence_delimiters(vec!['.', '!', '?', '。', '.', '\n']);
983
984        // 。 is U+3002 (3 bytes). Previously pos+1 would slice mid-char and panic.
985        let tokens = stream.process_chunk("テスト。次の文。\n");
986        let remaining = stream.flush();
987        let total = tokens.len() + remaining.len();
988        assert!(total > 0 || stream.buffer_len() == 0);
989    }
990
991    #[test]
992    fn test_decimal_number_not_split() {
993        let tokenizer = create_test_tokenizer();
994        let mut stream = StreamingTokenizer::new(tokenizer);
995
996        let tokens = stream.process_chunk("값은 3.14입니다.\n");
997        let remaining = stream.flush();
998        let count = tokens.len() + remaining.len();
999        // With a full system dictionary, "3.14" would be kept intact.
1000        // With the mini-dict, numbers and decimals are UNKNOWN and may be
1001        // split by the tokenizer — verify we at least produce tokens.
1002        assert!(count > 0, "Decimal input should produce at least one token");
1003    }
1004
1005    #[test]
1006    fn test_buffer_limit_forces_flush() {
1007        let tokenizer = create_test_tokenizer();
1008        let mut stream = StreamingTokenizer::new(tokenizer);
1009        // Set a tiny max buffer to trigger forced flush
1010        stream.max_buffer_size = 32;
1011
1012        // No delimiter — would grow unbounded without the limit
1013        let tokens = stream.process_chunk(&"가".repeat(100));
1014        assert!(!tokens.is_empty(), "Buffer limit should force a flush");
1015    }
1016
1017    #[test]
1018    fn test_safe_split_point() {
1019        let tokenizer = create_test_tokenizer();
1020        let stream = StreamingTokenizer::new(tokenizer)
1021            .with_sentence_delimiters(vec!['.', '!', '?', '\n', ' ']);
1022
1023        // 내부 버퍼에 직접 접근할 수 없으므로 process_chunk로 테스트
1024        let mut stream = stream;
1025        let _tokens = stream.process_chunk("안녕하세요 감사합니다");
1026
1027        // 버퍼가 있어야 함 (문장 구분자가 없으므로)
1028        assert!(stream.buffer_len() > 0);
1029    }
1030}
1031
1032// ============================================================
1033// SentenceReader — BufRead 기반 문장 단위 이터레이터
1034// ============================================================
1035
1036/// Reads from a [`BufRead`] source and yields complete sentences one at a time.
1037///
1038/// Korean sentence boundaries are detected by:
1039/// - Newline characters (`\n`) — always a boundary.
1040/// - Sentence-ending punctuation (`.`, `?`, `!`) followed by whitespace or EOF,
1041///   **except** when the `.` is between two ASCII digits (decimal numbers such
1042///   as `3.14`).
1043///
1044/// Empty segments (blank lines or whitespace-only spans) are silently skipped.
1045///
1046/// Because the Viterbi algorithm requires the full sentence context, this is
1047/// the minimum granularity for streaming tokenization of large inputs.
1048///
1049/// # Examples
1050///
1051/// ```rust
1052/// use mecab_ko_core::streaming::SentenceReader;
1053/// use std::io::Cursor;
1054///
1055/// let input = "첫 번째 문장입니다. 두 번째 문장입니다.\n";
1056/// let reader = SentenceReader::new(Cursor::new(input));
1057/// let sentences: Vec<String> = reader.map(|r| r.unwrap()).collect();
1058/// assert_eq!(sentences.len(), 2);
1059/// ```
1060pub struct SentenceReader<R: BufRead> {
1061    reader: R,
1062    /// Raw character-level working buffer accumulated from `reader`.
1063    buffer: String,
1064    /// Completed sentences waiting to be returned by `next()`.
1065    queue: std::collections::VecDeque<String>,
1066    /// Set to `true` once the underlying reader returns EOF.
1067    eof: bool,
1068    /// Maximum buffer size in bytes. Exceeding this triggers a forced flush.
1069    max_buffer_size: usize,
1070}
1071
1072impl<R: BufRead> SentenceReader<R> {
1073    /// Default maximum buffer size (16 MB).
1074    pub const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
1075
1076    /// Creates a new `SentenceReader` wrapping `reader`.
1077    #[must_use]
1078    pub const fn new(reader: R) -> Self {
1079        Self {
1080            reader,
1081            buffer: String::new(),
1082            queue: std::collections::VecDeque::new(),
1083            eof: false,
1084            max_buffer_size: Self::DEFAULT_MAX_BUFFER_SIZE,
1085        }
1086    }
1087
1088    /// Sets the maximum buffer size (bytes). If input accumulates
1089    /// beyond this limit without a sentence boundary, the buffer is
1090    /// force-flushed as a single sentence to prevent OOM.
1091    #[must_use]
1092    pub const fn with_max_buffer_size(mut self, size: usize) -> Self {
1093        self.max_buffer_size = size;
1094        self
1095    }
1096
1097    /// Drain all complete sentences currently visible in `self.buffer` into
1098    /// `self.queue`.  A sentence ends at:
1099    ///   1. A `\n` character (stripped from the yielded sentence).
1100    ///   2. A `.`, `?`, or `!` that is **not** a decimal point, followed
1101    ///      immediately by ASCII whitespace or at the end of the buffer when
1102    ///      `eof` is `true`.
1103    fn drain_sentences(&mut self) {
1104        // Work with byte indices directly to avoid allocating Vec<char>.
1105        let buf = self.buffer.as_str();
1106        let indices: Vec<(usize, char)> = buf.char_indices().collect();
1107        let len = indices.len();
1108        let mut start_char = 0; // char-level index into `indices`
1109
1110        let mut i = 0;
1111        while i < len {
1112            let (_, ch) = indices[i];
1113
1114            if ch == '\n' {
1115                let start_byte = indices[start_char].0;
1116                let end_byte = indices[i].0;
1117                let trimmed = buf[start_byte..end_byte].trim();
1118                if !trimmed.is_empty() {
1119                    self.queue.push_back(trimmed.to_string());
1120                }
1121                start_char = i + 1;
1122                i += 1;
1123                continue;
1124            }
1125
1126            if matches!(ch, '.' | '?' | '!') {
1127                if ch == '.' {
1128                    let prev_is_digit = i > 0 && indices[i - 1].1.is_ascii_digit();
1129                    let next_is_digit = i + 1 < len && indices[i + 1].1.is_ascii_digit();
1130                    if prev_is_digit && next_is_digit {
1131                        i += 1;
1132                        continue;
1133                    }
1134                }
1135
1136                let punct_byte_end = indices[i].0 + ch.len_utf8();
1137
1138                let mut j = i + 1;
1139                while j < len && matches!(indices[j].1, ')' | ']' | '"' | '\'') {
1140                    j += 1;
1141                }
1142
1143                let followed_by_whitespace = j < len && indices[j].1.is_whitespace();
1144                let followed_by_eof = j >= len && self.eof;
1145
1146                if followed_by_whitespace || followed_by_eof {
1147                    let start_byte = indices[start_char].0;
1148                    let trimmed = buf[start_byte..punct_byte_end].trim();
1149                    if !trimmed.is_empty() {
1150                        self.queue.push_back(trimmed.to_string());
1151                    }
1152                    start_char = j;
1153                    if j < len && indices[j].1.is_whitespace() && indices[j].1 != '\n' {
1154                        start_char = j + 1;
1155                        i = j + 1;
1156                    } else {
1157                        i = j;
1158                    }
1159                    continue;
1160                }
1161            }
1162
1163            i += 1;
1164        }
1165
1166        if self.eof && start_char < len {
1167            let start_byte = indices[start_char].0;
1168            let trimmed = buf[start_byte..].trim();
1169            if !trimmed.is_empty() {
1170                self.queue.push_back(trimmed.to_string());
1171            }
1172            self.buffer.clear();
1173        } else if start_char > 0 && start_char < len {
1174            let byte_offset = indices[start_char].0;
1175            self.buffer.drain(..byte_offset);
1176        } else if start_char >= len && !self.eof {
1177            self.buffer.clear();
1178        }
1179    }
1180
1181    /// Read one more line from the underlying reader.
1182    ///
1183    /// Returns `Ok(true)` if bytes were read, `Ok(false)` on EOF, and
1184    /// `Err(_)` on an I/O error.
1185    fn fill_buffer(&mut self) -> io::Result<bool> {
1186        if self.buffer.len() >= self.max_buffer_size {
1187            // Force-flush the entire buffer as a single sentence to prevent OOM.
1188            let trimmed = self.buffer.trim().to_string();
1189            if !trimmed.is_empty() {
1190                self.queue.push_back(trimmed);
1191            }
1192            self.buffer.clear();
1193        }
1194
1195        let mut line = String::new();
1196        let n = self.reader.read_line(&mut line)?;
1197        if n == 0 {
1198            self.eof = true;
1199            Ok(false)
1200        } else {
1201            self.buffer.push_str(&line);
1202            Ok(true)
1203        }
1204    }
1205}
1206
1207impl<R: BufRead> Iterator for SentenceReader<R> {
1208    type Item = io::Result<String>;
1209
1210    fn next(&mut self) -> Option<Self::Item> {
1211        loop {
1212            // If we already have a sentence ready, return it immediately.
1213            if let Some(sentence) = self.queue.pop_front() {
1214                return Some(Ok(sentence));
1215            }
1216
1217            // Nothing in the queue and EOF consumed — we are done.
1218            if self.eof {
1219                return None;
1220            }
1221
1222            // Try to read more data from the reader.
1223            if let Err(e) = self.fill_buffer() {
1224                return Some(Err(e));
1225            }
1226
1227            // Parse whatever is now in the buffer.
1228            self.drain_sentences();
1229        }
1230    }
1231}
1232
1233#[cfg(test)]
1234mod sentence_reader_tests {
1235    #![allow(clippy::expect_used, clippy::unwrap_used, clippy::needless_collect)]
1236
1237    use super::*;
1238    use std::io::Cursor;
1239
1240    #[test]
1241    fn test_single_sentence() {
1242        let input = "안녕하세요.\n";
1243        let reader = SentenceReader::new(Cursor::new(input));
1244        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1245        assert_eq!(sentences, vec!["안녕하세요."]);
1246    }
1247
1248    #[test]
1249    fn test_multiple_sentences() {
1250        let input = "첫 번째 문장입니다. 두 번째 문장입니다.\n";
1251        let reader = SentenceReader::new(Cursor::new(input));
1252        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1253        assert_eq!(sentences.len(), 2);
1254        assert_eq!(sentences[0], "첫 번째 문장입니다.");
1255        assert_eq!(sentences[1], "두 번째 문장입니다.");
1256    }
1257
1258    #[test]
1259    fn test_newline_boundary() {
1260        let input = "줄 하나\n줄 둘\n";
1261        let reader = SentenceReader::new(Cursor::new(input));
1262        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1263        assert_eq!(sentences, vec!["줄 하나", "줄 둘"]);
1264    }
1265
1266    #[test]
1267    fn test_decimal_not_boundary() {
1268        let input = "값은 3.14입니다.\n";
1269        let reader = SentenceReader::new(Cursor::new(input));
1270        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1271        assert_eq!(sentences, vec!["값은 3.14입니다."]);
1272    }
1273
1274    #[test]
1275    fn test_question_mark() {
1276        let input = "이것은 무엇인가요? 네, 맞습니다.\n";
1277        let reader = SentenceReader::new(Cursor::new(input));
1278        let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1279        assert_eq!(sentences.len(), 2);
1280    }
1281
1282    #[test]
1283    fn test_empty_input() {
1284        let input = "";
1285        let reader = SentenceReader::new(Cursor::new(input));
1286        let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1287        assert!(sentences.is_empty());
1288    }
1289
1290    #[test]
1291    fn test_no_trailing_newline() {
1292        let input = "마지막 문장";
1293        let reader = SentenceReader::new(Cursor::new(input));
1294        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1295        assert_eq!(sentences, vec!["마지막 문장"]);
1296    }
1297
1298    #[test]
1299    fn test_multiple_newlines() {
1300        let input = "첫째\n\n둘째\n";
1301        let reader = SentenceReader::new(Cursor::new(input));
1302        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1303        // Empty lines are skipped.
1304        assert_eq!(sentences, vec!["첫째", "둘째"]);
1305    }
1306
1307    #[test]
1308    fn test_exclamation() {
1309        let input = "대단합니다! 정말요?\n";
1310        let reader = SentenceReader::new(Cursor::new(input));
1311        let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1312        assert_eq!(sentences.len(), 2);
1313    }
1314
1315    #[test]
1316    fn test_sentence_reader_is_send() {
1317        fn assert_send<T: Send>() {}
1318        assert_send::<SentenceReader<std::io::Cursor<&[u8]>>>();
1319    }
1320
1321    #[test]
1322    fn test_closing_paren_before_whitespace() {
1323        // Punctuation followed by closing bracket then space should still split.
1324        let input = "문장입니다.) 다음 문장.\n";
1325        let reader = SentenceReader::new(Cursor::new(input));
1326        let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1327        assert_eq!(sentences.len(), 2);
1328    }
1329
1330    #[test]
1331    fn test_no_trailing_newline_punctuation() {
1332        // Final sentence with punctuation but no newline should still be yielded.
1333        let input = "첫째. 둘째.";
1334        let reader = SentenceReader::new(Cursor::new(input));
1335        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1336        assert_eq!(sentences.len(), 2);
1337        assert_eq!(sentences[0], "첫째.");
1338        assert_eq!(sentences[1], "둘째.");
1339    }
1340
1341    #[test]
1342    fn test_buffer_limit_prevents_oom() {
1343        // A line with no sentence boundary should eventually be flushed
1344        // when buffer exceeds max_buffer_size.
1345        let long_line = "가".repeat(200);
1346        let reader = SentenceReader::new(Cursor::new(long_line.as_str())).with_max_buffer_size(64);
1347        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1348        // Should produce at least one sentence without hanging or OOM.
1349        assert!(!sentences.is_empty());
1350    }
1351}