Coding a Recurrent Neural Network (RNN) from scratch using Pytorch

Recurrent Neural Network

Table of Contents

In this blog I will show you how to create a Recurrent Neural Network (RNN) layer from scratch using Pytorch.

💡 You can find the code of this blog on this link

We won’t be using the native RNN layer, instead we will create our own. This is helpful to really understand how RNNs work and the internal computations performed.

For this I will asume you already know what RNNs are, why they are used and what are the limitations of it (LSTM and GRU).

Topics we will cover:

  1. Creation of RNN layer
  2. Shapes and tensor dimensions
  3. Training process with batches

RNN vs Feedforward Architecture

Personally I find it is easier to understand RNNs when I compare it to a feedforward networks because its a known concept, so I just add new concepts to previous knowledge, so I will be comparing them often.

Unlike feedforward networks, RNNs machinery is a bit more complex. Inside a single Recurrent Neural Network layer we have 3 weight matrices as well as 2 input tensors and 2 output tensors.

Fig 1. Feedforward vs Recurrent Neural Network number of components
Fig 2. Top: Feedforward Layer architecture. Bottom: Recurrent Neural Network Layer architecture

People often say “RNNs are simple feedforward with an internal state”, however with this simple diagram we can see it’s not that simple. The components are way more complex in a Recurrent Net, but don’t worry, I will try to explain to you how this works and hopefully by seeing the code you will be able to understand it.

Recurrent Neural Network (RNN) Layer Architecture

Recurrent Nets introduce a new concept called “hidden state”, which is simply another input based on previous layer outputs. But wait, if this is based on previous layer outputs, how do I get it for the first run? Simple, just start it with zeros.

RNNs are fed in a different way than feedforward networks. Because we are working with sequences, the order that we input the data matters, this is why each time we feed the net, we have to input a single item in the sequence. for example if it’s a stock price, we input the stock price for each day. If it’s a text we enter a single letter/word each time.

We enter one step at a time because we need to compute the hidden state on each iteration, so this hidden state will hold previous information so the next sequence we input will have data from previous runs by summing the matrices (see Fig 2 above)

Inputs

Input tensor: This tensor should be 1 single step in the sequence. If your total sequence is for example 100 characters of text, then the input will be 1 single character of text

Hidden state tensor: This tensor is the hidden state. Remember for the first run of each entire sequence, this tensor will be filled with zeros. Following the example above If you have 10 sequences of 100 characters each (a text of 1000 characters in total) then for each sequence you will generate a hidden state filled with zeros.

Weight Matrices

Input Dense: Dense matrix used to compute inputs (just like feedforward).

Hidden Dense: Dense matrix used to compute hidden state input.

Output Dense: Dense matrix used to compute the result of activation(input_dense + hidden_dense)

Outputs

New hidden state: New hidden state tensor which is activation(input_dense + hidden_dense). You will use this as input on the next iteration in the sequence.

Output: activation(output_dense). This is your prediction vector. which means is like the feedforward output prediction vector

Recurrent Neural Network (RNN) Layer Code

