Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM compss/compss-tutorial:latest
FROM compss/compss-tutorial:3.3.3
MAINTAINER COMPSs Support <[email protected]>

COPY . dislib/
Expand All @@ -14,7 +14,8 @@ RUN apt-get -o Acquire::Check-Valid-Until=false -o Acquire::Check-Date=false upd
git clone https://github.com/Blosc/python-blosc2/ /python-blosc2 && cd /python-blosc2 && git checkout v2.5.1 && \
python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade -r /python-blosc2/requirements-build.txt && \
python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade -r /python-blosc2/requirements-runtime.txt && \
cd /python-blosc2 && git submodule update --init --recursive && python3 setup.py build_ext --inplace -- -DDEACTIVATE_AVX2:STRING=ON && \
python3 -m pip install "cmake==3.31.2" && \
cd /python-blosc2 && git submodule update --init --recursive && export CXXFLAGS="-Wno-error=maybe-uninitialized" && python3 setup.py build_ext --inplace -- -DDEACTIVATE_AVX2:STRING=ON && \
git clone --recurse-submodules https://github.com/deephealthproject/pyeddl.git /pyeddl && cd /pyeddl && \
cd third_party/eddl && mkdir build && cd build && cmake .. -D CMAKE_INSTALL_PREFIX=/pyeddl/third_party/eddl -D BUILD_TARGET=CPU -D BUILD_SHARED_LIB=ON -D BUILD_SUPERBUILD=ON -D BUILD_PROTOBUF=ON -D BUILD_TESTS=OFF && \
make && make install && cd ../.. && \
Expand Down
87 changes: 86 additions & 1 deletion dislib/pytorch/encapsulated_functions_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, num_workers=10):
self.num_workers = num_workers

