photo by Seachaos @ Switzerland

PyTorch with multi process training and get loss history cross process (running on multi cpu core at the same time)

Seachaos
Published in
4 min readJun 19, 2021

--

Why need PyTorch running on multiprocess?

Normally PyTorch will run a computation on each CPU with Tensor automatically.
But at least here are some scenarios we can using distributions :

  1. you have Real-Time data (datasets or data loaders need to take data from somewhere and have pending)
  2. Reinforcement Learning. It usually costs a lot of time to get data from environments.
  3. CPU is not running on high usage ( mean you can use CPU more efficiently).

We can use multi-process to speed up the training progress, especially with Reinforcement Deep Learning.

In this tutorial, we will be using torch.multiprocessing :

from torch import multiprocessing as mp

Start with an example

Assume you already have data ( in case of Reinforcement Learning is get from the environment ), or we can use the below example, we put time.sleep to pretend this “getData” is slow ( this is not a good example, but just for demonstration ):

from sklearn.datasets import make_classificationdef getData(size=800):
time.sleep(0.3)
X, y = make_classification(n_samples=10000, n_features=8, n_classes=2, random_state=42)
np.random.seed(int((time.time() * 1E6) % (2**32 - 1)))
idx = np.random.choice(np.arange(10000), size)
return torch.from_numpy(X[idx]).float(), torch.from_numpy(y[idx])

then here is a super simple model: ( again, it just for demonstration )

model = torch.nn.Sequential(
torch.nn.Linear(8, 32),
torch.nn.ReLU(),
torch.nn.Linear(32, 16),
torch.nn.ReLU(),
torch.nn.Linear(16, 2),
torch.nn.Softmax(dim=1),
)
model.share_memory()

Note that “model.share_memory()” is needed. that is to help the “fork” process method work.

Let’s define how many epochs(training times) and processes ( CPU cores ) we want to use.

EPOCHS = 50
PROCESSES_NUM = 24

and then is train function define:

def train(args):
(pid, model, loss_history) = args
loss_func = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for i in trange(EPOCHS):
X, y = getData()
y_p = model(X)
optimizer.zero_grad()
loss = loss_func(y_p, y)
loss_history[pid, i] = loss.item()
loss.backward()
optimizer.step()

should be looks like a typical train function, but here are some different:

  1. We put loss_func and optimizer in here because this will be running in a new process. The best practice is to isolate variables as much as possible to avoid deadlocks. ( Thinking of it as functional programming )
  2. Wrap the args in a tuple because that’s easier to pass parameters in the function.
  3. The loss report here is a trick with tensor. We will talk about it below. ( losses[pid, i] = loss.item() )

About Loss history (Losses value)

Here is a trick we have, usually the python variable is not easy to cross-process. And each process is running in a different time sequence. It will be hard to collect loss history.

Since we know PyTorch Tensor can cross-process, we use this feature to do it. We allocate a zero Tensor as a buffer then place each epoch and process-id (PID) loss value one by one.

loss_history[pid, i] = loss.item()

Start training

Here two simple ways can distribute training :

first way is using the pool

loss_history = torch.zeros(PROCESSES_NUM, EPOCHS)
with mp.Pool(PROCESSES_NUM) as pool:
pool.map(train, [(i, model, loss_history) for i in range(PROCESSES_NUM)])
  • That is much simple; you use pool.map to distribute each train function to each process (CPU).
  • As we mentioned before, our loss_history is a zero Tensor.

BTW, pool.map can also collect return value from the function, but It’s not guaranteed the return value is sequential. ( that’s why we use `loss_history` here )

Second way is using the process

loss_history = torch.zeros(PROCESSES_NUM, EPOCHS)
loss_history.share_memory_()
processes = []
for pid in range(PROCESSES_NUM):
p = mp.Process(target=train, args=((pid, model, loss_history),))
processes.append(p)
p.start()
[p.join() for p in processes]
  • This way is to create each process instance and join them, waiting for them to end. It’s more like an async task so you can do something between the all process done.
  • Because above our train function is using tuple as arg, same thing here. But if you are sure you will use the process way, you can change parameters to not using tuple
  • There need to set a loss_history.share_memory_() to share the loss history of each process.

The result

We can plot the loss history.

cross-process loss history
plt.figure(figsize=(21, 10))# show the each process loss history
plt.subplot(1, 2, 1)
for i in range(PROCESSES_NUM):
plt.plot(np.arange(EPOCHS), loss_history[i,:].numpy(), label=f'pid{i}')
plt.legend()
# show the all loss history
plt.subplot(1, 2, 2)
loss_history_1d = loss_history.T.reshape(-1)
plt.plot(np.arange(len(loss_history_1d)), loss_history_1d, label='loss')
plt.legend()

Fig left: we show each process loss history; each process (PID) with epoch is easy to plot.
Fig right: Transpose and change shape we can get 1D Tensor, that is what we need.

As you can see, the model’s loss value is convergent.
We also can compare it with single-process training.

The multi-process is running around 65sec, and the single-process is 369sec.

Both of the final results are clear close, but saving more time on multiprocess.

Conclusion

Both the multiprocess and the single-process final results are clear close, but saving more time on multiprocess.

But the multiprocess is not for all scenarios because PyTorch is doing well on cross-process computation.

Keep an eye on your CPU usage; if it’s already too busy, it is unnecessary to use the multiprocess train. And keep minds on deadlocks with the multiprocess.

Reference:
https://pytorch.org/docs/stable/notes/multiprocessing.html

--

--