Skip to content

Test: Validate memory limit for sort queries to extended test #14142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion .github/workflows/extended.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,42 @@ on:
- main

jobs:
# Check crate compiles and base cargo check passes
linux-build-lib:
name: linux build test
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Prepare cargo build
run: cargo check --profile ci --all-targets

# Run extended tests (with feature 'extended_tests')
linux-test-extended:
name: cargo test (amd64)
needs: linux-build-lib
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Run tests (excluding doctests)
run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,extended_tests
- name: Verify Working Directory Clean
run: git diff --exit-code

# Check answers are correct when hash values collide
hash-collisions:
name: cargo test hash collisions (amd64)
Expand All @@ -51,7 +87,8 @@ jobs:
- name: Run tests
run: |
cd datafusion
cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib --tests --features=force_hash_collisions,avro
cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib --tests --features=force_hash_collisions,avro,extended_tests

sqllogictest-sqlite:
name: "Run sqllogictests with the sqlite test suite"
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ unicode_expressions = [
"datafusion-sql/unicode_expressions",
"datafusion-functions/unicode_expressions",
]
extended_tests = []

[dependencies]
apache-avro = { version = "0.17", optional = true }
Expand Down Expand Up @@ -150,6 +151,7 @@ rand_distr = "0.4.3"
regex = { workspace = true }
rstest = { workspace = true }
serde_json = { workspace = true }
sysinfo = "0.33.1"
test-utils = { path = "../../test-utils" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] }

Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/tests/memory_limit/memory_limit_validation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Validates query's actual memory usage is consistent with the specified memory
//! limit.

mod sort_mem_validation;
mod utils;
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Memory limit validation tests for the sort queries
//!
//! These tests must run in separate processes to accurately measure memory usage.
//! This file is organized as:
//! - Test runners that spawn individual test processes
//! - Test cases that contain the actual validation logic
use std::{process::Command, str};

use log::info;

use crate::memory_limit::memory_limit_validation::utils;

// ===========================================================================
// Test runners:
// Runners are splitted into multiple tests to run in parallel
// ===========================================================================

#[test]
fn memory_limit_validation_runner_works_runner() {
spawn_test_process("memory_limit_validation_runner_works");
}

#[test]
fn sort_no_mem_limit_runner() {
spawn_test_process("sort_no_mem_limit");
}

#[test]
fn sort_with_mem_limit_1_runner() {
spawn_test_process("sort_with_mem_limit_1");
}

#[test]
fn sort_with_mem_limit_2_runner() {
spawn_test_process("sort_with_mem_limit_2");
}

#[test]
fn sort_with_mem_limit_3_runner() {
spawn_test_process("sort_with_mem_limit_3");
}

#[test]
fn sort_with_mem_limit_2_cols_1_runner() {
spawn_test_process("sort_with_mem_limit_2_cols_1");
}

#[test]
fn sort_with_mem_limit_2_cols_2_runner() {
spawn_test_process("sort_with_mem_limit_2_cols_2");
}

/// Helper function that executes a test in a separate process with the required environment
/// variable set. Memory limit validation tasks need to measure memory resident set
/// size (RSS), so they must run in a separate process.
fn spawn_test_process(test: &str) {
let test_path = format!(
"memory_limit::memory_limit_validation::sort_mem_validation::{}",
test
);
info!("Running test: {}", test_path);

// Run the test command
let output = Command::new("cargo")
.arg("test")
.arg("--package")
.arg("datafusion")
.arg("--test")
.arg("core_integration")
.arg("--features")
.arg("extended_tests")
.arg("--")
.arg(&test_path)
.arg("--exact")
.arg("--nocapture")
.env("DATAFUSION_TEST_MEM_LIMIT_VALIDATION", "1")
.output()
.expect("Failed to execute test command");

// Convert output to strings
let stdout = str::from_utf8(&output.stdout).unwrap_or("");
let stderr = str::from_utf8(&output.stderr).unwrap_or("");

info!("{}", stdout);

assert!(
output.status.success(),
"Test '{}' failed with status: {}\nstdout:\n{}\nstderr:\n{}",
test,
output.status,
stdout,
stderr
);
}

