Configuration properties¶
Generic Configuration¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
preferred_output_batch_bytes |
integer |
10MB |
Preferred size of batches in bytes to be returned by operators from Operator::getOutput. It is used when an estimate of average row size is known. Otherwise preferred_output_batch_rows is used. |
preferred_output_batch_rows |
integer |
1024 |
Preferred number of rows to be returned by operators from Operator::getOutput. It is used when an estimate of average row size is not known. When the estimate of average row size is known, preferred_output_batch_bytes is used. |
max_output_batch_rows |
integer |
10000 |
Max number of rows that could be return by operators from Operator::getOutput. It is used when an estimate of average row size is known and preferred_output_batch_bytes is used to compute the number of output rows. |
table_scan_getoutput_time_limit_ms |
integer |
5000 |
TableScan operator will exit getOutput() method after this many milliseconds even if it has no data to return yet. Zero means ‘no time limit’. |
abandon_partial_aggregation_min_rows |
integer |
100,000 |
Number of input rows to receive before starting to check whether to abandon partial aggregation. |
abandon_partial_aggregation_min_pct |
integer |
80 |
Abandons partial aggregation if number of groups equals or exceeds this percentage of the number of input rows. |
abandon_partial_topn_row_number_min_rows |
integer |
100,000 |
Number of input rows to receive before starting to check whether to abandon partial TopNRowNumber. |
abandon_partial_topn_row_number_min_pct |
integer |
80 |
Abandons partial TopNRowNumber if number of output rows equals or exceeds this percentage of the number of input rows. |
session_timezone |
string |
User provided session timezone. Stores a string with the actual timezone name, e.g: “America/Los_Angeles”. |
|
adjust_timestamp_to_session_timezone |
bool |
false |
If true, timezone-less timestamp conversions (e.g. string to timestamp, when the string does not specify a timezone) will be adjusted to the user provided session_timezone (if any). For instance: if this option is true and user supplied “America/Los_Angeles”, then “1970-01-01” will be converted to -28800 instead of 0. Similarly, timestamp to date conversions will adhere to user ‘session_timezone’, e.g: Timestamp(0) to Date will be -1 (number of days since epoch) for “America/Los_Angeles”. |
track_operator_cpu_usage |
bool |
true |
Whether to track CPU usage for stages of individual operators. Can be expensive when processing small batches, e.g. < 10K rows. |
hash_adaptivity_enabled |
bool |
true |
If false, the ‘group by’ code is forced to use generic hash mode hashtable. |
adaptive_filter_reordering_enabled |
bool |
true |
If true, the conjunction expression can reorder inputs based on the time taken to calculate them. |
max_local_exchange_buffer_size |
integer |
32MB |
Used for backpressure to block local exchange producers when the local exchange buffer reaches or exceeds this size. |
max_local_exchange_partition_count |
integer |
2^32 |
Limits the number of partitions created by a local exchange. Partitioning data too granularly can lead to poor performance. This setting allows increasing the task concurrency for all pipelines except the ones that require a local partitioning. Affects the number of drivers for pipelines containing LocalPartitionNode and cannot exceed the maximum number of pipeline drivers configured for the task. |
exchange.max_buffer_size |
integer |
32MB |
Size of buffer in the exchange client that holds data fetched from other nodes before it is processed. A larger buffer can increase network throughput for larger clusters and thus decrease query processing time at the expense of reducing the amount of memory available for other usage. |
merge_exchange.max_buffer_size |
integer |
128MB |
The aggregate buffer size (in bytes) across all exchange clients generated by the merge exchange operator, responsible for storing data retrieved from various nodes prior to processing. It is divided equally among all clients and has an upper and lower limit of 32MB and 1MB, respectively, per client. Enforced approximately, not strictly. A larger size can increase network throughput for larger clusters and thus decrease query processing time at the expense of reducing the amount of memory available for other usage. |
max_page_partitioning_buffer_size |
integer |
32MB |
The maximum size in bytes for the task’s buffered output when output is partitioned using hash of partitioning keys. See PartitionedOutputNode::Kind::kPartitioned. The producer Drivers are blocked when the buffered size exceeds this. The Drivers are resumed when the buffered size goes below OutputBufferManager::kContinuePct (90)% of this. |
max_output_buffer_size |
integer |
32MB |
The maximum size in bytes for the task’s buffered output. The producer Drivers are blocked when the buffered size exceeds this. The Drivers are resumed when the buffered size goes below OutputBufferManager::kContinuePct (90)% of this. |
min_table_rows_for_parallel_join_build |
integer |
1000 |
The minimum number of table rows that can trigger the parallel hash join table build. |
debug.validate_output_from_operators |
bool |
false |
If set to true, then during execution of tasks, the output vectors of every operator are validated for consistency. This is an expensive check so should only be used for debugging. It can help debug issues where malformed vector cause failures or crashes by helping identify which operator is generating them. |
enable_expression_evaluation_cache |
bool |
true |
Whether to enable caches in expression evaluation. If set to true, optimizations including vector pools and evalWithMemo are enabled. |
max_shared_subexpr_results_cached |
integer |
10 |
For a given shared subexpression, the maximum distinct sets of inputs we cache results for. Lambdas can call the same expression with different inputs many times, causing the results we cache to explode in size. Putting a limit contains the memory usage. |
driver_cpu_time_slice_limit_ms |
integer |
0 |
If it is not zero, specifies the time limit that a driver can continuously run on a thread before yield. If it is zero, then it no limit. |
prefixsort_normalized_key_max_bytes |
integer |
128 |
Maximum number of bytes to use for the normalized key in prefix-sort. Use 0 to disable prefix-sort. |
prefixsort_min_rows |
integer |
130 |
Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking. |
Expression Evaluation Configuration¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
expression.eval_simplified |
boolean |
false |
Whether to use the simplified expression evaluation path. |
expression.track_cpu_usage |
boolean |
false |
Whether to track CPU usage for individual expressions (supported by call and cast expressions). Can be expensive when processing small batches, e.g. < 10K rows. |
legacy_cast |
bool |
false |
Enables legacy CAST semantics if set to true. CAST(timestamp AS varchar) uses ‘T’ as separator between date and time (instead of a space), and the year part is not padded. |
cast_match_struct_by_name |
bool |
false |
This flag makes the Row conversion to by applied in a way that the casting row field are matched by name instead of position. |
expression.max_array_size_in_reduce |
integer |
100000 |
|
debug_disable_expression_with_peeling |
bool |
false |
Disable optimization in expression evaluation to peel common dictionary layer from inputs. Should only be used for debugging. |
debug_disable_common_sub_expressions |
bool |
false |
Disable optimization in expression evaluation to re-use cached results for common sub-expressions. Should only be used for debugging. |
debug_disable_expression_with_memoization |
bool |
false |
Disable optimization in expression evaluation to re-use cached results between subsequent input batches that are dictionary encoded and have the same alphabet(underlying flat vector). Should only be used for debugging. |
debug_disable_expression_with_lazy_inputs |
bool |
false |
Disable optimization in expression evaluation to delay loading of lazy inputs unless required. Should only be used for debugging. |
Memory Management¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
max_partial_aggregation_memory |
integer |
16MB |
Maximum amount of memory in bytes for partial aggregation results. Increasing this value can result in less network transfer and lower CPU utilization by allowing more groups to be kept locally before being flushed, at the cost of additional memory usage. |
max_extended_partial_aggregation_memory |
integer |
16MB |
Maximum amount of memory in bytes for partial aggregation results if cardinality reduction is below partial_aggregation_reduction_ratio_threshold. Every time partial aggregate results size reaches max_partial_aggregation_memory bytes, the results are flushed. If cardinality reduction is below partial_aggregation_reduction_ratio_threshold, i.e. number of result rows / number of input rows > partial_aggregation_reduction_ratio_threshold, memory limit for partial aggregation is automatically doubled up to max_extended_partial_aggregation_memory. This adaptation is disabled by default, since the value of max_extended_partial_aggregation_memory equals the value of max_partial_aggregation_memory. Specify higher value for max_extended_partial_aggregation_memory to enable. |
Spilling¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
spill_enabled |
boolean |
false |
Spill memory to disk to avoid exceeding memory limits for the query. |
aggregation_spill_enabled |
boolean |
true |
When spill_enabled is true, determines whether HashAggregation operator can spill to disk under memory pressure. |
join_spill_enabled |
boolean |
true |
When spill_enabled is true, determines whether HashBuild and HashProbe operators can spill to disk under memory pressure. |
order_by_spill_enabled |
boolean |
true |
When spill_enabled is true, determines whether OrderBy operator can spill to disk under memory pressure. |
window_spill_enabled |
boolean |
true |
When spill_enabled is true, determines whether Window operator can spill to disk under memory pressure. |
row_number_spill_enabled |
boolean |
true |
When spill_enabled is true, determines whether RowNumber operator can spill to disk under memory pressure. |
topn_row_number_spill_enabled |
boolean |
true |
When spill_enabled is true, determines whether TopNRowNumber operator can spill to disk under memory pressure. |
writer_spill_enabled |
boolean |
true |
When writer_spill_enabled is true, determines whether TableWriter operator can flush the buffered data to disk under memory pressure. |
aggregation_spill_memory_threshold |
integer |
0 |
Maximum amount of memory in bytes that a final aggregation can use before spilling. 0 means unlimited. |
join_spill_memory_threshold |
integer |
0 |
Maximum amount of memory in bytes that a hash join build side can use before spilling. 0 means unlimited. |
order_by_spill_memory_threshold |
integer |
0 |
Maximum amount of memory in bytes that an order by can use before spilling. 0 means unlimited. |
writer_flush_threshold_bytes |
integer |
96MB |
Minimum memory footprint size required to reclaim memory from a file writer by flushing its buffered data to disk. |
min_spillable_reservation_pct |
integer |
5 |
The minimal available spillable memory reservation in percentage of the current memory usage. Suppose the current memory usage size of M, available memory reservation size of N and min reservation percentage of P, if M * P / 100 > N, then spiller operator needs to grow the memory reservation with percentage of ‘spillable_reservation_growth_pct’ (see below). This ensures we have sufficient amount of memory reservation to process the large input outlier. |
spillable_reservation_growth_pct |
integer |
10 |
The spillable memory reservation growth percentage of the current memory usage. Suppose a growth percentage of N and the current memory usage size of M, the next memory reservation size will be M * (1 + N / 100). After growing the memory reservation K times, the memory reservation size will be M * (1 + N / 100) ^ K. Hence the memory reservation grows along a series of powers of (1 + N / 100). If the memory reservation fails, it starts spilling. |
max_spill_level |
integer |
1 |
The maximum allowed spilling level with zero being the initial spilling level. Applies to hash join build spilling which might use recursive spilling when the build table is very large. -1 means unlimited. In this case an extremely large query might run out of spilling partition bits. The max spill level can be used to prevent a query from using too much io and cpu resources. |
max_spill_run_rows |
integer |
12582912 |
The max number of rows to fill and spill for each spill run. This is used to cap the memory used for spilling. If it is zero, then there is no limit and spilling might run out of memory. Based on offline test results, the default value is set to 12 million rows which uses ~128MB memory when to fill a spill run. Relation between spill rows and memory usage are as follows:
|
max_spill_file_size |
integer |
0 |
The maximum allowed spill file size. Zero means unlimited. |
max_spill_bytes |
integer |
107374182400 |
The max spill bytes limit set for each query. This is used to cap the storage used for spilling. If it is zero, then there is no limit and spilling might exhaust the storage or takes too long to run. The default value is set to 100 GB. |
spill_write_buffer_size |
integer |
4MB |
The maximum size in bytes to buffer the serialized spill data before write to disk for IO efficiency. If set to zero, buffering is disabled. |
spill_read_buffer_size |
integer |
1MB |
The buffer size in bytes to read from one spilled file. If the underlying filesystem supports async read, we do read-ahead with double buffering, which doubles the buffer used to read from each spill file. |
min_spill_run_size |
integer |
256MB |
The minimum spill run size (bytes) limit used to select partitions for spilling. The spiller tries to spill a previously spilled partitions if its data size exceeds this limit, otherwise it spills the partition with most data. If the limit is zero, then the spiller always spills a previously spilled partition if it has any data. This is to avoid spill from a partition with a small amount of data which might result in generating too many small spilled files. |
spill_compression_codec |
string |
none |
Specifies the compression algorithm type to compress the spilled data before write to disk to trade CPU for IO efficiency. The supported compression codecs are: ZLIB, SNAPPY, LZO, ZSTD, LZ4 and GZIP. NONE means no compression. |
spill_prefixsort_enabled |
bool |
false |
Enable the prefix sort or fallback to timsort in spill. The prefix sort is faster than std::sort but requires the memory to build normalized prefix keys, which might have potential risk of running out of server memory. |
spiller_start_partition_bit |
integer |
29 |
The start partition bit which is used with spiller_partition_bits together to calculate the spilling partition number. |
spiller_num_partition_bits |
integer |
3 |
The number of bits (N) used to calculate the spilling partition number for hash join and RowNumber: 2 ^ N. At the moment the maximum value is 3, meaning we only support up to 8-way spill partitioning.ing. |
testing.spill_pct |
integer |
0 |
Percentage of aggregation or join input batches that will be forced to spill for testing. 0 means no extra spilling. |
Table Scan¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
max_split_preload_per_driver |
integer |
2 |
Maximum number of splits to preload per driver. Set to 0 to disable preloading. |
Table Writer¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
task_writer_count |
integer |
1 |
The number of parallel table writer threads per task. |
task_partitioned_writer_count |
integer |
task_writer_count |
The number of parallel table writer threads per task for partitioned table writes. If not set, use ‘task_writer_count’ as default. |
Hive Connector¶
Hive Connector config is initialized on velox runtime startup and is shared among queries as the default config. Each query can override the config by setting corresponding query session properties such as in Prestissimo.
Configuration Property Name |
Session Property Name |
Type |
Default Value |
Description |
---|---|---|---|---|
hive.max-partitions-per-writers |
integer |
100 |
Maximum number of (bucketed) partitions per a single table writer instance. |
|
insert-existing-partitions-behavior |
insert_existing_partitions_behavior |
string |
ERROR |
Allowed values: |
hive.immutable-partitions |
bool |
false |
True if appending data to an existing unpartitioned table is allowed. Currently this configuration does not support appending to existing partitions. |
|
file-column-names-read-as-lower-case |
bool |
false |
True if reading the source file column names as lower case, and planner should guarantee the input column name and filter is also lower case to achive case-insensitive read. |
|
partition_path_as_lower_case |
bool |
true |
If true, the partition directory will be converted to lowercase when executing a table write operation. |
|
allow-null-partition-keys |
allow_null_partition_keys |
bool |
true |
Determines whether null values for partition keys are allowed or not. If not, fails with “Partition key must not be null” error message when writing data with null partition key. Null check for partitioning key should be used only when partitions are generated dynamically during query execution. For queries that write to fixed partitions, this check should happen much earlier before the Velox execution even starts. |
ignore_missing_files |
bool |
false |
If true, splits that refer to missing files don’t generate errors and are processed as empty splits. |
|
max-coalesced-bytes |
integer |
128MB |
Maximum size in bytes to coalesce requests to be fetched in a single request. |
|
max-coalesced-distance-bytes |
integer |
512KB |
Maximum distance in bytes between chunks to be fetched that may be coalesced into a single request. |
|
load-quantum |
integer |
8MB |
Define the size of each coalesce load request. E.g. in Parquet scan, if it’s bigger than rowgroup size then the whole row group can be fetched together. Otherwise, the row group will be fetched column chunk by column chunk |
|
num-cached-file-handles |
integer |
20000 |
Maximum number of entries in the file handle cache. The value must be non-negative. Zero value indicates infinite cache capacity. |
|
file-handle-cache-enabled |
bool |
true |
Enables caching of file handles if true. Disables caching if false. File handle cache should be disabled if files are not immutable, i.e. file content may change while file path stays the same. |
|
sort-writer-max-output-rows |
sort_writer_max_output_rows |
integer |
1024 |
Maximum number of rows for sort writer in one batch of output. This is to limit the memory usage of sort writer. |
sort-writer-max-output-bytes |
sort_writer_max_output_bytes |
string |
10MB |
Maximum bytes for sort writer in one batch of output. This is to limit the memory usage of sort writer. |
file-preload-threshold |
integer |
8MB |
Usually Velox fetches the meta data firstly then fetch the rest of file. But if the file is very small, Velox can fetch the whole file directly to avoid multiple IO requests. The parameter controls the threshold when whole file is fetched. |
|
footer-estimated-size |
integer |
1MB |
Define the estimation of footer size in ORC and Parquet format. The footer data includes version, schema, and meta data for every columns which may or may not need to be fetched later. The parameter controls the size when footer is fetched each time. Bigger value can decrease the IO requests but may fetch more useless meta data. |
|
hive.orc.writer.stripe-max-size |
orc_optimized_writer_max_stripe_size |
string |
64M |
Maximum stripe size in orc writer. |
hive.orc.writer.dictionary-max-memory |
orc_optimized_writer_max_dictionary_memory |
string |
16M |
Maximum dictionary memory that can be used in orc writer. |
hive.orc.writer.integer-dictionary-encoding-enabled |
orc_optimized_writer_integer_dictionary_encoding_enabled |
bool |
true |
Whether or not dictionary encoding of integer types should be used by the ORC writer. |
hive.orc.writer.string-dictionary-encoding-enabled |
orc_optimized_writer_string_dictionary_encoding_enabled |
bool |
true |
Whether or not dictionary encoding of string types should be used by the ORC writer. |
hive.parquet.writer.timestamp-unit |
hive.parquet.writer.timestamp_unit |
tinyint |
9 |
Timestamp unit used when writing timestamps into Parquet through Arrow bridge. Valid values are 0 (second), 3 (millisecond), 6 (microsecond), 9 (nanosecond). |
hive.orc.writer.linear-stripe-size-heuristics |
orc_writer_linear_stripe_size_heuristics |
bool |
true |
Enables historical based stripe size estimation after compression. |
hive.orc.writer.min-compression-size |
orc_writer_min_compression_size |
integer |
1024 |
Minimal number of items in an encoded stream. |
hive.orc.writer.compression-level |
orc_optimized_writer_compression_level |
tinyint |
3 for ZSTD and 4 for ZLIB |
The compression level to use with ZLIB and ZSTD. |
cache.no_retention |
cache.no_retention |
bool |
false |
If true, evict out a query scanned data out of in-memory cache right after the access, and also skip staging to the ssd cache. This helps to prevent the cache space pollution from the one-time table scan by large batch query when mixed running with interactive query which has high data locality. |
Amazon S3 Configuration
¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
hive.s3.use-instance-credentials |
bool |
false |
Use the EC2 metadata service to retrieve API credentials. This works with IAM roles in EC2. |
hive.s3.aws-access-key |
string |
Default AWS access key to use. |
|
hive.s3.aws-secret-key |
string |
Default AWS secret key to use. |
|
hive.s3.endpoint |
string |
us-east-1 |
The S3 storage endpoint server. This can be used to connect to an S3-compatible storage system instead of AWS. |
hive.s3.path-style-access |
bool |
false |
Use path-style access for all requests to the S3-compatible storage. This is for S3-compatible storage that doesn’t support virtual-hosted-style access. |
hive.s3.ssl.enabled |
bool |
true |
Use HTTPS to communicate with the S3 API. |
hive.s3.log-level |
string |
FATAL |
Allowed values: “OFF”, “FATAL”, “ERROR”, “WARN”, “INFO”, “DEBUG”, “TRACE” Granularity of logging generated by the AWS C++ SDK library. |
hive.s3.iam-role |
string |
IAM role to assume. |
|
hive.s3.iam-role-session-name |
string |
velox-session |
Session name associated with the IAM role. |
hive.s3.use-proxy-from-env |
bool |
false |
Utilize the configuration of the environment variables http_proxy, https_proxy, and no_proxy for use with the S3 API. |
hive.s3.connect-timeout |
string |
Socket connect timeout. |
|
hive.s3.socket-timeout |
string |
Socket read timeout. |
|
hive.s3.max-connections |
integer |
Maximum concurrent TCP connections for a single http client. |
|
hive.s3.max-attempts |
integer |
Maximum attempts for connections to a single http client, work together with retry-mode. By default, it’s 3 for standard/adaptive mode and 10 for legacy mode. |
|
hive.s3.retry-mode |
string |
Allowed values: “standard”, “adaptive”, “legacy”. By default it’s empty, S3 client will be created with RetryStrategy. Legacy mode only enables throttled retry for transient errors. Standard mode is built on top of legacy mode and has throttled retry enabled for throttling errors apart from transient errors. Adaptive retry mode dynamically limits the rate of AWS requests to maximize success rate. |
Bucket Level Configuration¶
All “hive.s3.*” config (except “hive.s3.log-level”) can be set on a per-bucket basis. The bucket-specific option is set by replacing the “hive.s3.” prefix on a config with “hive.s3.bucket.BUCKETNAME.”, where BUCKETNAME is the name of the bucket. e.g. the endpoint for a bucket named “velox” can be specified by the config “hive.s3.bucket.velox.endpoint”. When connecting to a bucket, all options explicitly set will override the base “hive.s3.” values. These semantics are similar to the Apache Hadoop-Aws module.
Google Cloud Storage Configuration
¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
hive.gcs.endpoint |
string |
The GCS storage URI. |
|
hive.gcs.json-key-file-path |
string |
The GCS service account configuration JSON key file. |
|
hive.gcs.max-retry-count |
integer |
The GCS maximum retry counter of transient errors. |
|
hive.gcs.max-retry-time |
string |
The GCS maximum time allowed to retry transient errors. |
Azure Blob Storage Configuration
¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
fs.azure.account.key.<storage-account>.dfs.core.windows.net |
string |
The credentials to access the specific Azure Blob Storage account, replace <storage-account> with the name of your Azure Storage account. This property aligns with how Spark configures Azure account key credentials for accessing Azure storage, by setting this property multiple times with different storage account names, you can access multiple Azure storage accounts. |
Presto-specific Configuration¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
presto.array_agg.ignore_nulls |
bool |
false |
If true, |
Spark-specific Configuration¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
spark.legacy_size_of_null |
bool |
true |
If false, |
spark.bloom_filter.expected_num_items |
integer |
1000000 |
The default number of expected items for the bloom filter in |
spark.bloom_filter.num_bits |
integer |
8388608 |
The default number of bits to use for the bloom filter in |
spark.bloom_filter.max_num_bits |
integer |
4194304 |
The maximum number of bits to use for the bloom filter in |
spark.partition_id |
integer |
The current task’s Spark partition ID. It’s set by the query engine (Spark) prior to task execution. |
|
spark.legacy_date_formatter |
bool |
false |
If true, Simple Date Format is used for time formatting and parsing. Joda date formatter is used by default. Joda date formatter performs strict checking of its input and uses different pattern string. For example, the 2015-07-22 10:00:00 timestamp cannot be parsed if pattern is yyyy-MM-dd because the parser does not consume whole input. Another example is that the ‘W’ pattern, which means week in month, is not supported. For more differences, see #10354. |
Tracing¶
Property Name |
Type |
Default Value |
Description |
---|---|---|---|
query_trace_enabled |
bool |
true |
If true, enable query tracing. |
query_trace_dir |
string |
The root directory to store the tracing data and metadata for a query. |
|
query_trace_node_ids |
string |
A comma-separated list of plan node ids whose input data will be trace. If it is empty, then we only trace the query metadata which includes the query plan and configs etc. |
|
query_trace_task_reg_exp |
string |
The regexp of traced task id. We only enable trace on a task if its id matches. |
|
query_trace_max_bytes |
integer |
0 |
The max trace bytes limit. Tracing is disabled if zero. |