Skip to content

Extraction Functions

Kreuzberg provides both async and sync functions for text extraction. All functions accept an optional ExtractionConfig parameter for configuring the extraction process.

Asynchronous Functions

These functions return awaitable coroutines that must be awaited or run in an asyncio event loop.

extract_file

Extract text from a file path:

kreuzberg.extract_file(file_path: PathLike[str] | str, mime_type: str | None = None, config: ExtractionConfig = DEFAULT_CONFIG) -> ExtractionResult async

Extract the textual content from a given file.

PARAMETER DESCRIPTION
file_path

The path to the file.

TYPE: PathLike[str] | str

mime_type

The mime type of the content.

TYPE: str | None DEFAULT: None

config

Extraction options object, defaults to the default object.

TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG

RETURNS DESCRIPTION
ExtractionResult

The extracted content and the mime type of the content.

RAISES DESCRIPTION
ValidationError

If the file path or configuration is invalid.

Source code in kreuzberg/extraction.py
async def extract_file(
    file_path: PathLike[str] | str, mime_type: str | None = None, config: ExtractionConfig = DEFAULT_CONFIG
) -> ExtractionResult:
    """Extract the textual content from a given file.

    Args:
        file_path: The path to the file.
        mime_type: The mime type of the content.
        config: Extraction options object, defaults to the default object.

    Returns:
        The extracted content and the mime type of the content.

    Raises:
        ValidationError: If the file path or configuration is invalid.
    """
    cache = get_document_cache()
    path = Path(file_path)
    cached_result = cache.get(path, config)
    if cached_result is not None:
        return cached_result

    if cache.is_processing(path, config):
        event = cache.mark_processing(path, config)
        await anyio.to_thread.run_sync(event.wait)  # pragma: no cover

        # Try cache again after waiting for other process to complete  # ~keep
        cached_result = cache.get(path, config)  # pragma: no cover
        if cached_result is not None:  # pragma: no cover
            return cached_result

    cache.mark_processing(path, config)

    try:
        if not path.exists():
            raise ValidationError("The file does not exist", context={"file_path": str(path)})

        mime_type = validate_mime_type(file_path=file_path, mime_type=mime_type)
        if extractor := ExtractorRegistry.get_extractor(mime_type=mime_type, config=config):
            result = await extractor.extract_path_async(Path(file_path))
        else:
            result = ExtractionResult(
                content=safe_decode(await anyio.Path(file_path).read_bytes()),
                chunks=[],
                mime_type=mime_type,
                metadata={},
            )

        result = await _validate_and_post_process_async(result=result, config=config, file_path=path)

        cache.set(path, config, result)

        return result
    finally:
        cache.mark_complete(path, config)

extract_bytes

Extract text from raw bytes:

kreuzberg.extract_bytes(content: bytes, mime_type: str, config: ExtractionConfig = DEFAULT_CONFIG) -> ExtractionResult async

Extract the textual content from a given byte string representing a file's contents.

PARAMETER DESCRIPTION
content

The content to extract.

TYPE: bytes

mime_type

The mime type of the content.

TYPE: str

config

Extraction options object, defaults to the default object.

TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG

RETURNS DESCRIPTION
ExtractionResult

The extracted content and the mime type of the content.

Source code in kreuzberg/extraction.py
async def extract_bytes(content: bytes, mime_type: str, config: ExtractionConfig = DEFAULT_CONFIG) -> ExtractionResult:
    """Extract the textual content from a given byte string representing a file's contents.

    Args:
        content: The content to extract.
        mime_type: The mime type of the content.
        config: Extraction options object, defaults to the default object.


    Returns:
        The extracted content and the mime type of the content.
    """
    mime_type = validate_mime_type(mime_type=mime_type)
    if extractor := ExtractorRegistry.get_extractor(mime_type=mime_type, config=config):
        result = await extractor.extract_bytes_async(content)
    else:
        result = ExtractionResult(
            content=safe_decode(content),
            chunks=[],
            mime_type=mime_type,
            metadata={},
        )

    return await _validate_and_post_process_async(result=result, config=config)

batch_extract_file

Process multiple files concurrently:

kreuzberg.batch_extract_file(file_paths: Sequence[PathLike[str] | str], config: ExtractionConfig = DEFAULT_CONFIG) -> list[ExtractionResult] async