def build(self, net, optimizer, loss, optimizer_parameters,
num_gpu=0, num_nodes=0):
scheduler=None, T_max=1, eta_min=0.0, num_gpu=0, num_nodes=0):
"""
Builds the model to obtain the initial parameters of the training
and it also builds the model in each worker in order to be ready
Expand Down Expand Up @@ -114,9 +114,17 @@ def build(self, net, optimizer, loss, optimizer_parameters,
copy.deepcopy(optimizer),
optimizer_parameters)

self.optimizer_parameters = optimizer_parameters
self.num_gpu = num_gpu
self.num_gpus_per_worker = int(num_nodes*num_gpu/self.num_workers)
self.model_parameters = net
self.optimizer = optimizer(self.model_parameters.parameters(),
**optimizer_parameters)
if scheduler is not None:
self.scheduler = scheduler(self.optimizer,
T_max=T_max, eta_min=eta_min)
else:
self.scheduler = None

def get_parameters(self):
"""
Expand Down Expand Up @@ -181,6 +189,17 @@ def fit_synchronous_shuffle_every_n_epochs_with_GPU(self, x_train,
parameters_for_workers = [copy.deepcopy(self.model_parameters)
for _ in
range(len(parameters_for_workers))]
if self.scheduler is not None:
self.scheduler.step()
self.optimizer_parameters = {}
self.optimizer_parameters["lr"] = \
self.optimizer.param_groups[0]["lr"]
if "momentum" in self.optimizer.param_groups[0]:
self.optimizer_parameters["momentum"] = \
self.optimizer.param_groups[0]["momentum"]
if "weight_decay" in self.optimizer.param_groups[0]:
self.optimizer_parameters["weight_decay"] = \
self.optimizer.param_groups[0]["weight_decay"]
parameters_for_workers = compss_wait_on(parameters_for_workers)
self.model_parameters = pt_aggregateParameters(
parameters_for_workers)
Expand Down Expand Up @@ -249,6 +268,17 @@ def fit_synchronous_every_n_epochs_with_GPU(self, x_train, y_train,
parameters_for_workers = [
copy.deepcopy(self.model_parameters) for _
in range(len(parameters_for_workers))]
if self.scheduler is not None:
self.scheduler.step()
self.optimizer_parameters = {}
self.optimizer_parameters["lr"] = \
self.optimizer.param_groups[0]["lr"]
if "momentum" in self.optimizer.param_groups[0]:
self.optimizer_parameters["momentum"] = \
self.optimizer.param_groups[0]["momentum"]
if "weight_decay" in self.optimizer.param_groups[0]:
self.optimizer_parameters["weight_decay"] = \
self.optimizer.param_groups[0]["weight_decay"]
parameters_for_workers = compss_wait_on(parameters_for_workers)
self.model_parameters = pt_aggregateParameters(
parameters_for_workers)
Expand Down Expand Up @@ -305,6 +335,17 @@ def fit_synchronous_with_GPU(self, x_train, y_train,
j = j + 1
if j == self.num_workers:
j = 0
if self.scheduler is not None:
self.scheduler.step()
self.optimizer_parameters = {}
self.optimizer_parameters["lr"] = \
self.optimizer.param_groups[0]["lr"]
if "momentum" in self.optimizer.param_groups[0]:
self.optimizer_parameters["momentum"] = \
self.optimizer.param_groups[0]["momentum"]
if "weight_decay" in self.optimizer.param_groups[0]:
self.optimizer_parameters["weight_decay"] = \
self.optimizer.param_groups[0]["weight_decay"]
parameters_for_workers = compss_wait_on(parameters_for_workers)
self.model_parameters = pt_aggregateParameters(
parameters_for_workers)
Expand Down Expand Up @@ -356,6 +397,17 @@ def fit_synchronous_shuffle_with_GPU(self, x_train, y_train,
j = j + 1
if j == self.num_workers:
j = 0
if self.scheduler is not None:
self.scheduler.step()
self.optimizer_parameters = {}
self.optimizer_parameters["lr"] = \
self.optimizer.param_groups[0]["lr"]
if "momentum" in self.optimizer.param_groups[0]:
self.optimizer_parameters["momentum"] = \
self.optimizer.param_groups[0]["momentum"]
if "weight_decay" in self.optimizer.param_groups[0]:
self.optimizer_parameters["weight_decay"] = \
self.optimizer.param_groups[0]["weight_decay"]
parameters_for_workers = compss_wait_on(parameters_for_workers)
self.model_parameters = \
pt_aggregateParameters(parameters_for_workers)
Expand Down Expand Up @@ -417,6 +469,17 @@ def fit_asynchronous_with_GPU(self, x_train, y_train,
j = j + 1
if j == self.num_workers:
j = 0
if self.scheduler is not None:
self.scheduler.step()
self.optimizer_parameters = {}
self.optimizer_parameters["lr"] = \
self.optimizer.param_groups[0]["lr"]
if "momentum" in self.optimizer.param_groups[0]:
self.optimizer_parameters["momentum"] = \
self.optimizer.param_groups[0]["momentum"]
if "weight_decay" in self.optimizer.param_groups[0]:
self.optimizer_parameters["weight_decay"] = \
self.optimizer.param_groups[0]["weight_decay"]
for j in range(self.num_workers):
parameters_for_workers[j] = \
self.compss_object[j].aggregate_parameters_async(
Expand Down Expand Up @@ -468,6 +531,17 @@ def fit_asynchronous_shuffle_with_GPU(self, x_train, y_train,
j = j + 1
if j == self.num_workers:
j = 0
if self.scheduler is not None:
self.scheduler.step()
self.optimizer_parameters = {}
self.optimizer_parameters["lr"] = \
self.optimizer.param_groups[0]["lr"]
if "momentum" in self.optimizer.param_groups[0]:
self.optimizer_parameters["momentum"] = \
self.optimizer.param_groups[0]["momentum"]
if "weight_decay" in self.optimizer.param_groups[0]:
self.optimizer_parameters["weight_decay"] = \
self.optimizer.param_groups[0]["weight_decay"]
for j in range(self.num_workers):
parameters_for_workers[j] = \
self.compss_object[j].aggregate_parameters_async(
Expand Down Expand Up @@ -532,6 +606,17 @@ def fit_asynchronous_n_epochs_with_GPU(self, x_train, y_train,
j = j + 1
if j == self.num_workers:
j = 0
if self.scheduler is not None:
self.scheduler.step()
self.optimizer_parameters = {}
self.optimizer_parameters["lr"] = \
self.optimizer.param_groups[0]["lr"]
if "momentum" in self.optimizer.param_groups[0]:
self.optimizer_parameters["momentum"] = \
self.optimizer.param_groups[0]["momentum"]
if "weight_decay" in self.optimizer.param_groups[0]:
self.optimizer_parameters["weight_decay"] = \
self.optimizer.param_groups[0]["weight_decay"]
if (i + 1) % n_epocs_sync == 0:
for j in range(self.num_workers):
parameters_for_workers[j] = \
Expand Down
64 changes: 59 additions & 5 deletions dislib/pytorch/pytorch_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,30 @@ def train_cnn_batch_GPU(self, model_parameters, x_train,
{'processorType': 'GPU', 'computingUnits': '${ComputingUnitsGPUs}'}])
@task()
def _train_cnn_batch_GPU(self, model_parameters, x_train,
y_train, num_batches, shuffle_block_data):
y_train, num_batches, shuffle_block_data,
transformations=None,
gradient_clipping=None):
if shuffle_block_data:
idx = torch.randperm(x_train.shape[0])
x_train = x_train[idx].view(x_train.size())
y_train = y_train[idx].view(y_train.size())
if not isinstance(x_train.size, int):
x_train = x_train[idx].view(x_train.size())
else:
if not torch.is_tensor(x_train):
x_train = x_train[idx]
else:
x_train = torch.from_numpy(x_train)
x_train = x_train[idx].view(x_train.size())
if not isinstance(y_train.size, int):
if len(y_train.size()) > 1:
y_train = y_train[idx].view(y_train.size())
else:
y_train = y_train[idx]
else:
if not torch.is_tensor(y_train):
y_train = y_train[idx]
else:
y_train = torch.from_numpy(y_train)
y_train = y_train[idx].view(y_train.size())
if hasattr(self.model, 'neural_network_layers'):
len_nn = len(self.model.neural_network_layers)
for i in range(len_nn):
Expand All @@ -89,10 +108,17 @@ def _train_cnn_batch_GPU(self, model_parameters, x_train,
nn.Parameter(
model_parameters.neural_network_layers[i].
weight.float())
if hasattr(model_parameters.neural_network_layers[i],
'bias'):
self.model.neural_network_layers[i].bias = \
nn.Parameter(
model_parameters.neural_network_layers[i].bias.
float())
if hasattr(self.model.neural_network_layers[i],
'alpha'):
self.model.neural_network_layers[i].alpha = \
nn.Parameter(model_parameters.
neural_network_layers[i].alpha)
if hasattr(self.model, 'dense_neural_network_layers'):
len_nn = len(model_parameters.dense_neural_network_layers)
for i in range(len_nn):
Expand All @@ -103,23 +129,37 @@ def _train_cnn_batch_GPU(self, model_parameters, x_train,
nn.Parameter(
model_parameters.dense_neural_network_layers[i].
weight.float())
if hasattr(model_parameters.dense_neural_network_layers[i],
'bias'):
self.model.dense_neural_network_layers[i].bias = \
nn.Parameter(
model_parameters.dense_neural_network_layers[i].
bias.float())
if hasattr(self.model.neural_network_layers[i],
'alpha'):
self.model.neural_network_layers[i].alpha = \
nn.Parameter(model_parameters.
neural_network_layers[i].alpha)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self.model.to(device)
optimizer = self.optimizer(self.model.parameters(),
**self.optimizer_parameters)
x_train = x_train.float().to(device)
true_labels = y_train.to(device)
indexes = int(x_train.shape[0] / num_batches)
if isinstance(self.loss, nn.CrossEntropyLoss):
y_train = y_train.long()
true_labels = y_train.to(device)
for idx in range(num_batches):
optimizer.zero_grad()
outputs = self.model(x_train[idx*indexes:(idx+1)*indexes])
inputs = x_train[idx*indexes:(idx + 1)*indexes]
if transformations is not None:
inputs = transformations(inputs)
outputs = self.model(inputs)
loss = self.loss(outputs,
true_labels[idx*indexes:(idx+1)*indexes])
loss.backward()
if gradient_clipping is not None:
gradient_clipping(self.model)
optimizer.step()
self.model = self.model.to("cpu")
return self.model
Expand Down Expand Up @@ -181,9 +221,16 @@ def _aggregate_parameters_async(self, model_params,
model_params.neural_network_layers[i].weight = \
nn.Parameter(final_added_parameters[j].float())
j += 1
if hasattr(model_params.neural_network_layers[i],
'bias'):
model_params.neural_network_layers[i].bias = \
nn.Parameter(final_added_parameters[j].float())
j += 1
if hasattr(self.model.neural_network_layers[i],
'alpha'):
self.model.neural_network_layers[i].alpha = \
nn.Parameter(model_params.
neural_network_layers[i].alpha)
if hasattr(model_params, 'dense_neural_network_layers'):
len_nn = len(model_params.dense_neural_network_layers)
aux_j = 0
Expand All @@ -193,7 +240,14 @@ def _aggregate_parameters_async(self, model_params,
model_params.dense_neural_network_layers[i].weight = \
nn.Parameter(final_added_parameters[aux_j + j].float())
aux_j += 1
if hasattr(model_params.dense_neural_network_layers[i],
'bias'):
model_params.dense_neural_network_layers[i].bias = \
nn.Parameter(final_added_parameters[aux_j + j].float())
aux_j += 1
if hasattr(self.model.dense_neural_network_layers[i],
'alpha'):
self.model.dense_neural_network_layers[i].alpha = \
nn.Parameter(model_params.
neural_network_layers[i].alpha)
return model_params