Skip to content

TidesDB Kafka Streams Plugin Reference

If you want to download the source of this document, you can find it here.


Overview

The TidesDB Kafka Streams plugin is a drop-in replacement for the default RocksDB state stores in Apache Kafka Streams. It provides a KeyValueStore implementation backed by TidesDB, giving Kafka Streams applications access to TidesDB’s ACID transactions, MVCC concurrency, LSM-tree storage, configurable compression, bloom filters, block indexes, B+tree klog format, TTL-based expiration, commit hooks for change data capture, online backups, and lightweight checkpoints, all through the standard Kafka Streams state store interface.

Switching from RocksDB to TidesDB requires no changes to your stream topology. You replace the store supplier or builder, and the plugin handles the rest. The underlying TidesDB database is managed automatically, the plugin creates and opens the database in Kafka Streams’ state directory, manages column families, handles transactions, and closes the database on shutdown.

Getting Started

Prerequisites

  • Java 11 or higher
  • Maven 3.6+
  • TidesDB native C library installed on the system (see Building)
  • TidesDB Java bindings installed (com.tidesdb:tidesdb-java)

Adding to Your Project

Maven

<dependency>
<groupId>com.tidesdb</groupId>
<artifactId>tidesdb-kafka</artifactId>
<version>0.3.0</version>
</dependency>

You must also ensure the TidesDB JNI shared library is on the Java library path at runtime:

Terminal window
-Djava.library.path=/usr/local/lib

Usage

Basic Usage with Materialized

The simplest way to use TidesDB as a state store is through Materialized.as() with the store supplier:

import com.tidesdb.kafka.store.TidesDBStoreSupplier;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("input-topic");
KTable<String, Long> counts = input
.groupByKey()
.count(Materialized.as(new TidesDBStoreSupplier("my-counts")));
counts.toStream().to("output-topic");

This creates a TidesDB-backed state store with default configuration: LZ4 compression, bloom filters enabled at 1% FPR, block indexes enabled, SYNC_NONE durability mode, 64 MB write buffer, and 64 MB block cache.

Usage with StoreBuilder

For topology-level state stores, use the builder:

import com.tidesdb.kafka.store.TidesDBStoreBuilder;
StoreBuilder<TidesDBStore> storeBuilder = new TidesDBStoreBuilder("my-store")
.withLoggingEnabled(Collections.emptyMap());
builder.addStateStore(storeBuilder);
builder.stream("input-topic")
.process(() -> new MyProcessor(), "my-store");

Custom Configuration

Both the supplier and builder accept a TidesDBStoreConfig for fine-grained control over every TidesDB database and column family parameter:

import com.tidesdb.kafka.store.TidesDBStoreConfig;
import com.tidesdb.kafka.store.TidesDBStoreSupplier;
import com.tidesdb.*;
TidesDBStoreConfig config = TidesDBStoreConfig.builder()
.compressionAlgorithm(CompressionAlgorithm.ZSTD_COMPRESSION)
.enableBloomFilter(true)
.bloomFPR(0.001)
.writeBufferSize(128 * 1024 * 1024) // 128 MB
.blockCacheSize(128 * 1024 * 1024) // 128 MB
.syncMode(SyncMode.SYNC_NONE)
.enableBlockIndexes(true)
.numFlushThreads(4)
.numCompactionThreads(4)
.build();
KTable<String, Long> counts = input
.groupByKey()
.count(Materialized.as(new TidesDBStoreSupplier("my-counts", config)));

Or with the builder:

TidesDBStoreBuilder storeBuilder = TidesDBStoreBuilder.create("my-store", config)
.withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());

Configuration Reference

Database Configuration

These settings control the TidesDB database instance that backs the state store. Each Kafka Streams state store opens its own independent TidesDB database.

OptionTypeDefaultDescription
numFlushThreadsint2Number of background threads flushing memtables to SSTables
numCompactionThreadsint2Number of background threads performing LSM compaction
logLevelLogLevelINFOTidesDB internal log level (DEBUG, INFO, WARN, ERROR, FATAL, NONE)
blockCacheSizelong64 MBSize of the block cache shared across all column families
maxOpenSSTableslong256Maximum number of SSTable file handles cached in the LRU
maxMemoryUsagelong0 (auto)Global memory limit in bytes; 0 lets TidesDB auto-detect (50% of system RAM)
logToFilebooleanfalseWrite TidesDB logs to a file instead of stderr
logTruncationAtlong24 MBLog file truncation size; 0 disables truncation

