Hash Table Caching¶
Background¶
In materialized execution engines like Spark and Presto on Spark, for broadcast joins, the build side splits are replicated to all join tasks due to upfront split planning. This kind of upfront split planning allows these engines to provide task level fault tolerance as the input splits of the tasks are tracked and output data can be discarded, thus enabling task level retries.
But due to this, each task independently builds an identical hash table from the same data. For large build sides this is wasteful: every task spends CPU and memory constructing the same hash table that another task in the same query has already built.
The Build IO Tax¶
Also, the broadcast data follows a write-once-read-many I/O pattern. Each task re-reads the build side data independently. When the number of tasks is large — O(100k) tasks across 10k+ workers — these concurrent reads overwhelm the I/O service layer, leading to throttling.
Throttling causes tasks to stall for seconds to minutes waiting for I/O. When queries are charged for reserved workers, these stalls mean reserved resources sit idle, increasing query cost. Beyond I/O fetch delays, when the hash table is large (in the gigabyte range), the CPU cost of rebuilding it per task is also significant and wasteful.
Hash table caching eliminates this redundant work by allowing the first task to build the hash table and making it available to all subsequent tasks in the same Velox instance. This is a build-once, reuse-many paradigm. In Sapphire-Velox, this implements a once-per-worker model that yields more than an order of magnitude savings, since the number of tasks far exceeds the number of workers.
Enabling Hash Table Caching¶
Hash table caching is enabled by setting the useHashTableCache flag to
true on the HashJoinNode:
auto joinNode =
core::HashJoinNode::Builder()
.id(planNodeIdGenerator->next())
.joinType(core::JoinType::kInner)
.nullAware(false)
.leftKeys({leftKeyField})
.rightKeys({rightKeyField})
.left(probeNode)
.right(buildNode)
.outputType(outputType)
.useHashTableCache(true)
.build();
When useHashTableCache is false (the default), the hash join behaves
exactly as before. The flag is only intended for broadcast joins and is
currently used by Presto-on-Spark.
Overall Design¶
Hash table caching introduces a global singleton HashTableCache that stores
built hash tables keyed by queryId:planNodeId. The cache coordinates
between tasks so that exactly one task builds the hash table while other tasks
wait and then reuse the result.
The HashTableCache is a process-wide singleton in the Velox instance,
alongside the AsyncDataCache and MemoryManager. The cache and its
methods provide building blocks for drivers within a task and tasks within a
worker to coordinate hash table construction and reuse.
The design has three main components:
HashTableCache - A process-wide singleton that stores and manages cached hash table entries.
HashTableCacheEntry - A cache entry that holds the hash table, build coordination state, and a dedicated memory pool.
HashBuild operator integration - Logic in the HashBuild operator to check the cache, build or wait, and store the result.
Cache Structure¶
HashTableCache is a thread-safe singleton that maps cache keys to cache
entries:
class HashTableCache {
std::mutex lock_;
std::unordered_map<std::string,
std::shared_ptr<HashTableCacheEntry>> tables_;
};
Each HashTableCacheEntry contains:
Field |
Description |
|---|---|
|
The key used to look up this entry ( |
|
The task ID of the task that is responsible for building the table. |
|
A leaf memory pool under the query pool used for table allocations. |
|
The built |
|
Whether the build side contained null join keys. |
|
Atomic flag indicating whether the table has been fully built. |
|
Promises used to notify waiting tasks when build completes. |
Cache API¶
The HashTableCache exposes three methods. All decisions are made under a
single std::mutex. The lock is held only for map lookups, inserts, and
promise creation — never during table building or memory allocation.
get()¶
The get() method is the central coordination point. It is called by every
HashBuild operator during initialize() and determines the caller’s
role under the mutex:
get(key, taskId, queryCtx, *future):
lock(lock_)
Case 1 – No entry: create entry, set builderTaskId → return entry (Builder)
Case 2 – Same task: return entry (Builder, coordinate via JoinBridge)
Case 3 – Diff task, not complete: push promise → return entry + future (Waiter)
Case 4 – Complete: return entry (Late Arrival)
When creating a new entry, get() allocates a tablePool as a leaf child
of the query pool and registers a QueryCtx release callback that calls
drop() on query destruction.
Key design decisions in get():
Memory pool ownership: The
tablePoolis a leaf child of the first caller’sQueryCtxroot pool. All drivers in the builder task share this pool for partial table allocations (viaHashBuild::tableMemoryPool()), tying the cached table’s memory accounting to the originating query.Cleanup callback:
QueryCtx::addReleaseCallbackensures the cache entry is dropped when the query finishes.drop()resets theshared_ptr<BaseHashTable>outside the lock to free memory before the entry is destroyed.Lock scope: All decisions are made under a single
std::mutex. The lock is held only for map lookups/inserts and promise creation — never during table building or memory allocation.
put()¶
Called by the last driver of the builder task after merging all partial tables. Publishes the table and wakes all waiters:
put(key, table, hasNullKeys):
lock(lock_)
entry.table = table
entry.buildComplete = true
promises = move(entry.buildPromises)
unlock(lock_)
for each promise: promise.setValue() // wake waiters outside lock
drop()¶
Removes a cache entry and frees the table memory. Called by the QueryCtx
cleanup callback when the query is destroyed:
drop(key):
lock(lock_)
entry = move(tables_[key])
tables_.erase(key)
unlock(lock_)
entry.table.reset() // free memory outside lock
Build Coordination¶
When hash table caching is enabled, the HashBuild operator calls
HashTableCache::get() during initialization. The cache uses the first
caller’s task as the builder and makes subsequent callers wait.
Builder Task¶
The first task to call get() for a given key creates the cache entry and
becomes the builder. This task proceeds through the normal HashBuild flow:
all its drivers build partial hash tables, the last driver merges them, and
the merged table is stored in the cache via HashTableCache::put().
Drivers within the builder task coordinate with each other through the
existing HashJoinBridge mechanism. The cache does not interfere with
intra-task driver synchronization.
Waiter Tasks¶
When a task calls get() and finds that another task is already building the
table (builderTaskId differs from its own task ID and buildComplete is
false), it receives a ContinueFuture and transitions to the
kWaitForBuild state. The task is suspended until the builder task calls
put(), which fulfills all waiting promises.
Once notified, the waiter task calls noMoreInput() which finds the table
in the cache and passes it directly to the HashJoinBridge without building
anything. The probe side then runs normally against the cached table.
Task 1 (Builder) Task 2 (Waiter) Task 3 (Waiter)
──────────────── ─────────────── ───────────────
get() → creates entry get() → sees builder get() → sees builder
builds hash table receives future receives future
put() → sets table (suspended) (suspended)
notifies waiters ──────────→ wakes up wakes up
uses cached table uses cached table
Cache Hit¶
If a task calls get() and finds buildComplete is already true, the
cached table is returned immediately. The HashBuild operator skips all build
logic and passes the table to the HashJoinBridge.
The HashBuild operator reports cache hits and misses via runtime statistics:
hashtable.cacheHit- Table was found in the cache and reused.hashtable.cacheMiss- Table was not in the cache; this task built it.
Usage by HashBuild¶
The HashBuild operator uses the cache in a three-phase protocol: build, synchronize, and probe.
Step 1: Build Phase (Producer)¶
When a HashBuild operator is initialized, it checks the cache via
setupCachedHashTable().
Cache Miss (Builder): The first task to find a miss becomes the Builder. It creates a cache entry, pulls data from storage, builds the
BaseHashTable, and callsput()to publish it. Within the Builder task, subsequent drivers also callget()and receive the same entry (sincebuilderTaskId == taskId). Each driver callssetupTable()to allocate its own partialBaseHashTableusingcacheEntry->tablePool, receives its subset of input viaaddInput(), and builds a partial table. Intra-task coordination between these drivers uses the standardallPeersFinished()/JoinBridgemechanism, not the cache.The Wait (Waiters): If other tasks arrive while the Builder is building, they encounter the pending state and transition to
kWaitForBuild, waiting on aContinueFutureprovided by the cache.Short-circuiting Upstream: Once the Builder publishes the table, waiters are unblocked. Upon receiving the cached table, waiter tasks set their no-more-input flags. This short-circuits their source operators (e.g.,
TableScan), immediately stopping further data retrieval.
Step 2: Synchronization (JoinBridge)¶
The HashJoinBridge acts as the hand-off point between the build and probe
sides. Even if the table was retrieved from the cache rather than built locally,
the bridge ensures the probe side is notified that the data is ready for
processing. Both builder and waiter tasks call
joinBridge.setHashTable() to publish the table (or cached table) to the
probe operators.
Step 3: Probe Phase (Consumer)¶
The HashProbe operator takes the cached table from the bridge and executes
as usual. Because the table is held as a shared_ptr, the probe operator’s
reference prevents the cache from freeing the table while a join is actively
scanning it. Once the probe finishes, the reference count is decremented. The
table is ultimately freed when the QueryCtx release callback calls
drop().
HashBuild Lifecycle¶
The following pseudocode shows the complete lifecycle of a HashBuild
operator when hash table caching is enabled. Only the key function calls are
shown.
Initialization¶
initialize():
cacheKey = "queryId:planNodeId"
cacheEntry = HashTableCache::instance()->get(cacheKey, taskId, queryCtx, &future_)
if cacheEntry.buildComplete: // Late Arrival
noMoreInput() // → finishHashBuild() → getHashTableFromCache()
return
if future_.valid(): // Waiter
state = kWaitForBuild
return
// Builder: proceed with normal table setup
setupTable() // allocate BaseHashTable using cacheEntry.tablePool
setupSpiller() // no-op: canSpill() returns false with cache
Build and Publish¶
noMoreInput() → finishHashBuild():
if not allPeersFinished(): // wait for peer drivers in same task
state = kWaitForBuild
return
if getHashTableFromCache(): // Waiter or Late Arrival: cache has table
joinBridge.setHashTable(cacheEntry.table, hasNullKeys)
return
// Builder (last driver): merge and publish
table_.prepareJoinTable(otherTables)
HashTableCache::instance()->put(cacheKey, table_, hasNullKeys)
joinBridge.setHashTable(table_, hasNullKeys)
Waiter Wake-up¶
isBlocked():
case kWaitForBuild:
if receivedCachedHashTable(): // future_ fulfilled, buildComplete == true
setRunning()
noMoreInput() // → finishHashBuild() → getHashTableFromCache()
Skipping Source Reads¶
Waiter tasks never read any data from storage. No splits are fetched, no exchanges are initiated.
In Velox, a build-side pipeline is a chain of operators ending with
HashBuild as the sink:
[TableScan / Exchange] → ... → [HashBuild]
operators_[0] operators_[last]
The Driver::runInternal() loop iterates through operator pairs (op,
nextOp) and, for each pair, follows this sequence:
Check
op->isBlocked()— if blocked, suspend the Driver.Check
nextOp->isBlocked()— if blocked, suspend the Driver.Check
nextOp->needsInput()— if false, skip pulling fromop.Call
op->getOutput()and feed the result tonextOp->addInput().
The critical point is that nextOp->isBlocked() is checked before
op->getOutput() is ever called. When the HashBuild operator is in the
kWaitForBuild state, it returns a blocked status, which prevents the
driver from pulling data from any upstream operator (e.g., TableScan or
Exchange). Once the cached table arrives and the waiter calls
noMoreInput(), source operators are short-circuited immediately —
they never execute at all.
This is a key benefit of the caching design: waiter tasks incur zero I/O cost.
Memory Management¶
Cached hash tables must outlive the task that built them because waiter tasks from the same query need to access the table after the builder task has finished. To support this, cached hash tables use a dedicated leaf memory pool created under the query memory pool rather than the operator’s task-level pool.
Pool Hierarchy¶
Query Pool
├── Task 1 Pool (builder - may finish first)
│ └── Operator Pool
└── cached_table_<key> Pool ← hash table lives here
(created by HashTableCache)
The tablePool is created by the first call to get() as a leaf child of
the caller’s QueryCtx root pool. All drivers in the builder task share this
pool for their partial table allocations via HashBuild::tableMemoryPool().
This ties the cached table’s memory accounting to the originating query rather
than to any individual task, allowing the table to survive task completion.
Cleanup Callback¶
When a cache entry is created, HashTableCache::get() registers a release
callback on the QueryCtx. When the query context is destroyed, this
callback calls HashTableCache::drop() to remove the entry and free the
table’s memory before the query pool is torn down. drop() resets the
shared_ptr<BaseHashTable> outside the lock to free memory before the entry
itself is destroyed. This ensures there are no dangling references to
destroyed memory pools.
HashBuild::tableMemoryPool() returns the cache entry’s tablePool when
caching is enabled, or the operator’s own pool() for regular joins.
Spilling¶
Spilling is not supported when hash table caching is enabled. Both
HashBuild::canSpill() and HashBuild::canReclaim() return false when
useHashTableCache is true:
bool HashBuild::canSpill() const {
// ...
if (useHashTableCache()) {
return false;
}
// ...
}
This is because spilling clears the hash table from memory and rebuilds it later, which would corrupt the cached table that other tasks may be using. Specifically:
Builder task: Cannot spill because the table is shared via the cache. Spilling would invalidate the
shared_ptrheld by waiter tasks.Waiter tasks: Cannot spill because they use the cached table directly and never build their own.
Coordination complexity: Rebuild-after-spill would require re-coordinating across all tasks sharing the cached table.
Broadcast joins (the primary use case for this cache) are generally expected to fit in memory. If a build-side relation is large enough to require spilling, it should bypass the cache and use a standard partitioned hash join with spilling enabled.
Eviction¶
Cache eviction is not currently supported. Entries remain in the cache until the query context is destroyed, at which point the release callback removes them.
Future memory pressure-based eviction would need to address:
Tracking total memory: Summing the memory held by all cached tables.
Eviction policy: Deciding which entries to evict (e.g., LRU, by size).
Reference invalidation: Safely handling eviction while probe operators hold references via
shared_ptr.Rebuild fallback: Allowing tasks to re-build the table if it was evicted.
The drop() method already provides the mechanism for removing individual
entries and could be extended to support eviction driven by the memory manager
or arbitration framework.
Limitations and Future Work¶
No spilling: Cached tables must reside entirely in memory. See Spilling above.
No eviction: Cached entries live for the full query lifetime. Memory pressure-based eviction is planned.
Single-query scope: The cache key includes the
queryId, so tables are not shared across different queries even if the build side data is identical. Cross-query sharing is a potential future optimization.No sanity checks on table sharing during probe: For right joins, we rely on the planner to not do a broadcast join and skip using cached tables. But velox as a library does not do checks during probe that it is in fact running a join that does not mutate the hash table. Mutating the cached hash table can cause incorrect execution results. We should add this check