Extract text from multiple files concurrently with optimizations.

PARAMETER DESCRIPTION
file_paths

A sequence of paths to files to extract text from.

TYPE: Sequence[PathLike[str] | str]

config

Extraction options object, defaults to the default object.

TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG

RETURNS DESCRIPTION
list[ExtractionResult]

A list of extraction results in the same order as the input paths.

Source code in kreuzberg/extraction.py
async def batch_extract_file(
    file_paths: Sequence[PathLike[str] | str], config: ExtractionConfig = DEFAULT_CONFIG
) -> list[ExtractionResult]:
    """Extract text from multiple files concurrently with optimizations.

    Args:
        file_paths: A sequence of paths to files to extract text from.
        config: Extraction options object, defaults to the default object.

    Returns:
        A list of extraction results in the same order as the input paths.
    """
    if not file_paths:
        return []

    max_concurrency = min(len(file_paths), mp.cpu_count() * 2)
    semaphore = anyio.Semaphore(max_concurrency)

    results = cast("list[ExtractionResult]", ([None] * len(file_paths)))

    async def _extract_file(path: PathLike[str] | str, index: int) -> None:
        async with semaphore:
            try:
                result = await extract_file(
                    path,
                    None,
                    config,
                )
                results[index] = result
            except Exception as e:  # noqa: BLE001
                error_result = ExtractionResult(
                    content=f"Error: {type(e).__name__}: {e!s}",
                    mime_type="text/plain",
                    metadata={  # type: ignore[typeddict-unknown-key]
                        "error": True,
                        "error_context": create_error_context(
                            operation="batch_extract_file",
                            file_path=path,
                            error=e,
                            index=index,
                        ),
                    },
                    chunks=[],
                )
                results[index] = error_result

    async with anyio.create_task_group() as tg:
        for i, path in enumerate(file_paths):
            tg.start_soon(_extract_file, path, i)

    return results

batch_extract_bytes

Process multiple byte contents concurrently:

kreuzberg.batch_extract_bytes(contents: Sequence[tuple[bytes, str]], config: ExtractionConfig = DEFAULT_CONFIG) -> list[ExtractionResult] async

Extract text from multiple byte contents concurrently with optimizations.

PARAMETER DESCRIPTION
contents

A sequence of tuples containing (content, mime_type) pairs.

TYPE: Sequence[tuple[bytes, str]]

config

Extraction options object, defaults to the default object.

TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG

RETURNS DESCRIPTION
list[ExtractionResult]

A list of extraction results in the same order as the input contents.

Source code in kreuzberg/extraction.py
async def batch_extract_bytes(
    contents: Sequence[tuple[bytes, str]], config: ExtractionConfig = DEFAULT_CONFIG
) -> list[ExtractionResult]:
    """Extract text from multiple byte contents concurrently with optimizations.

    Args:
        contents: A sequence of tuples containing (content, mime_type) pairs.
        config: Extraction options object, defaults to the default object.

    Returns:
        A list of extraction results in the same order as the input contents.
    """
    if not contents:
        return []

    max_concurrency = min(len(contents), mp.cpu_count() * 2)
    semaphore = anyio.Semaphore(max_concurrency)

    results = cast("list[ExtractionResult]", [None] * len(contents))

    async def _extract_bytes(content: bytes, mime_type: str, index: int) -> None:
        async with semaphore:
            try:
                result = await extract_bytes(content, mime_type, config)
                results[index] = result
            except Exception as e:  # noqa: BLE001
                error_result = ExtractionResult(
                    content=f"Error: {type(e).__name__}: {e!s}",
                    mime_type="text/plain",
                    metadata={  # type: ignore[typeddict-unknown-key]
                        "error": True,
                        "error_context": create_error_context(
                            operation="batch_extract_bytes",
                            error=e,
                            index=index,
                            mime_type=mime_type,
                            content_size=len(content),
                        ),
                    },
                    chunks=[],
                )
                results[index] = error_result

    async with anyio.create_task_group() as tg:
        for i, (content, mime_type) in enumerate(contents):
            tg.start_soon(_extract_bytes, content, mime_type, i)

    return results

Synchronous Functions

These functions block until extraction is complete and are suitable for non-async contexts.

extract_file_sync

