From 8fcc89c6911c78dca411fe02352c7ad60315b656 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Tue, 13 Oct 2020 14:02:43 -0500 Subject: [PATCH 1/2] Updated Modin join benchmark to current state Signed-off-by: Gregory Shimansky --- modin/join-modin.py | 170 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 145 insertions(+), 25 deletions(-) diff --git a/modin/join-modin.py b/modin/join-modin.py index b7c69650..f4db64ca 100755 --- a/modin/join-modin.py +++ b/modin/join-modin.py @@ -1,69 +1,189 @@ #!/usr/bin/env python -print("# join-modin.py") +print("# join-modin.py", flush=True) + +# Uncomment following two lines if your /tmp is not large enough to +# hold all data and specify an alternative place where to hold plasma storage. +# import ray +# ray.init(_plasma_directory="/path/to/large/free/filesystem") import os import gc import timeit +import modin import modin.pandas as pd -exec(open("./helpers.py").read()) - -src_x = os.environ['SRC_X_LOCAL'] -src_y = os.environ['SRC_Y_LOCAL'] +exec(open("./_helpers/helpers.py").read()) -ver = "" #pd.__version__ +ver = modin.__version__ git = "" task = "join" -question = "inner join" -l = [os.path.basename(src_x), os.path.basename(src_y)] -data_name = '-'.join(l) solution = "modin" fun = "merge" cache = "TRUE" +on_disk = "FALSE" -print("loading datasets...") +data_name = os.environ['SRC_JN_LOCAL'] +src_jn_x = os.path.join("data", data_name + ".csv") +y_data_name = join_to_tbls(data_name) +src_jn_y = [os.path.join("data", y_data_name[0]+".csv"), os.path.join("data", y_data_name[1]+".csv"), os.path.join("data", y_data_name[2]+".csv")] +if len(src_jn_y) != 3: + raise Exception("Something went wrong in preparing files used for join") -x = pd.read_csv(os.path.basename(src_x)) -y = pd.read_csv(os.path.basename(src_y)) +print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True) +from datatable import fread # for loading data only, see #47 +x = pd.DataFrame(fread(src_jn_x).to_pandas()) # convert Pandas DataFrame into Modin DataFrame +x['id4'] = x['id4'].astype('category') # remove after datatable#1691 +x['id5'] = x['id5'].astype('category') +x['id6'] = x['id6'].astype('category') +small = pd.DataFrame(fread(src_jn_y[0]).to_pandas()) # convert Pandas DataFrame into Modin DataFrame +small['id4'] = small['id4'].astype('category') +medium = pd.DataFrame(fread(src_jn_y[1]).to_pandas()) # convert Pandas DataFrame into Modin DataFrame +medium['id4'] = medium['id4'].astype('category') +medium['id5'] = medium['id5'].astype('category') +big = pd.DataFrame(fread(src_jn_y[2]).to_pandas()) # convert Pandas DataFrame into Modin DataFrame +big['id4'] = big['id4'].astype('category') +big['id5'] = big['id5'].astype('category') +big['id6'] = big['id6'].astype('category') +print(len(x.index), flush=True) +print(len(small.index), flush=True) +print(len(medium.index), flush=True) +print(len(big.index), flush=True) + +task_init = timeit.default_timer() print("joining...") -# NotImplementedError: To contribute to Pandas on Ray, please visit github.com/modin-project/modin +question = "small inner on int" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(small, on='id1') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans gc.collect() t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) +ans = x.merge(small, on='id1') +print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] +chk = [ans['v1'].sum(), ans['v2'].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) del ans +question = "medium inner on int" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(medium, on='id2') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans gc.collect() t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) +ans = x.merge(medium, on='id2') +print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] +chk = [ans['v1'].sum(), ans['v2'].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) del ans +question = "medium outer on int" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(medium, how='left', on='id2') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans gc.collect() t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) +ans = x.merge(medium, how='left', on='id2') +print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] +chk = [ans['v1'].sum(), ans['v2'].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=3, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) del ans +question = "medium inner on factor" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(medium, on='id5') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(medium, on='id5') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "big inner on int" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(big, on='id3') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(big, on='id3') +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v1'].sum(), ans['v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +print("joining finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) + exit(0) From 5a2df38e73a12966582a7f765257afe16732c6bf Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Tue, 13 Oct 2020 14:20:03 -0500 Subject: [PATCH 2/2] Replaced datatable with Modin's parallel read_csv Signed-off-by: Gregory Shimansky --- modin/join-modin.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/modin/join-modin.py b/modin/join-modin.py index f4db64ca..6da71382 100755 --- a/modin/join-modin.py +++ b/modin/join-modin.py @@ -32,17 +32,16 @@ print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True) -from datatable import fread # for loading data only, see #47 -x = pd.DataFrame(fread(src_jn_x).to_pandas()) # convert Pandas DataFrame into Modin DataFrame -x['id4'] = x['id4'].astype('category') # remove after datatable#1691 +x = pd.read_csv(src_jn_x) +x['id4'] = x['id4'].astype('category') x['id5'] = x['id5'].astype('category') x['id6'] = x['id6'].astype('category') -small = pd.DataFrame(fread(src_jn_y[0]).to_pandas()) # convert Pandas DataFrame into Modin DataFrame +small = pd.read_csv(src_jn_y[0]) small['id4'] = small['id4'].astype('category') -medium = pd.DataFrame(fread(src_jn_y[1]).to_pandas()) # convert Pandas DataFrame into Modin DataFrame +medium = pd.read_csv(src_jn_y[1]) medium['id4'] = medium['id4'].astype('category') medium['id5'] = medium['id5'].astype('category') -big = pd.DataFrame(fread(src_jn_y[2]).to_pandas()) # convert Pandas DataFrame into Modin DataFrame +big = pd.read_csv(src_jn_y[2]) big['id4'] = big['id4'].astype('category') big['id5'] = big['id5'].astype('category') big['id6'] = big['id6'].astype('category')