class RNN(nn.Module):
    """
    Basic RNN block. This represents a single layer of RNN
    """
    def __init__(self, input_size: int, hidden_size: int, output_size: int) -> None:
        """
        input_size: Number of features of your input vector
        hidden_size: Number of hidden neurons
        output_size: Number of features of your output vector
        """
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.batch_size = batch_size
        self.i2h = nn.Linear(input_size, hidden_size, bias=False)
        self.h2h = nn.Linear(hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, hidden_state) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Returns computed output and tanh(i2h + h2h)
        Inputs
        ------
        x: Input vector
        hidden_state: Previous hidden state
        Outputs
        -------
        out: Linear output (without activation because of how pytorch works)
        hidden_state: New hidden state matrix
        """
        x = self.i2h(x)
        hidden_state = self.h2h(hidden_state)
        hidden_state = torch.tanh(x + hidden_state)
	out = self.h2o(hidden_state)
        return out, hidden_state
        
    def init_zero_hidden(self, batch_size=1) -> torch.Tensor:
        """
				Helper function.
        Returns a hidden state with specified batch size. Defaults to 1
        """
        return torch.zeros(batch_size, self.hidden_size, requires_grad=False)

Shapes and Dimensions

Fig 3. Inputs, weights and outputs shapes

Dimensions resulting from each matrix dot product (yellow indicators)

  1. batch_size x hidden_units
  2. batch_size x hidden_units
  3. batch_size x output size

Training with batches

Feeding a Recurrent Neural Network in batches always computes way faster (10x faster easily), and Recurrent Neural Networks are no exception. Training with batches will not improve performance in any way tho, so if your NN doesn’t work with a single training example at a time, it won’t work with 10 or 100.

The Recurrent Neural Network I show as example is trained with text, one character at a time, so the training function should feed 1 character of the whole text at a time. I save a ton of time by doing this with batches. So I can feed any number of batches for every epoch.

Each epoch of the training goes through the whole text and then I iterate through each character. After I iterated the whole sequence I then compute the loss and the gradients, to then optimize the parameters by doing optimizer.step. It is worth mentioning that cliping is useful on this type of RNNs

After each epoch I also generate text to see how the network is doing and improving.

def train(model: RNN, data: DataLoader, epochs: int, optimizer: optim.Optimizer, loss_fn: nn.Module) -> None:
    """
    Trains the model for the specified number of epochs
    Inputs
    ------
    model: RNN model to train
    data: Iterable DataLoader
    epochs: Number of epochs to train the model
    optiimizer: Optimizer to use for each epoch
    loss_fn: Function to calculate loss
    """
    train_losses = {}
    model.to(device)
    
    model.train()
    print("=> Starting training")
    for epoch in range(epochs):
        epoch_losses = list()
        for X, Y in data:
            # skip last batch if it doesnt match with the batch_size
            if X.shape[0] != model.batch_size:
                continue
            hidden = model.init_zero_hidden(batch_size=model.batch_size)
            # send tensors to device
            X, Y, hidden = X.to(device), Y.to(device), hidden.to(device)
            # 2. clear gradients
            model.zero_grad()
            loss = 0
            for c in range(X.shape[1]):
                out, hidden = model(X[:, c].reshape(X.shape[0],1), hidden)
                l = loss_fn(out, Y[:, c].long())
                loss += l
            # 4. Compte gradients
            loss.backward()
            # 5. Adjust learnable parameters
            # clip as well to avoid vanishing and exploding gradients
            nn.utils.clip_grad_norm_(model.parameters(), 3)
            optimizer.step()
        
            epoch_losses.append(loss.detach().item() / X.shape[1])
        train_losses[epoch] = torch.tensor(epoch_losses).mean()
        print(f'=> epoch: {epoch + 1}, loss: {train_losses[epoch]}')
				# after each epoch generate text
        print(generate_text(model, data.dataset))

Again, I recommend you to check the complete code and also play with it. You can feed it with any text file, and you will see it improving each iteration. Play with the parameters and see how it behaves. Also set the batch_size to 1 so you can see how slow it is compared to 64

"""
RNN character generator
RNN implementation with Dense layers
There is an RNN layer in pytorch, but in this case we will be using
normal Dense layers to demonstrate the difference between
RNN and Normal feedforward networks.
This is a character level generator, which means it will create character by character
You can input any text file and it will generate characters based on that text
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import random

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using '{device}' device")

"""
Data preparation
"""
class TextDataset(Dataset):
    """
    Text Dataset
    Text Dataset Class
    
    This class is in charge of managing text data as vectors
    Data is saved as vectors (not as text)
    Attributes
    ----------
    seq_length - int: Sequence length
    chars - list(str): List of characters
    char_to_idx - dict: dictionary from character to index
    idx_to_char - dict: dictionary from index to character
    vocab_size - int: Vocabulary size
    data_size - int: total length of the text
    """
    def __init__(self, text_data: str, seq_length: int = 25) -> None:
        """
        Inputs
        ------
        text_data: Full text data as string
        seq_length: sequence length. How many characters per index of the dataset.
        """
        self.chars = sorted(list(set(text_data)))
        self.data_size, self.vocab_size = len(text_data), len(self.chars)
        # useful way to fetch characters either by index or char
        self.idx_to_char = {i:ch for i, ch in enumerate(self.chars)}
        self.char_to_idx = {ch:i for i, ch in enumerate(self.chars)}
        self.seq_length = seq_length
        self.X = self.string_to_vector(text_data)
    
    @property
    def X_string(self) -> str:
        """
        Returns X in string form
        """
        return self.vector_to_string(self.X)
        
    def __len__(self) -> int:
        """
        We remove the last sequence to avoid conflicts with Y being shifted to the left
        This causes our model to never see the last sequence of text
        which is not a huge deal, but its something to be aware of
        """
        return int(len(self.X) / self.seq_length -1)

    def __getitem__(self, index) -> tuple[torch.Tensor, torch.Tensor]:
        """
        X and Y have the same shape, but Y is shifted left 1 position
        """
        start_idx = index * self.seq_length
        end_idx = (index + 1) * self.seq_length

        X = torch.tensor(self.X[start_idx:end_idx]).float()
        y = torch.tensor(self.X[start_idx+1:end_idx+1]).float()
        return X, y
    
    def string_to_vector(self, name: str) -> list[int]:
        """
        Converts a string into a 1D vector with values from char_to_idx dictionary
        Inputs
        name: Name as string
        Outputs
        name_tensor: name represented as list of integers (1D vector)
        sample:
        >>> string_to_vector('test')
        [20, 5, 19, 20]
        """
        vector = list()
        for s in name:
            vector.append(self.char_to_idx[s])
        return vector

    def vector_to_string(self, vector: list[int]) -> str:
        """
        Converts a 1D vector into a string with values from idx_to_char dictionary
        Inputs
        vector: 1D vector with values in the range of idx_to_char
        Outputs
        vector_string: Vector converted to string
        sample:
        >>> vector_to_string([20, 5, 19, 20])
        'test'
        """
        vector_string = ""
        for i in vector:
            vector_string += self.idx_to_char[i]
        return vector_string