Column Family Configuration

Each state store maps to a single TidesDB column family. These settings control the column family’s storage behavior.

OptionTypeDefaultDescription
columnFamilyNameString”default”Name of the column family
writeBufferSizelong64 MBMemtable flush threshold
compressionAlgorithmCompressionAlgorithmLZ4_COMPRESSIONCompression algorithm for SSTables
enableBloomFilterbooleantrueEnable bloom filters for point lookups
bloomFPRdouble0.01Bloom filter false positive rate (1%)
enableBlockIndexesbooleantrueEnable compact block indexes for efficient seeking
indexSampleRatioint1Sample every Nth block for the index
blockIndexPrefixLenint16Block index prefix length in bytes
syncModeSyncModeSYNC_NONEDurability mode for WAL writes
syncIntervalUslong128000Sync interval in microseconds (for SYNC_INTERVAL mode)
useBtreebooleanfalseUse B+tree klog format instead of block-based SSTables
minLevelsint5Minimum number of LSM levels
levelSizeRatiolong10Level size multiplier for LSM compaction
skipListMaxLevelint12Skip list max level for memtables
skipListProbabilityfloat0.25Skip list promotion probability
defaultIsolationLevelIsolationLevelREAD_COMMITTEDDefault transaction isolation level
klogValueThresholdlong512Values larger than this go to the value log
l0QueueStallThresholdint20Number of L0 immutable memtables before stalling writes
l1FileCountTriggerint4Number of L1 files that triggers compaction

Store Behavior

OptionTypeDefaultDescription
defaultTtlSecondslong-1 (disabled)Default TTL in seconds applied to all put operations; -1 means no expiration

Compression Algorithms

AlgorithmDescription
NO_COMPRESSIONNo compression
SNAPPY_COMPRESSIONSnappy compression
LZ4_COMPRESSIONLZ4 standard compression (default)
ZSTD_COMPRESSIONZstandard compression (best ratio)
LZ4_FAST_COMPRESSIONLZ4 fast mode (higher throughput, lower ratio)

Sync Modes

ModeDescription
SYNC_NONENo explicit sync; relies on OS page cache (fastest, default for Kafka plugin)
SYNC_FULLFsync on every write (most durable)
SYNC_INTERVALPeriodic background syncing at configurable intervals

The default sync mode for the Kafka plugin is SYNC_NONE. This is appropriate because Kafka Streams’ changelog topics provide durability, and state can always be rebuilt from the changelog. If your application requires local durability guarantees beyond what Kafka provides, set SYNC_FULL or SYNC_INTERVAL.

Transaction Isolation Levels

LevelDescription
READ_UNCOMMITTEDSees all data including uncommitted changes
READ_COMMITTEDSees only committed data (default)
REPEATABLE_READConsistent snapshot; phantom reads possible
SNAPSHOTWrite-write conflict detection
SERIALIZABLEFull read-write conflict detection (SSI)

For most Kafka Streams workloads, READ_COMMITTED (the default) is sufficient. Higher isolation levels add overhead and are only needed when external threads access the store concurrently with custom logic.

TTL Support

TidesDB supports time-to-live (TTL) on individual key-value pairs. Expired entries are removed during compaction. The plugin exposes TTL in two ways.

Default TTL

Set a default TTL that applies to all put operations automatically:

TidesDBStoreConfig config = TidesDBStoreConfig.builder()
.defaultTtlSeconds(3600) // 1 hour
.build();
KTable<String, Long> counts = input
.groupByKey()
.count(Materialized.as(new TidesDBStoreSupplier("expiring-counts", config)));

Every entry written through put, putIfAbsent, and putAll will expire after the configured duration.

Per-Key TTL

For fine-grained control, use putWithTtl directly on the store:

TidesDBStore store = (TidesDBStore) context.getStateStore("my-store");
store.putWithTtl(key, value, 300); // expires in 5 minutes

A TTL of -1 means no expiration.

B+tree KLog Format

Column families can optionally use a B+tree structure for the key log instead of the default block-based SSTable format. The B+tree format offers faster point lookups through O(log N) tree traversal at the cost of slightly higher write amplification.

TidesDBStoreConfig config = TidesDBStoreConfig.builder()
.useBtree(true)
.build();

When to use B+tree format

  • Read-heavy Kafka Streams workloads with frequent key lookups
  • Interactive queries where read latency matters more than write throughput
  • Large state stores where block scanning becomes expensive

