Kreuzberg provides both async and sync functions for text extraction. All functions accept an optional ExtractionConfig
parameter for configuring the extraction process.
Asynchronous Functions
These functions return awaitable coroutines that must be awaited or run in an asyncio event loop.
Extract text from a file path:
Extract the textual content from a given file.
PARAMETER | DESCRIPTION |
file_path | TYPE: PathLike[str] | str |
mime_type | The mime type of the content. TYPE: str | None DEFAULT: None |
config | Extraction options object, defaults to the default object. TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG |
RETURNS | DESCRIPTION |
ExtractionResult | The extracted content and the mime type of the content. |
RAISES | DESCRIPTION |
ValidationError | If the file path or configuration is invalid. |
Source code in kreuzberg/extraction.py
| async def extract_file(
file_path: PathLike[str] | str, mime_type: str | None = None, config: ExtractionConfig = DEFAULT_CONFIG
) -> ExtractionResult:
"""Extract the textual content from a given file.
Args:
file_path: The path to the file.
mime_type: The mime type of the content.
config: Extraction options object, defaults to the default object.
Returns:
The extracted content and the mime type of the content.
Raises:
ValidationError: If the file path or configuration is invalid.
"""
cache = get_document_cache()
path = Path(file_path)
cached_result = cache.get(path, config)
if cached_result is not None:
return cached_result
if cache.is_processing(path, config):
event = cache.mark_processing(path, config)
await anyio.to_thread.run_sync(event.wait) # pragma: no cover
# Try cache again after waiting for other process to complete # ~keep
cached_result = cache.get(path, config) # pragma: no cover
if cached_result is not None: # pragma: no cover
return cached_result
cache.mark_processing(path, config)
try:
if not path.exists():
raise ValidationError("The file does not exist", context={"file_path": str(path)})
mime_type = validate_mime_type(file_path=file_path, mime_type=mime_type)
if extractor := ExtractorRegistry.get_extractor(mime_type=mime_type, config=config):
result = await extractor.extract_path_async(Path(file_path))
else:
result = ExtractionResult(
content=safe_decode(await anyio.Path(file_path).read_bytes()),
chunks=[],
mime_type=mime_type,
metadata={},
)
result = await _validate_and_post_process_async(result=result, config=config, file_path=path)
cache.set(path, config, result)
return result
finally:
cache.mark_complete(path, config)
|
Extract text from raw bytes:
Extract the textual content from a given byte string representing a file's contents.
PARAMETER | DESCRIPTION |
content | TYPE: bytes |
mime_type | The mime type of the content. TYPE: str |
config | Extraction options object, defaults to the default object. TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG |
RETURNS | DESCRIPTION |
ExtractionResult | The extracted content and the mime type of the content. |
Source code in kreuzberg/extraction.py
| async def extract_bytes(content: bytes, mime_type: str, config: ExtractionConfig = DEFAULT_CONFIG) -> ExtractionResult:
"""Extract the textual content from a given byte string representing a file's contents.
Args:
content: The content to extract.
mime_type: The mime type of the content.
config: Extraction options object, defaults to the default object.
Returns:
The extracted content and the mime type of the content.
"""
mime_type = validate_mime_type(mime_type=mime_type)
if extractor := ExtractorRegistry.get_extractor(mime_type=mime_type, config=config):
result = await extractor.extract_bytes_async(content)
else:
result = ExtractionResult(
content=safe_decode(content),
chunks=[],
mime_type=mime_type,
metadata={},
)
return await _validate_and_post_process_async(result=result, config=config)
|
Process multiple files concurrently:
Extract text from multiple files concurrently with optimizations.
PARAMETER | DESCRIPTION |
file_paths | A sequence of paths to files to extract text from. TYPE: Sequence[PathLike[str] | str] |
config | Extraction options object, defaults to the default object. TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG |
RETURNS | DESCRIPTION |
list[ExtractionResult] | A list of extraction results in the same order as the input paths. |
Source code in kreuzberg/extraction.py
| async def batch_extract_file(
file_paths: Sequence[PathLike[str] | str], config: ExtractionConfig = DEFAULT_CONFIG
) -> list[ExtractionResult]:
"""Extract text from multiple files concurrently with optimizations.
Args:
file_paths: A sequence of paths to files to extract text from.
config: Extraction options object, defaults to the default object.
Returns:
A list of extraction results in the same order as the input paths.
"""
if not file_paths:
return []
max_concurrency = min(len(file_paths), mp.cpu_count() * 2)
semaphore = anyio.Semaphore(max_concurrency)
results = cast("list[ExtractionResult]", ([None] * len(file_paths)))
async def _extract_file(path: PathLike[str] | str, index: int) -> None:
async with semaphore:
try:
result = await extract_file(
path,
None,
config,
)
results[index] = result
except Exception as e: # noqa: BLE001
error_result = ExtractionResult(
content=f"Error: {type(e).__name__}: {e!s}",
mime_type="text/plain",
metadata={ # type: ignore[typeddict-unknown-key]
"error": True,
"error_context": create_error_context(
operation="batch_extract_file",
file_path=path,
error=e,
index=index,
),
},
chunks=[],
)
results[index] = error_result
async with anyio.create_task_group() as tg:
for i, path in enumerate(file_paths):
tg.start_soon(_extract_file, path, i)
return results
|
Process multiple byte contents concurrently:
Extract text from multiple byte contents concurrently with optimizations.
PARAMETER | DESCRIPTION |
contents | A sequence of tuples containing (content, mime_type) pairs. TYPE: Sequence[tuple[bytes, str]] |
config | Extraction options object, defaults to the default object. TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG |
RETURNS | DESCRIPTION |
list[ExtractionResult] | A list of extraction results in the same order as the input contents. |
Source code in kreuzberg/extraction.py
| async def batch_extract_bytes(
contents: Sequence[tuple[bytes, str]], config: ExtractionConfig = DEFAULT_CONFIG
) -> list[ExtractionResult]:
"""Extract text from multiple byte contents concurrently with optimizations.
Args:
contents: A sequence of tuples containing (content, mime_type) pairs.
config: Extraction options object, defaults to the default object.
Returns:
A list of extraction results in the same order as the input contents.
"""
if not contents:
return []
max_concurrency = min(len(contents), mp.cpu_count() * 2)
semaphore = anyio.Semaphore(max_concurrency)
results = cast("list[ExtractionResult]", [None] * len(contents))
async def _extract_bytes(content: bytes, mime_type: str, index: int) -> None:
async with semaphore:
try:
result = await extract_bytes(content, mime_type, config)
results[index] = result
except Exception as e: # noqa: BLE001
error_result = ExtractionResult(
content=f"Error: {type(e).__name__}: {e!s}",
mime_type="text/plain",
metadata={ # type: ignore[typeddict-unknown-key]
"error": True,
"error_context": create_error_context(
operation="batch_extract_bytes",
error=e,
index=index,
mime_type=mime_type,
content_size=len(content),
),
},
chunks=[],
)
results[index] = error_result
async with anyio.create_task_group() as tg:
for i, (content, mime_type) in enumerate(contents):
tg.start_soon(_extract_bytes, content, mime_type, i)
return results
|
Synchronous Functions
These functions block until extraction is complete and are suitable for non-async contexts.
Synchronous version of extract_file:
Synchronous version of extract_file.
PARAMETER | DESCRIPTION |
file_path | TYPE: Path | str |
mime_type | The mime type of the content. TYPE: str | None DEFAULT: None |
config | Extraction options object, defaults to the default object. TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG |
RETURNS | DESCRIPTION |
ExtractionResult | The extracted content and the mime type of the content. |
RAISES | DESCRIPTION |
ValidationError | If the file path or configuration is invalid. |
Source code in kreuzberg/extraction.py
| def extract_file_sync(
file_path: Path | str, mime_type: str | None = None, config: ExtractionConfig = DEFAULT_CONFIG
) -> ExtractionResult:
"""Synchronous version of extract_file.
Args:
file_path: The path to the file.
mime_type: The mime type of the content.
config: Extraction options object, defaults to the default object.
Returns:
The extracted content and the mime type of the content.
Raises:
ValidationError: If the file path or configuration is invalid.
"""
cache = get_document_cache()
path = Path(file_path)
cached_result = cache.get(path, config)
if cached_result is not None:
return cached_result
if cache.is_processing(path, config):
event = cache.mark_processing(path, config)
event.wait() # pragma: no cover
# Try cache again after waiting for other process to complete # ~keep
cached_result = cache.get(path, config) # pragma: no cover
if cached_result is not None: # pragma: no cover
return cached_result
cache.mark_processing(path, config)
try:
if not path.exists():
raise ValidationError("The file does not exist", context={"file_path": str(path)})
mime_type = validate_mime_type(file_path=file_path, mime_type=mime_type)
if extractor := ExtractorRegistry.get_extractor(mime_type=mime_type, config=config):
result = extractor.extract_path_sync(Path(file_path))
else:
result = ExtractionResult(
content=Path(file_path).read_text(),
chunks=[],
mime_type=mime_type,
metadata={},
)
result = _validate_and_post_process_sync(result=result, config=config, file_path=path)
cache.set(path, config, result)
return result
finally:
cache.mark_complete(path, config)
|
Synchronous version of extract_bytes:
Synchronous version of extract_bytes.
PARAMETER | DESCRIPTION |
content | TYPE: bytes |
mime_type | The mime type of the content. TYPE: str |
config | Extraction options object, defaults to the default object. TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG |
RETURNS | DESCRIPTION |
ExtractionResult | The extracted content and the mime type of the content. |
Source code in kreuzberg/extraction.py
| def extract_bytes_sync(content: bytes, mime_type: str, config: ExtractionConfig = DEFAULT_CONFIG) -> ExtractionResult:
"""Synchronous version of extract_bytes.
Args:
content: The content to extract.
mime_type: The mime type of the content.
config: Extraction options object, defaults to the default object.
Returns:
The extracted content and the mime type of the content.
"""
mime_type = validate_mime_type(mime_type=mime_type)
if extractor := ExtractorRegistry.get_extractor(mime_type=mime_type, config=config):
result = extractor.extract_bytes_sync(content)
else:
result = ExtractionResult(
content=safe_decode(content),
chunks=[],
mime_type=mime_type,
metadata={},
)
return _validate_and_post_process_sync(result=result, config=config)
|
Synchronous version of batch_extract_file:
Synchronous version of batch_extract_file with parallel processing.
PARAMETER | DESCRIPTION |
file_paths | A sequence of paths to files to extract text from. TYPE: Sequence[PathLike[str] | str] |
config | Extraction options object, defaults to the default object. TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG |
RETURNS | DESCRIPTION |
list[ExtractionResult] | A list of extraction results in the same order as the input paths. |
Source code in kreuzberg/extraction.py
| def batch_extract_file_sync(
file_paths: Sequence[PathLike[str] | str], config: ExtractionConfig = DEFAULT_CONFIG
) -> list[ExtractionResult]:
"""Synchronous version of batch_extract_file with parallel processing.
Args:
file_paths: A sequence of paths to files to extract text from.
config: Extraction options object, defaults to the default object.
Returns:
A list of extraction results in the same order as the input paths.
"""
if len(file_paths) <= 1:
return [extract_file_sync(file_path=Path(file_path), mime_type=None, config=config) for file_path in file_paths]
max_workers = min(len(file_paths), mp.cpu_count())
def extract_single(file_path: PathLike[str] | str) -> tuple[int, ExtractionResult]:
"""Extract single file with index for ordering."""
try:
return (
file_paths.index(file_path),
extract_file_sync(file_path=Path(file_path), mime_type=None, config=config),
)
except Exception as e: # noqa: BLE001
error_result = ExtractionResult(
content=f"Error: {type(e).__name__}: {e!s}",
mime_type="text/plain",
metadata={ # type: ignore[typeddict-unknown-key]
"error": True,
"error_context": create_error_context(
operation="batch_extract_file_sync",
file_path=file_path,
error=e,
),
},
chunks=[],
)
return (file_paths.index(file_path), error_result)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_index = {executor.submit(extract_single, fp): i for i, fp in enumerate(file_paths)}
results: list[ExtractionResult] = [None] * len(file_paths) # type: ignore[list-item]
for future in as_completed(future_to_index):
index, result = future.result()
results[index] = result
return results
|
Synchronous version of batch_extract_bytes:
Synchronous version of batch_extract_bytes with parallel processing.
PARAMETER | DESCRIPTION |
contents | A sequence of tuples containing (content, mime_type) pairs. TYPE: Sequence[tuple[bytes, str]] |
config | Extraction options object, defaults to the default object. TYPE: ExtractionConfig DEFAULT: DEFAULT_CONFIG |
RETURNS | DESCRIPTION |
list[ExtractionResult] | A list of extraction results in the same order as the input contents. |
Source code in kreuzberg/extraction.py
| def batch_extract_bytes_sync(
contents: Sequence[tuple[bytes, str]], config: ExtractionConfig = DEFAULT_CONFIG
) -> list[ExtractionResult]:
"""Synchronous version of batch_extract_bytes with parallel processing.
Args:
contents: A sequence of tuples containing (content, mime_type) pairs.
config: Extraction options object, defaults to the default object.
Returns:
A list of extraction results in the same order as the input contents.
"""
if len(contents) <= 1:
return [
extract_bytes_sync(content=content, mime_type=mime_type, config=config) for content, mime_type in contents
]
max_workers = min(len(contents), mp.cpu_count())
def extract_single(index_and_content: tuple[int, tuple[bytes, str]]) -> tuple[int, ExtractionResult]:
"""Extract single content with index for ordering."""
index, (content, mime_type) = index_and_content
try:
return (index, extract_bytes_sync(content=content, mime_type=mime_type, config=config))
except Exception as e: # noqa: BLE001
error_result = ExtractionResult(
content=f"Error: {type(e).__name__}: {e!s}",
mime_type="text/plain",
metadata={ # type: ignore[typeddict-unknown-key]
"error": True,
"error_context": create_error_context(
operation="batch_extract_bytes_sync",
error=e,
index=index,
mime_type=mime_type,
content_size=len(content),
),
},
chunks=[],
)
return (index, error_result)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Avoid creating intermediate list, use enumerate directly
future_to_index = {executor.submit(extract_single, (i, content)): i for i, content in enumerate(contents)}
results: list[ExtractionResult] = [None] * len(contents) # type: ignore[list-item]
for future in as_completed(future_to_index):
index, result = future.result()
results[index] = result
return results
|