"""
Model definition
"""
class RNN(nn.Module):
    """
    Basic RNN block. This represents a single layer of RNN
    """
    def __init__(self, input_size: int, hidden_size: int, output_size: int) -> None:
        """
        input_size: Number of features of your input vector
        hidden_size: Number of hidden neurons
        output_size: Number of features of your output vector
        """
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.batch_size = batch_size

        self.i2h = nn.Linear(input_size, hidden_size, bias=False)
        self.h2h = nn.Linear(hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)

    
    def forward(self, x, hidden_state) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Returns softmax(linear_out) and tanh(i2h + i2o)
        Inputs
        ------
        x: Input vector x  with shape (vocab_size, )
        hidden_state: Hidden state matrix
        Outputs
        -------
        out: Prediction vector
        hidden_state: New hidden state matrix
        """
        x = self.i2h(x)
        hidden_state = self.h2h(hidden_state)
        hidden_state = torch.tanh(x + hidden_state)
        return self.h2o(hidden_state), hidden_state
        

    def init_zero_hidden(self, batch_size=1) -> torch.Tensor:
        """
        Returns a hidden state with specified batch size. Defaults to 1
        """
        return torch.zeros(batch_size, self.hidden_size, requires_grad=False)


def generate_text(model: RNN, dataset: TextDataset, prediction_length: int = 100) -> str:
    """
    Generate text up to prediction_length characters
    This function requires the dataset as argument in order to properly
    generate the text and return the output as strings
    """
    model.eval()
    predicted = dataset.vector_to_string([random.randint(0, len(dataset.chars) -1)])
    hidden = model.init_zero_hidden()

    for i in range(prediction_length - 1):
        last_char = torch.Tensor([dataset.char_to_idx[predicted[-1]]])
        X, hidden = last_char.to(device), hidden.to(device)
        out, hidden = model(X, hidden)
        result = torch.multinomial(nn.functional.softmax(out, 1), 1).item()
        #result = out.argmax().item()
        predicted += dataset.idx_to_char[result]

    return predicted



def train(model: RNN, data: DataLoader, epochs: int, optimizer: optim.Optimizer, loss_fn: nn.Module) -> None:
    """
    Trains the model for the specified number of epochs
    Inputs
    ------
    model: RNN model to train
    data: Iterable DataLoader
    epochs: Number of epochs to train the model
    optiimizer: Optimizer to use for each epoch
    loss_fn: Function to calculate loss
    """
    train_losses = {}
    model.to(device)
    
    model.train()
    print("=> Starting training")
    for epoch in range(epochs):
        epoch_losses = list()
        for X, Y in data:
            # skip batch if it doesnt match with the batch_size
            if X.shape[0] != model.batch_size:
                continue
            hidden = model.init_zero_hidden(batch_size=model.batch_size)

            # send tensors to device
            X, Y, hidden = X.to(device), Y.to(device), hidden.to(device)

            # 2. clear gradients
            model.zero_grad()

            loss = 0
            for c in range(X.shape[1]):
                out, hidden = model(X[:, c].reshape(X.shape[0],1), hidden)
                l = loss_fn(out, Y[:, c].long())
                loss += l

            # 4. Compte gradients gradients
            loss.backward()

            # 5. Adjust learnable parameters
            # clip as well to avoid vanishing and exploding gradients
            nn.utils.clip_grad_norm_(model.parameters(), 3)
            optimizer.step()
        
            epoch_losses.append(loss.detach().item() / X.shape[1])

        train_losses[epoch] = torch.tensor(epoch_losses).mean()
        print(f'=> epoch: {epoch + 1}, loss: {train_losses[epoch]}')
        print(generate_text(model, data.dataset))


if __name__ == "__main__":
    data = open('datasets/Dinos/dinos.txt', 'r').read() # use any text file you want to learn
    data = data.lower()

    # Data size variables
    seq_length = 25
    batch_size = 64
    hidden_size = 256

    text_dataset = TextDataset(data, seq_length=seq_length)
    text_dataloader = DataLoader(text_dataset, batch_size)

    # Model
    rnnModel = RNN(1, hidden_size, len(text_dataset.chars)) # 1 because we enter a single number/letter per step.

    # Train variables
    epochs = 1000
    loss = nn.CrossEntropyLoss()
    optimizer = optim.RMSprop(rnnModel.parameters(), lr = 0.001)

    train(rnnModel, text_dataloader, epochs, optimizer, loss)

Manually coding this really helps to understand the underlying operations and workflow, it is also very satisfying to see how the Recurrent Neural Network learns from the text and generates cool text.

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

Subscribe to our blog

Table of Contents

0
Would love your thoughts, please comment.x
()
x

You like this content?

Subscribe to our blog and stay tuned for all the new pieces of weekly content that we have for you.

Need help with your malware?

Receive a personalized quote in less than 24 hrs.

Or schedule a 30 min discovery call with us

Open chat
Need Help?
Hello 👋
Tap here for a personalized chat with your Solutions Architect.