Creating Plugins¶
Kreuzberg's plugin system allows you to extend functionality by creating custom extractors, post-processors, OCR backends, and validators. Plugins can be written in Rust or Python.
Plugin Types¶
Kreuzberg supports four types of plugins:
| Plugin Type | Purpose | Use Cases |
|---|---|---|
| DocumentExtractor | Extract content from file formats | Add support for new formats, override built-in extractors |
| PostProcessor | Transform extraction results | Add metadata, enrich content, apply custom processing |
| OcrBackend | Perform OCR on images | Integrate cloud OCR services, custom OCR engines |
| Validator | Validate extraction quality | Enforce minimum quality, check completeness |
Plugin Architecture¶
All plugins must implement the base Plugin trait and a type-specific trait. Plugins are:
- Thread-safe: All plugins must be
Send + Sync(Rust) or thread-safe (Python) - Lifecycle-managed: Plugins have
initialize()andshutdown()methods - Registered globally: Use registry functions to register your plugins
Document Extractors¶
Extract content from custom file formats or override built-in extractors.
Rust Implementation¶
use kreuzberg::plugins::{Plugin, DocumentExtractor};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;
struct CustomJsonExtractor;
impl Plugin for CustomJsonExtractor {
fn name(&self) -> &str { "custom-json-extractor" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl DocumentExtractor for CustomJsonExtractor {
async fn extract_bytes(
&self,
content: &[u8],
_mime_type: &str,
_config: &ExtractionConfig,
) -> Result<ExtractionResult> {
let json: serde_json::Value = serde_json::from_slice(content)?;
let text = extract_text_from_json(&json);
Ok(ExtractionResult {
content: text,
mime_type: "application/json".to_string(),
metadata: Metadata::default(),
tables: vec![],
detected_languages: None,
chunks: None,
images: None,
})
}
fn supported_mime_types(&self) -> &[&str] {
&["application/json", "text/json"]
}
fn priority(&self) -> i32 { 50 }
}
fn extract_text_from_json(value: &serde_json::Value) -> String {
match value {
serde_json::Value::String(s) => format!("{}\n", s),
serde_json::Value::Array(arr) => {
arr.iter().map(extract_text_from_json).collect()
}
serde_json::Value::Object(obj) => {
obj.values().map(extract_text_from_json).collect()
}
_ => String::new(),
}
}
Python Implementation¶
from kreuzberg import register_document_extractor, ExtractionResult
import json
class CustomJsonExtractor:
def name(self) -> str:
return "custom-json-extractor"
def version(self) -> str:
return "1.0.0"
def supported_mime_types(self) -> list[str]:
return ["application/json", "text/json"]
def priority(self) -> int:
return 50
def extract_bytes(
self,
content: bytes,
mime_type: str,
config: dict
) -> ExtractionResult:
data = json.loads(content)
text = self._extract_text(data)
return {
"content": text,
"mime_type": "application/json",
"metadata": {},
"tables": [],
}
def _extract_text(self, obj) -> str:
if isinstance(obj, str):
return f"{obj}\n"
elif isinstance(obj, list):
return "".join(self._extract_text(item) for item in obj)
elif isinstance(obj, dict):
return "".join(self._extract_text(v) for v in obj.values())
return ""
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
# Register the extractor
register_document_extractor(CustomJsonExtractor())
Registration¶
Priority System¶
When multiple extractors support the same MIME type, the highest priority wins:
- 0-25: Fallback/low-quality extractors
- 26-49: Alternative implementations
- 50: Default (built-in extractors)
- 51-75: Enhanced/premium extractors
- 76-100: Specialized/high-priority extractors
Post-Processors¶
Transform and enrich extraction results after initial extraction.
Processing Stages¶
Post-processors execute in three stages:
- Early: Run first, use for foundational operations like language detection, quality scoring, or text normalization that other processors may depend on
- Middle: Run second, use for content transformation like keyword extraction, token reduction, or summarization
- Late: Run last, use for final enrichment like custom metadata, analytics tracking, or output formatting
Rust Implementation¶
use kreuzberg::plugins::{Plugin, PostProcessor, ProcessingStage};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig};
use async_trait::async_trait;
struct WordCountProcessor;
impl Plugin for WordCountProcessor {
fn name(&self) -> &str { "word-count" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl PostProcessor for WordCountProcessor {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
let word_count = result.content.split_whitespace().count();
result.metadata.additional.insert(
"word_count".to_string(),
serde_json::json!(word_count)
);
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Early
}
fn should_process(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig
) -> bool {
// Only process if content is non-empty
!result.content.is_empty()
}
}
Python Implementation¶
from kreuzberg import register_post_processor, ExtractionResult
class WordCountProcessor:
def name(self) -> str:
return "word_count"
def version(self) -> str:
return "1.0.0"
def processing_stage(self) -> str:
return "early" # or "middle", "late"
def process(self, result: ExtractionResult) -> ExtractionResult:
word_count = len(result["content"].split())
result["metadata"]["word_count"] = word_count
return result
def should_process(self, result: ExtractionResult) -> bool:
return bool(result["content"])
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
register_post_processor(WordCountProcessor())
Conditional Processing¶
impl PostProcessor for PdfOnlyProcessor {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
// PDF-specific processing
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Middle
}
fn should_process(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig
) -> bool {
result.mime_type == "application/pdf"
}
}
OCR Backends¶
Integrate custom OCR engines or cloud services.
Rust Implementation¶
use kreuzberg::plugins::{Plugin, OcrBackend, OcrBackendType};
use kreuzberg::{Result, ExtractionResult, OcrConfig, Metadata};
use async_trait::async_trait;
use std::path::Path;
struct CloudOcrBackend {
api_key: String,
supported_langs: Vec<String>,
}
impl Plugin for CloudOcrBackend {
fn name(&self) -> &str { "cloud-ocr" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl OcrBackend for CloudOcrBackend {
async fn process_image(
&self,
image_bytes: &[u8],
config: &OcrConfig,
) -> Result<ExtractionResult> {
// Send image to cloud OCR service
let text = self.call_cloud_api(image_bytes, &config.language).await?;
Ok(ExtractionResult {
content: text,
mime_type: "text/plain".to_string(),
metadata: Metadata::default(),
tables: vec![],
detected_languages: None,
chunks: None,
images: None,
})
}
fn supports_language(&self, lang: &str) -> bool {
self.supported_langs.iter().any(|l| l == lang)
}
fn backend_type(&self) -> OcrBackendType {
OcrBackendType::Custom
}
fn supported_languages(&self) -> Vec<String> {
self.supported_langs.clone()
}
}
impl CloudOcrBackend {
async fn call_cloud_api(
&self,
image: &[u8],
language: &str
) -> Result<String> {
// API call implementation
Ok("Extracted text".to_string())
}
}
Python Implementation¶
from kreuzberg import register_ocr_backend
import requests
class CloudOcrBackend:
def __init__(self, api_key: str):
self.api_key = api_key
self.supported_langs = ["eng", "deu", "fra"]
def name(self) -> str:
return "cloud-ocr"
def version(self) -> str:
return "1.0.0"
def backend_type(self) -> str:
return "custom"
def supported_languages(self) -> list[str]:
return self.supported_langs
def supports_language(self, language: str) -> bool:
return language in self.supported_langs
def process_image(self, image_bytes: bytes, config: dict) -> dict:
# Send image to cloud OCR service
response = requests.post(
"https://api.example.com/ocr",
files={"image": image_bytes},
headers={"Authorization": f"Bearer {self.api_key}"},
json={"language": config.get("language", "eng")}
)
text = response.json()["text"]
return {
"content": text,
"mime_type": "text/plain",
"metadata": {"confidence": response.json().get("confidence", 0.0)},
"tables": [],
}
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
# Register the backend
register_ocr_backend(CloudOcrBackend(api_key="your-api-key"))
Validators¶
Enforce quality requirements on extraction results.
Validators are Fatal
Validation errors cause extraction to fail. Use validators for critical quality checks only.
Rust Implementation¶
use kreuzberg::plugins::{Plugin, Validator};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig, KreuzbergError};
use async_trait::async_trait;
struct MinLengthValidator {
min_length: usize,
}
impl Plugin for MinLengthValidator {
fn name(&self) -> &str { "min-length-validator" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> { Ok(()) }
fn shutdown(&self) -> Result<()> { Ok(()) }
}
#[async_trait]
impl Validator for MinLengthValidator {
async fn validate(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig,
) -> Result<()> {
if result.content.len() < self.min_length {
return Err(KreuzbergError::validation(format!(
"Content too short: {} < {} characters",
result.content.len(),
self.min_length
)));
}
Ok(())
}
fn priority(&self) -> i32 {
100 // Run early - fast check
}
}
Python Implementation¶
from kreuzberg import register_validator
from kreuzberg.exceptions import ValidationError
class MinLengthValidator:
def __init__(self, min_length: int = 100):
self.min_length = min_length
def name(self) -> str:
return "min_length_validator"
def version(self) -> str:
return "1.0.0"
def priority(self) -> int:
return 100 # Run early
def validate(self, result: dict) -> None:
if len(result["content"]) < self.min_length:
raise ValidationError(
f"Content too short: {len(result['content'])} < {self.min_length}"
)
def should_validate(self, result: dict) -> bool:
return True # Always validate
def initialize(self) -> None:
pass
def shutdown(self) -> None:
pass
register_validator(MinLengthValidator(min_length=100))
Quality Score Validator¶
#[async_trait]
impl Validator for QualityValidator {
async fn validate(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig,
) -> Result<()> {
let score = result.metadata
.additional
.get("quality_score")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
if score < 0.5 {
return Err(KreuzbergError::validation(format!(
"Quality score too low: {:.2} < 0.50",
score
)));
}
Ok(())
}
}
Plugin Management¶
Listing Plugins¶
use kreuzberg::plugins::registry::*;
// List document extractors
let registry = get_document_extractor_registry();
let extractors = registry.list()?;
println!("Registered extractors: {:?}", extractors);
// List post-processors
let registry = get_post_processor_registry();
let processors = registry.list()?;
println!("Registered processors: {:?}", processors);
// List OCR backends
let registry = get_ocr_backend_registry();
let backends = registry.list()?;
println!("Registered OCR backends: {:?}", backends);
// List validators
let registry = get_validator_registry();
let validators = registry.list()?;
println!("Registered validators: {:?}", validators);
Unregistering Plugins¶
from kreuzberg import (
unregister_document_extractor,
unregister_post_processor,
unregister_ocr_backend,
unregister_validator,
)
unregister_document_extractor("custom-json-extractor")
unregister_post_processor("word_count")
unregister_ocr_backend("cloud-ocr")
unregister_validator("min_length_validator")
Clearing All Plugins¶
Thread Safety¶
All plugins must be thread-safe:
Rust Thread Safety¶
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use kreuzberg::KreuzbergError;
struct StatefulPlugin {
// Use atomic types for simple counters
call_count: AtomicUsize,
// Use Mutex for complex state
cache: Mutex<HashMap<String, String>>,
}
impl Plugin for StatefulPlugin {
fn name(&self) -> &str { "stateful-plugin" }
fn version(&self) -> String { "1.0.0".to_string() }
fn initialize(&self) -> Result<()> {
self.call_count.store(0, Ordering::Release);
Ok(())
}
fn shutdown(&self) -> Result<()> {
let count = self.call_count.load(Ordering::Acquire);
println!("Plugin called {} times", count);
Ok(())
}
}
#[async_trait]
impl PostProcessor for StatefulPlugin {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig
) -> Result<()> {
// Increment counter atomically
self.call_count.fetch_add(1, Ordering::AcqRel);
// Access cache with proper error handling
let mut cache = self.cache.lock()
.map_err(|_| KreuzbergError::plugin("Cache lock poisoned"))?;
cache.insert("last_mime".to_string(), result.mime_type.clone());
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Middle
}
}
Python Thread Safety¶
Best Practices¶
Naming¶
- Use kebab-case for plugin names:
my-custom-plugin - Use lowercase only, no spaces or special characters
- Be descriptive but concise
Error Handling¶
async fn extract_bytes(
&self,
content: &[u8],
mime_type: &str,
config: &ExtractionConfig,
) -> Result<ExtractionResult> {
// Validate inputs
if content.is_empty() {
return Err(KreuzbergError::validation("Empty content"));
}
// Handle errors with context
let parsed = parse_content(content)
.map_err(|e| KreuzbergError::parsing(
format!("Failed to parse {}: {}", mime_type, e)
))?;
Ok(result)
}
Logging¶
use log::{info, warn, error};
impl Plugin for MyPlugin {
fn initialize(&self) -> Result<()> {
info!("Initializing plugin: {}", self.name());
Ok(())
}
fn shutdown(&self) -> Result<()> {
info!("Shutting down plugin: {}", self.name());
Ok(())
}
}
#[async_trait]
impl DocumentExtractor for MyPlugin {
async fn extract_bytes(
&self,
content: &[u8],
mime_type: &str,
_config: &ExtractionConfig,
) -> Result<ExtractionResult> {
info!("Extracting {} ({} bytes)", mime_type, content.len());
// Processing...
if result.content.is_empty() {
warn!("Extraction resulted in empty content");
}
Ok(result)
}
}
import logging
logger = logging.getLogger(__name__)
class MyPlugin:
def initialize(self) -> None:
logger.info(f"Initializing plugin: {self.name()}")
def shutdown(self) -> None:
logger.info(f"Shutting down plugin: {self.name()}")
def extract_bytes(
self,
content: bytes,
mime_type: str,
config: dict
) -> dict:
logger.info(f"Extracting {mime_type} ({len(content)} bytes)")
# Processing...
if not result["content"]:
logger.warning("Extraction resulted in empty content")
return result
Testing¶
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_custom_extractor() {
let extractor = CustomJsonExtractor;
let json_data = br#"{"message": "Hello, world!"}"#;
let config = ExtractionConfig::default();
let result = extractor
.extract_bytes(json_data, "application/json", &config)
.await
.expect("Extraction failed");
assert!(result.content.contains("Hello, world!"));
assert_eq!(result.mime_type, "application/json");
}
}
Complete Example: PDF Metadata Extractor¶
use kreuzberg::plugins::{Plugin, PostProcessor, ProcessingStage};
use kreuzberg::{Result, ExtractionResult, ExtractionConfig};
use async_trait::async_trait;
use std::sync::atomic::{AtomicUsize, Ordering};
struct PdfMetadataExtractor {
processed_count: AtomicUsize,
}
impl PdfMetadataExtractor {
fn new() -> Self {
Self {
processed_count: AtomicUsize::new(0),
}
}
}
impl Plugin for PdfMetadataExtractor {
fn name(&self) -> &str { "pdf-metadata-extractor" }
fn version(&self) -> String { "1.0.0".to_string() }
fn description(&self) -> &str {
"Extracts and enriches PDF metadata"
}
fn initialize(&self) -> Result<()> {
log::info!("PDF metadata extractor initialized");
Ok(())
}
fn shutdown(&self) -> Result<()> {
let count = self.processed_count.load(Ordering::Acquire);
log::info!("Processed {} PDFs", count);
Ok(())
}
}
#[async_trait]
impl PostProcessor for PdfMetadataExtractor {
async fn process(
&self,
result: &mut ExtractionResult,
_config: &ExtractionConfig,
) -> Result<()> {
self.processed_count.fetch_add(1, Ordering::AcqRel);
// Extract PDF-specific metadata
result.metadata.additional.insert(
"pdf_processed".to_string(),
serde_json::json!(true)
);
Ok(())
}
fn processing_stage(&self) -> ProcessingStage {
ProcessingStage::Early
}
fn should_process(
&self,
result: &ExtractionResult,
_config: &ExtractionConfig,
) -> bool {
result.mime_type == "application/pdf"
}
fn estimated_duration_ms(&self, _result: &ExtractionResult) -> u64 {
10 // Fast operation
}
}
// Registration
use kreuzberg::plugins::registry::get_post_processor_registry;
use std::sync::Arc;
fn register() -> Result<()> {
let processor = Arc::new(PdfMetadataExtractor::new());
let registry = get_post_processor_registry();
registry.register(processor, 50)?; // Default priority
Ok(())
}
from kreuzberg import register_post_processor, ExtractionResult
import logging
logger = logging.getLogger(__name__)
class PdfMetadataExtractor:
def __init__(self):
self.processed_count = 0
def name(self) -> str:
return "pdf_metadata_extractor"
def version(self) -> str:
return "1.0.0"
def description(self) -> str:
return "Extracts and enriches PDF metadata"
def processing_stage(self) -> str:
return "early"
def should_process(self, result: ExtractionResult) -> bool:
return result["mime_type"] == "application/pdf"
def process(self, result: ExtractionResult) -> ExtractionResult:
self.processed_count += 1
# Extract PDF-specific metadata
result["metadata"]["pdf_processed"] = True
return result
def initialize(self) -> None:
logger.info("PDF metadata extractor initialized")
def shutdown(self) -> None:
logger.info(f"Processed {self.processed_count} PDFs")
# Register the processor
register_post_processor(PdfMetadataExtractor())