Tradeoffs

  • Slightly higher write amplification during flush
  • Larger metadata overhead per node
  • Block-based format may be faster for full iteration and range scans

The format is set at column family creation time and cannot be changed afterward. If you need to switch formats, delete the state store directory and let Kafka Streams rebuild from the changelog.

Change Data Capture with Commit Hooks

TidesDB supports commit hooks, callbacks that fire synchronously after every transaction commit. This enables real-time change data capture without WAL parsing.

TidesDBStore store = (TidesDBStore) context.getStateStore("my-store");
store.setCommitHook((ops, commitSeq) -> {
for (CommitOp op : ops) {
if (op.isDelete()) {
System.out.println("DELETE key=" + new String(op.getKey()));
} else {
System.out.println("PUT key=" + new String(op.getKey()));
}
}
return 0; // 0 = success
});

Each CommitOp contains getKey(), getValue() (null for deletes), getTtl() (-1 for no expiry), and isDelete(). The commitSeq is monotonically increasing and can be used as a replication cursor.

To detach the hook:

store.clearCommitHook();

Hooks execute synchronously on the committing thread. Keep the callback fast to avoid stalling writers.

Operations and Maintenance

Statistics

The store exposes TidesDB statistics at three levels:

Column family statistics

TidesDBStore store = (TidesDBStore) context.getStateStore("my-store");
Stats stats = store.getStats();
System.out.println("Total keys: " + stats.getTotalKeys());
System.out.println("Data size: " + stats.getTotalDataSize());
System.out.println("Memtable size: " + stats.getMemtableSize());
System.out.println("Read amplification: " + stats.getReadAmp());
System.out.println("Cache hit rate: " + stats.getHitRate());

Database-level statistics

DbStats dbStats = store.getDbStats();
System.out.println("Column families: " + dbStats.getNumColumnFamilies());
System.out.println("Memory pressure: " + dbStats.getMemoryPressureLevel());
System.out.println("Flush queue: " + dbStats.getFlushQueueSize());
System.out.println("Compaction queue: " + dbStats.getCompactionQueueSize());
System.out.println("Total SSTables: " + dbStats.getTotalSstableCount());

Block cache statistics

CacheStats cacheStats = store.getCacheStats();
System.out.println("Cache enabled: " + cacheStats.isEnabled());
System.out.println("Hit rate: " + cacheStats.getHitRate());
System.out.println("Entries: " + cacheStats.getTotalEntries());

Compaction and Flush

// Non-blocking compaction
store.compact();
// Non-blocking flush
store.flush();
// Synchronous flush + aggressive compaction (blocks until complete)
store.purge();
// Purge all column families and drain all queues
store.purgeAll();
// Check background activity
boolean flushing = store.isFlushing();
boolean compacting = store.isCompacting();

Use purge() before backup, after bulk deletes, or during maintenance windows. Use compact() and flush() for non-blocking background work.

WAL Sync

Force an immediate fsync of the write-ahead log:

store.syncWal();

This is useful when running with SYNC_NONE or SYNC_INTERVAL and you need to guarantee durability at a specific point.

Backup and Checkpoint

Online backup · copies all data to a new directory without blocking reads or writes:

store.backup("/path/to/backup");

Lightweight checkpoint · uses hard links for near-instant snapshots (same filesystem only):

store.checkpoint("/path/to/checkpoint");
backup()checkpoint()
SpeedCopies every SSTable byte-by-byteNear-instant (hard links)
Disk usageFull independent copyNo extra disk until compaction removes old SSTables
PortabilityCan be moved to another filesystem or machineSame filesystem only
Use caseArchival, disaster recoveryFast local snapshots

Runtime Configuration Updates

Update runtime-safe column family settings without restarting:

ColumnFamilyConfig newConfig = ColumnFamilyConfig.builder()
.writeBufferSize(256 * 1024 * 1024)
.bloomFPR(0.001)
.syncMode(SyncMode.SYNC_INTERVAL)
.syncIntervalUs(100000)
.build();
store.updateRuntimeConfig(newConfig, true);

Updatable settings include writeBufferSize, skipListMaxLevel, skipListProbability, bloomFPR, indexSampleRatio, syncMode, and syncIntervalUs.

Range Cost Estimation

Estimate the cost of iterating between two keys without performing any disk I/O:

double costA = store.rangeCost("user:0000".getBytes(), "user:0999".getBytes());
double costB = store.rangeCost("user:1000".getBytes(), "user:1099".getBytes());

This is useful for query planning, load balancing range scan work across threads, and monitoring data distribution changes over time.

