printPlanWithStats¶
Velox collects a number of valuable statistics during query execution. These counters are exposed via Task::taskStats() API for programmatic access and can be printed in a human-friendly format for manual inspection. We use these to reason about the query execution dynamic and troubleshoot performance issues.
If you are familiar with Presto, the tools described below would look very similar to the PrestoQueryLookup tool available via bunnylol presto <query-id>.
PlanNode::toString()¶
PlanNode::toString() method prints a query plan as a tree of plan nodes. This API can be used before or after executing a query.
PlanNode::toString() method takes two optional flags: detailed and recursive. When detailed is true, the output includes extra details about each plan node. When recursive is true, the output includes the whole plan tree, otherwise only a single plan node is shown.
In “detailed” mode, Project node shows projection expressions, Filter node shows filter expression, Join node shows join type and join keys, Aggregation node shows grouping keys and aggregate functions, OrderBy node shows sorting keys and orders, etc.
Let’s use a simple join query as an example:
plan->toString(false /*detailed*/, true /*recursive*/) prints a plan tree using plan node names:
-> Project
-> HashJoin
-> TableScan
-> Project
-> Values
plan->toString(true /*detailed*/, true /*recursive*/) adds plan node details to each plan node.
-> Project[expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"])), ]
-> HashJoin[INNER c0=u_c0]
-> TableScan[]
-> Project[expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"]), ]
-> Values[100 rows in 1 vectors]
Let’s also look at an aggregation query:
plan->toString(false /*detailed*/, true /*recursive*/)
-> Aggregation
-> TableScan
plan->toString(true /*detailed*/, true /*recursive*/)
-> Aggregation[PARTIAL [c5] a0 := max(ROW["c0"]), a1 := sum(ROW["c1"]), a2 := sum(ROW["c2"]), a3 := sum(ROW["c3"]), a4 := sum(ROW["c4"])]
-> TableScan[]
printPlanWithStats()¶
printPlanWithStats() function prints a query plan annotated with runtime statistics. This function can be used after the query finishes processing. It takes a root plan node and a TaskStats struct.
By default, printPlanWithStats shows a number of output rows, CPU time, number of threads used, and peak memory usage for each plan node.
printPlanWithStats(*plan, task->taskStats())
-> Project[expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"]))]
Output: 2000 rows (154.98KB), Cpu time: 907.80us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (27.24us/872.82us/7.74us)
-> HashJoin[INNER c0=u_c0]
Output: 2000 rows (136.88KB), Cpu time: 508.74us, Blocked wall time: 117.00us, Peak memory: 2.00MB, CPU breakdown: I/O/F (177.87us/329.20us/1.66us)
HashBuild: Input: 100 rows (1.31KB), Output: 0 rows (0B), Cpu time: 41.77us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (40.18us/1.59us/0ns)
HashProbe: Input: 2000 rows (118.12KB), Output: 2000 rows (136.88KB), Cpu time: 466.97us, Blocked wall time: 117.00us, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (137.69us/327.61us/1.66us)
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 5.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
-> Project[expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"])]
Output: 100 rows (1.31KB), Cpu time: 43.22us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (691ns/5.54us/36.98us)
-> Values[100 rows in 1 vectors]
Input: 0 rows (0B), Output: 100 rows (1.31KB), Cpu time: 3.05us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (0ns/2.48us/568ns)
With includeCustomStats flag enabled, printPlanWithStats adds operator-specific statistics for each plan node, e.g. number of distinct values for the join key, number of row groups skipped in table scan, amount of data read from cache and storage in table scan, number of rows processed via aggregation pushdown into scan, etc.
Here is the output for the join query from above.
printPlanWithStats(*plan, task->taskStats(), true) shows custom operator statistics.
-> Project[expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"]))]
Output: 2000 rows (154.98KB), Cpu time: 907.80us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (27.24us/872.82us/7.74us)
dataSourceLazyWallNanos sum: 473.00us, count: 20, min: 11.00us, max: 96.00us
-> HashJoin[INNER c0=u_c0]
Output: 2000 rows (136.88KB), Cpu time: 508.74us, Blocked wall time: 223.00us, Peak memory: 2.00MB, CPU breakdown: I/O/F (177.87us/329.20us/1.66us)
HashBuild: Input: 100 rows (1.31KB), Output: 0 rows (0B), Cpu time: 41.77us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (40.18us/1.59us/0ns)
distinctKey0 sum: 101, count: 1, min: 101, max: 101
queuedWallNanos sum: 125.00us, count: 1, min: 125.00us, max: 125.00us
rangeKey0 sum: 200, count: 1, min: 200, max: 200
HashProbe: Input: 2000 rows (118.12KB), Output: 2000 rows (136.88KB), Cpu time: 466.97us, Blocked wall time: 223.00us, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (137.69us/327.61us/1.66us)
dynamicFiltersProduced sum: 1, count: 1, min: 1, max: 1
queuedWallNanos sum: 24.00us, count: 1, min: 24.00us, max: 24.00us
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 10.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
dataSourceWallNanos sum: 2.52ms, count: 40, min: 12.00us, max: 250.00us
dynamicFiltersAccepted sum: 1, count: 1, min: 1, max: 1
localReadBytes sum: 0B, count: 1, min: 0B, max: 0B
numLocalRead sum: 0, count: 1, min: 0, max: 0
numPrefetch sum: 28, count: 1, min: 28, max: 28
numRamRead sum: 0, count: 1, min: 0, max: 0
numStorageRead sum: 140, count: 1, min: 140, max: 140
prefetchBytes sum: 29.51KB, count: 1, min: 29.51KB, max: 29.51KB
queuedWallNanos sum: 29.00us, count: 1, min: 29.00us, max: 29.00us
ramReadBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplitBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplits sum: 0, count: 1, min: 0, max: 0
skippedStrides sum: 0, count: 1, min: 0, max: 0
storageReadBytes sum: 150.25KB, count: 1, min: 150.25KB, max: 150.25KB
totalScanTime sum: 0ns, count: 1, min: 0ns, max: 0ns
totalRemainingFilterTime sum: 0ns, count: 1, min: 0ns, max: 0ns
queryThreadIoLatency sum: 0, count: 1, min: 0, max: 0
-> Project[expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"])]
Output: 100 rows (1.31KB), Cpu time: 43.22us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (691ns/5.54us/36.98us)
-> Values[100 rows in 1 vectors]
Input: 0 rows (0B), Output: 100 rows (1.31KB), Cpu time: 3.05us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (0ns/2.48us/568ns)
And this is the output for the aggregation query from above.
printPlanWithStats(*plan, task->taskStats()) shows basic statistics:
-> Aggregation[PARTIAL [c5] a0 := max(ROW["c0"]), a1 := sum(ROW["c1"]), a2 := sum(ROW["c2"]), a3 := sum(ROW["c3"]), a4 := sum(ROW["c4"])]
Output: 849 rows (84.38KB), Cpu time: 1.96ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (1.38ms/579.12us/6.82us)
-> TableScan[Table: hive_table]
Input: 10000 rows (0B), Output: 10000 rows (0B), Cpu time: 2.89ms, Blocked wall time: 25.00us, Peak memory: 1.00MB, Threads: 1, Splits: 1, CPU breakdown: I/O/F (0ns/2.89ms/3.35us)
printPlanWithStats(*plan, task->taskStats(), true) includes custom statistics:
-> Aggregation[PARTIAL [c5] a0 := max(ROW["c0"]), a1 := sum(ROW["c1"]), a2 := sum(ROW["c2"]), a3 := sum(ROW["c3"]), a4 := sum(ROW["c4"])]
Output: 849 rows (84.38KB), Cpu time: 1.96ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (1.38ms/579.12us/6.82us)
-> TableScan[Table: hive_table]
Input: 10000 rows (0B), Output: 10000 rows (0B), Cpu time: 2.89ms, Blocked wall time: 30.00us, Peak memory: 1.00MB, Threads: 1, Splits: 1, CPU breakdown: I/O/F (0ns/2.89ms/3.35us)
dataSourceLazyWallNanos sum: 1.07ms, count: 7, min: 92.00us, max: 232.00us
dataSourceWallNanos sum: 329.00us, count: 2, min: 48.00us, max: 281.00us
loadedToValueHook sum: 50000, count: 5, min: 10000, max: 10000
localReadBytes sum: 0B, count: 1, min: 0B, max: 0B
numLocalRead sum: 0, count: 1, min: 0, max: 0
numPrefetch sum: 2, count: 1, min: 2, max: 2
numRamRead sum: 0, count: 1, min: 0, max: 0
numStorageRead sum: 7, count: 1, min: 7, max: 7
prefetchBytes sum: 31.13KB, count: 1, min: 31.13KB, max: 31.13KB
queuedWallNanos sum: 101.00us, count: 1, min: 101.00us, max: 101.00us
ramReadBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplitBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplits sum: 0, count: 1, min: 0, max: 0
skippedStrides sum: 0, count: 1, min: 0, max: 0
storageReadBytes sum: 61.53KB, count: 1, min: 61.53KB, max: 61.53KB
totalScanTime sum: 0ns, count: 1, min: 0ns, max: 0ns
totalRemainingFilterTime sum: 0ns, count: 1, min: 0ns, max: 0ns
queryThreadIoLatency sum: 0, count: 1, min: 0, max: 0
Common operator statistics¶
Let’s take a closer look at statistics that are collected for all operators.
For each operator, Velox tracks the total number of input rows, output rows, their estimated sizes, cpu time, blocked wall time, and the number of threads used to run the operator.
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 10.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
printPlanWithStats shows output rows and sizes for each plan node and shows input rows and sizes for leaf nodes and nodes that expand to multiple operators. Showing input rows for other nodes is redundant since the number of input rows equals the number of output rows of the immediate child plan node.
Input: 2000 rows (118.12KB), Output: 2000 rows (118.12KB)
When rows are pruned for a TableScan with filters, Velox reports the number of raw input rows and their total size. These are the rows processed before applying the pushed down filters. TableScan also reports the number of splits assigned.
Raw Input: 20480 rows (72.31KB), Splits: 20
Velox also measures CPU time and peak memory usage for each operator. This information is shown for all plan nodes.
Cpu time: 8.89ms, Peak memory: 1.00MB
A breakdown of CPU time into addInput, getOutput and finish stages of the operator is also available. I/O/F below is a shortcut for addInput/getOutput/finish.
CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
Some operators like TableScan and HashProbe may be blocked waiting for splits or hash tables. Velox records the total wall time an operator was blocked and printPlanWithStats shows this information as “Blocked wall time”.
Blocked wall time: 10.00us
Custom operator statistics¶
Operators also collect and report operator-specific statistics.
TableScan operator reports statistics that show how much data has been read from cache vs. durable storage, how much data was prefetched, how many files and row groups were skipped via stats-based pruning.
-> TableScan[Table = Orders]
localReadBytes sum: 0B, count: 1, min: 0B, max: 0B
numLocalRead sum: 0, count: 1, min: 0, max: 0
numPrefetch sum: 28, count: 1, min: 28, max: 28
numRamRead sum: 0, count: 1, min: 0, max: 0
numStorageRead sum: 140, count: 1, min: 140, max: 140
prefetchBytes sum: 29.51KB, count: 1, min: 29.51KB, max: 29.51KB
ramReadBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplitBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplits sum: 0, count: 1, min: 0, max: 0
skippedStrides sum: 0, count: 1, min: 0, max: 0
storageReadBytes sum: 150.25KB, count: 1, min: 150.25KB, max: 150.25KB
totalScanTime sum: 0ns, count: 1, min: 0ns, max: 0ns
totalRemainingFilterTime sum: 0ns, count: 1, min: 0ns, max: 0ns
queryThreadIoLatency sum: 0, count: 1, min: 0, max: 0
HashBuild operator reports range and number of distinct values for the join keys.
-> HashJoin[INNER c0=u_c0]
HashBuild:
rangeKey0 sum: 200, count: 1, min: 200, max: 200
distinctKey0 sum: 101, count: 1, min: 101, max: 101
HashProbe operator reports whether it generated dynamic filter and TableScan operator reports whether it received dynamic filter pushed down from the join.
-> HashJoin[INNER c0=u_c0]
HashProbe:
dynamicFiltersProduced sum: 1, count: 1, min: 1, max: 1
-> TableScan[]
dynamicFiltersAccepted sum: 1, count: 1, min: 1, max: 1
TableScan operator shows how many rows were processed by pushing down aggregation into TableScan.
loadedToValueHook sum: 50000, count: 5, min: 10000, max: 10000