๐Ÿ“ฆ p0n1 / epub_to_audiobook

๐Ÿ“„ utils.py ยท 276 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276import logging
from typing import List
import tempfile
import os
import io
from pydub import AudioSegment
from mutagen.id3._frames import TIT2, TPE1, TALB, TRCK
from mutagen.id3 import ID3, ID3NoHeaderError
from typing import List
from sentencex import segment
import os

logger = logging.getLogger(__name__)


def split_text(text: str, max_chars: int, language: str) -> List[str]:
    """
    Split text into chunks, where each chunk is as close to max_chars as possible.
    
    Args:
        text: The text to split
        max_chars: The maximum number of characters per chunk
        language: The language of the text
    
    Returns:
        A list of text chunks
    """
    # Edge cases
    if not text:
        return []
    if max_chars <= 0:
        raise ValueError("max_chars must be positive")
    
    # Use sentencex to get all sentences
    sentences = list(segment(language, text))
    
    chunks = []
    current_chunk = ""
    
    for sentence in sentences:
        # Add a space between sentences if current_chunk is not empty
        space = " " if current_chunk else ""
        # Check if adding the sentence would exceed max_chars
        if len(current_chunk) + len(space) + len(sentence) <= max_chars:
            current_chunk += space + sentence
        # If the sentence itself is longer than max_chars, split it
        elif len(sentence) > max_chars:
            # Add the current chunk if it's not empty
            if current_chunk:
                chunks.append(current_chunk)
                current_chunk = ""
            
            # Split the long sentence
            sentence_chunks = split_long_sentence(sentence, max_chars)
            
            # Add all chunks except the last one
            chunks.extend(sentence_chunks[:-1])
            
            # Start a new chunk with the last sentence chunk
            current_chunk = sentence_chunks[-1]
        # Otherwise, start a new chunk with this sentence
        else:
            if current_chunk:
                chunks.append(current_chunk)
            current_chunk = sentence
    
    # Add the last chunk if it's not empty
    if current_chunk:
        chunks.append(current_chunk)
    
    # For DEBUG only
    # # Assert that no chunk exceeds max_chars
    # for i, chunk in enumerate(chunks):
    #     assert len(chunk) <= max_chars, f"Chunk {i} length {len(chunk)} exceeds max_chars {max_chars}"
    
    # # Assert that no content is lost (loose check)
    # original_sans_whitespace = ''.join(c for c in text if not c.isspace())
    # chunks_sans_whitespace = ''.join(c for c in ''.join(chunks) if not c.isspace())
    
    # # The lengths should be the same
    # assert len(chunks_sans_whitespace) == len(original_sans_whitespace), "Content might be lost during splitting"
    
    return chunks

def split_long_sentence(sentence: str, max_chars: int) -> List[str]:
    """
    Split a long sentence into smaller parts based on punctuation and spaces.
    
    Args:
        sentence: The sentence to split
        max_chars: The maximum number of characters per part
    
    Returns:
        A list of sentence parts
    """
    # If max_chars is extremely small, split by character
    if max_chars < 5:
        return [sentence[i:i+max_chars] for i in range(0, len(sentence), max_chars)]
    
    # Define punctuation marks in order of priority
    punctuations = [
        'ใ€‚', '๏ผ', '๏ผŸ',  # Chinese end-of-sentence
        '. ', '! ', '? ',  # English end-of-sentence with space
        '๏ผ›', ';',  # Semicolons
        '๏ผŒ', ',',  # Commas
        '๏ผš', ':',  # Colons
        '๏ผ‰', ')', ']', 'ใ€‘', '}', 'ใ€', 'ใ€',  # Closing parentheses and brackets
        'ใ€',  # Chinese enumeration comma
        'โ€”', '-', 'โ€“',  # Dashes
        ' ',  # Spaces as last resort
    ]
    
    parts = []
    remaining = sentence
    
    while remaining:
        if len(remaining) <= max_chars:
            parts.append(remaining)
            break
        
        # Try to find the best split point based on punctuation marks
        best_split_idx = -1
        
        for punctuation in punctuations:
            # Find the rightmost occurrence of the punctuation within max_chars
            split_idx = remaining[:max_chars].rfind(punctuation)
            
            if split_idx != -1:
                # For punctuation marks that are not spaces, include them in the current chunk
                if punctuation != ' ':
                    split_idx += len(punctuation)
                else:
                    # For spaces, exclude them from both chunks
                    split_idx += 1
                
                best_split_idx = split_idx
                break
        
        # If no punctuation is found, split at max_chars
        if best_split_idx == -1:
            best_split_idx = max_chars
        
        parts.append(remaining[:best_split_idx])
        remaining = remaining[best_split_idx:]
    
    return parts