Synchronous version of extract_file:

kreuzberg.extract_file_sync(file_path: Path | str, mime_type: str | None = None, config: ExtractionConfig = DEFAULT_CONFIG) -> ExtractionResult

Synchronous version of extract_file.

PARAMETER DESCRIPTION
file_path

The path to the file.

TYPE: Path | str

mime_type

The mime type of the content.

TYPE: str | None DEFAULT: None

config

Extraction options object, defaults to the default object.

TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG

RETURNS DESCRIPTION
ExtractionResult

The extracted content and the mime type of the content.

RAISES DESCRIPTION
ValidationError

If the file path or configuration is invalid.

Source code in kreuzberg/extraction.py
def extract_file_sync(
    file_path: Path | str, mime_type: str | None = None, config: ExtractionConfig = DEFAULT_CONFIG
) -> ExtractionResult:
    """Synchronous version of extract_file.

    Args:
        file_path: The path to the file.
        mime_type: The mime type of the content.
        config: Extraction options object, defaults to the default object.

    Returns:
        The extracted content and the mime type of the content.

    Raises:
        ValidationError: If the file path or configuration is invalid.
    """
    cache = get_document_cache()
    path = Path(file_path)
    cached_result = cache.get(path, config)
    if cached_result is not None:
        return cached_result

    if cache.is_processing(path, config):
        event = cache.mark_processing(path, config)
        event.wait()  # pragma: no cover

        # Try cache again after waiting for other process to complete  # ~keep
        cached_result = cache.get(path, config)  # pragma: no cover
        if cached_result is not None:  # pragma: no cover
            return cached_result

    cache.mark_processing(path, config)

    try:
        if not path.exists():
            raise ValidationError("The file does not exist", context={"file_path": str(path)})

        mime_type = validate_mime_type(file_path=file_path, mime_type=mime_type)
        if extractor := ExtractorRegistry.get_extractor(mime_type=mime_type, config=config):
            result = extractor.extract_path_sync(Path(file_path))
        else:
            result = ExtractionResult(
                content=Path(file_path).read_text(),
                chunks=[],
                mime_type=mime_type,
                metadata={},
            )

        result = _validate_and_post_process_sync(result=result, config=config, file_path=path)

        cache.set(path, config, result)

        return result
    finally:
        cache.mark_complete(path, config)

extract_bytes_sync

Synchronous version of extract_bytes:

kreuzberg.extract_bytes_sync(content: bytes, mime_type: str, config: ExtractionConfig = DEFAULT_CONFIG) -> ExtractionResult

Synchronous version of extract_bytes.

PARAMETER DESCRIPTION
content

The content to extract.

TYPE: bytes

mime_type

The mime type of the content.

TYPE: str

config

Extraction options object, defaults to the default object.

TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG

RETURNS DESCRIPTION
ExtractionResult

The extracted content and the mime type of the content.

Source code in kreuzberg/extraction.py
def extract_bytes_sync(content: bytes, mime_type: str, config: ExtractionConfig = DEFAULT_CONFIG) -> ExtractionResult:
    """Synchronous version of extract_bytes.

    Args:
        content: The content to extract.
        mime_type: The mime type of the content.
        config: Extraction options object, defaults to the default object.

    Returns:
        The extracted content and the mime type of the content.
    """
    mime_type = validate_mime_type(mime_type=mime_type)
    if extractor := ExtractorRegistry.get_extractor(mime_type=mime_type, config=config):
        result = extractor.extract_bytes_sync(content)
    else:
        result = ExtractionResult(
            content=safe_decode(content),
            chunks=[],
            mime_type=mime_type,
            metadata={},
        )

    return _validate_and_post_process_sync(result=result, config=config)

batch_extract_file_sync

Synchronous version of batch_extract_file:

kreuzberg.batch_extract_file_sync(file_paths: Sequence[PathLike[str] | str], config: ExtractionConfig = DEFAULT_CONFIG) -> list[ExtractionResult]

Synchronous version of batch_extract_file with parallel processing.

PARAMETER DESCRIPTION
file_paths

A sequence of paths to files to extract text from.

TYPE: Sequence[PathLike[str] | str]

config

Extraction options object, defaults to the default object.

TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG

RETURNS DESCRIPTION
list[ExtractionResult]

