Plan Nodes and Operators¶
Velox query plan is a tree of PlanNode’s. Each PlanNode has zero or more child PlanNode’s. To execute a query plan, Velox converts it into a set of pipelines. Each pipeline is made of a linear sequence of operators that corresponds to a linear sub-tree of the plan. The plan tree is broken down into a set of linear sub-trees by disconnecting all but one child node from each node that has two or more children.

The conversion of plan nodes to operators is mostly one-to-one. Some exceptions are:
Filter node followed by Project node is converted into a single operator FilterProject
Nodes with two or more child nodes are converted to multiple operators, e.g. HashJoin node is converted to a pair of operators: HashProbe and HashBuild.
Operators corresponding to leaf nodes are called source operators. Only a subset of plan nodes can be located at the leaves of the plan tree. These are:
TableScanNode
ValuesNode
ExchangeNode
MergeExchangeNode
ArrowStreamNode
Here is a list of supported plan nodes and corresponding operators.
Plan Node |
Operator(s) |
Leaf Node / Source Operator |
---|---|---|
TableScanNode |
TableScan |
Y |
ArrowStreamNode |
ArrowStream |
Y |
FilterNode |
FilterProject |
|
ProjectNode |
FilterProject |
|
AggregationNode |
HashAggregation or StreamingAggregation |
|
GroupIdNode |
GroupId |
|
HashJoinNode |
HashProbe and HashBuild |
|
MergeJoinNode |
MergeJoin |
|
NestedLoopJoinNode |
NestedLoopJoinProbe and NestedLoopJoinBuild |
|
OrderByNode |
OrderBy |
|
TopNNode |
TopN |
|
LimitNode |
Limit |
|
UnnestNode |
Unnest |
|
TableWriteNode |
TableWrite |
|
PartitionedOutputNode |
PartitionedOutput |
|
ExchangeNode |
Exchange |
Y |
MergeExchangeNode |
MergeExchange |
Y |
ValuesNode |
Values |
Y |
LocalMergeNode |
LocalMerge |
|
LocalPartitionNode |
LocalPartition and LocalExchange |
|
EnforceSingleRowNode |
EnforceSingleRow |
|
AssignUniqueIdNode |
AssignUniqueId |
|
WindowNode |
Window |
Plan Nodes¶
TableScanNode¶
The table scan operation reads data from a connector. For example, when used with HiveConnector, table scan reads data from ORC or Parquet files.
Property |
Description |
---|---|
outputType |
A list of output columns. This is a subset of columns available in the underlying table. The order of columns may not match the schema of the table. |
tableHandle |
Connector-specific description of the table. May include a pushed down filter. |
assignments |
Connector-specific mapping from the table schema to output columns. |
ArrowStreamNode¶
The Arrow stream operation reads data from an Arrow array stream. The ArrowArrayStream structure is defined in Arrow abi, and provides the required callbacks to interact with a streaming source of Arrow arrays.
Property |
Description |
---|---|
arrowStream |
The constructed Arrow array stream. This is a streaming source of data chunks, each with the same schema. |
FilterNode¶
The filter operation eliminates one or more records from the input data based on a boolean filter expression.
Property |
Description |
---|---|
filter |
Boolean filter expression. |
ProjectNode¶
The project operation produces one or more additional expressions based on the inputs of the dataset. The project operation may also drop one or more of the input columns.
Property |
Description |
---|---|
names |
Column names for the output expressions. |
expressions |
Expressions for the output columns. |
AggregationNode¶
The aggregate operation groups input data on a set of grouping keys, calculating each measure for each combination of the grouping keys.
Property |
Description |
---|---|
step |
Aggregation step: partial, final, intermediate, single. |
groupingKeys |
Zero or more grouping keys. |
preGroupedKeys |
A subset of the grouping keys on which the input is known to be pre-grouped, i.e. all rows with a given combination of values of the pre-grouped keys appear together one after another. The input is not assumed to be sorted on the pre-grouped keys. If input is pre-grouped on all grouping keys the execution will use the StreamingAggregation operator. |
aggregateNames |
Names for the output columns for the measures. |
aggregates |
Expressions for computing the measures, e.g. count(1), sum(a), avg(b). Expressions must be in the form of aggregate function calls over input columns directly, e.g. sum(c) is ok, but sum(c + d) is not. |
aggregationMasks |
For each measure, an optional boolean input column that is used to mask out rows for this particular measure. |
ignoreNullKeys |
A boolean flag indicating whether the aggregation should drop rows with nulls in any of the grouping keys. Used to avoid unnecessary processing for an aggregation followed by an inner join on the grouping keys. |
GroupIdNode¶
Duplicates the input for each of the specified grouping key sets. Used to implement aggregations over grouping sets.
The output consists of grouping keys, followed by aggregation inputs, followed by the group ID column. The type of group ID column is BIGINT.
Property |
Description |
---|---|
groupingSets |
List of grouping key sets. Keys within each set must be unique, but keys can repeat across the sets. |
groupingKeyInfos |
The names and order of the grouping key columns in the output. |
aggregationInputs |
Input columns to duplicate. |
groupIdName |
The name for the group-id column that identifies the grouping set. Zero-based integer corresponding to the position of the grouping set in the ‘groupingSets’ list. |
HashJoinNode and MergeJoinNode¶
The join operation combines two separate inputs into a single output, based on a join expression. A common subtype of joins is an equality join where the join expression is constrained to a list of equality (or equality + null equality) conditions between the two inputs of the join.
HashJoinNode represents an implementation that starts by loading all rows from the right side of the join into a hash table, then streams left side of the join probing the hash table for matching rows and emitting results.
MergeJoinNode represents an implementation that assumes that both inputs are sorted on the join keys and streams both join sides looking for matching rows and emitting results.
Property |
Description |
---|---|
joinType |
Join type: inner, left, right, full, left semi filter, left semi project, right semi filter, right semi project, anti. You can read about different join types in this blog post. |
nullAware |
Applies to anti and semi project joins only. Indicates whether the join semantic is IN (nullAware = true) or EXISTS (nullAware = false). |
leftKeys |
Columns from the left hand side input that are part of the equality condition. At least one must be specified. |
rightKeys |
Columns from the right hand side input that are part of the equality condition. At least one must be specified. The number and order of the rightKeys must match the number and order of the leftKeys. |
filter |
Optional non-equality filter expression that may reference columns from both inputs. |
outputType |
A list of output columns. This is a subset of columns available in the left and right inputs of the join. The columns may appear in different order than in the input. |
NestedLoopJoinNode¶
The cross join operation combines two separate inputs into a single output by combining each row of the left hand side input with each row of the right hand side input. If there are N rows in the left input and M rows in the right input, the output of the cross join will contain N * M rows.
Property |
Description |
---|---|
outputType |
A list of output columns. This is a subset of columns available in the left and right inputs of the join. The columns may appear in different order than in the input. |
OrderByNode¶
The sort or order by operation reorders a dataset based on one or more identified sort fields as well as a sorting order.
Property |
Description |
---|---|
sortingKeys |
List of one of more input columns to sort by. |
sortingOrders |
Sorting order for each of the soring keys. The supported orders are: ascending nulls first, ascending nulls last, descending nulls first, descending nulls last. |
isPartial |
Boolean indicating whether the sort operation processes only a portion of the dataset. |
TopNNode¶
The top-n operation reorders a dataset based on one or more identified sort fields as well as a sorting order. Rather than sort the entire dataset, the top-n will only maintain the total number of records required to ensure a limited output. A top-n is a combination of a logical sort and logical limit operations.
Property |
Description |
---|---|
sortingKeys |
List of one of more input columns to sort by. |
sortingOrders |
Sorting order for each of the soring keys. See OrderBy for the list of supported orders. |
count |
Maximum number of rows to return. |
isPartial |
Boolean indicating whether the operation processes only a portion of the dataset. |
LimitNode¶
The limit operation skips a specified number of input rows and then keeps up to a specified number of rows and drops the rest.
Property |
Description |
---|---|
offset |
Number of rows of input to skip. |
count |
Maximum number of rows to return. |
isPartial |
Boolean indicating whether the operation processes only a portion of the dataset. |
UnnestNode¶
The unnest operation expands arrays and maps into separate columns. Arrays are expanded into a single column, and maps are expanded into two columns (key, value). Can be used to expand multiple columns. In this case produces as many rows as the highest cardinality array or map (the other columns are padded with nulls). Optionally can produce an ordinality column that specifies the row number starting with 1.
Property |
Description |
---|---|
replicateVariables |
Input columns that are returned unmodified. |
unnestVariables |
Input columns of type array or map to expand. |
unnestNames |
Names to use for expanded columns. One name per array column. Two names per map column. |
ordinalityName |
Optional name for the ordinality column. |
TableWriteNode¶
The table write operation consumes one output and writes it to storage via a connector. An example would be writing ORC or Parquet files. The table write operation return a single row with a single column containing the number of rows written to storage.
Property |
Description |
---|---|
columns |
A list of input columns to write to storage. This may be a subset of the input columns in different order. |
columnNames |
Column names to use when writing to storage. These can be different from the input column names. |
insertTableHandle |
Connector-specific description of the destination table. |
outputType |
An output column containing a number of rows written to storage. |
PartitionedOutputNode¶
The partitioned output operation redistributes data based on zero or more distribution fields.
Property |
Description |
---|---|
keys |
Zero or more input fields to use for calculating a partition for each row. |
numPartitions |
Number of partitions to split the data into. |
broadcast |
Boolean flag indicating whether all rows should be sent to all partitions. |
replicateNullsAndAny |
Boolean flag indicating whether rows with nulls in the keys should be sent to all partitions and, in case there are no such rows, whether a single arbitrarily chosen row should be sent to all partitions. Used to provide global-scope information necessary to implement anti join semantics on a single node. |
partitionFunctionFactory |
Factory to make partition functions to use when calculating partitions for input rows. |
outputType |
A list of output columns. This is a subset of input columns possibly in a different order. |
ValuesNode¶
The values operation returns specified data.
Property |
Description |
---|---|
values |
Set of rows to return. |
parallelizable |
If the same input should be produced by each thread (one per driver). |
repeatTimes |
How many times each vector should be produced as input. |
ExchangeNode¶
A receiving operation that merges multiple streams in an arbitrary order. Input streams are coming from remote exchange or shuffle.
Property |
Description |
---|---|
type |
A list of columns in the input streams. |
MergeExchangeNode¶
A receiving operation that merges multiple ordered streams to maintain orderedness. Input streams are coming from remote exchange or shuffle.
Property |
Description |
---|---|
type |
A list of columns in the input streams. |
sortingKeys |
List of one of more input columns to sort by. |
sortingOrders |
Sorting order for each of the soring keys. See OrderBy for the list of supported orders. |
LocalMergeNode¶
An operation that merges multiple ordered streams to maintain orderedness. Input streams are coming from local exchange.
Property |
Description |
---|---|
sortingKeys |
List of one of more input columns to sort by. |
sortingOrders |
Sorting order for each of the soring keys. See OrderBy for the list of supported orders. |
LocalPartitionNode¶
A local exchange operation that partitions input data into multiple streams or combines data from multiple streams into a single stream.
Property |
Description |
---|---|
Type |
Type of the exchange: gather or repartition. |
partitionFunctionFactory |
Factory to make partition functions to use when calculating partitions for input rows. |
outputType |
A list of output columns. This is a subset of input columns possibly in a different order. |
EnforceSingleRowNode¶
The enforce single row operation checks that input contains at most one row and returns that row unmodified. If input is empty, returns a single row with all values set to null. If input contains more than one row raises an exception.
Used for queries with non-correlated sub-queries.
AssignUniqueIdNode¶
The assign unique id operation adds one column at the end of the input columns with unique value per row. This unique value marks each output row to be unique among all output rows of this operator.
The 64-bit unique id is built in following way: - first 24 bits - task unique id - next 40 bits - operator counter value
The task unique id is added to ensure the generated id is unique across all the nodes executing the same query stage in a distributed query execution.
Property |
Description |
---|---|
idName |
Column name for the generated unique id column. |
taskUniqueId |
A 24-bit integer to uniquely identify the task id across all the nodes. |
WindowNode¶
The Window operator is used to evaluate window functions. The operator adds columns for the window functions output at the end of the input columns.
The window operator groups the input data into partitions based on the values of the partition columns. If no partition columns are specified, then all the input rows are considered to be in the same partition. Within each partition rows are ordered by the values of the sorting columns. The window function is computed for each row at a time in this order. If no sorting columns are specified then the order of the results is unspecified.
Property |
Description |
---|---|
partitionKeys |
Partition by columns for the window functions. |
sortingKeys |
Order by columns for the window functions. |
sortingOrders |
Sorting order for each sorting key above. The supported sort orders are asc nulls first, asc nulls last, desc nulls first and desc nulls last. |
windowColumnNames |
Output column names for each window function invocation in windowFunctions list below. |
windowFunctions |
Window function calls with the frame clause. e.g row_number(), first_value(name) between range 10 preceding and current row. The default frame is between range unbounded preceding and current row. |
Examples¶
Join¶
A query plan with a join includes a HashJoinNode. Such a plan is translated into two pipelines: build and probe. Build pipeline is processing input from the build side of the join and uses HashBuild operator to build a hash table. Probe pipeline is processing input from the probe side of the join, probes the hash table and produces rows that match join criteria. Build pipeline provides the hash table to the probe pipeline via a special mechanism called JoinBridge. JoinBridge is like a future, where HashBuild operator completes the future with a HashTable as a result and HashProbe operator receives the HashTable when future completes.
Each pipeline can run with different levels of parallelism. In the example below, the probe pipeline runs on 2 threads, while the build pipeline runs on 3 threads. When the build pipeline runs multi-threaded, each pipeline processes a portion of the build-side input. The last pipeline to finish processing is responsible for combining the hash tables from the other pipelines and publishing the final table to the JoinBridge. When the probe pipeline for the right outer join runs multi-threaded, the last pipeline to finish processing is responsible for emitting rows from the build side that didn’t match the join condition.

Local Exchange¶
A local exchange operation has multiple uses. It is used to change the parallelism of the data processing from multi-threaded to single-threaded or vice versa. For example, local exchange can be used in a sort operation where partial sort runs multi-threaded and then results are merged on a single thread. Local exchange operation is also used to combine results of multiple pipelines. For example to combine multiple inputs of the UNION or UNION ALL.
Here are some examples.
N-to-1 local exchange that could be used for combining partially sorted results for final merge sort.

1-to-N local exchange to increase parallelism after an operation that must run single-threaded.

Local exchange used to combine data from multiple pipelines, e.g. for UNION ALL.
