Chunker API Reference

Chunker

Chunk documents.

Potential functions to override if implementing a custom Chunker class:

  • chunk_docs(): the logic for how a document is is chunked.
  • get_splitter(): the logic for which splitter to use.
Source code in src/easy_ingest_text/chunk_text.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class Chunker:
    """Chunk documents.

    Potential functions to override if implementing a custom Chunker class:

    - `chunk_docs()`: the logic for how a document is is chunked.
    - `get_splitter()`: the logic for which splitter to use.
    """

    ALLOWED_SPLITTERS = {"custom", "recursive"}

    def __init__(
        self,
        splitter: str = "recursive",
        splitter_config: Dict = DEFAULT_SPLITTERS_CONFIG,
        num_workers: int = 10,
    ) -> None:
        """
        Initializes the Chunker instance with specified splitter configuration
        and multiprocessing settings.

        Args:
            splitter (str): The name of the splitter to use. Currently supports
                'custom' or 'recursive'.
            splitter_config (Dict): Configuration dictionary for the chosen
                splitter.
            num_workers (int): Number of worker processes to use for chunking.

        Raises:
            ValueError: If the specified splitter is not supported.
        """

        self.splitter_config = splitter_config
        self.num_workers = num_workers

        if splitter not in self.ALLOWED_SPLITTERS:
            raise ValueError(
                f"{splitter} is not a valid splitter."
                f" Choose from: {self.ALLOWED_SPLITTERS}"
            )
        self.splitter = self.get_splitter(splitter)

    def chunk_dataset(
        self,
        input_dir: str,
        detailed_progress: bool = False,
        output_dir: Optional[str] = None,
        num_workers: Optional[int] = None,
    ) -> None:
        """
        Processes and chunks all documents in a specified directory.

        Args:
            input_dir (str): Directory containing the documents to be chunked.
            output_dir (str, optional): Directory to save the chunked documents
                if save_chunks is True.
            detailed_progress (bool): Whether to show detailed progress during
                the chunking process.
            num_workers (int, optional): Number of worker processes to use;
                defaults to the instance's num_workers if not provided.

        Raises:
            ValueError: If `save_chunks` is True but `output_dir` is not provided.
        """
        if num_workers is None:
            num_workers = self.num_workers

        num_files = None
        if detailed_progress:
            num_files = len(list(get_files_from_dir(input_dir)))

        partial_func = partial(
            self.chunk_file, save_chunks=True, output_dir=output_dir
        )

        with tqdm(
            total=num_files, desc="Chunking files", unit=" files", smoothing=0
        ) as pbar:
            with multiprocessing.Pool(num_workers) as pool:
                for _ in pool.imap_unordered(
                    partial_func,
                    get_files_from_dir(input_dir),
                ):
                    pbar.update(1)

    def chunk_file(
        self, save_chunks: bool, output_dir: Optional[str], file_path: str
    ) -> List[EnhancedDocument]:
        """
        Chunks a single file into smaller EnhancedDocuments.

        Args:
            save_chunks (bool): Whether to save the chunked documents.
            output_dir (str): Directory to save the documents if save_chunks
                is True.
            file_path (str): Path to the file to be chunked.

        Returns:
            List[EnhancedDocument]: A list of chunked documents.
        """
        logging.debug("Chunking file: %s", file_path)
        raw_docs = load_docs_from_jsonl(file_path)
        chunked_docs = self.chunk_docs(raw_docs)
        if save_chunks:
            if output_dir is None:
                raise ValueError(
                    "Must provide an output directory when saving documents."
                )
            save_docs_to_file(chunked_docs, file_path, output_dir)
        logging.debug("Chunked file: %s", file_path)
        return chunked_docs

    def chunk_docs(
        self, raw_docs: List[EnhancedDocument]
    ) -> List[EnhancedDocument]:
        """
        Splits a list of EnhancedDocuments into smaller, chunked
        EnhancedDocuments.

        Args:
            raw_docs (List[EnhancedDocument]): List of documents to be chunked.

        Returns:
            List[EnhancedDocument]: A list of chunked documents.
        """

        chunked_docs = self.splitter.split_documents(raw_docs)
        # NOTE(STP): We need to remove the hashes here since the page content
        # for the chunk differs from the parent document.
        docs = [EnhancedDocument.remove_hashes(doc) for doc in chunked_docs]
        docs = [EnhancedDocument.from_document(doc) for doc in docs]
        return docs

    def get_splitter(self, splitter: str) -> TextSplitter:
        """
        Retrieves the appropriate document splitter based on the specified type.

        Args:
            splitter (str): The name of the splitter to use.

        Returns:
            TextSplitter: An instance of a TextSplitter.

        Raises:
            NotImplementedError: If a 'custom' splitter is specified but
                not implemented.
            ValueError: If the specified splitter type is not recognized.
        """
        if splitter == "custom":
            error_message = """
            "If using custom vectorstore, the Embedder.set_vectorstore() method
            must be overridden.
            """
            raise NotImplementedError(error_message)

        if splitter == "recursive":
            kwargs = self.splitter_config["recursive"]
            return RecursiveCharacterTextSplitter(**kwargs)
        else:
            raise ValueError("Splitter not recognized: %s", splitter)