A list of extraction results in the same order as the input paths.

Source code in kreuzberg/extraction.py
def batch_extract_file_sync(
    file_paths: Sequence[PathLike[str] | str], config: ExtractionConfig = DEFAULT_CONFIG
) -> list[ExtractionResult]:
    """Synchronous version of batch_extract_file with parallel processing.

    Args:
        file_paths: A sequence of paths to files to extract text from.
        config: Extraction options object, defaults to the default object.

    Returns:
        A list of extraction results in the same order as the input paths.
    """
    if len(file_paths) <= 1:
        return [extract_file_sync(file_path=Path(file_path), mime_type=None, config=config) for file_path in file_paths]

    max_workers = min(len(file_paths), mp.cpu_count())

    def extract_single(file_path: PathLike[str] | str) -> tuple[int, ExtractionResult]:
        """Extract single file with index for ordering."""
        try:
            return (
                file_paths.index(file_path),
                extract_file_sync(file_path=Path(file_path), mime_type=None, config=config),
            )
        except Exception as e:  # noqa: BLE001
            error_result = ExtractionResult(
                content=f"Error: {type(e).__name__}: {e!s}",
                mime_type="text/plain",
                metadata={  # type: ignore[typeddict-unknown-key]
                    "error": True,
                    "error_context": create_error_context(
                        operation="batch_extract_file_sync",
                        file_path=file_path,
                        error=e,
                    ),
                },
                chunks=[],
            )
            return (file_paths.index(file_path), error_result)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_index = {executor.submit(extract_single, fp): i for i, fp in enumerate(file_paths)}

        results: list[ExtractionResult] = [None] * len(file_paths)  # type: ignore[list-item]
        for future in as_completed(future_to_index):
            index, result = future.result()
            results[index] = result

    return results

batch_extract_bytes_sync

Synchronous version of batch_extract_bytes:

kreuzberg.batch_extract_bytes_sync(contents: Sequence[tuple[bytes, str]], config: ExtractionConfig = DEFAULT_CONFIG) -> list[ExtractionResult]

Synchronous version of batch_extract_bytes with parallel processing.

PARAMETER DESCRIPTION
contents

A sequence of tuples containing (content, mime_type) pairs.

TYPE: Sequence[tuple[bytes, str]]

config

Extraction options object, defaults to the default object.

TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG

RETURNS DESCRIPTION
list[ExtractionResult]

A list of extraction results in the same order as the input contents.

Source code in kreuzberg/extraction.py
def batch_extract_bytes_sync(
    contents: Sequence[tuple[bytes, str]], config: ExtractionConfig = DEFAULT_CONFIG
) -> list[ExtractionResult]:
    """Synchronous version of batch_extract_bytes with parallel processing.

    Args:
        contents: A sequence of tuples containing (content, mime_type) pairs.
        config: Extraction options object, defaults to the default object.

    Returns:
        A list of extraction results in the same order as the input contents.
    """
    if len(contents) <= 1:
        return [
            extract_bytes_sync(content=content, mime_type=mime_type, config=config) for content, mime_type in contents
        ]

    max_workers = min(len(contents), mp.cpu_count())

    def extract_single(index_and_content: tuple[int, tuple[bytes, str]]) -> tuple[int, ExtractionResult]:
        """Extract single content with index for ordering."""
        index, (content, mime_type) = index_and_content
        try:
            return (index, extract_bytes_sync(content=content, mime_type=mime_type, config=config))
        except Exception as e:  # noqa: BLE001
            error_result = ExtractionResult(
                content=f"Error: {type(e).__name__}: {e!s}",
                mime_type="text/plain",
                metadata={  # type: ignore[typeddict-unknown-key]
                    "error": True,
                    "error_context": create_error_context(
                        operation="batch_extract_bytes_sync",
                        error=e,
                        index=index,
                        mime_type=mime_type,
                        content_size=len(content),
                    ),
                },
                chunks=[],
            )
            return (index, error_result)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Avoid creating intermediate list, use enumerate directly
        future_to_index = {executor.submit(extract_single, (i, content)): i for i, content in enumerate(contents)}

        results: list[ExtractionResult] = [None] * len(contents)  # type: ignore[list-item]
        for future in as_completed(future_to_index):
            index, result = future.result()
            results[index] = result

    return results