diff --git a/ctrl_c_nn.py b/ctrl_c_nn.py index 639b9e4..ec86e25 100644 --- a/ctrl_c_nn.py +++ b/ctrl_c_nn.py @@ -26,7 +26,7 @@ class LLOps: """ @staticmethod - def fill(shape: tuple, value): + def fill(shape: tuple, value: (int, float)) -> list: # Make a new list (of lists) filled with value if len(shape) == 1: return [value for _ in range(shape[0])] @@ -34,7 +34,7 @@ def fill(shape: tuple, value): return [LLOps.fill(shape[1:], value) for _ in range(shape[0])] @staticmethod - def fill_callable(shape: tuple, gen): + def fill_callable(shape: tuple, gen: callable) -> list: # Make a new list (of lists) filled with values generated from the callable gen if len(shape) == 1: return [gen() for _ in range(shape[0])] @@ -42,7 +42,7 @@ def fill_callable(shape: tuple, gen): return [LLOps.fill(shape[1:], gen()) for _ in range(shape[0])] @staticmethod - def f_unary_op(a: list, f): + def f_unary_op(a: list, f: callable) -> list: # input tensor output list of list if not isinstance(a, list): return f(a) @@ -52,7 +52,7 @@ def f_unary_op(a: list, f): return [LLOps.f_unary_op(a_i, f) for a_i in a] @staticmethod - def f_binary_op_scalar(a: list, b: (int, float), op): + def f_binary_op_scalar(a: list, b: (int, float), op: callable) -> list: # Add a scalar to a list (of lists). Other operations than add are supported too if isinstance(a, (int, float)): return op(a, b) @@ -62,7 +62,7 @@ def f_binary_op_scalar(a: list, b: (int, float), op): return [LLOps.f_binary_op_scalar(a_i, b, op) for a_i in a] @staticmethod - def f_binary_op_same_size(a: list, b: list, op): + def f_binary_op_same_size(a: list, b: list, op: callable) -> list: # Add two list (of lists). Other operations than add are supported too if isinstance(a[0], (int, float)): return [op(a_i, b_i) for a_i, b_i in zip(a, b)] @@ -70,7 +70,7 @@ def f_binary_op_same_size(a: list, b: list, op): return [LLOps.f_binary_op_same_size(a_i, b_i, op) for a_i, b_i in zip(a, b)] @staticmethod - def f_add_same_size_performance(a: list, b: list): + def f_add_same_size_performance(a: list, b: list) -> list: # Add two list (of lists). Only used to test the performance of different implementations if isinstance(a[0], (int, float)): return [a_i + b_i for a_i, b_i in zip(a, b)] @@ -78,14 +78,14 @@ def f_add_same_size_performance(a: list, b: list): return [LLOps.f_add_same_size_performance(a_i, b_i) for a_i, b_i in zip(a, b)] @staticmethod - def f_transpose_2d(a: list): + def f_transpose_2d(a: list) -> list: # Transpose a 2-dimensional list # (I,J) -> (J,I) I, J = len(a), len(a[0]) return [[a[i][j] for i in range(I)] for j in range(J)] @staticmethod - def f_matmul_2d(a: list, b: list): + def f_matmul_2d(a: list, b: list) -> list: # perform matrix multiplication on two 2-dimensional lists # (I,K) @ (K, J) -> (I,J) I, K, K2, J = len(a), len(a[0]), len(b), len(b[0]) @@ -94,13 +94,13 @@ def f_matmul_2d(a: list, b: list): return [[sumprod(a_i, b_T_j) for b_T_j in b_T] for a_i in a] @staticmethod - def f_matmul_transposed_2d(a: list, bT: list): + def f_matmul_transposed_2d(a: list, bT: list) -> list: # perform matrix multiplication on two 2-dimensional lists # (I,K) @ (J, K).T -> (I,J) return [[sumprod(a_i, bT_j) for bT_j in bT] for a_i in a] @staticmethod - def f_matmul_2d_multiprocess(a: list, b: list): + def f_matmul_2d_multiprocess(a: list, b: list) -> list: # perform matrix multiplication on two 2-dimensional lists # (I,K) @ (K, J) -> (I,J) I, K, K2, J = len(a), len(a[0]), len(b), len(b[0]) @@ -109,7 +109,7 @@ def f_matmul_2d_multiprocess(a: list, b: list): return p.starmap(LLOps.f_vec_times_mat, ((a_i, b) for a_i in a), chunksize=max(1, I//8)) @staticmethod - def f_matmul_2d_old(a: list, b: list): + def f_matmul_2d_old(a: list, b: list) -> list: # perform matrix multiplication on two 2-dimensional lists # (I,K) @ (K, J) -> (I,J) I, K, K2, J = len(a), len(a[0]), len(b), len(b[0]) @@ -128,23 +128,23 @@ def f_matmul_2d_old(a: list, b: list): return result @staticmethod - def f_vec_times_vec(a: list, b: list): - # perform vector times matrix multiplication on a 1-dimensional list and another 2-dimensional lists + def f_vec_times_vec(a: list, b: list) -> (int, float): + # perform vector times matrix multiplication on a 1-dimensional list and another 1-dimensional lists assert len(a) == len(b) return sumprod(a, b) @staticmethod - def f_mat_times_vec(a: list, b: list): + def f_mat_times_vec(a: list, b: list) -> list: # perform matrix times vector multiplication on a 2-dimensional list and another 1-dimensional lists return [LLOps.f_vec_times_vec(row, b) for row in a] @staticmethod - def f_vec_times_mat(a: list, b: list): + def f_vec_times_mat(a: list, b: list) -> list: # perform vector times vector multiplication on two 1-dimensional lists return [LLOps.f_vec_times_vec(a, row) for row in LLOps.f_transpose_2d(b)] @staticmethod - def f_squeeze(a: list, dim: int): + def f_squeeze(a: list, dim: int) -> (list, int, float): # remove one dimension from a list of lists if dim == 0: return a[0] @@ -154,7 +154,7 @@ def f_squeeze(a: list, dim: int): return [LLOps.f_squeeze(a_i, dim-1) for a_i in a] @staticmethod - def f_unsqueeze(a: list, dim: int): + def f_unsqueeze(a: list, dim: int) -> list: # add one dimension to a list of lists if dim == 0: return [a] @@ -164,7 +164,7 @@ def f_unsqueeze(a: list, dim: int): return [LLOps.f_unsqueeze(a_i, dim-1) for a_i in a] @staticmethod - def f_slice(a: list, item: tuple): + def f_slice(a: list, item: tuple) -> list: # Return a slice of the list of lists (e.g. a[0] or a[:, 3:2]) if len(item) == 1: return a[item[0]] @@ -176,7 +176,7 @@ def f_slice(a: list, item: tuple): return LLOps.f_slice(a[index], item[1:]) @staticmethod - def f_flatten(a: list, dim=0): + def f_flatten(a: list, dim: int = 0) -> list: # Flatten a list of lists into a single list starting at dim if dim == 0: if isinstance(a[0], list): @@ -190,7 +190,7 @@ def f_flatten(a: list, dim=0): return [LLOps.f_flatten(a_i, dim-1) for a_i in a] @staticmethod - def f_calc_shape(a: list): + def f_calc_shape(a: list) -> list: assert isinstance(a, list) if not isinstance(a[0], list): return [len(a)] @@ -200,7 +200,7 @@ def f_calc_shape(a: list): return l @staticmethod - def f_setitem(a: list, key: tuple, value): + def f_setitem(a: list, key: tuple, value) -> list: # set the item at position key of list a to a value. Value can be scalar or list. (a[key] = value) if len(key) == 1: a[key[0]] = value @@ -220,7 +220,7 @@ def f_setitem(a: list, key: tuple, value): return m @staticmethod - def f_reshape_flattened(a: list, shape: tuple): + def f_reshape_flattened(a: list, shape: tuple) -> list: # reshape a one-dimensional array (flattened) into a target format if len(shape) == 1: return a @@ -229,7 +229,7 @@ def f_reshape_flattened(a: list, shape: tuple): return [LLOps.f_reshape_flattened(a[i*n:(i+1)*n], shape[1:]) for i in range(shape[0])] @staticmethod - def f_permute_201(a): + def f_permute_201(a: list) -> list: # permute shape (H, W, C) to (C, H, W) I, J, K = len(a), len(a[0]), len(a[0][0]) @@ -244,7 +244,7 @@ def f_advanced_indexing_1d(a: (list, tuple), b: (list, tuple)): return tuple([a[b_i] for b_i in b]) @staticmethod - def f_reduction_sum(a, reduction_dim, a_shape): + def f_reduction_sum(a: list, reduction_dim: int, a_shape: tuple) -> (list, int, float): # sum up the list (of lists) along the dimensions specified in shape if reduction_dim == 0: if len(a_shape) == 1: @@ -258,7 +258,7 @@ def f_reduction_sum(a, reduction_dim, a_shape): return [LLOps.f_reduction_sum(a_i, reduction_dim - 1, a_shape[1:]) for a_i in a] @staticmethod - def f_reduction_max(a, reduction_dim, a_shape): + def f_reduction_max(a: list, reduction_dim: int, a_shape: tuple) -> (list, int, float): # find the max of the list (of lists) along the dimensions specified in shape if reduction_dim == 0: if len(a_shape) == 1: @@ -272,7 +272,7 @@ def f_reduction_max(a, reduction_dim, a_shape): return [LLOps.f_reduction_max(a_i, reduction_dim - 1, a_shape[1:]) for a_i in a] @staticmethod - def f_reduction_prod(a, reduction_dim, a_shape): + def f_reduction_prod(a: list, reduction_dim: int, a_shape: tuple) -> (list, int, float): # calculate the product of the list (of lists) along the dimensions specified in shape if reduction_dim == 0: if len(a_shape) == 1: @@ -286,7 +286,7 @@ def f_reduction_prod(a, reduction_dim, a_shape): return [LLOps.f_reduction_prod(a_i, reduction_dim - 1, a_shape[1:]) for a_i in a] @staticmethod - def f_stack(iterable, dim): + def f_stack(iterable: list, dim: int) -> list: # stack iterables of lists along a certain dim. NOT working yet if dim == 0: return [a_i for a_i in iterable] @@ -298,7 +298,7 @@ def f_stack(iterable, dim): raise NotImplementedError @staticmethod - def f_cat(iterable, dim): + def f_cat(iterable: list, dim: int) -> list: # concatenate iterables of lists along a certain dim. ONLY working for dim 0 and 1 if dim == 0: return [a_i_j for a_i in iterable for a_i_j in a_i] @@ -312,7 +312,7 @@ def f_cat(iterable, dim): class Tensor: # Wrapper to use linalg operations on lists (of lists) (e.g. matmuls) in a nicer way - def __init__(self, elems): + def __init__(self, elems: (list, int)): self.elems = elems # calculate number of dimensions @@ -516,7 +516,7 @@ def sqrt(self): def exp(self): return Tensor(LLOps.f_unary_op(self.elems, math.exp)) - def __pow__(self, num): + def pow(self, num): if num == 2: return Tensor(LLOps.f_binary_op_same_size(self.elems, self.elems, operator.mul)) elif isinstance(num, int): @@ -524,6 +524,9 @@ def __pow__(self, num): else: return self.apply(lambda x: math.pow(x, num)) + def __pow__(self, num): + return self.pow(num) + def clamp(self, low, high): if low is None: return Tensor(LLOps.f_unary_op(self.elems, lambda x: min(x, high))) @@ -1080,29 +1083,30 @@ def png_decompress(data_bytes, width, height, n_channels): @staticmethod def read_png(path, resize=None, dimorder="HWC", num_channels=3, to_float=True, mean=(0.0, 0.0, 0.0), std=(1.0, 1.0, 1.0)): with open(path, "br") as f: - data = b'' - width, height, bit_depth, color_type_str, color_type_bytes = None, None, None, None, None image_bytes = f.read() - # header = image_bytes[:8] - start = 8 - while start is not None: - chunk_length = int.from_bytes(image_bytes[start:start+4], byteorder="big") - chunk_type = image_bytes[start+4:start+8] - chunk_data = image_bytes[start+8:start+8+chunk_length] - if chunk_type == b'IHDR': - width, height = int.from_bytes(chunk_data[:4], "big"), int.from_bytes(chunk_data[4:8], "big") - bit_depth, color_type = int.from_bytes(chunk_data[8:9], "big"), int.from_bytes(chunk_data[9:10], "big") - color_type_str = "RGB" if color_type == 2 else "GRAY" if color_type == 0 else "RGBA" if color_type == 6 else "OTHER" - color_type_bytes = 3 if color_type == 2 else 1 if color_type == 0 else 4 if color_type == 6 else None - assert bit_depth == 8 - - if chunk_type == b'IDAT': - data += chunk_data - - start = start + 12 + chunk_length - - if chunk_type == b'IEND': - start = None + data = b'' + width, height, bit_depth, color_type_str, color_type_bytes = None, None, None, None, None + + # header = image_bytes[:8] + start = 8 + while start is not None: + chunk_length = int.from_bytes(image_bytes[start:start+4], byteorder="big") + chunk_type = image_bytes[start+4:start+8] + chunk_data = image_bytes[start+8:start+8+chunk_length] + if chunk_type == b'IHDR': + width, height = int.from_bytes(chunk_data[:4], "big"), int.from_bytes(chunk_data[4:8], "big") + bit_depth, color_type = int.from_bytes(chunk_data[8:9], "big"), int.from_bytes(chunk_data[9:10], "big") + color_type_str = "RGB" if color_type == 2 else "GRAY" if color_type == 0 else "RGBA" if color_type == 6 else "OTHER" + color_type_bytes = 3 if color_type == 2 else 1 if color_type == 0 else 4 if color_type == 6 else None + assert bit_depth == 8 + + if chunk_type == b'IDAT': + data += chunk_data + + start = start + 12 + chunk_length + + if chunk_type == b'IEND': + start = None lines = ImageIO.png_decompress(data, width, height, color_type_bytes) t = Tensor(lines)