Benchmarking

The plugin includes a comprehensive benchmark suite comparing TidesDB against RocksDB across multiple workload types. Benchmarks are fully configurable via system properties.

Running Benchmarks

Terminal window
# Run with default settings
mvn test -Dtest=StateStoreBenchmark \
-DargLine="-Djava.library.path=/usr/local/lib"
# Run with custom data directory (e.g., fast SSD)
mvn test -Dtest=StateStoreBenchmark \
-DargLine="-Djava.library.path=/usr/local/lib -Dbenchmark.data.dir=/mnt/ssd/bench"
# Run with custom parameters
mvn test -Dtest=StateStoreBenchmark \
-DargLine="-Djava.library.path=/usr/local/lib \
-Dbenchmark.sizes=1000,10000,100000 \
-Dbenchmark.value.size=256 \
-Dbenchmark.mixed.ratio=80 \
-Dbenchmark.percentiles=true"

Or use the included runner script:

Terminal window
./run.sh -b # Run benchmarks
./run.sh -b -d /mnt/fast-ssd/bench # Run on specific directory
./run.sh -a # Run tests, benchmarks, and generate charts

Benchmark Parameters

All parameters are configurable via -D system properties:

PropertyDefaultDescription
benchmark.data.dir(temp)Data directory for benchmark databases
benchmark.sizes1000,5000,10000,50000,100000Comma-separated operation counts for standard benchmarks
benchmark.large.sizes100000,…,25000000Sizes for large dataset benchmarks
benchmark.threads1,2,4,8,16Thread counts for concurrent access benchmarks
benchmark.value.size64Value size in bytes for standard benchmarks
benchmark.large.value.size10240Value size in bytes for large-value benchmarks
benchmark.warmup3Number of warmup iterations before measurement
benchmark.iterations5Number of measurement iterations (for statistical accuracy)
benchmark.compaction.batch50000Batch size for compaction pressure test
benchmark.compaction.batches5Number of batches for compaction pressure test
benchmark.range.data50000Data size for range scan benchmark
benchmark.range.sizes10,100,1000,5000,10000Comma-separated range sizes
benchmark.mixed.ratio50Read percentage for mixed workload (0–100)
benchmark.seed42Random seed for reproducibility
benchmark.percentilestrueEnable per-operation latency percentile tracking

Benchmark Workloads

The suite runs the following workloads:

  • Sequential Writes · ordered key insertion
  • Random Writes · random key insertion
  • Sequential Reads · ordered key lookups
  • Random Reads · random key lookups
  • Mixed Workload · configurable read/write ratio
  • Range Scans · iterator-based range queries
  • Bulk Writes · batched putAll operations
  • Update Workload · overwriting existing keys
  • Large Values · configurable large value sizes
  • Full Iteration · complete store scan
  • Delete Workload · sequential key deletion
  • Large Datasets · up to 25M keys with warmup and statistical analysis (mean, stddev)
  • Concurrent Access · multi-threaded mixed workload with throughput and scalability analysis
  • Compaction Pressure · accumulated data over multiple batches to stress compaction
  • Memory/CPU Metrics · write/read performance with heap memory and CPU usage tracking
  • Latency Percentiles · per-operation nanosecond latencies with p50, p90, p95, p99, p99.9, and max

Generating Charts

After running benchmarks, generate visualizations:

Terminal window
./run.sh -c

This creates PNG charts in a timestamped charts_* directory including performance comparisons, speedup charts, throughput comparisons, error bar plots for large datasets, concurrent scalability curves, compaction pressure plots, memory usage comparisons, and a summary table.

Requirements for chart generation, Python 3, pandas, matplotlib, seaborn (installed automatically into a virtual environment by the runner script).

Running Tests

Terminal window
# Run unit tests
mvn test -Dtest=TidesDBStoreTest \
-DargLine="-Djava.library.path=/usr/local/lib"
# Run all tests
mvn test -DargLine="-Djava.library.path=/usr/local/lib"

Performance Tuning

Write-Heavy Workloads

For workloads that prioritize write throughput:

TidesDBStoreConfig config = TidesDBStoreConfig.builder()
.compressionAlgorithm(CompressionAlgorithm.LZ4_FAST_COMPRESSION)
.syncMode(SyncMode.SYNC_NONE)
.writeBufferSize(128 * 1024 * 1024)
.enableBloomFilter(true)
.bloomFPR(0.01)
.numFlushThreads(4)
.numCompactionThreads(4)
.build();