// ===========================================================================
// Test cases:
// All following tests need to be run through their individual test wrapper.
// When run directly, environment variable `DATAFUSION_TEST_MEM_LIMIT_VALIDATION`
// is not set, test will return with a no-op.
//
// If some tests consistently fail, suppress by setting a larger expected memory
// usage (e.g. 80_000_000 * 3 -> 80_000_000 * 4)
// ===========================================================================

/// Test runner itself: if memory limit violated, test should fail.
#[tokio::test]
async fn memory_limit_validation_runner_works() {
if std::env::var("DATAFUSION_TEST_MEM_LIMIT_VALIDATION").is_err() {
println!("Skipping test because DATAFUSION_TEST_MEM_LIMIT_VALIDATION is not set");

return;
}

let result = std::panic::catch_unwind(|| {
tokio::runtime::Runtime::new().unwrap().block_on(async {
utils::validate_query_with_memory_limits(
20_000_000, // set an impossible limit: query requires at least 80MB
None,
"select * from generate_series(1,10000000) as t1(c1) order by c1",
"select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data
)
.await;
})
});

assert!(
result.is_err(),
"Expected the query to panic due to memory limit"
);
}

#[tokio::test]
async fn sort_no_mem_limit() {
utils::validate_query_with_memory_limits(
80_000_000 * 3,
None,
"select * from generate_series(1,10000000) as t1(c1) order by c1",
"select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data
)
.await;
}

#[tokio::test]
async fn sort_with_mem_limit_1() {
utils::validate_query_with_memory_limits(
40_000_000 * 5,
Some(40_000_000),
"select * from generate_series(1,10000000) as t1(c1) order by c1",
"select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data
)
.await;
}

#[tokio::test]
async fn sort_with_mem_limit_2() {
utils::validate_query_with_memory_limits(
80_000_000 * 3,
Some(80_000_000),
"select * from generate_series(1,10000000) as t1(c1) order by c1",
"select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data
)
.await;
}

#[tokio::test]
async fn sort_with_mem_limit_3() {
utils::validate_query_with_memory_limits(
80_000_000 * 3,
Some(80_000_000 * 10), // mem limit is large enough so that no spill happens
"select * from generate_series(1,10000000) as t1(c1) order by c1",
"select * from generate_series(1,1000000) as t1(c1) order by c1", // Baseline query with ~10% of data
)
.await;
}

#[tokio::test]
async fn sort_with_mem_limit_2_cols_1() {
let memory_usage_in_theory = 80_000_000 * 2; // 2 columns
let expected_max_mem_usage = memory_usage_in_theory * 4;
utils::validate_query_with_memory_limits(
expected_max_mem_usage,
None,
"select c1, c1 as c2 from generate_series(1,10000000) as t1(c1) order by c2 DESC, c1 ASC NULLS LAST",
"select c1, c1 as c2 from generate_series(1,1000000) as t1(c1) order by c2 DESC, c1 ASC NULLS LAST", // Baseline query with ~10% of data
)
.await;
}

// TODO: Query fails, fix it
// Issue: https://github.com/apache/datafusion/issues/14143
#[ignore]
#[tokio::test]
async fn sort_with_mem_limit_2_cols_2() {
let memory_usage_in_theory = 80_000_000 * 2; // 2 columns
let expected_max_mem_usage = memory_usage_in_theory * 3;
let mem_limit = memory_usage_in_theory as f64 * 0.5;

utils::validate_query_with_memory_limits(
expected_max_mem_usage,
Some(mem_limit as i64),
"select c1, c1 as c2 from generate_series(1,10000000) as t1(c1) order by c2 DESC, c1 ASC NULLS LAST",
"select c1, c1 as c2 from generate_series(1,1000000) as t1(c1) order by c2 DESC, c1 ASC NULLS LAST", // Baseline query with ~10% of data
)
.await;
}
Loading
Loading