Skip to content

Commit

Permalink
Type annotations for LLOps
Browse files Browse the repository at this point in the history
  • Loading branch information
manu12121999 authored Dec 16, 2024
1 parent d712079 commit 8ab8ad2
Showing 1 changed file with 56 additions and 52 deletions.
108 changes: 56 additions & 52 deletions ctrl_c_nn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ class LLOps:
"""

@staticmethod
def fill(shape: tuple, value):
def fill(shape: tuple, value: (int, float)) -> list:
# Make a new list (of lists) filled with value
if len(shape) == 1:
return [value for _ in range(shape[0])]
else:
return [LLOps.fill(shape[1:], value) for _ in range(shape[0])]

@staticmethod
def fill_callable(shape: tuple, gen):
def fill_callable(shape: tuple, gen: callable) -> list:
# Make a new list (of lists) filled with values generated from the callable gen
if len(shape) == 1:
return [gen() for _ in range(shape[0])]
else:
return [LLOps.fill(shape[1:], gen()) for _ in range(shape[0])]

@staticmethod
def f_unary_op(a: list, f):
def f_unary_op(a: list, f: callable) -> list:
# input tensor output list of list
if not isinstance(a, list):
return f(a)
Expand All @@ -52,7 +52,7 @@ def f_unary_op(a: list, f):
return [LLOps.f_unary_op(a_i, f) for a_i in a]

@staticmethod
def f_binary_op_scalar(a: list, b: (int, float), op):
def f_binary_op_scalar(a: list, b: (int, float), op: callable) -> list:
# Add a scalar to a list (of lists). Other operations than add are supported too
if isinstance(a, (int, float)):
return op(a, b)
Expand All @@ -62,30 +62,30 @@ def f_binary_op_scalar(a: list, b: (int, float), op):
return [LLOps.f_binary_op_scalar(a_i, b, op) for a_i in a]

@staticmethod
def f_binary_op_same_size(a: list, b: list, op):
def f_binary_op_same_size(a: list, b: list, op: callable) -> list:
# Add two list (of lists). Other operations than add are supported too
if isinstance(a[0], (int, float)):
return [op(a_i, b_i) for a_i, b_i in zip(a, b)]
else:
return [LLOps.f_binary_op_same_size(a_i, b_i, op) for a_i, b_i in zip(a, b)]

@staticmethod
def f_add_same_size_performance(a: list, b: list):
def f_add_same_size_performance(a: list, b: list) -> list:
# Add two list (of lists). Only used to test the performance of different implementations
if isinstance(a[0], (int, float)):
return [a_i + b_i for a_i, b_i in zip(a, b)]
else:
return [LLOps.f_add_same_size_performance(a_i, b_i) for a_i, b_i in zip(a, b)]

@staticmethod
def f_transpose_2d(a: list):
def f_transpose_2d(a: list) -> list:
# Transpose a 2-dimensional list
# (I,J) -> (J,I)
I, J = len(a), len(a[0])
return [[a[i][j] for i in range(I)] for j in range(J)]

@staticmethod
def f_matmul_2d(a: list, b: list):
def f_matmul_2d(a: list, b: list) -> list:
# perform matrix multiplication on two 2-dimensional lists
# (I,K) @ (K, J) -> (I,J)
I, K, K2, J = len(a), len(a[0]), len(b), len(b[0])
Expand All @@ -94,13 +94,13 @@ def f_matmul_2d(a: list, b: list):
return [[sumprod(a_i, b_T_j) for b_T_j in b_T] for a_i in a]

@staticmethod
def f_matmul_transposed_2d(a: list, bT: list):
def f_matmul_transposed_2d(a: list, bT: list) -> list:
# perform matrix multiplication on two 2-dimensional lists
# (I,K) @ (J, K).T -> (I,J)
return [[sumprod(a_i, bT_j) for bT_j in bT] for a_i in a]

@staticmethod
def f_matmul_2d_multiprocess(a: list, b: list):
def f_matmul_2d_multiprocess(a: list, b: list) -> list:
# perform matrix multiplication on two 2-dimensional lists
# (I,K) @ (K, J) -> (I,J)
I, K, K2, J = len(a), len(a[0]), len(b), len(b[0])
Expand All @@ -109,7 +109,7 @@ def f_matmul_2d_multiprocess(a: list, b: list):
return p.starmap(LLOps.f_vec_times_mat, ((a_i, b) for a_i in a), chunksize=max(1, I//8))

@staticmethod
def f_matmul_2d_old(a: list, b: list):
def f_matmul_2d_old(a: list, b: list) -> list:
# perform matrix multiplication on two 2-dimensional lists
# (I,K) @ (K, J) -> (I,J)
I, K, K2, J = len(a), len(a[0]), len(b), len(b[0])
Expand All @@ -128,23 +128,23 @@ def f_matmul_2d_old(a: list, b: list):
return result

@staticmethod
def f_vec_times_vec(a: list, b: list):
# perform vector times matrix multiplication on a 1-dimensional list and another 2-dimensional lists
def f_vec_times_vec(a: list, b: list) -> (int, float):
# perform vector times matrix multiplication on a 1-dimensional list and another 1-dimensional lists
assert len(a) == len(b)
return sumprod(a, b)

@staticmethod
def f_mat_times_vec(a: list, b: list):
def f_mat_times_vec(a: list, b: list) -> list:
# perform matrix times vector multiplication on a 2-dimensional list and another 1-dimensional lists
return [LLOps.f_vec_times_vec(row, b) for row in a]

@staticmethod
def f_vec_times_mat(a: list, b: list):
def f_vec_times_mat(a: list, b: list) -> list:
# perform vector times vector multiplication on two 1-dimensional lists
return [LLOps.f_vec_times_vec(a, row) for row in LLOps.f_transpose_2d(b)]

@staticmethod
def f_squeeze(a: list, dim: int):
def f_squeeze(a: list, dim: int) -> (list, int, float):
# remove one dimension from a list of lists
if dim == 0:
return a[0]
Expand All @@ -154,7 +154,7 @@ def f_squeeze(a: list, dim: int):
return [LLOps.f_squeeze(a_i, dim-1) for a_i in a]

@staticmethod
def f_unsqueeze(a: list, dim: int):
def f_unsqueeze(a: list, dim: int) -> list:
# add one dimension to a list of lists
if dim == 0:
return [a]
Expand All @@ -164,7 +164,7 @@ def f_unsqueeze(a: list, dim: int):
return [LLOps.f_unsqueeze(a_i, dim-1) for a_i in a]

@staticmethod
def f_slice(a: list, item: tuple):
def f_slice(a: list, item: tuple) -> list:
# Return a slice of the list of lists (e.g. a[0] or a[:, 3:2])
if len(item) == 1:
return a[item[0]]
Expand All @@ -176,7 +176,7 @@ def f_slice(a: list, item: tuple):
return LLOps.f_slice(a[index], item[1:])

@staticmethod
def f_flatten(a: list, dim=0):
def f_flatten(a: list, dim: int = 0) -> list:
# Flatten a list of lists into a single list starting at dim
if dim == 0:
if isinstance(a[0], list):
Expand All @@ -190,7 +190,7 @@ def f_flatten(a: list, dim=0):
return [LLOps.f_flatten(a_i, dim-1) for a_i in a]

@staticmethod
def f_calc_shape(a: list):
def f_calc_shape(a: list) -> list:
assert isinstance(a, list)
if not isinstance(a[0], list):
return [len(a)]
Expand All @@ -200,7 +200,7 @@ def f_calc_shape(a: list):
return l

@staticmethod
def f_setitem(a: list, key: tuple, value):
def f_setitem(a: list, key: tuple, value) -> list:
# set the item at position key of list a to a value. Value can be scalar or list. (a[key] = value)
if len(key) == 1:
a[key[0]] = value
Expand All @@ -220,7 +220,7 @@ def f_setitem(a: list, key: tuple, value):
return m

@staticmethod
def f_reshape_flattened(a: list, shape: tuple):
def f_reshape_flattened(a: list, shape: tuple) -> list:
# reshape a one-dimensional array (flattened) into a target format
if len(shape) == 1:
return a
Expand All @@ -229,7 +229,7 @@ def f_reshape_flattened(a: list, shape: tuple):
return [LLOps.f_reshape_flattened(a[i*n:(i+1)*n], shape[1:]) for i in range(shape[0])]

@staticmethod
def f_permute_201(a):
def f_permute_201(a: list) -> list:
# permute shape (H, W, C) to (C, H, W)
I, J, K = len(a), len(a[0]), len(a[0][0])

Expand All @@ -244,7 +244,7 @@ def f_advanced_indexing_1d(a: (list, tuple), b: (list, tuple)):
return tuple([a[b_i] for b_i in b])

@staticmethod
def f_reduction_sum(a, reduction_dim, a_shape):
def f_reduction_sum(a: list, reduction_dim: int, a_shape: tuple) -> (list, int, float):
# sum up the list (of lists) along the dimensions specified in shape
if reduction_dim == 0:
if len(a_shape) == 1:
Expand All @@ -258,7 +258,7 @@ def f_reduction_sum(a, reduction_dim, a_shape):
return [LLOps.f_reduction_sum(a_i, reduction_dim - 1, a_shape[1:]) for a_i in a]

@staticmethod
def f_reduction_max(a, reduction_dim, a_shape):
def f_reduction_max(a: list, reduction_dim: int, a_shape: tuple) -> (list, int, float):
# find the max of the list (of lists) along the dimensions specified in shape
if reduction_dim == 0:
if len(a_shape) == 1:
Expand All @@ -272,7 +272,7 @@ def f_reduction_max(a, reduction_dim, a_shape):
return [LLOps.f_reduction_max(a_i, reduction_dim - 1, a_shape[1:]) for a_i in a]

@staticmethod
def f_reduction_prod(a, reduction_dim, a_shape):
def f_reduction_prod(a: list, reduction_dim: int, a_shape: tuple) -> (list, int, float):
# calculate the product of the list (of lists) along the dimensions specified in shape
if reduction_dim == 0:
if len(a_shape) == 1:
Expand All @@ -286,7 +286,7 @@ def f_reduction_prod(a, reduction_dim, a_shape):
return [LLOps.f_reduction_prod(a_i, reduction_dim - 1, a_shape[1:]) for a_i in a]

@staticmethod
def f_stack(iterable, dim):
def f_stack(iterable: list, dim: int) -> list:
# stack iterables of lists along a certain dim. NOT working yet
if dim == 0:
return [a_i for a_i in iterable]
Expand All @@ -298,7 +298,7 @@ def f_stack(iterable, dim):
raise NotImplementedError

@staticmethod
def f_cat(iterable, dim):
def f_cat(iterable: list, dim: int) -> list:
# concatenate iterables of lists along a certain dim. ONLY working for dim 0 and 1
if dim == 0:
return [a_i_j for a_i in iterable for a_i_j in a_i]
Expand All @@ -312,7 +312,7 @@ def f_cat(iterable, dim):
class Tensor:
# Wrapper to use linalg operations on lists (of lists) (e.g. matmuls) in a nicer way

def __init__(self, elems):
def __init__(self, elems: (list, int)):
self.elems = elems

# calculate number of dimensions
Expand Down Expand Up @@ -516,14 +516,17 @@ def sqrt(self):
def exp(self):
return Tensor(LLOps.f_unary_op(self.elems, math.exp))

def __pow__(self, num):
def pow(self, num):
if num == 2:
return Tensor(LLOps.f_binary_op_same_size(self.elems, self.elems, operator.mul))
elif isinstance(num, int):
return self.apply(lambda x: x**num)
else:
return self.apply(lambda x: math.pow(x, num))

def __pow__(self, num):
return self.pow(num)

def clamp(self, low, high):
if low is None:
return Tensor(LLOps.f_unary_op(self.elems, lambda x: min(x, high)))
Expand Down Expand Up @@ -1080,29 +1083,30 @@ def png_decompress(data_bytes, width, height, n_channels):
@staticmethod
def read_png(path, resize=None, dimorder="HWC", num_channels=3, to_float=True, mean=(0.0, 0.0, 0.0), std=(1.0, 1.0, 1.0)):
with open(path, "br") as f:
data = b''
width, height, bit_depth, color_type_str, color_type_bytes = None, None, None, None, None
image_bytes = f.read()
# header = image_bytes[:8]
start = 8
while start is not None:
chunk_length = int.from_bytes(image_bytes[start:start+4], byteorder="big")
chunk_type = image_bytes[start+4:start+8]
chunk_data = image_bytes[start+8:start+8+chunk_length]
if chunk_type == b'IHDR':
width, height = int.from_bytes(chunk_data[:4], "big"), int.from_bytes(chunk_data[4:8], "big")
bit_depth, color_type = int.from_bytes(chunk_data[8:9], "big"), int.from_bytes(chunk_data[9:10], "big")
color_type_str = "RGB" if color_type == 2 else "GRAY" if color_type == 0 else "RGBA" if color_type == 6 else "OTHER"
color_type_bytes = 3 if color_type == 2 else 1 if color_type == 0 else 4 if color_type == 6 else None
assert bit_depth == 8

if chunk_type == b'IDAT':
data += chunk_data

start = start + 12 + chunk_length

if chunk_type == b'IEND':
start = None
data = b''
width, height, bit_depth, color_type_str, color_type_bytes = None, None, None, None, None

# header = image_bytes[:8]
start = 8
while start is not None:
chunk_length = int.from_bytes(image_bytes[start:start+4], byteorder="big")
chunk_type = image_bytes[start+4:start+8]
chunk_data = image_bytes[start+8:start+8+chunk_length]
if chunk_type == b'IHDR':
width, height = int.from_bytes(chunk_data[:4], "big"), int.from_bytes(chunk_data[4:8], "big")
bit_depth, color_type = int.from_bytes(chunk_data[8:9], "big"), int.from_bytes(chunk_data[9:10], "big")
color_type_str = "RGB" if color_type == 2 else "GRAY" if color_type == 0 else "RGBA" if color_type == 6 else "OTHER"
color_type_bytes = 3 if color_type == 2 else 1 if color_type == 0 else 4 if color_type == 6 else None
assert bit_depth == 8

if chunk_type == b'IDAT':
data += chunk_data

start = start + 12 + chunk_length

if chunk_type == b'IEND':
start = None
lines = ImageIO.png_decompress(data, width, height, color_type_bytes)
t = Tensor(lines)

Expand Down

0 comments on commit 8ab8ad2

Please sign in to comment.