Read-Heavy Workloads

For workloads that prioritize read latency:

TidesDBStoreConfig config = TidesDBStoreConfig.builder()
.useBtree(true)
.blockCacheSize(256 * 1024 * 1024)
.enableBloomFilter(true)
.bloomFPR(0.001)
.enableBlockIndexes(true)
.compressionAlgorithm(CompressionAlgorithm.LZ4_COMPRESSION)
.build();

Memory-Constrained Environments

For environments with limited memory:

TidesDBStoreConfig config = TidesDBStoreConfig.builder()
.blockCacheSize(16 * 1024 * 1024)
.writeBufferSize(16 * 1024 * 1024)
.maxMemoryUsage(256 * 1024 * 1024)
.maxOpenSSTables(64)
.build();

Fair Benchmarking Against RocksDB

The benchmark suite configures both engines with equivalent settings to ensure a fair comparison:

SettingTidesDBRocksDB
CompressionLZ4LZ4
Bloom filter1% FPR10 bits/key (~1% FPR)
Block cache64 MB64 MB (LRU)
Write buffer64 MB64 MB
Background threads2 flush + 2 compaction4 (maxBackgroundJobs)
Sync / durabilitySYNC_NONEsync=false, WAL enabled
Block indexesEnabledBinary search
LSM levels55
Bulk writesSingle transactionWriteBatch

The one structural difference that cannot be eliminated is transaction overhead. TidesDB requires all operations to go through transactions (begin → op → commit), while RocksDB supports direct db.put()/db.get() calls. The plugin mitigates this with transaction reuse via reset(), but there is still a per-operation cost that is inherent to TidesDB’s MVCC architecture. This cost is most visible on small-data read benchmarks; at larger dataset sizes, disk I/O dominates and the gap closes.

Architecture

Store Lifecycle

  1. Creation · TidesDBStoreSupplier.get() or TidesDBStoreBuilder.build() creates a TidesDBStore instance with the provided configuration.
  2. Initialization · init() is called by Kafka Streams with the state directory. The plugin opens a TidesDB database at <stateDir>/<storeName>, creates or opens the configured column family, and initializes a reusable transaction for the hot path.
  3. Operation · get, put, delete, putAll, range, and all map directly to TidesDB transactions. Single-operation calls (get, put) reuse a pooled transaction via reset() to avoid allocation overhead. Multi-operation calls (putAll, putIfAbsent, delete) use fresh transactions for atomicity.
  4. Flush · flush() triggers a non-blocking memtable flush to disk.
  5. Close · close() frees the reusable transaction and closes the TidesDB database.

Transaction Reuse

The plugin maintains a single reusable transaction for the get and put hot path. After each commit, the transaction is reset via reset() instead of being freed and reallocated. This retains internal buffers and avoids repeated allocation overhead, which matters in Kafka Streams where every record processed may trigger a state store read or write.

When the reusable transaction is unavailable (e.g., concurrent access from a punctuator), the plugin falls back to creating a new transaction.

Direct Access

For advanced use cases, the store exposes direct access to the underlying TidesDB objects:

TidesDBStore store = (TidesDBStore) context.getStateStore("my-store");
TidesDB db = store.getDb();
ColumnFamily cf = store.getColumnFamily();

Use with caution · operations on these objects bypass the store’s transaction management.

Examples

Word Count

import com.tidesdb.kafka.store.TidesDBStoreSupplier;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> lines = builder.stream("text-input");
KTable<String, Long> wordCounts = lines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as(new TidesDBStoreSupplier("word-counts")));
wordCounts.toStream()
.to("word-counts-output", Produced.with(Serdes.String(), Serdes.Long()));

Windowed Aggregation

KStream<String, String> events = builder.stream("events");
events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.toStream()
.to("windowed-output");

Custom Aggregation with Config

TidesDBStoreConfig config = TidesDBStoreConfig.builder()
.compressionAlgorithm(CompressionAlgorithm.ZSTD_COMPRESSION)
.defaultTtlSeconds(86400) // 24-hour expiration
.useBtree(true)
.build();
activities
.groupByKey()
.aggregate(
UserStats::new,
(userId, activity, stats) -> { stats.addActivity(activity); return stats; },
Materialized.<String, UserStats>as(new TidesDBStoreSupplier("user-stats", config))
.withKeySerde(Serdes.String())
.withValueSerde(new UserStatsSerde())
);

TidesDB Kafka Streams plugin repository: https://github.com/tidesdb/tidesdb-kafka