Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ run.out
clickhouse/etc_sudoers.bak
workdir/
timeout-exit-codes.out
*/target
*.lock
14 changes: 14 additions & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "db-benchmark"
version = "0.1.0"
edition = "2018"

[dependencies]
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", features = ["simd"]}
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
snmalloc-rs = {version = "0.2", features= ["cache-friendly"]}
num_cpus = "1.0"

[profile.release]
lto = true
codegen-units = 1
4 changes: 4 additions & 0 deletions datafusion/exec.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

RUSTFLAGS='-C target-cpu=native' cargo +nightly run --release
5 changes: 5 additions & 0 deletions datafusion/setup-datafusion.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
set -e

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y

86 changes: 86 additions & 0 deletions datafusion/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use datafusion::{arrow::datatypes::{DataType, Field, Schema}, datasource::{CsvFile, MemTable}};
use datafusion::error::Result;
use datafusion::prelude::*;
use std::{env, sync::Arc};
use std::time::Instant;

#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;

async fn exec_query(ctx: &mut ExecutionContext, query: &str, name: &str) -> Result<()> {
let start = Instant::now();

let ans = ctx.sql(query)?.collect().await?;

// TODO: print details

println!("{} took {} ms", name, start.elapsed().as_millis());

Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let mut ctx = ExecutionContext::new();
let data = format!("../data/{}.csv", env::var("SRC_DATANAME").unwrap());

let schema = Schema::new(vec![
Field::new("id1", DataType::Utf8, false),
Field::new("id2", DataType::Utf8, false),
Field::new("id3", DataType::Utf8, false),
Field::new("id4", DataType::Int32, false),
Field::new("id5", DataType::Int32, false),
Field::new("id6", DataType::Int32, false),
Field::new("v1", DataType::Int32, false),
Field::new("v2", DataType::Int32, false),
Field::new("v3", DataType::Float64, false),
]);
let options = CsvReadOptions::new().schema(&schema).has_header(true);

let csv = CsvFile::try_new(&data, options).unwrap();
let batch_size = 65536;
let partition_size = num_cpus::get();

let memtable = MemTable::load(Arc::new(csv), batch_size, Some(partition_size)).await?;
ctx.register_table("tbl", Arc::new(memtable));

exec_query(
&mut ctx,
"SELECT id1, SUM(v1) AS v1 FROM tbl GROUP BY id1",
"q1",
)
.await?;
exec_query(
&mut ctx,
"SELECT id1, id2, SUM(v1) AS v1 FROM tbl GROUP BY id1, id2",
"q2",
)
.await?;
exec_query(
&mut ctx,
"SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM tbl GROUP BY id3",
"q3",
)
.await?;
exec_query(
&mut ctx,
"SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM tbl GROUP BY id4",
"q4",
)
.await?;
exec_query(
&mut ctx,
"SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM tbl GROUP BY id6",
"q5",
)
.await?;
exec_query(
&mut ctx,
"SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM tbl GROUP BY id3",
"q7",
)
.await?;

exec_query(&mut ctx, "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM tbl GROUP BY id1, id2, id3, id4, id5, id6", "q10").await?;

Ok(())
}
8 changes: 8 additions & 0 deletions datafusion/upg-datafusion.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash
set -e

cd datafusion

cargo update

cd ../
1 change: 1 addition & 0 deletions datafusion/ver-datafusion.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cargo tree | grep "├── datafusion" | cut -d ' ' -f3
2 changes: 1 addition & 1 deletion run.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# task, used in init-setup-iteration.R
export RUN_TASKS="groupby join"
# solution, used in init-setup-iteration.R
export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars"
export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars datafusion"

# flag to upgrade tools, used in run.sh on init
export DO_UPGRADE=true
Expand Down
2 changes: 2 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/upg-h2o.
if [[ "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/ver-h2o.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/upg-polars.sh; fi;
if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/ver-polars.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/upg-datafusion.sh; fi;
if [[ "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/ver-datafusion.sh; fi;

# run
if [[ -f ./stop ]]; then echo "# Benchmark run $BATCH has been interrupted after $(($(date +%s)-$BATCH))s due to 'stop' file" && rm -f ./stop && rm -f ./run.lock && exit; fi;
Expand Down