diff --git a/.gitignore b/.gitignore index 3f072cfb..c77f2bc9 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,5 @@ run.out clickhouse/etc_sudoers.bak workdir/ timeout-exit-codes.out +*/target +*.lock diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml new file mode 100644 index 00000000..16e3632c --- /dev/null +++ b/datafusion/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "db-benchmark" +version = "0.1.0" +edition = "2018" + +[dependencies] +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", features = ["simd"]} +tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] } +snmalloc-rs = {version = "0.2", features= ["cache-friendly"]} +num_cpus = "1.0" + +[profile.release] +lto = true +codegen-units = 1 diff --git a/datafusion/exec.sh b/datafusion/exec.sh new file mode 100755 index 00000000..649afefa --- /dev/null +++ b/datafusion/exec.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +RUSTFLAGS='-C target-cpu=native' cargo +nightly run --release \ No newline at end of file diff --git a/datafusion/setup-datafusion.sh b/datafusion/setup-datafusion.sh new file mode 100755 index 00000000..c8422bc4 --- /dev/null +++ b/datafusion/setup-datafusion.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -e + +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs new file mode 100644 index 00000000..949fddee --- /dev/null +++ b/datafusion/src/main.rs @@ -0,0 +1,86 @@ +use datafusion::{arrow::datatypes::{DataType, Field, Schema}, datasource::{CsvFile, MemTable}}; +use datafusion::error::Result; +use datafusion::prelude::*; +use std::{env, sync::Arc}; +use std::time::Instant; + +#[global_allocator] +static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; + +async fn exec_query(ctx: &mut ExecutionContext, query: &str, name: &str) -> Result<()> { + let start = Instant::now(); + + let ans = ctx.sql(query)?.collect().await?; + + // TODO: print details + + println!("{} took {} ms", name, start.elapsed().as_millis()); + + Ok(()) +} +#[tokio::main] +async fn main() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let data = format!("../data/{}.csv", env::var("SRC_DATANAME").unwrap()); + + let schema = Schema::new(vec![ + Field::new("id1", DataType::Utf8, false), + Field::new("id2", DataType::Utf8, false), + Field::new("id3", DataType::Utf8, false), + Field::new("id4", DataType::Int32, false), + Field::new("id5", DataType::Int32, false), + Field::new("id6", DataType::Int32, false), + Field::new("v1", DataType::Int32, false), + Field::new("v2", DataType::Int32, false), + Field::new("v3", DataType::Float64, false), + ]); + let options = CsvReadOptions::new().schema(&schema).has_header(true); + + let csv = CsvFile::try_new(&data, options).unwrap(); + let batch_size = 65536; + let partition_size = num_cpus::get(); + + let memtable = MemTable::load(Arc::new(csv), batch_size, Some(partition_size)).await?; + ctx.register_table("tbl", Arc::new(memtable)); + + exec_query( + &mut ctx, + "SELECT id1, SUM(v1) AS v1 FROM tbl GROUP BY id1", + "q1", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id1, id2, SUM(v1) AS v1 FROM tbl GROUP BY id1, id2", + "q2", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM tbl GROUP BY id3", + "q3", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM tbl GROUP BY id4", + "q4", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM tbl GROUP BY id6", + "q5", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM tbl GROUP BY id3", + "q7", + ) + .await?; + + exec_query(&mut ctx, "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM tbl GROUP BY id1, id2, id3, id4, id5, id6", "q10").await?; + + Ok(()) +} diff --git a/datafusion/upg-datafusion.sh b/datafusion/upg-datafusion.sh new file mode 100755 index 00000000..c0bb8469 --- /dev/null +++ b/datafusion/upg-datafusion.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +cd datafusion + +cargo update + +cd ../ \ No newline at end of file diff --git a/datafusion/ver-datafusion.sh b/datafusion/ver-datafusion.sh new file mode 100755 index 00000000..05e4f20c --- /dev/null +++ b/datafusion/ver-datafusion.sh @@ -0,0 +1 @@ +cargo tree | grep "├── datafusion" | cut -d ' ' -f3 \ No newline at end of file diff --git a/run.conf b/run.conf index de58a433..37d08736 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars" +export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars datafusion" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=true diff --git a/run.sh b/run.sh index c1c45e07..e7c30c02 100755 --- a/run.sh +++ b/run.sh @@ -62,6 +62,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/upg-h2o. if [[ "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/ver-h2o.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/upg-polars.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/ver-polars.sh; fi; +if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/upg-datafusion.sh; fi; +if [[ "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/ver-datafusion.sh; fi; # run if [[ -f ./stop ]]; then echo "# Benchmark run $BATCH has been interrupted after $(($(date +%s)-$BATCH))s due to 'stop' file" && rm -f ./stop && rm -f ./run.lock && exit; fi;