forked from jromang/picochess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utilities.py
283 lines (233 loc) · 8.92 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# Copyright (C) 2013-2014 Jean-Francois Romang ([email protected])
# Shivkumar Shivaji ()
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import queue
import os
import platform
import random
import subprocess
import urllib.request
import socket
from xml.dom.minidom import parseString
try:
import enum
except ImportError:
import enum34 as enum
# picochess version
version = '032'
event_queue = queue.Queue()
display_devices = []
class AutoNumber(enum.Enum):
def __new__(cls): # Autonumber
value = len(cls.__members__) + 1
obj = object.__new__(cls)
obj._value_ = value
return obj
class Event(AutoNumber):
# User events
FEN = () # User has moved one or more pieces, and we have a new fen position.
LEVEL = () # User sets engine level (from 1 to 20).
NEW_GAME = () # User starts a new game
USER_MOVE = () # User sends a move
OPENING_BOOK = () # User chooses an opening book
SET_MODE = () # Change interaction mode
SETUP_POSITION = () # Setup custom position
#Engine events
BEST_MOVE = () # Engine has found a move
SET_TIME_CONTROL = () # User sets time control
OUT_OF_TIME = ()
UCI_OPTION_SET = () # Users sets an UCI option, contains 'name' and 'value' (strings)
SHUTDOWN = ()
class Message(AutoNumber):
#Messages to display devices
COMPUTER_MOVE = () # Show computer move
BOOK_MOVE = () # Show book move
REVIEW_MODE_MOVE = () # Player is reviewing game
INTERACTION_MODE = () # Interaction mode
START_NEW_GAME = ()
COMPUTER_MOVE_DONE_ON_BOARD = () # User has done the compute move on board
SEARCH_STARTED = () # Engine has started to search
USER_TAKE_BACK = () # User takes back his move while engine is searching
UPDATE_CLOCK = () # Message send every second when to clock runs, containing white_time and black_time
RUN_CLOCK = () # Say to run autonomous clock, contains time_control
USER_MOVE = () # Player has done a move on board
UCI_OPTION_LIST = () # Contains 'options', a dict of the current engine's UCI options
GAME_ENDS = () # The current game has ended, contains a 'result' (GameResult) and list of 'moves'
SYSTEM_INFO = () # Information about picochess such as version etc
class Menu(AutoNumber):
GAME_MENU = () # Default Menu
SETUP_POSITION_MENU = () # Setup position menu
ENGINE_MENU = () # Engine menu
SETTINGS_MENU = () # Settings menu
class SetupPositionMenu(AutoNumber):
TO_MOVE_TOGGLE = ()
REVERSE_ORIENTATION = ()
SCAN_POSITION = ()
SPACER = ()
SWITCH_MENU = () # Switch Menu
class EngineMenu(AutoNumber):
LEVEL = ()
BOOK = ()
TIME = ()
ENG_INFO = ()
SWITCH_MENU = () # Switch Menu
class GameMenu(AutoNumber):
LAST_MOVE = () # Show last move
HINT = () # Show hint
EVAL = () # Show evaluation of position
ADD_TIME = () # Add 1 minute to clock
SWITCH_MENU = () # Switch Menu
@enum.unique
class Mode(enum.Enum):
#Interaction modes
GAME = 0
ANALYSIS = 1
PLAY_WHITE = 2
KIBITZ = 3
OBSERVE = 4
PLAY_BLACK = 5
class ClockMode(AutoNumber):
FIXED_TIME = () # Fixed seconds per move
BLITZ = () # Fixed time per game
FISCHER = () # Fischer increment
class GameResult(enum.Enum):
MATE = 'mate'
STALEMATE = 'stalemate'
TIME_CONTROL = 'time'
INSUFFICIENT_MATERIAL = 'material'
ABORT = 'abort'
class Observable(object): # Input devices are observable
def __init__(self):
super(Observable, self).__init__()
@staticmethod
def fire(event, **attrs):
for k, v in attrs.items():
setattr(event, k, v)
event_queue.put(event)
class Display(object): # Display devices (DGT XL clock, Piface LCD, pgn file...)
def __init__(self):
super(Display, self).__init__()
self.message_queue = queue.Queue()
display_devices.append(self)
@staticmethod
def show(message, **attrs): # Sends a message on each display device
for k, v in attrs.items():
setattr(message, k, v)
for display in display_devices:
display.message_queue.put(message)
# switch/case instruction in python
class switch(object):
def __init__(self, value):
self.value = value
self.fall = False
def __iter__(self):
"""Return the match method once, then stop"""
yield self.match
raise StopIteration
def match(self, *args):
"""Indicate whether or not to enter a case suite"""
if self.fall or not args:
return True
elif self.value in args: # changed for v1.5, see below
self.fall = True
return True
else:
return False
def get_opening_books():
book_list = sorted(os.listdir(os.path.dirname(os.path.realpath(__file__))+os.sep+'books'))
library = [('nobook', None)]
for book in book_list:
library.append((book[2:book.index('.')], 'books' + os.sep + book))
return library
def weighted_choice(book, game):
total = sum(e.weight for e in book.get_entries_for_position(game))
r = random.uniform(0, total)
upto = 0
#for e in book.get_entries_for_position(game):
# print(e.move().uci() + 'w:' + str(e.weight))
for e in book.get_entries_for_position(game):
if upto + e.weight > r:
return e.move()
upto += e.weight
return None
def hours_minutes_seconds(seconds):
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return h, m, s
def which(program):
""" Find an executable file on the system path.
:param program: Name or full path of the executable file
:return: Full path of the executable file, or None if it was not found
"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep) + [os.path.dirname(os.path.realpath(__file__)),
os.path.dirname(os.path.realpath(__file__))+os.sep+'engines']:
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
logging.warning("Engine executable [%s] not found", program)
return None
def update_picochess(auto_reboot=False):
git = which('git.exe' if platform.system() == 'Windows' else 'git')
if git:
branch = subprocess.Popen([git, "rev-parse", "--abbrev-ref", "HEAD"], stdout=subprocess.PIPE).communicate()[0].decode(encoding='UTF-8').rstrip()
if branch == 'stable':
# Fetch remote repo
output = subprocess.Popen([git, "remote", "update"], stdout=subprocess.PIPE).communicate()[0].decode(encoding='UTF-8')
logging.debug(output)
# Check if update is needed
output = subprocess.Popen([git, "status", "-uno"], stdout=subprocess.PIPE).communicate()[0].decode(encoding='UTF-8')
logging.debug(output)
if not 'up-to-date' in output:
# Update
logging.debug('Updating')
output = subprocess.Popen([git, "pull", "origin", "stable"], stdout=subprocess.PIPE).communicate()[0].decode(encoding='UTF-8')
logging.debug(output)
if auto_reboot:
os.system('reboot')
def shutdown():
logging.debug('Shutting down system')
if platform.system() == 'Windows':
os.system('shutdown /s')
else:
os.system('shutdown -h now')
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("google.com", 80))
return s.getsockname()[0]
# TODO: Better handling of exceptions of socket connect
except socket.error as v:
logging.error("No Internet Connection!")
finally:
s.close()
def get_location():
try:
response = urllib.request.urlopen('http://freegeoip.net/xml/')
dom = parseString(response.read())
country = dom.getElementsByTagName('CountryName')[0].childNodes[0].data
city = dom.getElementsByTagName('City')[0].childNodes[0].data
country_code = dom.getElementsByTagName('CountryCode')[0].childNodes[0].data
return city + ', ' + country + ' ' + country_code
except:
return '?'