def set_audio_tags(output_file, audio_tags):
    try:
        try:
            tags = ID3(output_file)
            logger.debug(f"tags: {tags}")
        except ID3NoHeaderError:
            logger.debug(f"handling ID3NoHeaderError: {output_file}")
            tags = ID3()
        tags.add(TIT2(encoding=3, text=audio_tags.title))
        tags.add(TPE1(encoding=3, text=audio_tags.author))
        tags.add(TALB(encoding=3, text=audio_tags.book_title))
        tags.add(TRCK(encoding=3, text=str(audio_tags.idx)))
        tags.save(output_file)
    except Exception as e:
        logger.error(f"Error while setting audio tags: {e}, {output_file}")
        raise e  # TODO: use this raise to catch unknown errors for now


def is_special_char(char: str) -> bool:
    # Check if the character is a English letter, number or punctuation or a punctuation in Chinese, never split these characters.
    ord_char = ord(char)
    result = (
        (ord_char >= 33 and ord_char <= 126)
        or (char in "ใ€‚๏ผŒใ€๏ผŸ๏ผ๏ผš๏ผ›โ€œโ€โ€˜โ€™๏ผˆ๏ผ‰ใ€Šใ€‹ใ€ใ€‘โ€ฆโ€”๏ฝžยทใ€Œใ€ใ€Žใ€ใ€ˆใ€‰ใ€–ใ€—ใ€”ใ€•")
        or (char in "โˆถ")
    )  # special unicode punctuation
    logger.debug(f"is_special_char> char={char}, ord={ord_char}, result={result}")
    return result


def save_segment_tmp(segment: io.BytesIO, output_format: str, prefix: str = None) -> str:
    """
    Save audio segment to a temporary file
    
    Args:
        segment: Audio segment (io.BytesIO)
        output_format: Audio file format
        prefix: Optional prefix for the temporary filename
        
    Returns:
        Path to the temporary file
    """
    kwargs = {"delete": False, "suffix": f".{output_format}"}
    if prefix:
        kwargs["prefix"] = f"{prefix}_"
        
    with tempfile.NamedTemporaryFile(**kwargs) as tmp_file:
        segment.seek(0)
        tmp_file.write(segment.read())
        logger.debug(f"Audio segment written to temporary file: {tmp_file.name}")
        return tmp_file.name


def pydub_merge_audio_segments(tmp_files: List[str], output_file: str, output_format: str) -> None:
    """
    Merge multiple audio segments into one and set audio tags
    
    Args:
        tmp_files: List of temporary file paths
        output_file: Path to the final output file
        output_format: Audio file format
    """
    if not tmp_files:
        logger.warning("No temporary files to merge")
        return
        
    combined = AudioSegment.empty()
    for tmp_file in tmp_files:
        logger.debug(f"Loading chunk from temporary file: {tmp_file}")
        segment = AudioSegment.from_file(tmp_file)
        combined += segment
        
    # Export to final output file
    logger.debug(f"Exporting to final output file: {output_file}")
    combined.export(output_file, format=output_format)
    logger.debug(f"Final output file exported: {output_file}")

    # Delete the temporary files
    for tmp_file in tmp_files:
        os.remove(tmp_file)
    logger.debug(f"Temporary files deleted: {tmp_files}")


def direct_merge_audio_segments(audio_segments: List[io.BytesIO], output_file: str) -> None:
    """
    Directly write multiple audio segments into one file without using pydub
    
    Args:
        audio_segments: List of audio segments in memory
        output_file: Path to the final output file
    """
    if not audio_segments:
        logger.warning("No audio segments to write")
        return
        
    logger.debug(f"Writing audio segments directly to file: {output_file}")
    with open(output_file, "wb") as outfile:
        for segment in audio_segments:
            segment.seek(0)
            outfile.write(segment.read())
    logger.debug(f"Direct writing completed: {output_file}")


def merge_audio_segments(audio_segments: List[io.BytesIO], output_file: str, output_format: str, 
                          chunk_ids: List[str], use_pydub_merge: bool) -> None:
    """
    Merge audio segments using either pydub or direct write method based on configuration
    
    Args:
        audio_segments: List of audio segments (BytesIO objects)
        output_file: Path to the final output file
        output_format: Audio file format
        chunk_ids: List of IDs for each audio chunk
        use_pydub_merge: Whether to use pydub for merging (True) or direct write (False)
    """
    if use_pydub_merge:
        logger.info(f"Using pydub to merge audio segments: {chunk_ids}")
        tmp_files = []
        for i, segment in enumerate(audio_segments):
            tmp_file_path = save_segment_tmp(segment, output_format, chunk_ids[i])
            tmp_files.append(tmp_file_path)

        # Merge all audio chunks with pydub and create the final output file
        pydub_merge_audio_segments(tmp_files, output_file, output_format)
    else:
        logger.info(f"Using direct write to merge audio segments: {chunk_ids}")
        # Direct write audio segments to output file
        direct_merge_audio_segments(audio_segments, output_file)