-
Notifications
You must be signed in to change notification settings - Fork 1
/
abstract_simulator.py
339 lines (301 loc) · 15.5 KB
/
abstract_simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
import re
from aux_data_structures import Stack, Queue, Tape, InputTape
class AbstractMachineSimulator:
def __init__(self, machine_definition) -> None:
self.aux_data = None
self.input_tape = None
self.logic = machine_definition["logic"]
self.aux_data = machine_definition["aux_data"]
# Lifecycle
self.accepted = False
self.halted = False
self.current_state = None
self.input_tapehead_idx = 0
self.output = []
# Memory
self.memory = {}
# Initialize aux data to memory
for key in self.aux_data.keys():
if(self.aux_data[key]["type"] == "STACK"):
self.memory[key] = Stack()
elif(self.aux_data[key]["type"] == "QUEUE"):
self.memory[key] = Queue()
elif(self.aux_data[key]["type"] == "TAPE"):
self.memory[key] = Tape()
else:
raise Exception("Unknown aux data type: " + self.aux_data[key]["type"])
# Build the state map
self.state_map = {}
for key in self.logic.keys():
self.state_map[key] = {
"instruction": None,
"associated_data": None,
"transitions": {}
}
# Set the transitions
for t in self.logic[key]["arguments"]:
t = t.split(",")
transition = (t[0][1:], t[1][:-1])
self.state_map[key]["transitions"][transition[0]] = transition[1]
# Check the instruction for an aux data
data = re.findall("\([a-zA-Z0-9_]{1,}\)", self.logic[key]["instruction"])
if len(data) > 0:
# Get the aux data
self.state_map[key]["associated_data"] = data[0][1:-1]
# Remove the aux data from the instruction
self.state_map[key]["instruction"] = self.logic[key]["instruction"].replace("(" + data[0][1:-1] + ")", "")
else:
self.state_map[key]["instruction"] = self.logic[key]["instruction"]
def set_input_tape(self, input_tape, is_turing_machine=False):
# Ensure that the input tape is a string with the first and last character as #
if not isinstance(input_tape, str):
raise Exception("Input tape must be a string")
if input_tape[0] != "#" or input_tape[-1] != "#":
raise Exception("Input tape must start and end with #")
# Turn the input tape into a Tape of type InputTape
self.input_tape = InputTape(input_tape)
# If this is a Turing machine, set the input tape to the first declared tape aux data
if is_turing_machine:
for key in self.aux_data.keys():
if self.aux_data[key]["type"] == "TAPE":
self.memory[key] = self.input_tape
break
def step(self, verbose=False, logger=None) -> bool:
# If the input tape is not set yet, raise an error
if self.input_tape == None:
raise Exception("Input tape not set")
# If the machine is halted
if self.halted:
return False
# If the current state is not set, set it to the first state
if self.current_state == None:
first_state = list(self.state_map.keys())[0]
self.current_state = first_state
if verbose:
print("Starting at state " + first_state)
if logger: logger("Starting at state " + first_state)
# Read the first sharp
# if(self.input_tape.read() != "#"):
# raise Exception("Input tape must start with #")
# print("Read first #")
# Check if accepting or rejecting state
if self.current_state.lower() == "accept":
self.accepted = True
self.halted = True
if verbose:
print("Accepted and halted.")
if logger: logger("Accepted and halted.")
return False
elif self.current_state.lower() == "reject":
self.halted = True
if verbose:
print("Rejected and halted.")
if logger: logger("Rejected and halted.")
return False
elif self.current_state.lower() == "halt":
self.halted = True
if verbose:
print("Halted.")
if logger: logger("Halted.")
return False
# Get the current state
current_state = self.state_map[self.current_state]
# Get the current instruction
instruction = current_state["instruction"]
# Get the current associated data
associated_data = current_state["associated_data"]
# Get the current transitions
transitions = current_state["transitions"]
symbol_buffer = ""
# Execute the instruction
if instruction == "SCAN":
self.input_tape.move("R")
symbol_buffer = self.input_tape.read()
if verbose:
print("Read symbol " + symbol_buffer)
if logger: logger("Read symbol " + symbol_buffer)
# Transition to the next state
if symbol_buffer in transitions.keys():
self.current_state = transitions[symbol_buffer]
if verbose:
print("Transitioned to state " + self.current_state)
if logger: logger("Transitioned to state " + self.current_state)
else:
self.halted = True
if verbose:
print("No transitions found for this symbol. Halted.")
if logger: logger("No transitions found for this symbol. Halted.")
elif instruction == "WRITE":
if associated_data == None:
raise Exception("WRITE instruction requires associated data")
# Grab the value to write from the transitions
keys = transitions.keys()
if len(keys) > 1:
raise Exception("WRITE instruction can only have one transition.")
symbol_buffer = list(keys)[0]
# Get the data type and write accordingly
if isinstance(self.memory[associated_data], Stack):
self.memory[associated_data].push(symbol_buffer)
elif isinstance(self.memory[associated_data], Queue):
self.memory[associated_data].enqueue(symbol_buffer)
elif isinstance(self.memory[associated_data], Tape):
raise Exception("Tape is not a valid data type for WRITE instruction")
if verbose:
print("Wrote symbol " + symbol_buffer + " to " + associated_data)
if logger: logger("Wrote symbol " + symbol_buffer + " to " + associated_data)
# Transition to the next state
self.current_state = transitions[symbol_buffer]
if verbose:
print("Transitioned to state " + self.current_state)
if logger: logger("Transitioned to state " + self.current_state)
elif instruction == "READ":
if associated_data == None:
raise Exception("READ instruction requires associated data")
# Get the data type and read accordingly
if isinstance(self.memory[associated_data], Stack):
symbol_buffer = self.memory[associated_data].pop()
elif isinstance(self.memory[associated_data], Queue):
symbol_buffer = self.memory[associated_data].dequeue()
elif isinstance(self.memory[associated_data], Tape):
raise Exception("Tape is not a valid data type for READ instruction")
if verbose:
print("Read symbol " + symbol_buffer + " from " + associated_data)
if logger: logger("Read symbol " + symbol_buffer + " from " + associated_data)
# Transition to the next state based on the sybmol buffer
if symbol_buffer in transitions.keys():
self.current_state = transitions[symbol_buffer]
if verbose:
print("Transitioned to state " + self.current_state)
if logger: logger("Transitioned to state " + self.current_state)
else:
self.halted = True
if verbose:
print("No transitions found for this symbol. Halted.")
if logger: logger("No transitions found for this symbol. Halted.")
elif instruction == "PRINT":
# Get the value to print from the transitions
keys = transitions.keys()
if len(keys) > 1:
raise Exception("PRINT instruction can only have one transition.")
symbol_buffer = list(keys)[0]
# Print the symbol buffer
self.output.append(symbol_buffer)
if verbose:
print("Printed symbol " + symbol_buffer)
if logger: logger("Printed symbol " + symbol_buffer)
# Transition to the next state
self.current_state = transitions[symbol_buffer]
if verbose:
print("Transitioned to state " + self.current_state)
if logger: logger("Transitioned to state " + self.current_state)
elif instruction == "SCAN RIGHT":
# Move the tape head right
self.input_tape.move("R")
symbol_buffer = self.input_tape.read()
if verbose:
print("Read symbol " + symbol_buffer + " from the right")
if logger: logger("Read symbol " + symbol_buffer + " from the right")
# Transition to the next state based on symbol read
if symbol_buffer in transitions.keys():
self.current_state = transitions[symbol_buffer]
if verbose:
print("Transitioned to state " + self.current_state)
if logger: logger("Transitioned to state " + self.current_state)
else:
self.halted = True
if verbose:
print("No transitions found for this symbol. Halted.")
if logger: logger("No transitions found for this symbol. Halted.")
elif instruction == "SCAN LEFT":
# Move the tape head left
self.input_tape.move("L")
symbol_buffer = self.input_tape.read()
if verbose:
print("Read symbol " + symbol_buffer + " from the left")
if logger: logger("Read symbol " + symbol_buffer + " from the left")
# Transition to the next state based on symbol read
if symbol_buffer in transitions.keys():
self.current_state = transitions[symbol_buffer]
if verbose:
print("Transitioned to state " + self.current_state)
if logger: logger("Transitioned to state " + self.current_state)
else:
self.halted = True
if verbose:
print("No transitions found for this symbol. Halted.")
if logger: logger("No transitions found for this symbol. Halted.")
elif instruction == "RIGHT":
# Assert that the symbol buffer is not empty and associated data is a tape
if not isinstance(self.memory[associated_data], Tape):
raise Exception("Associated data must be a tape")
# Move the tape head right
self.memory[associated_data].move("R")
# Read
symbol_buffer = self.memory[associated_data].read()
if verbose:
print("Read symbol " + symbol_buffer + " from the right")
if logger: logger("Read symbol " + symbol_buffer + " from the right")
# Find this symbol in the available transitions
keys = list(map(lambda x: (x.split("/")[0], x.split("/")[1]), transitions.keys()))
keys = list(filter(lambda x: x[0] == symbol_buffer, keys))
if len(keys) > 1:
raise Exception("More than one transition for a symbol. Non-determinism is not supported.")
elif len(keys) == 0:
self.halted = True
raise Exception("No transitions found for this symbol. Halted.")
# Replace the symbol in the tape with the symbol in the transition
if len(keys[0]) > 1:
# Get the transition and go to next state
self.current_state = transitions[str(keys[0][0]) + "/" + str(keys[0][1])]
self.memory[associated_data].write(keys[0][1])
if verbose:
print("Replaced symbol " + symbol_buffer + " with " + keys[0][1])
if logger: logger("Replaced symbol " + symbol_buffer + " with " + keys[0][1])
if verbose:
print("Transitioning to state " + self.current_state)
if logger: logger("Transitioning to state " + self.current_state)
else:
# Get the transition and go to next state
self.current_state = transitions[keys[0][0]]
if verbose:
print("Transitioning to state " + self.current_state)
if logger: logger("Transitioning to state " + self.current_state)
elif instruction == "LEFT":
# Assert that the symbol buffer is not empty and associated data is a tape
if not isinstance(self.memory[associated_data], Tape):
raise Exception("Associated data must be a tape")
# Move the tape head left
self.memory[associated_data].move("L")
# Read
symbol_buffer = self.memory[associated_data].read()
if verbose:
print("Read symbol " + symbol_buffer + " from the left")
if logger: logger("Read symbol " + symbol_buffer + " from the left")
# Find this symbol in the available transitions
keys = list(map(lambda x: (x.split("/")[0], x.split("/")[1]), transitions.keys()))
keys = list(filter(lambda x: x[0] == symbol_buffer, keys))
if len(keys) > 1:
raise Exception("More than one transition for a symbol. Non-determinism is not supported.")
# Replace the symbol in the tape with the symbol in the transition
if len(keys[0]) > 1:
# Get the transition and go to next state
self.current_state = transitions[str(keys[0][0]) + "/" + str(keys[0][1])]
self.memory[associated_data].write(keys[0][1])
if verbose:
print("Replaced symbol " + symbol_buffer + " with " + keys[0][1])
if logger: logger("Replaced symbol " + symbol_buffer + " with " + keys[0][1])
if verbose:
print("Transitioning to state " + self.current_state)
if logger: logger("Transitioning to state " + self.current_state)
else:
# Get the transition and go to next state
self.current_state = transitions[keys[0][0]]
if verbose:
print("Transitioning to state " + self.current_state)
if logger: logger("Transitioning to state " + self.current_state)
else:
raise Exception("Instruction not supported.")
def run(self, verbose=False, logger=None):
"""Runs the machine until it halts"""
while not self.halted:
self.step(verbose=verbose, logger=logger)