kshama/million-labs.ipynb
2025-07-09 01:14:05 +06:00

951 lines
33 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "initial_id",
"metadata": {
"collapsed": true,
"jupyter": {
"outputs_hidden": true
}
},
"outputs": [],
"source": [
"# PyTorch for implementing LLM (No GPU)\n",
"import torch\n",
"\n",
"# Neural network modules and functions from PyTorch\n",
"from torch import nn\n",
"from torch.nn import functional as F\n",
"\n",
"# NumPy for numerical operations\n",
"import numpy as np\n",
"\n",
"# Matplotlib for plotting Loss etc.\n",
"from matplotlib import pyplot as plt\n",
"\n",
"# Time module for tracking execution time\n",
"import time\n",
"\n",
"# Pandas for data manipulation and analysis\n",
"import pandas as pd\n",
"\n",
"# urllib for handling URL requests (Downloading Dataset)\n",
"import urllib.request"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ded604c1a4e80155",
"metadata": {},
"outputs": [],
"source": [
"torch.set_default_device('cuda')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "97afd4d11e5d5391",
"metadata": {},
"outputs": [],
"source": [
"# Configuration object for model parameters\n",
"MASTER_CONFIG = {\n",
" # Adding parameters later\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e4c23d3282151461",
"metadata": {},
"outputs": [],
"source": [
"# The URL of the raw text file on GitHub\n",
"url = \"https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt\"\n",
"\n",
"# The file name for local storage\n",
"file_name = \"tinyshakespeare.txt\"\n",
"\n",
"# Execute the download\n",
"urllib.request.urlretrieve(url, file_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a1f50a516659363",
"metadata": {},
"outputs": [],
"source": [
"# Read the content of the dataset\n",
"lines = open(\"tinyshakespeare.txt\", 'r').read()\n",
"\n",
"# Create a sorted list of unique characters in the dataset\n",
"vocab = sorted(list(set(lines)))\n",
"\n",
"# Display the first 10 characters in the vocabulary list\n",
"print('Printing the first 10 characters of the vocab list:', vocab[:10])\n",
"\n",
"# Output the total number of characters in our dataset (Vocabulary Size)\n",
"print('Total number of characters in our dataset (Vocabulary Size):', len(vocab))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5e898838024633e3",
"metadata": {},
"outputs": [],
"source": [
"# Mapping integers to characters (itos)\n",
"itos = {i: ch for i, ch in enumerate(vocab)}\n",
"\n",
"# Mapping characters to integers (stoi)\n",
"stoi = {ch: i for i, ch in enumerate(vocab)}"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3a75bf8384dae76a",
"metadata": {},
"outputs": [],
"source": [
"# Encode function: Converts a string to a list of integers using the mapping stoi\n",
"def encode(s):\n",
" return [stoi[ch] for ch in s]\n",
"\n",
"# Decode function: Converts a list of integers back to a string using the mapping itos\n",
"def decode(l):\n",
" return ''.join([itos[i] for i in l])\n",
"\n",
"# Example: Encode the string \"hello\" and then decode the result\n",
"decode(encode(\"morning\"))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7179c91e90c32ec6",
"metadata": {},
"outputs": [],
"source": [
"# Convert the dataset into a torch tensor with specified data type (dtype)\n",
"dataset = torch.tensor(encode(lines), dtype=torch.int8)\n",
"\n",
"# adding the vocab size\n",
"MASTER_CONFIG = {\n",
" \"vocab_size\": len(vocab),\n",
"}\n",
"\n",
"# Display the shape of the resulting tensor\n",
"print(dataset.shape)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2d6c3c9d114ed620",
"metadata": {},
"outputs": [],
"source": [
"# Function to get batches for training, validation, or testing\n",
"def get_batches(data, split, batch_size, context_window, config=MASTER_CONFIG):\n",
" # Split the dataset into training, validation, and test sets\n",
" train = data[:int(.8 * len(data))]\n",
" val = data[int(.8 * len(data)): int(.9 * len(data))]\n",
" test = data[int(.9 * len(data)):]\n",
"\n",
" # Determine which split to use\n",
" batch_data = train\n",
" if split == 'val':\n",
" batch_data = val\n",
" if split == 'test':\n",
" batch_data = test\n",
"\n",
" # Pick random starting points within the data\n",
" ix = torch.randint(0, batch_data.size(0) - context_window - 1, (batch_size,))\n",
"\n",
" # Create input sequences (x) and corresponding target sequences (y)\n",
" x = torch.stack([batch_data[i:i+context_window] for i in ix]).long()\n",
" y = torch.stack([batch_data[i+1:i+context_window+1] for i in ix]).long()\n",
"\n",
" return x, y"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a61c3a0dd3339a42",
"metadata": {},
"outputs": [],
"source": [
"# Update the MASTER_CONFIG with batch_size and context_window parameters\n",
"MASTER_CONFIG.update({\n",
" 'batch_size': 8, # Number of batches to be processed at each random split\n",
" 'context_window': 16 # Number of characters in each input (x) and target (y) sequence of each batch\n",
"})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bf9bdacdfc815807",
"metadata": {},
"outputs": [],
"source": [
"# Obtain batches for training using the specified batch size and context window\n",
"xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])\n",
"\n",
"# Decode the sequences to obtain the corresponding text representations\n",
"decoded_samples = [(decode(xs[i].tolist()), decode(ys[i].tolist())) for i in range(len(xs))]\n",
"\n",
"# Print the random sample\n",
"print(decoded_samples)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bfbe5a734e783ac6",
"metadata": {},
"outputs": [],
"source": [
"@torch.no_grad() # Don't compute gradients for this function\n",
"def evaluate_loss(model, config=MASTER_CONFIG):\n",
" # Placeholder for the evaluation results\n",
" out = {}\n",
"\n",
" # Set the model to evaluation mode\n",
" model.eval()\n",
"\n",
" # Iterate through training and validation splits\n",
" for split in [\"train\", \"val\"]:\n",
" # Placeholder for individual losses\n",
" losses = []\n",
"\n",
" # Generate 10 batches for evaluation\n",
" for _ in range(10):\n",
" # Get input sequences (xb) and target sequences (yb)\n",
" xb, yb = get_batches(dataset, split, config['batch_size'], config['context_window'])\n",
"\n",
" # Perform model inference and calculate the loss\n",
" _, loss = model(xb, yb)\n",
"\n",
" # Append the loss to the list\n",
" losses.append(loss.item())\n",
"\n",
" # Calculate the mean loss for the split and store it in the output dictionary\n",
" out[split] = np.mean(losses)\n",
"\n",
" # Set the model back to training mode\n",
" model.train()\n",
"\n",
" return out"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a606f9ba323d4389",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "5a8930d6083f2930",
"metadata": {},
"outputs": [],
"source": [
"# Function to perform training\n",
"def train(model, optimizer, scheduler=None, config=MASTER_CONFIG, print_logs=False):\n",
" # Placeholder for storing losses\n",
" losses = []\n",
"\n",
" # Start tracking time\n",
" start_time = time.time()\n",
"\n",
" # Iterate through epochs\n",
" for epoch in range(config['epochs']):\n",
" # Zero out gradients\n",
" optimizer.zero_grad()\n",
"\n",
" # Obtain batches for training\n",
" xs, ys = get_batches(dataset, 'train', config['batch_size'], config['context_window'])\n",
"\n",
" # Forward pass through the model to calculate logits and loss\n",
" logits, loss = model(xs, targets=ys)\n",
"\n",
" # Backward pass and optimization step\n",
" loss.backward()\n",
" optimizer.step()\n",
"\n",
" # If a learning rate scheduler is provided, adjust the learning rate\n",
" if scheduler:\n",
" scheduler.step()\n",
"\n",
" # Log progress every specified interval\n",
" if epoch % config['log_interval'] == 0:\n",
" # Calculate batch time\n",
" batch_time = time.time() - start_time\n",
"\n",
" # Evaluate loss on validation set\n",
" x = evaluate_loss(model)\n",
"\n",
" # Store the validation loss\n",
" losses += [x]\n",
"\n",
" # Print progress logs if specified\n",
" if print_logs:\n",
" print(f\"Epoch {epoch} | val loss {x['val']:.3f} | Time {batch_time:.3f} | ETA in seconds {batch_time * (config['epochs'] - epoch)/config['log_interval'] :.3f}\")\n",
"\n",
" # Reset the timer\n",
" start_time = time.time()\n",
"\n",
" # Print learning rate if a scheduler is provided\n",
" if scheduler:\n",
" print(\"lr: \", scheduler.get_lr())\n",
"\n",
" # Print the final validation loss\n",
" print(\"Validation loss: \", losses[-1]['val'])\n",
"\n",
" # Plot the training and validation loss curves\n",
" return pd.DataFrame(losses).plot()\n",
"\n",
"# Execute the training process\n",
"# train(model, optimizer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3889f433a82e96e5",
"metadata": {},
"outputs": [],
"source": [
"class RMSNorm(nn.Module):\n",
" def __init__(self, layer_shape, eps=1e-8, bias=False):\n",
" super(RMSNorm, self).__init__()\n",
"\n",
" # Registering a learnable parameter 'scale' as a parameter of the module\n",
" self.register_parameter(\"scale\", nn.Parameter(torch.ones(layer_shape)))\n",
"\n",
" def forward(self, x):\n",
" \"\"\"\n",
" Assumes shape is (batch, seq_len, d_model)\n",
" \"\"\"\n",
" # Calculating the Frobenius norm, RMS = 1/sqrt(N) * Frobenius norm\n",
" ff_rms = torch.linalg.norm(x, dim=(1,2)) * x[0].numel() ** -.5\n",
"\n",
" # Normalizing the input tensor 'x' with respect to RMS\n",
" raw = x / ff_rms.unsqueeze(-1).unsqueeze(-1)\n",
"\n",
" # Scaling the normalized tensor using the learnable parameter 'scale'\n",
" return self.scale[:x.shape[1], :].unsqueeze(0) * raw"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a232f8fb217d8a09",
"metadata": {},
"outputs": [],
"source": [
"def get_rotary_matrix(context_window, embedding_dim):\n",
" # Initialize a tensor for the rotary matrix with zeros\n",
" R = torch.zeros((context_window, embedding_dim, embedding_dim), requires_grad=False)\n",
"\n",
" # Loop through each position in the context window\n",
" for position in range(context_window):\n",
" # Loop through each dimension in the embedding\n",
" for i in range(embedding_dim // 2):\n",
" # Calculate the rotation angle (theta) based on the position and embedding dimension\n",
" theta = 10000. ** (-2. * (i - 1) / embedding_dim)\n",
" # Calculate the rotated matrix elements using sine and cosine functions\n",
" m_theta = position * theta\n",
" R[position, 2 * i, 2 * i] = np.cos(m_theta)\n",
" R[position, 2 * i, 2 * i + 1] = -np.sin(m_theta)\n",
" R[position, 2 * i + 1, 2 * i] = np.sin(m_theta)\n",
" R[position, 2 * i + 1, 2 * i + 1] = np.cos(m_theta)\n",
" return R"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "10aedc140006fa95",
"metadata": {},
"outputs": [],
"source": [
"class RoPEMaskedAttentionHead(nn.Module):\n",
" def __init__(self, config):\n",
" super().__init__()\n",
" self.config = config\n",
" # Linear transformation for query\n",
" self.w_q = nn.Linear(config['d_model'], config['d_model'], bias=False)\n",
" # Linear transformation for key\n",
" self.w_k = nn.Linear(config['d_model'], config['d_model'], bias=False)\n",
" # Linear transformation for value\n",
" self.w_v = nn.Linear(config['d_model'], config['d_model'], bias=False)\n",
" # Obtain rotary matrix for positional embeddings\n",
" self.R = get_rotary_matrix(config['context_window'], config['d_model'])\n",
"\n",
" def get_rotary_matrix(context_window, embedding_dim):\n",
" # Initialize a tensor for the rotary matrix with zeros\n",
" R = torch.zeros((context_window, embedding_dim, embedding_dim), requires_grad=False)\n",
"\n",
" # Loop through each position in the context window\n",
" for position in range(context_window):\n",
" # Loop through each dimension in the embedding\n",
" for i in range(embedding_dim // 2):\n",
" # Calculate the rotation angle (theta) based on the position and embedding dimension\n",
" theta = 10000. ** (-2. * (i - 1) / embedding_dim)\n",
" # Calculate the rotated matrix elements using sine and cosine functions\n",
" m_theta = position * theta\n",
" R[position, 2 * i, 2 * i] = np.cos(m_theta)\n",
" R[position, 2 * i, 2 * i + 1] = -np.sin(m_theta)\n",
" R[position, 2 * i + 1, 2 * i] = np.sin(m_theta)\n",
" R[position, 2 * i + 1, 2 * i + 1] = np.cos(m_theta)\n",
" return R\n",
"\n",
" def forward(self, x, return_attn_weights=False):\n",
" # x: input tensor of shape (batch, sequence length, dimension)\n",
"\n",
" b, m, d = x.shape # batch size, sequence length, dimension\n",
"\n",
" # Linear transformations for Q, K, and V\n",
" q = self.w_q(x)\n",
" k = self.w_k(x)\n",
" v = self.w_v(x)\n",
"\n",
" # Rotate Q and K using the RoPE matrix\n",
" q_rotated = (torch.bmm(q.transpose(0, 1), self.R[:m])).transpose(0, 1)\n",
" k_rotated = (torch.bmm(k.transpose(0, 1), self.R[:m])).transpose(0, 1)\n",
"\n",
" # Perform scaled dot-product attention\n",
" activations = F.scaled_dot_product_attention(\n",
" q_rotated, k_rotated, v, dropout_p=0.1, is_causal=True\n",
" )\n",
"\n",
" if return_attn_weights:\n",
" # Create a causal attention mask\n",
" attn_mask = torch.tril(torch.ones((m, m)), diagonal=0)\n",
" # Calculate attention weights and add causal mask\n",
" attn_weights = torch.bmm(q_rotated, k_rotated.transpose(1, 2)) / np.sqrt(d) + attn_mask\n",
" attn_weights = F.softmax(attn_weights, dim=-1)\n",
" return activations, attn_weights\n",
"\n",
" return activations"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1e919117e21d94e3",
"metadata": {},
"outputs": [],
"source": [
"class RoPEMaskedAttentionHead(nn.Module):\n",
" def __init__(self, config):\n",
" super().__init__()\n",
" self.config = config\n",
" # Linear transformation for query\n",
" self.w_q = nn.Linear(config['d_model'], config['d_model'], bias=False)\n",
" # Linear transformation for key\n",
" self.w_k = nn.Linear(config['d_model'], config['d_model'], bias=False)\n",
" # Linear transformation for value\n",
" self.w_v = nn.Linear(config['d_model'], config['d_model'], bias=False)\n",
" # Obtain rotary matrix for positional embeddings\n",
" self.R = get_rotary_matrix(config['context_window'], config['d_model'])\n",
"\n",
" def get_rotary_matrix(context_window, embedding_dim):\n",
" # Initialize a tensor for the rotary matrix with zeros\n",
" R = torch.zeros((context_window, embedding_dim, embedding_dim), requires_grad=False)\n",
"\n",
" # Loop through each position in the context window\n",
" for position in range(context_window):\n",
" # Loop through each dimension in the embedding\n",
" for i in range(embedding_dim // 2):\n",
" # Calculate the rotation angle (theta) based on the position and embedding dimension\n",
" theta = 10000. ** (-2. * (i - 1) / embedding_dim)\n",
" # Calculate the rotated matrix elements using sine and cosine functions\n",
" m_theta = position * theta\n",
" R[position, 2 * i, 2 * i] = np.cos(m_theta)\n",
" R[position, 2 * i, 2 * i + 1] = -np.sin(m_theta)\n",
" R[position, 2 * i + 1, 2 * i] = np.sin(m_theta)\n",
" R[position, 2 * i + 1, 2 * i + 1] = np.cos(m_theta)\n",
" return R\n",
"\n",
" def forward(self, x, return_attn_weights=False):\n",
" # x: input tensor of shape (batch, sequence length, dimension)\n",
"\n",
" b, m, d = x.shape # batch size, sequence length, dimension\n",
"\n",
" # Linear transformations for Q, K, and V\n",
" q = self.w_q(x)\n",
" k = self.w_k(x)\n",
" v = self.w_v(x)\n",
"\n",
" # Rotate Q and K using the RoPE matrix\n",
" q_rotated = (torch.bmm(q.transpose(0, 1), self.R[:m])).transpose(0, 1)\n",
" k_rotated = (torch.bmm(k.transpose(0, 1), self.R[:m])).transpose(0, 1)\n",
"\n",
" # Perform scaled dot-product attention\n",
" activations = F.scaled_dot_product_attention(\n",
" q_rotated, k_rotated, v, dropout_p=0.1, is_causal=True\n",
" )\n",
"\n",
" if return_attn_weights:\n",
" # Create a causal attention mask\n",
" attn_mask = torch.tril(torch.ones((m, m)), diagonal=0)\n",
" # Calculate attention weights and add causal mask\n",
" attn_weights = torch.bmm(q_rotated, k_rotated.transpose(1, 2)) / np.sqrt(d) + attn_mask\n",
" attn_weights = F.softmax(attn_weights, dim=-1)\n",
" return activations, attn_weights\n",
"\n",
" return activations"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f62b1d235695764c",
"metadata": {},
"outputs": [],
"source": [
"class RoPEMaskedMultiheadAttention(nn.Module):\n",
" def __init__(self, config):\n",
" super().__init__()\n",
" self.config = config\n",
" # Create a list of RoPEMaskedAttentionHead instances as attention heads\n",
" self.heads = nn.ModuleList([\n",
" RoPEMaskedAttentionHead(config) for _ in range(config['n_heads'])\n",
" ])\n",
" self.linear = nn.Linear(config['n_heads'] * config['d_model'], config['d_model']) # Linear layer after concatenating heads\n",
" self.dropout = nn.Dropout(.1) # Dropout layer\n",
"\n",
" def forward(self, x):\n",
" # x: input tensor of shape (batch, sequence length, dimension)\n",
"\n",
" # Process each attention head and concatenate the results\n",
" heads = [h(x) for h in self.heads]\n",
" x = torch.cat(heads, dim=-1)\n",
"\n",
" # Apply linear transformation to the concatenated output\n",
" x = self.linear(x)\n",
"\n",
" # Apply dropout\n",
" x = self.dropout(x)\n",
" return x"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e9236f037781b069",
"metadata": {},
"outputs": [],
"source": [
"# Update the master configuration with the number of attention heads\n",
"MASTER_CONFIG.update({\n",
" 'n_heads': 8,\n",
"})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ec0ca4d23bdd9fd4",
"metadata": {},
"outputs": [],
"source": [
"class RopeModel(nn.Module):\n",
" def __init__(self, config):\n",
" super().__init__()\n",
" self.config = config\n",
"\n",
" # Embedding layer for input tokens\n",
" self.embedding = nn.Embedding(config['vocab_size'], config['d_model'])\n",
"\n",
" # RMSNorm layer for pre-normalization\n",
" self.rms = RMSNorm((config['context_window'], config['d_model']))\n",
"\n",
" # RoPEMaskedMultiheadAttention layer\n",
" self.rope_attention = RoPEMaskedMultiheadAttention(config)\n",
"\n",
" # Linear layer followed by ReLU activation\n",
" self.linear = nn.Sequential(\n",
" nn.Linear(config['d_model'], config['d_model']),\n",
" nn.ReLU(),\n",
" )\n",
"\n",
" # Final linear layer for prediction\n",
" self.last_linear = nn.Linear(config['d_model'], config['vocab_size'])\n",
"\n",
" print(\"model params:\", sum([m.numel() for m in self.parameters()]))\n",
"\n",
" def forward(self, idx, targets=None):\n",
" # idx: input indices\n",
" x = self.embedding(idx)\n",
"\n",
" # One block of attention\n",
" x = self.rms(x) # RMS pre-normalization\n",
" x = x + self.rope_attention(x)\n",
"\n",
" x = self.rms(x) # RMS pre-normalization\n",
" x = x + self.linear(x)\n",
"\n",
" logits = self.last_linear(x)\n",
"\n",
" if targets is not None:\n",
" loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))\n",
" return logits, loss\n",
"\n",
" else:\n",
" return logits"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "95271a5b0b29b43c",
"metadata": {},
"outputs": [],
"source": [
"# Create an instance of RopeModel (RMSNorm, RoPE, Multi-Head)\n",
"model = RopeModel(MASTER_CONFIG)\n",
"\n",
"# Obtain batches for training\n",
"xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])\n",
"\n",
"# Calculate logits and loss using the model\n",
"logits, loss = model(xs, ys)\n",
"\n",
"# Define the Adam optimizer for model parameters\n",
"optimizer = torch.optim.Adam(model.parameters())\n",
"\n",
"# Train the model\n",
"train(model, optimizer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5b1afff55705a505",
"metadata": {},
"outputs": [],
"source": [
"# Updating training configuration with more epochs and a logging interval\n",
"MASTER_CONFIG.update({\n",
" \"epochs\": 5000,\n",
" \"log_interval\": 10,\n",
"})\n",
"\n",
"# Training the model with the updated configuration\n",
"train(model, optimizer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "44e3b30f92614023",
"metadata": {},
"outputs": [],
"source": [
"class SwiGLU(nn.Module):\n",
" \"\"\"\n",
" Swish-Gated Linear Unit\n",
" https://arxiv.org/pdf/2002.05202v1.pdf\n",
" \"\"\"\n",
" def __init__(self, size):\n",
" super().__init__()\n",
" self.linear_gate = nn.Linear(size, size)\n",
" self.linear = nn.Linear(size, size)\n",
" self.beta = torch.randn(1, requires_grad=True)\n",
"\n",
" self.beta = nn.Parameter(torch.ones(1))\n",
" self.register_parameter(\"beta\", self.beta)\n",
"\n",
" def forward(self, x):\n",
" swish_gate = self.linear_gate(x) * torch.sigmoid(self.beta * self.linear_gate(x))\n",
" out = swish_gate * self.linear(x)\n",
" return out"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3b46c7a9b8e6da8e",
"metadata": {},
"outputs": [],
"source": [
"class RopeModel(nn.Module):\n",
" def __init__(self, config):\n",
" super().__init__()\n",
" self.config = config\n",
"\n",
" # Embedding layer for input tokens\n",
" self.embedding = nn.Embedding(config['vocab_size'], config['d_model'])\n",
"\n",
" # RMSNorm layer for pre-normalization\n",
" self.rms = RMSNorm((config['context_window'], config['d_model']))\n",
"\n",
" # Multi-head attention layer with RoPE (Rotary Positional Embeddings)\n",
" self.rope_attention = RoPEMaskedMultiheadAttention(config)\n",
"\n",
" # Linear layer followed by SwiGLU activation\n",
" self.linear = nn.Sequential(\n",
" nn.Linear(config['d_model'], config['d_model']),\n",
" SwiGLU(config['d_model']), # Adding SwiGLU activation\n",
" )\n",
"\n",
" # Output linear layer\n",
" self.last_linear = nn.Linear(config['d_model'], config['vocab_size'])\n",
"\n",
" # Printing total model parameters\n",
" print(\"model params:\", sum([m.numel() for m in self.parameters()]))\n",
"\n",
" def forward(self, idx, targets=None):\n",
" x = self.embedding(idx)\n",
"\n",
" # One block of attention\n",
" x = self.rms(x) # RMS pre-normalization\n",
" x = x + self.rope_attention(x)\n",
"\n",
" x = self.rms(x) # RMS pre-normalization\n",
" x = x + self.linear(x) # Applying SwiGLU activation\n",
"\n",
" logits = self.last_linear(x)\n",
"\n",
" if targets is not None:\n",
" # Calculate cross-entropy loss if targets are provided\n",
" loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))\n",
" return logits, loss\n",
"\n",
" else:\n",
" return logits"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f8548a11ec0c40be",
"metadata": {},
"outputs": [],
"source": [
"# Create an instance of RopeModel (RMSNorm, RoPE, Multi-Head, SwiGLU)\n",
"model = RopeModel(MASTER_CONFIG)\n",
"\n",
"# Obtain batches for training\n",
"xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])\n",
"\n",
"# Calculate logits and loss using the model\n",
"logits, loss = model(xs, ys)\n",
"\n",
"# Define the Adam optimizer for model parameters\n",
"optimizer = torch.optim.Adam(model.parameters())\n",
"\n",
"# Train the model\n",
"train(model, optimizer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2622420706bbc513",
"metadata": {},
"outputs": [],
"source": [
"# Update model configurations for the number of layers\n",
"MASTER_CONFIG.update({\n",
" 'n_layers': 4, # Set the number of layers to 4\n",
"})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e7b49bf56c2f8b4b",
"metadata": {},
"outputs": [],
"source": [
"# add RMSNorm and residual conncection\n",
"class LlamaBlock(nn.Module):\n",
" def __init__(self, config):\n",
" super().__init__()\n",
" self.config = config\n",
"\n",
" self.rms = RMSNorm((config['context_window'], config['d_model']))\n",
"\n",
" self.attention = RoPEMaskedMultiheadAttention(config)\n",
" self.feedforward = nn.Sequential(\n",
" nn.Linear(config['d_model'], config['d_model']),\n",
" SwiGLU(config['d_model']),\n",
" )\n",
"\n",
" def forward(self, x):\n",
" x = self.rms(x) # rms pre-normalization\n",
" x = x + self.attention(x)\n",
"\n",
" x = self.rms(x) # rms pre-normalization\n",
" x = x + self.feedforward(x)\n",
" return x"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "431beed0a02fc772",
"metadata": {},
"outputs": [],
"source": [
"MASTER_CONFIG"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aa931e6781403fcc",
"metadata": {},
"outputs": [],
"source": [
"block = LlamaBlock(MASTER_CONFIG)\n",
"block(torch.randn(MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'], MASTER_CONFIG['d_model']));"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e26360bd91af5bae",
"metadata": {},
"outputs": [],
"source": [
"# Create an instance of the LlamaBlock class with the provided configuration\n",
"block = LlamaBlock(MASTER_CONFIG)\n",
"\n",
"# Generate a random tensor with the specified batch size, context window, and model dimension\n",
"random_input = torch.randn(MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'], MASTER_CONFIG['d_model'])\n",
"\n",
"# Apply the LlamaBlock to the random input tensor\n",
"output = block(random_input)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f84fb6ee09aadea5",
"metadata": {},
"outputs": [],
"source": [
"from collections import OrderedDict\n",
"\n",
"MASTER_CONFIG.update({\n",
" 'n_layers': 4,\n",
"})\n",
"\n",
"class Llama(nn.Module):\n",
" def __init__(self, config):\n",
" super().__init__()\n",
" self.config = config\n",
" # Embedding layer for token representations\n",
" self.embeddings = nn.Embedding(config['vocab_size'], config['d_model'])\n",
" # Sequential block of LlamaBlocks based on the specified number of layers\n",
" self.llama_blocks = nn.Sequential(\n",
" OrderedDict([(f\"llama_{i}\", LlamaBlock(config)) for i in range(config['n_layers'])])\n",
" )\n",
" # Feedforward network (FFN) for final output\n",
" self.ffn = nn.Sequential(\n",
" nn.Linear(config['d_model'], config['d_model']),\n",
" SwiGLU(config['d_model']),\n",
" nn.Linear(config['d_model'], config['vocab_size']),\n",
" )\n",
"\n",
" # Print total number of parameters in the model\n",
" print(\"model params:\", sum([m.numel() for m in self.parameters()]))\n",
"\n",
" def forward(self, idx, targets=None):\n",
" # Input token indices are passed through the embedding layer\n",
" x = self.embeddings(idx)\n",
" # Process the input through the LlamaBlocks\n",
" x = self.llama_blocks(x)\n",
" # Pass the processed input through the final FFN for output logits\n",
" logits = self.ffn(x)\n",
"\n",
" # If targets are not provided, return only the logits\n",
" if targets is None:\n",
" return logits\n",
" # If targets are provided, compute and return the cross-entropy loss\n",
" else:\n",
" loss = F.cross_entropy(logits.view(-1, self.config['vocab_size']), targets.view(-1))\n",
" return logits, loss"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a0ff7f505eddaefd",
"metadata": {},
"outputs": [],
"source": [
"# Create an instance of RopeModel (RMSNorm, RoPE, Multi-Head, SwiGLU, N_layers)\n",
"llama = Llama(MASTER_CONFIG)\n",
"\n",
"# Obtain batches for training\n",
"xs, ys = get_batches(dataset, 'train', MASTER_CONFIG['batch_size'], MASTER_CONFIG['context_window'])\n",
"\n",
"# Calculate logits and loss using the model\n",
"logits, loss = llama(xs, ys)\n",
"\n",
"# Define the Adam optimizer for model parameters\n",
"optimizer = torch.optim.Adam(llama.parameters())\n",
"\n",
"# Train the model\n",
"train(llama, optimizer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ca7605a6b36f613d",
"metadata": {},
"outputs": [],
"source": [
"# Update the number of epochs in the configuration\n",
"MASTER_CONFIG.update({\n",
" 'epochs': 10000,\n",
"})\n",
"# Train the LLaMA model for the specified number of epochs\n",
"train(llama, optimizer, scheduler=None, config=MASTER_CONFIG)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}