Skip to content

Commit be0b303

Browse files
committed
[IMP] orm: add optional parallelism to iter_browse.create()
Like the same support added to `__attr__` in the parent commit, this can only be used by callers when it is known that database modifications will be distinct, not causing concurrency issues or side-effects on the results. `create` returns an `iter_browse` object for the caller to browse created records. To support vast amounts of created records in multiprocessing strategy, we process values in a generator and initialize the returned `iter_browse` object with it. As this requires the caller of `create` to always consume/iterate the result (otherwise records will not be created), it is not applied to the other strategies as it would break existing API.
1 parent b8ca99d commit be0b303

File tree

1 file changed

+46
-13
lines changed

1 file changed

+46
-13
lines changed

src/util/orm.py

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,12 @@ def _mp_iter_browse_cb(ids_or_values, params):
367367
getattr(
368368
me.env[params["model_name"]].with_context(params["context"]).browse(ids_or_values), params["attr_name"]
369369
)(*params["args"], **params["kwargs"])
370+
if params["mode"] == "create":
371+
new_ids = me.env[params["model_name"]].with_context(params["context"]).create(ids_or_values).ids
370372
me.env.cr.commit()
373+
if params["mode"] == "create":
374+
return new_ids
375+
return None
371376

372377

373378
class iter_browse(object):
@@ -437,14 +442,14 @@ def __init__(self, model, *args, **kw):
437442
self._ids = args[-1]
438443
self._size = kw.pop("size", None)
439444
self._chunk_size = kw.pop("chunk_size", 200) # keyword-only argument
445+
self._task_size = self._chunk_size
440446
self._logger = kw.pop("logger", _logger)
441447
self._strategy = kw.pop("strategy", "flush")
442448
assert self._strategy in {"flush", "commit", "multiprocessing"}
443449
if self._strategy == "multiprocessing":
444450
if not ProcessPoolExecutor:
445451
raise ValueError("multiprocessing strategy can not be used in scripts run by python2")
446452
if UPG_PARALLEL_ITER_BROWSE:
447-
self._task_size = self._chunk_size
448453
self._chunk_size = min(get_max_workers() * 10 * self._task_size, 1000000)
449454
else:
450455
self._strategy = "commit" # downgrade
@@ -593,30 +598,58 @@ def create(self, values, **kw):
593598
if self._size:
594599
raise ValueError("`create` can only called on empty `browse_record` objects.")
595600

596-
ids = []
601+
if self._strategy == "multiprocessing" and not multi:
602+
raise ValueError("The multiprocessing strategy only supports the multi version of `create`")
603+
597604
size = len(values)
598605
it = chunks(values, self._chunk_size, fmt=list)
599606
if self._logger:
600607
sz = (size + self._chunk_size - 1) // self._chunk_size
601608
qualifier = "env[%r].create([:%d])" % (self._model._name, self._chunk_size)
602609
it = log_progress(it, self._logger, qualifier=qualifier, size=sz)
603610

604-
self._patch = no_selection_cache_validation()
605-
for sub_values in it:
611+
def mp_create():
612+
params = {
613+
"dbname": self._model.env.cr.dbname,
614+
"model_name": self._model._name,
615+
# convert to dict for pickle. Will still break if any value in the context is not pickleable
616+
"context": dict(self._model.env.context),
617+
"mode": "create",
618+
}
619+
self._model.env.cr.commit()
606620
self._patch.start()
621+
extrakwargs = {"mp_context": multiprocessing.get_context("fork")} if sys.version_info >= (3, 7) else {}
622+
with ProcessPoolExecutor(max_workers=get_max_workers(), **extrakwargs) as executor:
623+
for sub_values in it:
624+
for task_result in executor.map(
625+
_mp_iter_browse_cb, chunks(sub_values, self._task_size, fmt=tuple), repeat(params)
626+
):
627+
self._model.env.cr.commit() # make task_result visible on main cursor before yielding ids
628+
for new_id in task_result:
629+
yield new_id
630+
next(self._end(), None)
607631

608-
if multi:
609-
ids += self._model.create(sub_values).ids
610-
elif not self._cr_uid:
611-
ids += [self._model.create(sub_value).id for sub_value in sub_values]
612-
else:
613-
# old API, `create` directly return the id
614-
ids += [self._model.create(*(self._cr_uid + (sub_value,))) for sub_value in sub_values]
632+
self._patch = no_selection_cache_validation()
633+
if self._strategy == "multiprocessing":
634+
ids = mp_create()
635+
else:
636+
ids = []
637+
for sub_values in it:
638+
self._patch.start()
639+
640+
if multi:
641+
ids += self._model.create(sub_values).ids
642+
elif not self._cr_uid:
643+
ids += [self._model.create(sub_value).id for sub_value in sub_values]
644+
else:
645+
# old API, `create` directly return the id
646+
ids += [self._model.create(*(self._cr_uid + (sub_value,))) for sub_value in sub_values]
647+
648+
next(self._end(), None)
615649

616-
next(self._end(), None)
617650
args = self._cr_uid + (ids,)
618651
return iter_browse(
619-
self._model, *args, chunk_size=self._chunk_size, logger=self._logger, strategy=self._strategy
652+
self._model, *args, size=size, chunk_size=self._task_size, logger=self._logger, strategy=self._strategy
620653
)
621654

622655

0 commit comments

Comments
 (0)