Skip to content

Commit f07edab

Browse files
committed
cli argument for thread-count
1 parent bba8f97 commit f07edab

File tree

5 files changed

+20
-8
lines changed

5 files changed

+20
-8
lines changed

h5pyd/_apps/hsload.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def main():
124124
cfg.setitem("ignorefilters", False, flags=["--ignore-filters"], help="ignore any filters used by source dataset")
125125
cfg.setitem("retries", 3, flags=["--retries",], choices=["N",], help="Set number of server retry attempts")
126126
cfg.setitem("no_checks", False, flags=["--no-checks"], help="do not check for existence before creating resources")
127+
cfg.setitem("thread_count", 30, flags=["--thread-count"], choices=["N",] ,help="The number of threads to allocate when making requests in parallel, defaults to 30")
127128
cfg.setitem("help", False, flags=["-h", "--help"], help="this message")
128129

129130
try:
@@ -259,6 +260,7 @@ def main():
259260
"bucket": cfg["hs_bucket"],
260261
"mode": mode,
261262
"retries": int(cfg["retries"]),
263+
"thread_count": int(cfg["thread_count"]),
262264
}
263265

264266
fout = h5pyd.File(tgt, **kwargs)
@@ -325,6 +327,7 @@ def main():
325327
"ignore_error": cfg["ignore_error"],
326328
"no_clobber": no_clobber,
327329
"no_checks": cfg["no_checks"],
330+
"thread_count": int(cfg["thread_count"]),
328331
}
329332
load_file(fin, fout, **kwargs)
330333

h5pyd/_apps/utillib.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@ def create_dataset(dobj, ctx):
11701170
print(msg)
11711171
fout = ctx["fout"]
11721172

1173-
if not ctx["no-checks"] and dobj.name in fout:
1173+
if not ctx["no_checks"] and dobj.name in fout:
11741174
dset = fout[dobj.name]
11751175
logging.debug(f"{dobj.name} already exists")
11761176
if ctx["no_clobber"]:
@@ -1688,7 +1688,7 @@ def create_group(gobj, ctx):
16881688

16891689
grp = None
16901690

1691-
if not ctx["no-checks"] and gobj.name in fout:
1691+
if not ctx["no_checks"] and gobj.name in fout:
16921692
grp = fout[gobj.name]
16931693
logging.debug(f"{gobj.name} already exists")
16941694
if ctx["no_clobber"]:
@@ -1709,7 +1709,7 @@ def create_group(gobj, ctx):
17091709
if not ctx["ignore_error"]:
17101710
raise IOError(msg)
17111711
else:
1712-
if not ctx["no-checks"] and ctx["verbose"]:
1712+
if not ctx["no_checks"] and ctx["verbose"]:
17131713
print(f"{gobj.name} not found")
17141714

17151715
grp = fout.create_group(gobj.name)
@@ -1800,7 +1800,8 @@ def load_file(
18001800
extend_dim=None,
18011801
extend_offset=0,
18021802
ignore_error=False,
1803-
no_checks=False
1803+
no_checks=False,
1804+
thread_count=30,
18041805
):
18051806

18061807
logging.info(f"input file: {fin.filename}")
@@ -1836,7 +1837,8 @@ def load_file(
18361837
ctx["extend_offset"] = extend_offset
18371838
ctx["srcid_desobj_map"] = {}
18381839
ctx["ignore_error"] = ignore_error
1839-
ctx["no-checks"] = no_checks
1840+
ctx["no_checks"] = no_checks
1841+
ctx["thread_count"] = thread_count
18401842

18411843
def copy_attribute_helper(name, obj):
18421844
logging.info(f"copy attribute - name: {name} obj: {obj.name}")
@@ -1885,7 +1887,7 @@ def _visit_in_parallell(func):
18851887
def _add_to_jobs(name, obj):
18861888
jobs.append((name, obj))
18871889

1888-
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
1890+
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as executor:
18891891
fin.visititems(_add_to_jobs)
18901892
futures = [executor.submit(func, item[0], item[1]) for item in jobs]
18911893

h5pyd/_hl/files.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ def __init__(
279279
track_order=None,
280280
retries=10,
281281
timeout=180,
282+
thread_count=30,
282283
**kwds,
283284
):
284285
"""Create a new file object.
@@ -419,6 +420,7 @@ def __init__(
419420
logger=logger,
420421
retries=retries,
421422
timeout=timeout,
423+
thread_count=thread_count,
422424
)
423425

424426
root_json = None

h5pyd/_hl/folders.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def __init__(
8383
owner=None,
8484
batch_size=1000,
8585
retries=3,
86+
thread_count=30,
8687
**kwds,
8788
):
8889
"""Create a new Folders object.
@@ -179,6 +180,7 @@ def __init__(
179180
mode=mode,
180181
logger=logger,
181182
retries=retries,
183+
thread_count=thread_count
182184
)
183185
self.log = self._http_conn.logging
184186

h5pyd/_hl/httpconn.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def __init__(
166166
logger=None,
167167
retries=3,
168168
timeout=DEFAULT_TIMEOUT,
169+
thread_count=30,
169170
**kwds,
170171
):
171172
self._domain = domain_name
@@ -179,6 +180,8 @@ def __init__(
179180
self._api_key = api_key
180181
self._s = None # Sessions
181182
self._server_info = None
183+
self._thread_count = thread_count
184+
182185
if use_cache:
183186
self._cache = {}
184187
self._objdb = {}
@@ -739,11 +742,11 @@ def session(self):
739742

740743
s.mount(
741744
"http://",
742-
HTTPAdapter(max_retries=retry, pool_connections=30, pool_maxsize=30),
745+
HTTPAdapter(max_retries=retry, pool_connections=self._thread_count, pool_maxsize=self._thread_count),
743746
)
744747
s.mount(
745748
"https://",
746-
HTTPAdapter(max_retries=retry, pool_connections=30, pool_maxsize=30),
749+
HTTPAdapter(max_retries=retry, pool_connections=self._thread_count, pool_maxsize=self._thread_count),
747750
)
748751
self._s = s
749752
else:

0 commit comments

Comments
 (0)