1use std::collections::VecDeque;
34use std::io::{self, BufRead, BufReader, Read};
35
36use crate::tokenizer::{Token, Tokenizer};
37use crate::Result;
38
39pub struct StreamingTokenizer {
44 tokenizer: Tokenizer,
46
47 buffer: String,
49
50 chunk_size: usize,
52
53 sentence_delimiters: Vec<char>,
55
56 total_chars_processed: usize,
58
59 total_bytes_processed: usize,
61
62 max_buffer_size: usize,
64}
65
66impl StreamingTokenizer {
67 pub const DEFAULT_CHUNK_SIZE: usize = 8192;
69
70 pub const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
72
73 #[must_use]
89 pub fn new(tokenizer: Tokenizer) -> Self {
90 Self {
91 tokenizer,
92 buffer: String::with_capacity(Self::DEFAULT_CHUNK_SIZE),
93 chunk_size: Self::DEFAULT_CHUNK_SIZE,
94 sentence_delimiters: vec!['.', '!', '?', '。', '.', '\n'],
95 total_chars_processed: 0,
96 total_bytes_processed: 0,
97 max_buffer_size: Self::DEFAULT_MAX_BUFFER_SIZE,
98 }
99 }
100
101 #[must_use]
107 pub fn with_chunk_size(mut self, size: usize) -> Self {
108 self.chunk_size = size;
109 self.buffer = String::with_capacity(size);
110 self
111 }
112
113 #[must_use]
119 pub fn with_sentence_delimiters(mut self, delimiters: Vec<char>) -> Self {
120 self.sentence_delimiters = delimiters;
121 self
122 }
123
124 pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
136 self.buffer.push_str(chunk);
137
138 let split_pos = self.find_last_sentence_boundary();
139
140 if let Some(pos) = split_pos {
141 let to_process = self.buffer[..=pos].to_string();
142 let remaining = self.buffer[pos + 1..].to_string();
143
144 let mut tokens = self.tokenizer.tokenize(&to_process);
145
146 for token in &mut tokens {
147 token.start_pos += self.total_chars_processed;
148 token.end_pos += self.total_chars_processed;
149 token.start_byte += self.total_bytes_processed;
150 token.end_byte += self.total_bytes_processed;
151 }
152
153 self.total_chars_processed += to_process.chars().count();
154 self.total_bytes_processed += to_process.len();
155 self.buffer = remaining;
156
157 tokens
158 } else if self.buffer.len() > self.max_buffer_size {
159 self.force_flush_partial()
160 } else {
161 Vec::new()
162 }
163 }
164
165 fn find_last_sentence_boundary(&self) -> Option<usize> {
171 let bytes = self.buffer.as_bytes();
172 for (i, ch) in self.buffer.char_indices().rev() {
173 if self.sentence_delimiters.contains(&ch) {
174 if ch == '.' && i > 0 && i + ch.len_utf8() < bytes.len() {
176 let prev_byte = bytes[i - 1];
177 let next_byte = bytes[i + ch.len_utf8()];
178 if prev_byte.is_ascii_digit() && next_byte.is_ascii_digit() {
179 continue;
180 }
181 }
182 return Some(i + ch.len_utf8() - 1);
183 }
184 }
185 None
186 }
187
188 fn find_safe_split_point(&self, target_pos: usize) -> usize {
190 let mut pos = target_pos.min(self.buffer.len());
192 while pos > 0 && !self.buffer.is_char_boundary(pos) {
193 pos -= 1;
194 }
195
196 for (byte_idx, ch) in self.buffer[..pos].char_indices().rev() {
199 if ch.is_whitespace() || self.sentence_delimiters.contains(&ch) {
200 return byte_idx + ch.len_utf8();
201 }
202 }
203
204 pos
206 }
207
208 fn force_flush_partial(&mut self) -> Vec<Token> {
210 let target_pos = self.buffer.len() / 2;
212 let split_pos = self.find_safe_split_point(target_pos);
213
214 if split_pos == 0 {
215 return self.flush();
217 }
218
219 let to_process = self.buffer[..split_pos].to_string();
220 let remaining = self.buffer[split_pos..].to_string();
221
222 let mut tokens = self.tokenizer.tokenize(&to_process);
223
224 for token in &mut tokens {
225 token.start_pos += self.total_chars_processed;
226 token.end_pos += self.total_chars_processed;
227 token.start_byte += self.total_bytes_processed;
228 token.end_byte += self.total_bytes_processed;
229 }
230
231 self.total_chars_processed += to_process.chars().count();
232 self.total_bytes_processed += to_process.len();
233 self.buffer = remaining;
234
235 tokens
236 }
237
238 pub fn flush(&mut self) -> Vec<Token> {
246 if self.buffer.is_empty() {
247 return Vec::new();
248 }
249
250 let to_process = std::mem::take(&mut self.buffer);
251 let mut tokens = self.tokenizer.tokenize(&to_process);
252
253 for token in &mut tokens {
254 token.start_pos += self.total_chars_processed;
255 token.end_pos += self.total_chars_processed;
256 token.start_byte += self.total_bytes_processed;
257 token.end_byte += self.total_bytes_processed;
258 }
259
260 self.total_chars_processed += to_process.chars().count();
261 self.total_bytes_processed += to_process.len();
262
263 tokens
264 }
265
266 pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
280 let mut buf_reader = BufReader::with_capacity(self.chunk_size, reader);
281 let mut all_tokens = Vec::new();
282
283 loop {
284 let mut line = String::new();
285 let bytes_read = buf_reader
286 .read_line(&mut line)
287 .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
288
289 if bytes_read == 0 {
290 break; }
292
293 let tokens = self.process_chunk(&line);
294 all_tokens.extend(tokens);
295 }
296
297 let remaining = self.flush();
299 all_tokens.extend(remaining);
300
301 Ok(all_tokens)
302 }
303
304 pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
318 let file = std::fs::File::open(path)
319 .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
320 self.process_reader(file)
321 }
322
323 #[must_use]
325 pub fn buffer_len(&self) -> usize {
326 self.buffer.len()
327 }
328
329 #[must_use]
331 pub const fn total_chars_processed(&self) -> usize {
332 self.total_chars_processed
333 }
334
335 #[must_use]
337 pub const fn total_bytes_processed(&self) -> usize {
338 self.total_bytes_processed
339 }
340
341 pub fn reset(&mut self) {
343 self.buffer.clear();
344 self.total_chars_processed = 0;
345 self.total_bytes_processed = 0;
346 }
347}
348
349pub struct TokenStream<I>
354where
355 I: Iterator<Item = String>,
356{
357 chunks: I,
359
360 streaming: StreamingTokenizer,
362
363 token_buffer: VecDeque<Token>,
365
366 finished: bool,
368
369 tokens_yielded: usize,
371}
372
373impl<I> TokenStream<I>
374where
375 I: Iterator<Item = String>,
376{
377 #[must_use]
384 pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
385 Self {
386 chunks,
387 streaming: StreamingTokenizer::new(tokenizer),
388 token_buffer: VecDeque::new(),
389 finished: false,
390 tokens_yielded: 0,
391 }
392 }
393
394 #[must_use]
396 pub fn with_chunk_size(mut self, size: usize) -> Self {
397 self.streaming = self.streaming.with_chunk_size(size);
398 self
399 }
400
401 #[must_use]
403 pub const fn tokens_yielded(&self) -> usize {
404 self.tokens_yielded
405 }
406}
407
408impl<I> Iterator for TokenStream<I>
409where
410 I: Iterator<Item = String>,
411{
412 type Item = Token;
413
414 fn next(&mut self) -> Option<Self::Item> {
415 if let Some(token) = self.token_buffer.pop_front() {
417 self.tokens_yielded += 1;
418 return Some(token);
419 }
420
421 if self.finished {
423 return None;
424 }
425
426 for chunk in self.chunks.by_ref() {
428 let tokens = self.streaming.process_chunk(&chunk);
429
430 if !tokens.is_empty() {
431 self.token_buffer.extend(tokens);
432 if let Some(token) = self.token_buffer.pop_front() {
433 self.tokens_yielded += 1;
434 return Some(token);
435 }
436 }
437 }
438
439 self.finished = true;
441 let remaining = self.streaming.flush();
442
443 if !remaining.is_empty() {
444 self.token_buffer.extend(remaining);
445 if let Some(token) = self.token_buffer.pop_front() {
446 self.tokens_yielded += 1;
447 return Some(token);
448 }
449 }
450
451 None
452 }
453
454 fn size_hint(&self) -> (usize, Option<usize>) {
455 (self.token_buffer.len(), None)
457 }
458}
459
460pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send>;
462
463#[derive(Debug, Clone)]
465pub struct StreamingProgress {
466 pub bytes_processed: usize,
468 pub total_bytes: Option<usize>,
470 pub tokens_generated: usize,
472 pub chunks_processed: usize,
474}
475
476impl StreamingProgress {
477 #[must_use]
479 #[allow(clippy::cast_precision_loss)]
480 pub fn percent(&self) -> Option<f64> {
481 self.total_bytes
482 .map(|total| (self.bytes_processed as f64 / total as f64) * 100.0)
483 }
484}
485
486pub struct ProgressStreamingTokenizer {
490 inner: StreamingTokenizer,
492
493 callback: Option<ProgressCallback>,
495
496 bytes_processed: usize,
498
499 total_bytes: Option<usize>,
501
502 tokens_generated: usize,
504
505 chunks_processed: usize,
507
508 callback_interval: usize,
510
511 last_callback_bytes: usize,
513}
514
515impl ProgressStreamingTokenizer {
516 pub const DEFAULT_CALLBACK_INTERVAL: usize = 65536;
518
519 #[must_use]
521 pub fn new(tokenizer: Tokenizer) -> Self {
522 Self {
523 inner: StreamingTokenizer::new(tokenizer),
524 callback: None,
525 bytes_processed: 0,
526 total_bytes: None,
527 tokens_generated: 0,
528 chunks_processed: 0,
529 callback_interval: Self::DEFAULT_CALLBACK_INTERVAL,
530 last_callback_bytes: 0,
531 }
532 }
533
534 #[must_use]
536 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
537 where
538 F: Fn(StreamingProgress) + Send + 'static,
539 {
540 self.callback = Some(Box::new(callback));
541 self
542 }
543
544 #[must_use]
546 pub const fn with_total_bytes(mut self, total: usize) -> Self {
547 self.total_bytes = Some(total);
548 self
549 }
550
551 #[must_use]
553 pub const fn with_callback_interval(mut self, interval: usize) -> Self {
554 self.callback_interval = interval;
555 self
556 }
557
558 #[must_use]
560 pub fn with_chunk_size(mut self, size: usize) -> Self {
561 self.inner = self.inner.with_chunk_size(size);
562 self
563 }
564
565 pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
567 self.bytes_processed += chunk.len();
568 self.chunks_processed += 1;
569
570 let tokens = self.inner.process_chunk(chunk);
571 self.tokens_generated += tokens.len();
572
573 if self.bytes_processed - self.last_callback_bytes >= self.callback_interval {
575 self.report_progress();
576 self.last_callback_bytes = self.bytes_processed;
577 }
578
579 tokens
580 }
581
582 pub fn flush(&mut self) -> Vec<Token> {
584 let tokens = self.inner.flush();
585 self.tokens_generated += tokens.len();
586
587 self.report_progress();
589
590 tokens
591 }
592
593 fn report_progress(&self) {
595 if let Some(ref callback) = self.callback {
596 callback(StreamingProgress {
597 bytes_processed: self.bytes_processed,
598 total_bytes: self.total_bytes,
599 tokens_generated: self.tokens_generated,
600 chunks_processed: self.chunks_processed,
601 });
602 }
603 }
604
605 pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
611 let mut buf_reader = BufReader::with_capacity(self.inner.chunk_size, reader);
612 let mut all_tokens = Vec::new();
613
614 loop {
615 let mut line = String::new();
616 let bytes_read = buf_reader
617 .read_line(&mut line)
618 .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
619
620 if bytes_read == 0 {
621 break;
622 }
623
624 let tokens = self.process_chunk(&line);
625 all_tokens.extend(tokens);
626 }
627
628 let remaining = self.flush();
629 all_tokens.extend(remaining);
630
631 Ok(all_tokens)
632 }
633
634 #[allow(clippy::cast_possible_truncation)]
640 pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
641 let metadata = std::fs::metadata(path.as_ref())
642 .map_err(|e| crate::Error::Analysis(format!("Failed to read metadata: {e}")))?;
643
644 self.total_bytes = Some(metadata.len() as usize);
645
646 let file = std::fs::File::open(path)
647 .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
648
649 self.process_reader(file)
650 }
651
652 #[must_use]
654 pub const fn progress(&self) -> StreamingProgress {
655 StreamingProgress {
656 bytes_processed: self.bytes_processed,
657 total_bytes: self.total_bytes,
658 tokens_generated: self.tokens_generated,
659 chunks_processed: self.chunks_processed,
660 }
661 }
662
663 pub fn reset(&mut self) {
665 self.inner.reset();
666 self.bytes_processed = 0;
667 self.tokens_generated = 0;
668 self.chunks_processed = 0;
669 self.last_callback_bytes = 0;
670 }
671}
672
673pub struct ChunkedTokenIterator<I>
677where
678 I: Iterator<Item = String>,
679{
680 chunks: I,
682
683 streaming: StreamingTokenizer,
685
686 finished: bool,
688}
689
690impl<I> ChunkedTokenIterator<I>
691where
692 I: Iterator<Item = String>,
693{
694 #[must_use]
696 pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
697 Self {
698 chunks,
699 streaming: StreamingTokenizer::new(tokenizer),
700 finished: false,
701 }
702 }
703
704 #[must_use]
706 pub fn with_chunk_size(mut self, size: usize) -> Self {
707 self.streaming = self.streaming.with_chunk_size(size);
708 self
709 }
710}
711
712impl<I> Iterator for ChunkedTokenIterator<I>
713where
714 I: Iterator<Item = String>,
715{
716 type Item = Vec<Token>;
717
718 fn next(&mut self) -> Option<Self::Item> {
719 if self.finished {
720 return None;
721 }
722
723 for chunk in self.chunks.by_ref() {
725 let tokens = self.streaming.process_chunk(&chunk);
726 if !tokens.is_empty() {
727 return Some(tokens);
728 }
729 }
730
731 self.finished = true;
733 let remaining = self.streaming.flush();
734
735 if remaining.is_empty() {
736 None
737 } else {
738 Some(remaining)
739 }
740 }
741}
742
743#[cfg(test)]
744#[allow(clippy::expect_used)]
745mod tests {
746 use super::*;
747
748 fn create_test_tokenizer() -> Tokenizer {
749 Tokenizer::new().expect("should create tokenizer")
750 }
751
752 #[test]
753 fn test_streaming_tokenizer_creation() {
754 let tokenizer = create_test_tokenizer();
755 let stream = StreamingTokenizer::new(tokenizer);
756
757 assert_eq!(stream.buffer_len(), 0);
758 assert_eq!(stream.total_chars_processed(), 0);
759 }
760
761 #[test]
762 fn test_process_chunk_with_delimiter() {
763 let tokenizer = create_test_tokenizer();
764 let mut stream = StreamingTokenizer::new(tokenizer);
765
766 let tokens = stream.process_chunk("안녕\n");
767 assert!(!tokens.is_empty() || stream.buffer_len() > 0);
768
769 let remaining = stream.flush();
771 let total_tokens = tokens.len() + remaining.len();
772 assert!(total_tokens > 0);
773 }
774
775 #[test]
776 fn test_process_chunk_without_delimiter() {
777 let tokenizer = create_test_tokenizer();
778 let mut stream = StreamingTokenizer::new(tokenizer);
779
780 let tokens = stream.process_chunk("안녕하세요");
781 assert!(tokens.is_empty() || stream.buffer_len() > 0);
783 }
784
785 #[test]
786 fn test_flush() {
787 let tokenizer = create_test_tokenizer();
788 let mut stream = StreamingTokenizer::new(tokenizer);
789
790 stream.process_chunk("안녕하세요");
791 let tokens = stream.flush();
792
793 assert!(!tokens.is_empty());
794 assert_eq!(stream.buffer_len(), 0);
795 }
796
797 #[test]
798 fn test_multiple_chunks() {
799 let tokenizer = create_test_tokenizer();
800 let mut stream = StreamingTokenizer::new(tokenizer);
801
802 let _tokens1 = stream.process_chunk("안녕하세요.\n");
803 let _tokens2 = stream.process_chunk("감사합니다.\n");
804 let _remaining = stream.flush();
805
806 assert!(stream.total_chars_processed() > 0);
807 }
808
809 #[test]
810 fn test_reset() {
811 let tokenizer = create_test_tokenizer();
812 let mut stream = StreamingTokenizer::new(tokenizer);
813
814 stream.process_chunk("안녕하세요");
815 stream.reset();
816
817 assert_eq!(stream.buffer_len(), 0);
818 assert_eq!(stream.total_chars_processed(), 0);
819 assert_eq!(stream.total_bytes_processed(), 0);
820 }
821
822 #[test]
823 fn test_byte_positions_accumulate_across_chunks() {
824 let tokenizer = create_test_tokenizer();
825 let mut stream = StreamingTokenizer::new(tokenizer);
826
827 let tokens1 = stream.process_chunk("안녕.\n");
828 let chunk1_bytes = "안녕.\n".len();
829
830 let tokens2 = stream.process_chunk("감사.\n");
831
832 for token in &tokens1 {
833 assert!(
834 token.start_byte < chunk1_bytes,
835 "first chunk token '{}' start_byte {} must be < chunk1 bytes {}",
836 token.surface,
837 token.start_byte,
838 chunk1_bytes
839 );
840 }
841
842 for token in &tokens2 {
843 assert!(
844 token.start_byte >= chunk1_bytes,
845 "second chunk token '{}' start_byte {} must be >= chunk1 bytes {}",
846 token.surface,
847 token.start_byte,
848 chunk1_bytes
849 );
850 }
851 }
852
853 #[test]
854 fn test_custom_chunk_size() {
855 let tokenizer = create_test_tokenizer();
856 let stream = StreamingTokenizer::new(tokenizer).with_chunk_size(1024);
857
858 assert_eq!(stream.chunk_size, 1024);
859 }
860
861 #[test]
862 fn test_custom_delimiters() {
863 let tokenizer = create_test_tokenizer();
864 let stream =
865 StreamingTokenizer::new(tokenizer).with_sentence_delimiters(vec!['.', '!', '?']);
866
867 assert_eq!(stream.sentence_delimiters.len(), 3);
868 }
869
870 #[test]
871 fn test_token_stream_creation() {
872 let tokenizer = create_test_tokenizer();
873 let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
874 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
875
876 assert!(!stream.finished);
877 }
878
879 #[test]
880 fn test_token_stream_iteration() {
881 let tokenizer = create_test_tokenizer();
882 let chunks = vec!["안녕\n".to_string(), "감사\n".to_string()];
883 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
884
885 let tokens: Vec<_> = stream.collect();
886 assert!(!tokens.is_empty());
887 }
888
889 #[test]
890 fn test_token_stream_tokens_yielded() {
891 let tokenizer = create_test_tokenizer();
892 let chunks = vec!["안녕하세요.\n".to_string()];
893 let mut stream = TokenStream::new(chunks.into_iter(), tokenizer);
894
895 let mut count = 0;
897 while stream.next().is_some() {
898 count += 1;
899 }
900
901 assert_eq!(stream.tokens_yielded(), count);
903 }
904
905 #[test]
906 fn test_token_stream_size_hint() {
907 let tokenizer = create_test_tokenizer();
908 let chunks = vec!["안녕하세요.\n".to_string()];
909 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
910
911 let (lower, _upper) = stream.size_hint();
912 assert_eq!(lower, 0);
914 }
915
916 #[test]
917 fn test_progress_streaming_tokenizer() {
918 let tokenizer = create_test_tokenizer();
919 let mut stream = ProgressStreamingTokenizer::new(tokenizer);
920
921 let _tokens = stream.process_chunk("안녕하세요.\n");
922 let progress = stream.progress();
923
924 assert!(progress.bytes_processed > 0);
925 assert!(progress.chunks_processed > 0);
926 }
927
928 #[test]
929 fn test_progress_callback() {
930 use std::sync::atomic::{AtomicUsize, Ordering};
931 use std::sync::Arc;
932
933 let tokenizer = create_test_tokenizer();
934 let callback_count = Arc::new(AtomicUsize::new(0));
935 let callback_count_clone = Arc::clone(&callback_count);
936
937 let mut stream = ProgressStreamingTokenizer::new(tokenizer)
938 .with_callback_interval(1) .with_progress_callback(move |_progress| {
940 callback_count_clone.fetch_add(1, Ordering::SeqCst);
941 });
942
943 stream.process_chunk("안녕하세요. 오늘 날씨가 좋네요.\n");
945 let _remaining = stream.flush();
946
947 assert!(callback_count.load(Ordering::SeqCst) > 0);
949 }
950
951 #[test]
952 fn test_progress_percent() {
953 let progress = StreamingProgress {
954 bytes_processed: 50,
955 total_bytes: Some(100),
956 tokens_generated: 10,
957 chunks_processed: 2,
958 };
959
960 assert_eq!(progress.percent(), Some(50.0));
961 }
962
963 #[test]
964 fn test_chunked_token_iterator() {
965 let tokenizer = create_test_tokenizer();
966 let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
967 let iter = ChunkedTokenIterator::new(chunks.into_iter(), tokenizer);
968
969 let token_chunks: Vec<_> = iter.collect();
970 let total_tokens: usize = token_chunks.iter().map(std::vec::Vec::len).sum();
972
973 let _ = total_tokens; }
977
978 #[test]
979 fn test_multibyte_delimiter_no_panic() {
980 let tokenizer = create_test_tokenizer();
981 let mut stream = StreamingTokenizer::new(tokenizer)
982 .with_sentence_delimiters(vec!['.', '!', '?', '。', '.', '\n']);
983
984 let tokens = stream.process_chunk("テスト。次の文。\n");
986 let remaining = stream.flush();
987 let total = tokens.len() + remaining.len();
988 assert!(total > 0 || stream.buffer_len() == 0);
989 }
990
991 #[test]
992 fn test_decimal_number_not_split() {
993 let tokenizer = create_test_tokenizer();
994 let mut stream = StreamingTokenizer::new(tokenizer);
995
996 let tokens = stream.process_chunk("값은 3.14입니다.\n");
997 let remaining = stream.flush();
998 let count = tokens.len() + remaining.len();
999 assert!(count > 0, "Decimal input should produce at least one token");
1003 }
1004
1005 #[test]
1006 fn test_buffer_limit_forces_flush() {
1007 let tokenizer = create_test_tokenizer();
1008 let mut stream = StreamingTokenizer::new(tokenizer);
1009 stream.max_buffer_size = 32;
1011
1012 let tokens = stream.process_chunk(&"가".repeat(100));
1014 assert!(!tokens.is_empty(), "Buffer limit should force a flush");
1015 }
1016
1017 #[test]
1018 fn test_safe_split_point() {
1019 let tokenizer = create_test_tokenizer();
1020 let stream = StreamingTokenizer::new(tokenizer)
1021 .with_sentence_delimiters(vec!['.', '!', '?', '\n', ' ']);
1022
1023 let mut stream = stream;
1025 let _tokens = stream.process_chunk("안녕하세요 감사합니다");
1026
1027 assert!(stream.buffer_len() > 0);
1029 }
1030}
1031
1032pub struct SentenceReader<R: BufRead> {
1061 reader: R,
1062 buffer: String,
1064 queue: std::collections::VecDeque<String>,
1066 eof: bool,
1068 max_buffer_size: usize,
1070}
1071
1072impl<R: BufRead> SentenceReader<R> {
1073 pub const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
1075
1076 #[must_use]
1078 pub const fn new(reader: R) -> Self {
1079 Self {
1080 reader,
1081 buffer: String::new(),
1082 queue: std::collections::VecDeque::new(),
1083 eof: false,
1084 max_buffer_size: Self::DEFAULT_MAX_BUFFER_SIZE,
1085 }
1086 }
1087
1088 #[must_use]
1092 pub const fn with_max_buffer_size(mut self, size: usize) -> Self {
1093 self.max_buffer_size = size;
1094 self
1095 }
1096
1097 fn drain_sentences(&mut self) {
1104 let buf = self.buffer.as_str();
1106 let indices: Vec<(usize, char)> = buf.char_indices().collect();
1107 let len = indices.len();
1108 let mut start_char = 0; let mut i = 0;
1111 while i < len {
1112 let (_, ch) = indices[i];
1113
1114 if ch == '\n' {
1115 let start_byte = indices[start_char].0;
1116 let end_byte = indices[i].0;
1117 let trimmed = buf[start_byte..end_byte].trim();
1118 if !trimmed.is_empty() {
1119 self.queue.push_back(trimmed.to_string());
1120 }
1121 start_char = i + 1;
1122 i += 1;
1123 continue;
1124 }
1125
1126 if matches!(ch, '.' | '?' | '!') {
1127 if ch == '.' {
1128 let prev_is_digit = i > 0 && indices[i - 1].1.is_ascii_digit();
1129 let next_is_digit = i + 1 < len && indices[i + 1].1.is_ascii_digit();
1130 if prev_is_digit && next_is_digit {
1131 i += 1;
1132 continue;
1133 }
1134 }
1135
1136 let punct_byte_end = indices[i].0 + ch.len_utf8();
1137
1138 let mut j = i + 1;
1139 while j < len && matches!(indices[j].1, ')' | ']' | '"' | '\'') {
1140 j += 1;
1141 }
1142
1143 let followed_by_whitespace = j < len && indices[j].1.is_whitespace();
1144 let followed_by_eof = j >= len && self.eof;
1145
1146 if followed_by_whitespace || followed_by_eof {
1147 let start_byte = indices[start_char].0;
1148 let trimmed = buf[start_byte..punct_byte_end].trim();
1149 if !trimmed.is_empty() {
1150 self.queue.push_back(trimmed.to_string());
1151 }
1152 start_char = j;
1153 if j < len && indices[j].1.is_whitespace() && indices[j].1 != '\n' {
1154 start_char = j + 1;
1155 i = j + 1;
1156 } else {
1157 i = j;
1158 }
1159 continue;
1160 }
1161 }
1162
1163 i += 1;
1164 }
1165
1166 if self.eof && start_char < len {
1167 let start_byte = indices[start_char].0;
1168 let trimmed = buf[start_byte..].trim();
1169 if !trimmed.is_empty() {
1170 self.queue.push_back(trimmed.to_string());
1171 }
1172 self.buffer.clear();
1173 } else if start_char > 0 && start_char < len {
1174 let byte_offset = indices[start_char].0;
1175 self.buffer.drain(..byte_offset);
1176 } else if start_char >= len && !self.eof {
1177 self.buffer.clear();
1178 }
1179 }
1180
1181 fn fill_buffer(&mut self) -> io::Result<bool> {
1186 if self.buffer.len() >= self.max_buffer_size {
1187 let trimmed = self.buffer.trim().to_string();
1189 if !trimmed.is_empty() {
1190 self.queue.push_back(trimmed);
1191 }
1192 self.buffer.clear();
1193 }
1194
1195 let mut line = String::new();
1196 let n = self.reader.read_line(&mut line)?;
1197 if n == 0 {
1198 self.eof = true;
1199 Ok(false)
1200 } else {
1201 self.buffer.push_str(&line);
1202 Ok(true)
1203 }
1204 }
1205}
1206
1207impl<R: BufRead> Iterator for SentenceReader<R> {
1208 type Item = io::Result<String>;
1209
1210 fn next(&mut self) -> Option<Self::Item> {
1211 loop {
1212 if let Some(sentence) = self.queue.pop_front() {
1214 return Some(Ok(sentence));
1215 }
1216
1217 if self.eof {
1219 return None;
1220 }
1221
1222 if let Err(e) = self.fill_buffer() {
1224 return Some(Err(e));
1225 }
1226
1227 self.drain_sentences();
1229 }
1230 }
1231}
1232
1233#[cfg(test)]
1234mod sentence_reader_tests {
1235 #![allow(clippy::expect_used, clippy::unwrap_used, clippy::needless_collect)]
1236
1237 use super::*;
1238 use std::io::Cursor;
1239
1240 #[test]
1241 fn test_single_sentence() {
1242 let input = "안녕하세요.\n";
1243 let reader = SentenceReader::new(Cursor::new(input));
1244 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1245 assert_eq!(sentences, vec!["안녕하세요."]);
1246 }
1247
1248 #[test]
1249 fn test_multiple_sentences() {
1250 let input = "첫 번째 문장입니다. 두 번째 문장입니다.\n";
1251 let reader = SentenceReader::new(Cursor::new(input));
1252 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1253 assert_eq!(sentences.len(), 2);
1254 assert_eq!(sentences[0], "첫 번째 문장입니다.");
1255 assert_eq!(sentences[1], "두 번째 문장입니다.");
1256 }
1257
1258 #[test]
1259 fn test_newline_boundary() {
1260 let input = "줄 하나\n줄 둘\n";
1261 let reader = SentenceReader::new(Cursor::new(input));
1262 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1263 assert_eq!(sentences, vec!["줄 하나", "줄 둘"]);
1264 }
1265
1266 #[test]
1267 fn test_decimal_not_boundary() {
1268 let input = "값은 3.14입니다.\n";
1269 let reader = SentenceReader::new(Cursor::new(input));
1270 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1271 assert_eq!(sentences, vec!["값은 3.14입니다."]);
1272 }
1273
1274 #[test]
1275 fn test_question_mark() {
1276 let input = "이것은 무엇인가요? 네, 맞습니다.\n";
1277 let reader = SentenceReader::new(Cursor::new(input));
1278 let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1279 assert_eq!(sentences.len(), 2);
1280 }
1281
1282 #[test]
1283 fn test_empty_input() {
1284 let input = "";
1285 let reader = SentenceReader::new(Cursor::new(input));
1286 let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1287 assert!(sentences.is_empty());
1288 }
1289
1290 #[test]
1291 fn test_no_trailing_newline() {
1292 let input = "마지막 문장";
1293 let reader = SentenceReader::new(Cursor::new(input));
1294 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1295 assert_eq!(sentences, vec!["마지막 문장"]);
1296 }
1297
1298 #[test]
1299 fn test_multiple_newlines() {
1300 let input = "첫째\n\n둘째\n";
1301 let reader = SentenceReader::new(Cursor::new(input));
1302 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1303 assert_eq!(sentences, vec!["첫째", "둘째"]);
1305 }
1306
1307 #[test]
1308 fn test_exclamation() {
1309 let input = "대단합니다! 정말요?\n";
1310 let reader = SentenceReader::new(Cursor::new(input));
1311 let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1312 assert_eq!(sentences.len(), 2);
1313 }
1314
1315 #[test]
1316 fn test_sentence_reader_is_send() {
1317 fn assert_send<T: Send>() {}
1318 assert_send::<SentenceReader<std::io::Cursor<&[u8]>>>();
1319 }
1320
1321 #[test]
1322 fn test_closing_paren_before_whitespace() {
1323 let input = "문장입니다.) 다음 문장.\n";
1325 let reader = SentenceReader::new(Cursor::new(input));
1326 let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1327 assert_eq!(sentences.len(), 2);
1328 }
1329
1330 #[test]
1331 fn test_no_trailing_newline_punctuation() {
1332 let input = "첫째. 둘째.";
1334 let reader = SentenceReader::new(Cursor::new(input));
1335 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1336 assert_eq!(sentences.len(), 2);
1337 assert_eq!(sentences[0], "첫째.");
1338 assert_eq!(sentences[1], "둘째.");
1339 }
1340
1341 #[test]
1342 fn test_buffer_limit_prevents_oom() {
1343 let long_line = "가".repeat(200);
1346 let reader = SentenceReader::new(Cursor::new(long_line.as_str())).with_max_buffer_size(64);
1347 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1348 assert!(!sentences.is_empty());
1350 }
1351}