__init__(splitter='recursive', splitter_config=DEFAULT_SPLITTERS_CONFIG, num_workers=10)

Initializes the Chunker instance with specified splitter configuration and multiprocessing settings.

Parameters:
  • splitter (str, default: 'recursive' ) –

    The name of the splitter to use. Currently supports 'custom' or 'recursive'.

  • splitter_config (Dict, default: DEFAULT_SPLITTERS_CONFIG ) –

    Configuration dictionary for the chosen splitter.

  • num_workers (int, default: 10 ) –

    Number of worker processes to use for chunking.

Raises:
  • ValueError

    If the specified splitter is not supported.

Source code in src/easy_ingest_text/chunk_text.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(
    self,
    splitter: str = "recursive",
    splitter_config: Dict = DEFAULT_SPLITTERS_CONFIG,
    num_workers: int = 10,
) -> None:
    """
    Initializes the Chunker instance with specified splitter configuration
    and multiprocessing settings.

    Args:
        splitter (str): The name of the splitter to use. Currently supports
            'custom' or 'recursive'.
        splitter_config (Dict): Configuration dictionary for the chosen
            splitter.
        num_workers (int): Number of worker processes to use for chunking.

    Raises:
        ValueError: If the specified splitter is not supported.
    """

    self.splitter_config = splitter_config
    self.num_workers = num_workers

    if splitter not in self.ALLOWED_SPLITTERS:
        raise ValueError(
            f"{splitter} is not a valid splitter."
            f" Choose from: {self.ALLOWED_SPLITTERS}"
        )
    self.splitter = self.get_splitter(splitter)

chunk_dataset(input_dir, detailed_progress=False, output_dir=None, num_workers=None)

Processes and chunks all documents in a specified directory.

Parameters:
  • input_dir (str) –

    Directory containing the documents to be chunked.

  • output_dir (str, default: None ) –

    Directory to save the chunked documents if save_chunks is True.

  • detailed_progress (bool, default: False ) –

    Whether to show detailed progress during the chunking process.

  • num_workers (int, default: None ) –

    Number of worker processes to use; defaults to the instance's num_workers if not provided.

Raises:
  • ValueError

    If save_chunks is True but output_dir is not provided.

Source code in src/easy_ingest_text/chunk_text.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def chunk_dataset(
    self,
    input_dir: str,
    detailed_progress: bool = False,
    output_dir: Optional[str] = None,
    num_workers: Optional[int] = None,
) -> None:
    """
    Processes and chunks all documents in a specified directory.

    Args:
        input_dir (str): Directory containing the documents to be chunked.
        output_dir (str, optional): Directory to save the chunked documents
            if save_chunks is True.
        detailed_progress (bool): Whether to show detailed progress during
            the chunking process.
        num_workers (int, optional): Number of worker processes to use;
            defaults to the instance's num_workers if not provided.

    Raises:
        ValueError: If `save_chunks` is True but `output_dir` is not provided.
    """
    if num_workers is None:
        num_workers = self.num_workers

    num_files = None
    if detailed_progress:
        num_files = len(list(get_files_from_dir(input_dir)))

    partial_func = partial(
        self.chunk_file, save_chunks=True, output_dir=output_dir
    )

    with tqdm(
        total=num_files, desc="Chunking files", unit=" files", smoothing=0
    ) as pbar:
        with multiprocessing.Pool(num_workers) as pool:
            for _ in pool.imap_unordered(
                partial_func,
                get_files_from_dir(input_dir),
            ):
                pbar.update(1)

