forked from jaymody/picoGPT
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gpt2.py
126 lines (80 loc) · 4.72 KB
/
gpt2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import math
import np_replace
def gelu(array):
if not isinstance(array, (int, float, list)):
arraytemp = array.tolist()
else:
arraytemp = array
return [gelu(x) for x in arraytemp] if isinstance(arraytemp, list) else (0.5 * arraytemp * (1 + math.tanh(math.sqrt(2 / math.pi) * (arraytemp + 0.044715 * arraytemp**3))))
def softmax(x):
exp_x = np_replace.matexp(np_replace.matsub(x, np_replace.matmax(x)))
return np_replace.matdiv(exp_x, np_replace.matsum(exp_x))
def layer_norm(x, g, b, eps: float = 1e-5):
mean = np_replace.matmean(x)
variance = np_replace.matvar(x)
x = np_replace.matdiv(np_replace.matsub(x, mean), [[math.sqrt(i[0] + eps)] for i in variance]) # normalize x to have mean=0 and var=1 over last axis
return np_replace.matadd(np_replace.matmul_elem(g.tolist(), x), b) # scale and offset with gamma/beta params
def linear(x, w, b): # [m, in], [in, out], [out] -> [m, out]
return np_replace.matadd(np_replace.matmul(x, w), b)
def ffn(x, c_fc, c_proj): # [n_seq, n_embd] -> [n_seq, n_embd]
# project up
a = gelu(linear(x, **c_fc)) # [n_seq, n_embd] -> [n_seq, 4*n_embd]
# project back down
x = linear(a, **c_proj) # [n_seq, 4*n_embd] -> [n_seq, n_embd]
return x
def attention(q, k, v, mask): # [n_q, d_k], [n_k, d_k], [n_k, d_v], [n_q, n_k] -> [n_q, d_v]
return np_replace.matmul(softmax(np_replace.matadd(np_replace.matdiv(np_replace.matmul(q, np_replace.mattranspose(k)), [[math.sqrt(len(q[0]))]]), mask)), v)
def mha(x, c_attn, c_proj, n_head): # [n_seq, n_embd] -> [n_seq, n_embd]
# qkv projection
x = linear(x, **c_attn) # [n_seq, n_embd] -> [n_seq, 3*n_embd]
# split into qkv
qkv = np_replace.matsplit(x, 3) # [n_seq, 3*n_embd] -> [3, n_seq, n_embd]
# split into heads
qkv_heads = list(map(lambda x: np_replace.matsplit(x, n_head), qkv)) # [3, n_seq, n_embd] -> [3, n_head, n_seq, n_embd/n_head]
# causal mask to hide future inputs from being attended to
causal_mask = np_replace.mattri(len(x)) # [n_seq, n_seq]
# perform attention over each head
out_heads = [attention(q, k, v, causal_mask) for q, k, v in zip(*qkv_heads)] # [3, n_head, n_seq, n_embd/n_head] -> [n_head, n_seq, n_embd/n_head]
# merge heads
x = np_replace.mathstack(out_heads) # [n_head, n_seq, n_embd/n_head] -> [n_seq, n_embd]
# out projection
x = linear(x, **c_proj) # [n_seq, n_embd] -> [n_seq, n_embd]
return x
def transformer_block(x, mlp, attn, ln_1, ln_2, n_head): # [n_seq, n_embd] -> [n_seq, n_embd]
# multi-head causal self attention
x = np_replace.matadd(x, mha(layer_norm(x, **ln_1), **attn, n_head=n_head)) # [n_seq, n_embd] -> [n_seq, n_embd]
# position-wise feed forward network
x = np_replace.matadd(x, ffn(layer_norm(x, **ln_2), **mlp)) # [n_seq, n_embd] -> [n_seq, n_embd]
return x
def gpt2(inputs, wte, wpe, blocks, ln_f, n_head): # [n_seq] -> [n_seq, n_vocab]
# token + positional embeddings
x = np_replace.matadd(wte[inputs].tolist(), wpe[range(len(inputs))].tolist()) # [n_seq] -> [n_seq, n_embd]
# forward pass through n_layer transformer blocks
for block in blocks:
x = transformer_block(x, **block, n_head=n_head) # [n_seq, n_embd] -> [n_seq, n_embd]
# projection to vocab
x = layer_norm(x, **ln_f) # [n_seq, n_embd] -> [n_seq, n_embd]
return np_replace.matmul(x, np_replace.mattranspose(wte)) # [n_seq, n_embd] -> [n_seq, n_vocab]
def generate(inputs, params, n_head, n_tokens_to_generate):
from tqdm import tqdm
for _ in tqdm(range(n_tokens_to_generate), "generating"): # auto-regressive decode loop
logits = gpt2(inputs, **params, n_head=n_head) # model forward pass
next_id = logits[-1].index(max(logits[-1])) # greedy sampling
inputs.append(int(next_id)) # append prediction to input
return inputs[len(inputs) - n_tokens_to_generate :] # only return generated ids
def main(prompt: str, n_tokens_to_generate: int = 40, model_size: str = "124M", models_dir: str = "models"):
from utils import load_encoder_hparams_and_params
# load encoder, hparams, and params from the released open-ai gpt-2 files
encoder, hparams, params = load_encoder_hparams_and_params(model_size, models_dir)
# encode the input string using the BPE tokenizer
input_ids = encoder.encode(prompt)
# make sure we are not surpassing the max sequence length of our model
assert len(input_ids) + n_tokens_to_generate < hparams["n_ctx"]
# generate output ids
output_ids = generate(input_ids, params, hparams["n_head"], n_tokens_to_generate)
# decode the ids back into a string
output_text = encoder.decode(output_ids)
return output_text
if __name__ == "__main__":
import fire
fire.Fire(main)