{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "initial_id", "metadata": { "collapsed": true, "jupyter": { "outputs_hidden": true } }, "outputs": [], "source": [ "# PyTorch for implementing LLM (No GPU)\n", "import torch\n", "\n", "# Neural network modules and functions from PyTorch\n", "from torch import nn\n", "from torch.nn import functional as F\n", "\n", "# NumPy for numerical operations\n", "import numpy as np\n", "\n", "# Matplotlib for plotting Loss etc.\n", "from matplotlib import pyplot as plt\n", "\n", "# Time module for tracking execution time\n", "import time\n", "\n", "# Pandas for data manipulation and analysis\n", "import pandas as pd\n", "\n", "# urllib for handling URL requests (Downloading Dataset)\n", "import urllib.request" ] }, { "cell_type": "code", "execution_count": null, "id": "ded604c1a4e80155", "metadata": {}, "outputs": [], "source": [ "torch.set_default_device('cuda')" ] }, { "cell_type": "code", "execution_count": null, "id": "97afd4d11e5d5391", "metadata": {}, "outputs": [], "source": [ "# Configuration object for model parameters\n", "MASTER_CONFIG = {\n", " # Adding parameters later\n", "}" ] }, { "cell_type": "code", "execution_count": null, "id": "e4c23d3282151461", "metadata": {}, "outputs": [], "source": [ "# The URL of the raw text file on GitHub\n", "url = \"https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt\"\n", "\n", "# The file name for local storage\n", "file_name = \"tinyshakespeare.txt\"\n", "\n", "# Execute the download\n", "urllib.request.urlretrieve(url, file_name)" ] }, { "cell_type": "code", "execution_count": null, "id": "a1f50a516659363", "metadata": {}, "outputs": [], "source": [ "# Read the content of the dataset\n", "lines = open(\"tinyshakespeare.txt\", 'r').read()\n", "\n", "# Create a sorted list of unique characters in the dataset\n", "vocab = sorted(list(set(lines)))\n", "\n", "# Display the first 10 characters in the vocabulary list\n", "print('Printing the first 10 characters of the vocab list:', vocab[:10])\n", "\n", "# Output the total number of characters in our dataset (Vocabulary Size)\n", "print('Total number of characters in our dataset (Vocabulary Size):', len(vocab))" ] }, { "cell_type": "code", "execution_count": null, "id": "5e898838024633e3", "metadata": {}, "outputs": [], "source": [ "# Mapping integers to characters (itos)\n", "itos = {i: ch for i, ch in enumerate(vocab)}\n", "\n", "# Mapping characters to integers (stoi)\n", "stoi = {ch: i for i, ch in enumerate(vocab)}" ] }, { "cell_type": "code", "execution_count": null, "id": "3a75bf8384dae76a", "metadata": {}, "outputs": [], "source": [ "# Encode function: Converts a string to a list of integers using the mapping stoi\n", "def encode(s):\n", " return [stoi[ch] for ch in s]\n", "\n", "# Decode function: Converts a list of integers back to a string using the mapping itos\n", "def decode(l):\n", " return ''.join([itos[i] for i in l])\n", "\n", "# Example: Encode the string \"hello\" and then decode the result\n", "decode(encode(\"morning\"))" ] }, { "cell_type": "code", "execution_count": null, "id": "7179c91e90c32ec6", "metadata": {}, "outputs": [], "source": [ "# Convert the dataset into a torch tensor with specified data type (dtype)\n", "dataset = torch.tensor(encode(lines), dtype=torch.int8)\n", "\n", "# adding the vocab size\n", "MASTER_CONFIG = {\n", " \"vocab_size\": len(vocab),\n", "}\n", "\n", "# Display the shape of the resulting tensor\n", "print(dataset.shape)" ] }, { "cell_type": "code", "execution_count": null, "id": "2d6c3c9d114ed620", "metadata": {}, "outputs": [], "source": [ "# Function to get batches for training, validation, or testing\n", "def get_batches(data, split, batch_size, context_window, config=MASTER_CONFIG):\n", " # Split the dataset into training, validation, and test sets\n", " train = data[:int(.8 * len(data))]\n", " val = data[int(.8 * len(data)): int(.9 * len(data))]\n", " test = data[int(.9 * len(data)):]\n", "\n", " # Determine which split to use\n", " batch_data = train\n", " if split == 'val':\n", " batch_data = val\n", " if split == 'test':\n", " batch_data = test\n", "\n", " # Pick random starting points within the data\n", " ix = torch.randint(0, batch_data.size(0) - context_window - 1, (batch_size,))\n", "\n", " # Create input sequences (x) and corresponding target sequences (y)\n", " x = torch.stack([batch_data[i:i+context_window] for i in ix]).long()\n", " y = torch.stack([batch_data[i+1:i+context_window+1] for i in ix]).long()\n", "\n", " return x, y" ] }, { "cell_type": "code", "execution_count": null, "id": "a61c3a0dd3339a42", "metadata": {}, "outputs": [], "source": [ "# Update the MASTER_CONFIG with batch_size and context_window parameters\n", "MASTER_CONFIG.update({\n", " 'batch_size': 8, # Number of batches to be processed at each random split\n", " 'context_window': 16 # Number of characters in each input (x) and target (y) sequence of each batch\n", "})" ] }, { "cell_type": "code", "execution_count": null, "id": "bf9bdacdfc815807", "metadata": {}, "outputs": [], "source": [ "# Obtain batches for training using the specified batch size and context window\n", "xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])\n", "\n", "# Decode the sequences to obtain the corresponding text representations\n", "decoded_samples = [(decode(xs[i].tolist()), decode(ys[i].tolist())) for i in range(len(xs))]\n", "\n", "# Print the random sample\n", "print(decoded_samples)" ] }, { "cell_type": "code", "execution_count": null, "id": "bfbe5a734e783ac6", "metadata": {}, "outputs": [], "source": [ "@torch.no_grad() # Don't compute gradients for this function\n", "def evaluate_loss(model, config=MASTER_CONFIG):\n", " # Placeholder for the evaluation results\n", " out = {}\n", "\n", " # Set the model to evaluation mode\n", " model.eval()\n", "\n", " # Iterate through training and validation splits\n", " for split in [\"train\", \"val\"]:\n", " # Placeholder for individual losses\n", " losses = []\n", "\n", " # Generate 10 batches for evaluation\n", " for _ in range(10):\n", " # Get input sequences (xb) and target sequences (yb)\n", " xb, yb = get_batches(dataset, split, config['batch_size'], config['context_window'])\n", "\n", " # Perform model inference and calculate the loss\n", " _, loss = model(xb, yb)\n", "\n", " # Append the loss to the list\n", " losses.append(loss.item())\n", "\n", " # Calculate the mean loss for the split and store it in the output dictionary\n", " out[split] = np.mean(losses)\n", "\n", " # Set the model back to training mode\n", " model.train()\n", "\n", " return out" ] }, { "cell_type": "code", "execution_count": null, "id": "a606f9ba323d4389", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "id": "5a8930d6083f2930", "metadata": {}, "outputs": [], "source": [ "# Function to perform training\n", "def train(model, optimizer, scheduler=None, config=MASTER_CONFIG, print_logs=False):\n", " # Placeholder for storing losses\n", " losses = []\n", "\n", " # Start tracking time\n", " start_time = time.time()\n", "\n", " # Iterate through epochs\n", " for epoch in range(config['epochs']):\n", " # Zero out gradients\n", " optimizer.zero_grad()\n", "\n", " # Obtain batches for training\n", " xs, ys = get_batches(dataset, 'train', config['batch_size'], config['context_window'])\n", "\n", " # Forward pass through the model to calculate logits and loss\n", " logits, loss = model(xs, targets=ys)\n", "\n", " # Backward pass and optimization step\n", " loss.backward()\n", " optimizer.step()\n", "\n", " # If a learning rate scheduler is provided, adjust the learning rate\n", " if scheduler:\n", " scheduler.step()\n", "\n", " # Log progress every specified interval\n", " if epoch % config['log_interval'] == 0:\n", " # Calculate batch time\n", " batch_time = time.time() - start_time\n", "\n", " # Evaluate loss on validation set\n", " x = evaluate_loss(model)\n", "\n", " # Store the validation loss\n", " losses += [x]\n", "\n", " # Print progress logs if specified\n", " if print_logs:\n", " print(f\"Epoch {epoch} | val loss {x['val']:.3f} | Time {batch_time:.3f} | ETA in seconds {batch_time * (config['epochs'] - epoch)/config['log_interval'] :.3f}\")\n", "\n", " # Reset the timer\n", " start_time = time.time()\n", "\n", " # Print learning rate if a scheduler is provided\n", " if scheduler:\n", " print(\"lr: \", scheduler.get_lr())\n", "\n", " # Print the final validation loss\n", " print(\"Validation loss: \", losses[-1]['val'])\n", "\n", " # Plot the training and validation loss curves\n", " return pd.DataFrame(losses).plot()\n", "\n", "# Execute the training process\n", "# train(model, optimizer)" ] }, { "cell_type": "code", "execution_count": null, "id": "3889f433a82e96e5", "metadata": {}, "outputs": [], "source": [ "class RMSNorm(nn.Module):\n", " def __init__(self, layer_shape, eps=1e-8, bias=False):\n", " super(RMSNorm, self).__init__()\n", "\n", " # Registering a learnable parameter 'scale' as a parameter of the module\n", " self.register_parameter(\"scale\", nn.Parameter(torch.ones(layer_shape)))\n", "\n", " def forward(self, x):\n", " \"\"\"\n", " Assumes shape is (batch, seq_len, d_model)\n", " \"\"\"\n", " # Calculating the Frobenius norm, RMS = 1/sqrt(N) * Frobenius norm\n", " ff_rms = torch.linalg.norm(x, dim=(1,2)) * x[0].numel() ** -.5\n", "\n", " # Normalizing the input tensor 'x' with respect to RMS\n", " raw = x / ff_rms.unsqueeze(-1).unsqueeze(-1)\n", "\n", " # Scaling the normalized tensor using the learnable parameter 'scale'\n", " return self.scale[:x.shape[1], :].unsqueeze(0) * raw" ] }, { "cell_type": "code", "execution_count": null, "id": "a232f8fb217d8a09", "metadata": {}, "outputs": [], "source": [ "def get_rotary_matrix(context_window, embedding_dim):\n", " # Initialize a tensor for the rotary matrix with zeros\n", " R = torch.zeros((context_window, embedding_dim, embedding_dim), requires_grad=False)\n", "\n", " # Loop through each position in the context window\n", " for position in range(context_window):\n", " # Loop through each dimension in the embedding\n", " for i in range(embedding_dim // 2):\n", " # Calculate the rotation angle (theta) based on the position and embedding dimension\n", " theta = 10000. ** (-2. * (i - 1) / embedding_dim)\n", " # Calculate the rotated matrix elements using sine and cosine functions\n", " m_theta = position * theta\n", " R[position, 2 * i, 2 * i] = np.cos(m_theta)\n", " R[position, 2 * i, 2 * i + 1] = -np.sin(m_theta)\n", " R[position, 2 * i + 1, 2 * i] = np.sin(m_theta)\n", " R[position, 2 * i + 1, 2 * i + 1] = np.cos(m_theta)\n", " return R" ] }, { "cell_type": "code", "execution_count": null, "id": "10aedc140006fa95", "metadata": {}, "outputs": [], "source": [ "class RoPEMaskedAttentionHead(nn.Module):\n", " def __init__(self, config):\n", " super().__init__()\n", " self.config = config\n", " # Linear transformation for query\n", " self.w_q = nn.Linear(config['d_model'], config['d_model'], bias=False)\n", " # Linear transformation for key\n", " self.w_k = nn.Linear(config['d_model'], config['d_model'], bias=False)\n", " # Linear transformation for value\n", " self.w_v = nn.Linear(config['d_model'], config['d_model'], bias=False)\n", " # Obtain rotary matrix for positional embeddings\n", " self.R = get_rotary_matrix(config['context_window'], config['d_model'])\n", "\n", " def get_rotary_matrix(context_window, embedding_dim):\n", " # Initialize a tensor for the rotary matrix with zeros\n", " R = torch.zeros((context_window, embedding_dim, embedding_dim), requires_grad=False)\n", "\n", " # Loop through each position in the context window\n", " for position in range(context_window):\n", " # Loop through each dimension in the embedding\n", " for i in range(embedding_dim // 2):\n", " # Calculate the rotation angle (theta) based on the position and embedding dimension\n", " theta = 10000. ** (-2. * (i - 1) / embedding_dim)\n", " # Calculate the rotated matrix elements using sine and cosine functions\n", " m_theta = position * theta\n", " R[position, 2 * i, 2 * i] = np.cos(m_theta)\n", " R[position, 2 * i, 2 * i + 1] = -np.sin(m_theta)\n", " R[position, 2 * i + 1, 2 * i] = np.sin(m_theta)\n", " R[position, 2 * i + 1, 2 * i + 1] = np.cos(m_theta)\n", " return R\n", "\n", " def forward(self, x, return_attn_weights=False):\n", " # x: input tensor of shape (batch, sequence length, dimension)\n", "\n", " b, m, d = x.shape # batch size, sequence length, dimension\n", "\n", " # Linear transformations for Q, K, and V\n", " q = self.w_q(x)\n", " k = self.w_k(x)\n", " v = self.w_v(x)\n", "\n", " # Rotate Q and K using the RoPE matrix\n", " q_rotated = (torch.bmm(q.transpose(0, 1), self.R[:m])).transpose(0, 1)\n", " k_rotated = (torch.bmm(k.transpose(0, 1), self.R[:m])).transpose(0, 1)\n", "\n", " # Perform scaled dot-product attention\n", " activations = F.scaled_dot_product_attention(\n", " q_rotated, k_rotated, v, dropout_p=0.1, is_causal=True\n", " )\n", "\n", " if return_attn_weights:\n", " # Create a causal attention mask\n", " attn_mask = torch.tril(torch.ones((m, m)), diagonal=0)\n", " # Calculate attention weights and add causal mask\n", " attn_weights = torch.bmm(q_rotated, k_rotated.transpose(1, 2)) / np.sqrt(d) + attn_mask\n", " attn_weights = F.softmax(attn_weights, dim=-1)\n", " return activations, attn_weights\n", "\n", " return activations" ] }, { "cell_type": "code", "execution_count": null, "id": "1e919117e21d94e3", "metadata": {}, "outputs": [], "source": [ "class RoPEMaskedAttentionHead(nn.Module):\n", " def __init__(self, config):\n", " super().__init__()\n", " self.config = config\n", " # Linear transformation for query\n", " self.w_q = nn.Linear(config['d_model'], config['d_model'], bias=False)\n", " # Linear transformation for key\n", " self.w_k = nn.Linear(config['d_model'], config['d_model'], bias=False)\n", " # Linear transformation for value\n", " self.w_v = nn.Linear(config['d_model'], config['d_model'], bias=False)\n", " # Obtain rotary matrix for positional embeddings\n", " self.R = get_rotary_matrix(config['context_window'], config['d_model'])\n", "\n", " def get_rotary_matrix(context_window, embedding_dim):\n", " # Initialize a tensor for the rotary matrix with zeros\n", " R = torch.zeros((context_window, embedding_dim, embedding_dim), requires_grad=False)\n", "\n", " # Loop through each position in the context window\n", " for position in range(context_window):\n", " # Loop through each dimension in the embedding\n", " for i in range(embedding_dim // 2):\n", " # Calculate the rotation angle (theta) based on the position and embedding dimension\n", " theta = 10000. ** (-2. * (i - 1) / embedding_dim)\n", " # Calculate the rotated matrix elements using sine and cosine functions\n", " m_theta = position * theta\n", " R[position, 2 * i, 2 * i] = np.cos(m_theta)\n", " R[position, 2 * i, 2 * i + 1] = -np.sin(m_theta)\n", " R[position, 2 * i + 1, 2 * i] = np.sin(m_theta)\n", " R[position, 2 * i + 1, 2 * i + 1] = np.cos(m_theta)\n", " return R\n", "\n", " def forward(self, x, return_attn_weights=False):\n", " # x: input tensor of shape (batch, sequence length, dimension)\n", "\n", " b, m, d = x.shape # batch size, sequence length, dimension\n", "\n", " # Linear transformations for Q, K, and V\n", " q = self.w_q(x)\n", " k = self.w_k(x)\n", " v = self.w_v(x)\n", "\n", " # Rotate Q and K using the RoPE matrix\n", " q_rotated = (torch.bmm(q.transpose(0, 1), self.R[:m])).transpose(0, 1)\n", " k_rotated = (torch.bmm(k.transpose(0, 1), self.R[:m])).transpose(0, 1)\n", "\n", " # Perform scaled dot-product attention\n", " activations = F.scaled_dot_product_attention(\n", " q_rotated, k_rotated, v, dropout_p=0.1, is_causal=True\n", " )\n", "\n", " if return_attn_weights:\n", " # Create a causal attention mask\n", " attn_mask = torch.tril(torch.ones((m, m)), diagonal=0)\n", " # Calculate attention weights and add causal mask\n", " attn_weights = torch.bmm(q_rotated, k_rotated.transpose(1, 2)) / np.sqrt(d) + attn_mask\n", " attn_weights = F.softmax(attn_weights, dim=-1)\n", " return activations, attn_weights\n", "\n", " return activations" ] }, { "cell_type": "code", "execution_count": null, "id": "f62b1d235695764c", "metadata": {}, "outputs": [], "source": [ "class RoPEMaskedMultiheadAttention(nn.Module):\n", " def __init__(self, config):\n", " super().__init__()\n", " self.config = config\n", " # Create a list of RoPEMaskedAttentionHead instances as attention heads\n", " self.heads = nn.ModuleList([\n", " RoPEMaskedAttentionHead(config) for _ in range(config['n_heads'])\n", " ])\n", " self.linear = nn.Linear(config['n_heads'] * config['d_model'], config['d_model']) # Linear layer after concatenating heads\n", " self.dropout = nn.Dropout(.1) # Dropout layer\n", "\n", " def forward(self, x):\n", " # x: input tensor of shape (batch, sequence length, dimension)\n", "\n", " # Process each attention head and concatenate the results\n", " heads = [h(x) for h in self.heads]\n", " x = torch.cat(heads, dim=-1)\n", "\n", " # Apply linear transformation to the concatenated output\n", " x = self.linear(x)\n", "\n", " # Apply dropout\n", " x = self.dropout(x)\n", " return x" ] }, { "cell_type": "code", "execution_count": null, "id": "e9236f037781b069", "metadata": {}, "outputs": [], "source": [ "# Update the master configuration with the number of attention heads\n", "MASTER_CONFIG.update({\n", " 'n_heads': 8,\n", "})" ] }, { "cell_type": "code", "execution_count": null, "id": "ec0ca4d23bdd9fd4", "metadata": {}, "outputs": [], "source": [ "class RopeModel(nn.Module):\n", " def __init__(self, config):\n", " super().__init__()\n", " self.config = config\n", "\n", " # Embedding layer for input tokens\n", " self.embedding = nn.Embedding(config['vocab_size'], config['d_model'])\n", "\n", " # RMSNorm layer for pre-normalization\n", " self.rms = RMSNorm((config['context_window'], config['d_model']))\n", "\n", " # RoPEMaskedMultiheadAttention layer\n", " self.rope_attention = RoPEMaskedMultiheadAttention(config)\n", "\n", " # Linear layer followed by ReLU activation\n", " self.linear = nn.Sequential(\n", " nn.Linear(config['d_model'], config['d_model']),\n", " nn.ReLU(),\n", " )\n", "\n", " # Final linear layer for prediction\n", " self.last_linear = nn.Linear(config['d_model'], config['vocab_size'])\n", "\n", " print(\"model params:\", sum([m.numel() for m in self.parameters()]))\n", "\n", " def forward(self, idx, targets=None):\n", " # idx: input indices\n", " x = self.embedding(idx)\n", "\n", " # One block of attention\n", " x = self.rms(x) # RMS pre-normalization\n", " x = x + self.rope_attention(x)\n", "\n", " x = self.rms(x) # RMS pre-normalization\n", " x = x + self.linear(x)\n", "\n", " logits = self.last_linear(x)\n", "\n", " if targets is not None:\n", " loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))\n", " return logits, loss\n", "\n", " else:\n", " return logits" ] }, { "cell_type": "code", "execution_count": null, "id": "95271a5b0b29b43c", "metadata": {}, "outputs": [], "source": [ "# Create an instance of RopeModel (RMSNorm, RoPE, Multi-Head)\n", "model = RopeModel(MASTER_CONFIG)\n", "\n", "# Obtain batches for training\n", "xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])\n", "\n", "# Calculate logits and loss using the model\n", "logits, loss = model(xs, ys)\n", "\n", "# Define the Adam optimizer for model parameters\n", "optimizer = torch.optim.Adam(model.parameters())\n", "\n", "# Train the model\n", "train(model, optimizer)" ] }, { "cell_type": "code", "execution_count": null, "id": "5b1afff55705a505", "metadata": {}, "outputs": [], "source": [ "# Updating training configuration with more epochs and a logging interval\n", "MASTER_CONFIG.update({\n", " \"epochs\": 5000,\n", " \"log_interval\": 10,\n", "})\n", "\n", "# Training the model with the updated configuration\n", "train(model, optimizer)" ] }, { "cell_type": "code", "execution_count": null, "id": "44e3b30f92614023", "metadata": {}, "outputs": [], "source": [ "class SwiGLU(nn.Module):\n", " \"\"\"\n", " Swish-Gated Linear Unit\n", " https://arxiv.org/pdf/2002.05202v1.pdf\n", " \"\"\"\n", " def __init__(self, size):\n", " super().__init__()\n", " self.linear_gate = nn.Linear(size, size)\n", " self.linear = nn.Linear(size, size)\n", " self.beta = torch.randn(1, requires_grad=True)\n", "\n", " self.beta = nn.Parameter(torch.ones(1))\n", " self.register_parameter(\"beta\", self.beta)\n", "\n", " def forward(self, x):\n", " swish_gate = self.linear_gate(x) * torch.sigmoid(self.beta * self.linear_gate(x))\n", " out = swish_gate * self.linear(x)\n", " return out" ] }, { "cell_type": "code", "execution_count": null, "id": "3b46c7a9b8e6da8e", "metadata": {}, "outputs": [], "source": [ "class RopeModel(nn.Module):\n", " def __init__(self, config):\n", " super().__init__()\n", " self.config = config\n", "\n", " # Embedding layer for input tokens\n", " self.embedding = nn.Embedding(config['vocab_size'], config['d_model'])\n", "\n", " # RMSNorm layer for pre-normalization\n", " self.rms = RMSNorm((config['context_window'], config['d_model']))\n", "\n", " # Multi-head attention layer with RoPE (Rotary Positional Embeddings)\n", " self.rope_attention = RoPEMaskedMultiheadAttention(config)\n", "\n", " # Linear layer followed by SwiGLU activation\n", " self.linear = nn.Sequential(\n", " nn.Linear(config['d_model'], config['d_model']),\n", " SwiGLU(config['d_model']), # Adding SwiGLU activation\n", " )\n", "\n", " # Output linear layer\n", " self.last_linear = nn.Linear(config['d_model'], config['vocab_size'])\n", "\n", " # Printing total model parameters\n", " print(\"model params:\", sum([m.numel() for m in self.parameters()]))\n", "\n", " def forward(self, idx, targets=None):\n", " x = self.embedding(idx)\n", "\n", " # One block of attention\n", " x = self.rms(x) # RMS pre-normalization\n", " x = x + self.rope_attention(x)\n", "\n", " x = self.rms(x) # RMS pre-normalization\n", " x = x + self.linear(x) # Applying SwiGLU activation\n", "\n", " logits = self.last_linear(x)\n", "\n", " if targets is not None:\n", " # Calculate cross-entropy loss if targets are provided\n", " loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))\n", " return logits, loss\n", "\n", " else:\n", " return logits" ] }, { "cell_type": "code", "execution_count": null, "id": "f8548a11ec0c40be", "metadata": {}, "outputs": [], "source": [ "# Create an instance of RopeModel (RMSNorm, RoPE, Multi-Head, SwiGLU)\n", "model = RopeModel(MASTER_CONFIG)\n", "\n", "# Obtain batches for training\n", "xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])\n", "\n", "# Calculate logits and loss using the model\n", "logits, loss = model(xs, ys)\n", "\n", "# Define the Adam optimizer for model parameters\n", "optimizer = torch.optim.Adam(model.parameters())\n", "\n", "# Train the model\n", "train(model, optimizer)" ] }, { "cell_type": "code", "execution_count": null, "id": "2622420706bbc513", "metadata": {}, "outputs": [], "source": [ "# Update model configurations for the number of layers\n", "MASTER_CONFIG.update({\n", " 'n_layers': 4, # Set the number of layers to 4\n", "})" ] }, { "cell_type": "code", "execution_count": null, "id": "e7b49bf56c2f8b4b", "metadata": {}, "outputs": [], "source": [ "# add RMSNorm and residual conncection\n", "class LlamaBlock(nn.Module):\n", " def __init__(self, config):\n", " super().__init__()\n", " self.config = config\n", "\n", " self.rms = RMSNorm((config['context_window'], config['d_model']))\n", "\n", " self.attention = RoPEMaskedMultiheadAttention(config)\n", " self.feedforward = nn.Sequential(\n", " nn.Linear(config['d_model'], config['d_model']),\n", " SwiGLU(config['d_model']),\n", " )\n", "\n", " def forward(self, x):\n", " x = self.rms(x) # rms pre-normalization\n", " x = x + self.attention(x)\n", "\n", " x = self.rms(x) # rms pre-normalization\n", " x = x + self.feedforward(x)\n", " return x" ] }, { "cell_type": "code", "execution_count": null, "id": "431beed0a02fc772", "metadata": {}, "outputs": [], "source": [ "MASTER_CONFIG" ] }, { "cell_type": "code", "execution_count": null, "id": "aa931e6781403fcc", "metadata": {}, "outputs": [], "source": [ "block = LlamaBlock(MASTER_CONFIG)\n", "block(torch.randn(MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'], MASTER_CONFIG['d_model']));" ] }, { "cell_type": "code", "execution_count": null, "id": "e26360bd91af5bae", "metadata": {}, "outputs": [], "source": [ "# Create an instance of the LlamaBlock class with the provided configuration\n", "block = LlamaBlock(MASTER_CONFIG)\n", "\n", "# Generate a random tensor with the specified batch size, context window, and model dimension\n", "random_input = torch.randn(MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'], MASTER_CONFIG['d_model'])\n", "\n", "# Apply the LlamaBlock to the random input tensor\n", "output = block(random_input)" ] }, { "cell_type": "code", "execution_count": null, "id": "f84fb6ee09aadea5", "metadata": {}, "outputs": [], "source": [ "from collections import OrderedDict\n", "\n", "MASTER_CONFIG.update({\n", " 'n_layers': 4,\n", "})\n", "\n", "class Llama(nn.Module):\n", " def __init__(self, config):\n", " super().__init__()\n", " self.config = config\n", " # Embedding layer for token representations\n", " self.embeddings = nn.Embedding(config['vocab_size'], config['d_model'])\n", " # Sequential block of LlamaBlocks based on the specified number of layers\n", " self.llama_blocks = nn.Sequential(\n", " OrderedDict([(f\"llama_{i}\", LlamaBlock(config)) for i in range(config['n_layers'])])\n", " )\n", " # Feedforward network (FFN) for final output\n", " self.ffn = nn.Sequential(\n", " nn.Linear(config['d_model'], config['d_model']),\n", " SwiGLU(config['d_model']),\n", " nn.Linear(config['d_model'], config['vocab_size']),\n", " )\n", "\n", " # Print total number of parameters in the model\n", " print(\"model params:\", sum([m.numel() for m in self.parameters()]))\n", "\n", " def forward(self, idx, targets=None):\n", " # Input token indices are passed through the embedding layer\n", " x = self.embeddings(idx)\n", " # Process the input through the LlamaBlocks\n", " x = self.llama_blocks(x)\n", " # Pass the processed input through the final FFN for output logits\n", " logits = self.ffn(x)\n", "\n", " # If targets are not provided, return only the logits\n", " if targets is None:\n", " return logits\n", " # If targets are provided, compute and return the cross-entropy loss\n", " else:\n", " loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))\n", " return logits, loss" ] }, { "cell_type": "code", "execution_count": null, "id": "a0ff7f505eddaefd", "metadata": {}, "outputs": [], "source": [ "# Create an instance of RopeModel (RMSNorm, RoPE, Multi-Head, SwiGLU, N_layers)\n", "llama = Llama(MASTER_CONFIG)\n", "\n", "# Obtain batches for training\n", "xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])\n", "\n", "# Calculate logits and loss using the model\n", "logits, loss = llama(xs, ys)\n", "\n", "# Define the Adam optimizer for model parameters\n", "optimizer = torch.optim.Adam(llama.parameters())\n", "\n", "# Train the model\n", "train(llama, optimizer)" ] }, { "cell_type": "code", "execution_count": null, "id": "ca7605a6b36f613d", "metadata": {}, "outputs": [], "source": [ "# Update the number of epochs in the configuration\n", "MASTER_CONFIG.update({\n", " 'epochs': 10000,\n", "})\n", "# Train the LLaMA model for the specified number of epochs\n", "train(llama, optimizer, scheduler=None, config=MASTER_CONFIG)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" } }, "nbformat": 4, "nbformat_minor": 5 }