chunk_docs(raw_docs)

Splits a list of EnhancedDocuments into smaller, chunked EnhancedDocuments.

Parameters:
Returns:
  • List[EnhancedDocument]

    List[EnhancedDocument]: A list of chunked documents.

Source code in src/easy_ingest_text/chunk_text.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def chunk_docs(
    self, raw_docs: List[EnhancedDocument]
) -> List[EnhancedDocument]:
    """
    Splits a list of EnhancedDocuments into smaller, chunked
    EnhancedDocuments.

    Args:
        raw_docs (List[EnhancedDocument]): List of documents to be chunked.

    Returns:
        List[EnhancedDocument]: A list of chunked documents.
    """

    chunked_docs = self.splitter.split_documents(raw_docs)
    # NOTE(STP): We need to remove the hashes here since the page content
    # for the chunk differs from the parent document.
    docs = [EnhancedDocument.remove_hashes(doc) for doc in chunked_docs]
    docs = [EnhancedDocument.from_document(doc) for doc in docs]
    return docs

chunk_file(save_chunks, output_dir, file_path)

Chunks a single file into smaller EnhancedDocuments.

Parameters:
  • save_chunks (bool) –

    Whether to save the chunked documents.

  • output_dir (str) –

    Directory to save the documents if save_chunks is True.

  • file_path (str) –

    Path to the file to be chunked.

Returns:
  • List[EnhancedDocument]

    List[EnhancedDocument]: A list of chunked documents.

Source code in src/easy_ingest_text/chunk_text.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def chunk_file(
    self, save_chunks: bool, output_dir: Optional[str], file_path: str
) -> List[EnhancedDocument]:
    """
    Chunks a single file into smaller EnhancedDocuments.

    Args:
        save_chunks (bool): Whether to save the chunked documents.
        output_dir (str): Directory to save the documents if save_chunks
            is True.
        file_path (str): Path to the file to be chunked.

    Returns:
        List[EnhancedDocument]: A list of chunked documents.
    """
    logging.debug("Chunking file: %s", file_path)
    raw_docs = load_docs_from_jsonl(file_path)
    chunked_docs = self.chunk_docs(raw_docs)
    if save_chunks:
        if output_dir is None:
            raise ValueError(
                "Must provide an output directory when saving documents."
            )
        save_docs_to_file(chunked_docs, file_path, output_dir)
    logging.debug("Chunked file: %s", file_path)
    return chunked_docs

get_splitter(splitter)

Retrieves the appropriate document splitter based on the specified type.

Parameters:
  • splitter (str) –

    The name of the splitter to use.

Returns:
  • TextSplitter( TextSplitter ) –

    An instance of a TextSplitter.

Raises:
  • NotImplementedError

    If a 'custom' splitter is specified but not implemented.

  • ValueError

    If the specified splitter type is not recognized.

Source code in src/easy_ingest_text/chunk_text.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def get_splitter(self, splitter: str) -> TextSplitter:
    """
    Retrieves the appropriate document splitter based on the specified type.

    Args:
        splitter (str): The name of the splitter to use.

    Returns:
        TextSplitter: An instance of a TextSplitter.

    Raises:
        NotImplementedError: If a 'custom' splitter is specified but
            not implemented.
        ValueError: If the specified splitter type is not recognized.
    """
    if splitter == "custom":
        error_message = """
        "If using custom vectorstore, the Embedder.set_vectorstore() method
        must be overridden.
        """
        raise NotImplementedError(error_message)

    if splitter == "recursive":
        kwargs = self.splitter_config["recursive"]
        return RecursiveCharacterTextSplitter(**kwargs)
    else:
        raise ValueError("Splitter not recognized: %s", splitter)