diff --git a/.gitignore b/.gitignore index 9f99ac9..06d8fdc 100644 --- a/.gitignore +++ b/.gitignore @@ -131,6 +131,9 @@ dmypy.json # Visual Studio .vs +# Visual Studio Code +.vscode + # swy-bot saved constant.py diff --git a/README.md b/README.md index 93094a1..9fcffa7 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,11 @@ # 食物语自动挂机脚本 ## 简介 -食物语自动挂机脚本, 主要用于客潮~~和活动小游戏~~(太难做砍掉了绝对不是因为我懒啊啊啊啊啊啊) +食物语自动挂机脚本, 主要用于客潮和活动小游戏(目前仅包括千人千面自动消除) -现已加入千人千面自动挂机, 但由于某些原因仅提供算法, 不提供配置, 请在task.py中自行修改配置 +由于某些原因小游戏自动挂机仅提供算法, 不提供配置, 请在tasks/minigame.py中自行修改配置 + +目前已支持Android系统原生运行脚本, 但有一些性能问题, 详情请见android分支, 若想体验可到releases中下载 现已加入食物语线性规划做菜计算器, 但处于早期测试阶段, 如遇bug请提交issue @@ -17,24 +19,31 @@ ## 食用方式 详见程序内指引 -**注意:** 若使用ADB(Scrcpy模式), 请确保系统环境变量中有ADB, 并且ADB已连接至手机 +**注意:** 若使用ADB和Scrcpy模式, 请确保系统环境变量中有ADB, 并且ADB已连接至手机 --- ## 食材 -- Python3 +必需依赖: +- Python 3.8+ +- numpy - opencv-python -- PyAV -- pywin32(仅Windows上需要) -- PyAutoGUI(仅Windows以外的系统需要) -- pure-python-adb -- PuLP -- pyinstaller(如需要打包) -详见requirements.txt +平台特定(现已引入动态加载模式, 未安装库仅会禁用对应模式, 脚本仍可正常运行): +- Windows原生模式: + - pywin32 +- linux和mac原生模式: + - PyAutoGUI +- 任意系统ADB模式: + - pure-python-adb +- 任意系统Scrcpy模式: + - av + +线性规划做菜计算器: +- PuLP --- ## 菜谱&改良 -本项目可在Windows, Linux和MacOS上运行, 但目前仅在Windows上测试过 +本项目可在Windows, Linux,MacOS和Android系统上运行, Android系统需要专用的用于加载脚本的APP才能运行, 详见android分支 另外本挂机脚本实际上提供了一个框架, 经过简单修改应该也能用于其他游戏, 甚至是用于训练人工智能玩游戏(逃 @@ -44,19 +53,32 @@ - libs/ 脚本所需的库, 目前只有Scrcpy的服务端 - data/ 存储游戏数据和挂机所需资源的目录 - saved/ 截图会保存在这个目录 -- main.py 主文件 -- player.py 负责与游戏交互(通过原生窗口/ADB/Scrcpy) -- task.py 执行挂机任务 +- algorithms/ 算法 + - detect.py 图像识别相关算法 + - matching.py 自动连连看算法(原作者TheThreeDog) +- players/ 模式 + - player.py 模式基类 + - native.py 通过原生窗口与游戏交互 + - adb.py 通过ADB与游戏交互 + - scrcpy.py 通过Scrcpy与游戏交互 + - debug.py 调试用 +- tasks/ 挂机任务 + - task.py 挂机任务基类 + - kechao.py 客潮挂机任务 + - minigame.py 小游戏挂机任务 - platforms/ 工具模块的平台相关实现 + - console.py 具有命令行的系统上的实现 - windows.py Windows系统相关实现 - linux.py Linux系统相关实现 + - android.py Android系统相关实现 +- main.py 主模块 +- profit.py 线性规划做菜计算器(可单独运行) +- swy.py 食物语游戏数据(单独运行为输出游戏数据) +- scrcpy.py Python实现的Scrcpy客户端 - utils.py 工具模块 -- swy.py 负责加载游戏数据 -- profit.py 线性规划做菜计算器 -- matching.py 自动连连看算法(原作者TheThreeDog) ### 自动挂机 -详见player.py和task.py +详见players和tasks目录 ### 线性规划做菜计算器 线性规划算法详见profit.py中的注释 @@ -68,7 +90,7 @@ 另外烹饪时间出现小数的话只有入没有舍, 是ceil -### 自动连连看 +### 自动小游戏 由于某些原因, 本项目仅提供用于学习的算法, 不包含配置, 如需使用请自行配置 --- @@ -76,7 +98,7 @@ 1. 拿走菜谱(克隆仓库到本地) 1. 准备好食材(安装第三方库) 1. 开始做菜(运行main.py) -1. 装盘(如需构建, 请使用pyinstaller) +1. 装盘(目前仍无法打包运行, 请考虑使用Android原生版本) 1. 随心所欲地修改菜谱(请随意修改代码) --- @@ -96,6 +118,7 @@ 2021/9/13 加入自动连连看算法, 初步支持Linux 2021/9/14 已加入对Linux和MacOS的支持, 但尚未测试, 更新版本号至V1.6 2021/9/17 已加入千人千面自动挂机功能, 更新版本号至V1.7 +2023/2/2 代码重写, 用协程替代状态机, 实现动态加载模式, 移植到Android系统, 由于更新内容较多, 更新版本号为V2.0 diff --git a/algorithms/detect.py b/algorithms/detect.py new file mode 100644 index 0000000..b2da245 --- /dev/null +++ b/algorithms/detect.py @@ -0,0 +1,72 @@ +import numpy +import cv2 + +sift = cv2.SIFT_create() + +def siftcompute(image, draw=False): + # 转灰度图 + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + # 计算关键点和描述符 + # key_points: 关键点信息, 包括位置, 尺度, 方向信息 + # descriptors: 关键点描述符, 每个关键点对应128个梯度信息的特征向量 + kp, des = sift.detectAndCompute(gray, None) + if draw: + cv2.drawKeypoints(image, kp, image, (0, 255, 0), + cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS) + return des + +def siftsimilarity(img1, img2, threshold=0.7): + # 计算两张图片的特征点 + des1 = siftcompute(img1) + des2 = siftcompute(img2) + # 构建 FLANN + FLANN_INDEX_KDTREE = 1 + index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) + search_params = dict(checks=50) + flann = cv2.FlannBasedMatcher(index_params, search_params) + # 比较两张图片的特征点 + matches = flann.knnMatch(des1, des2, k=2) + if len(matches) == 0: + return 0.0 + # 计算匹配点数并计算相似度 + good = [m for (m, n) in matches if m.distance < n.distance * threshold] + return len(good) / len(matches) + +def isimagesame(img1, img2): + # return siftsimilarity(img1, img2, 0.7) > 0.8 + result = cv2.matchTemplate(img1, img2, cv2.TM_CCOEFF_NORMED) + location = numpy.where(result >= 0.5) + for pt in zip(*location[::-1]): + return True + return False + +def findtemplate(image, template, threshold=0.75, outline=False): + theight, twidth = template.shape[:2] + result = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED) + # result = cv2.normalize(result, None, 0, 1, cv2.NORM_MINMAX) + location = numpy.where(result >= threshold) + lx, ly = 0, 0 + points = [] + for pt in zip(*location[::-1]): + x, y = pt[0] + int(twidth / 2), pt[1] + int(theight / 2) + # 去掉重复点 + if x - lx < twidth or y - ly < theight: + continue + points.append((x, y)) + lx, ly = x, y + if outline: + cv2.rectangle(image, pt, (pt[0] + twidth, pt[1] + theight), (0, 255, 0), 1) + return points + +def findcircle(image, r, outline=False): + gimg = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + gimg = cv2.medianBlur(gimg, 5) + result = cv2.HoughCircles(gimg, cv2.HOUGH_GRADIENT, 1, int(r / 2), None, 100, 40, r - 10, r + 10) + points = [] + if result is not None: + result = numpy.uint16(numpy.around(result)) + for p in result[0, :]: + points.append((p[0], p[1])) + if outline: + cv2.circle(image, (p[0], p[1]), p[2], (0, 255, 0), 1) + return points diff --git a/matching.py b/algorithms/matching.py similarity index 99% rename from matching.py rename to algorithms/matching.py index 8e3d875..1676273 100644 --- a/matching.py +++ b/algorithms/matching.py @@ -1,5 +1,3 @@ -# coding=utf-8 - ''' Copyright 2018 TheThreeDog diff --git a/main.py b/main.py index 3b252ed..6b6b374 100644 --- a/main.py +++ b/main.py @@ -1,157 +1,137 @@ #!/usr/bin/env python -# coding=utf-8 import sys import time import atexit -import cv2 +from players import players +from tasks import tasks +try: + import profit +except ModuleNotFoundError: + print("线性规划做菜计算器目前尚不支持Android平台") import utils -from player import Player, PlayerADB, PlayerScrcpy, PlayerTest -from task import Phases, Results, getTasks -import profit - -WINDOW_NAME = "Preview Window" -FPS = 5 +FPS = 10 player = None -task = None -def main(): - utils.setnodpi() +def mainmenu(): utils.settitle("欢迎使用食物语挂机脚本") print('''============================================= -食物语挂机脚本 V1.7 作者: WC +食物语挂机脚本 V2.0 作者: WC 本脚本仅供个人代肝使用, 严禁用于商业用途 使用本脚本造成的一切法律纠纷由使用者自行承担 项目地址: https://github.com/DawningW/swy-bot 欢迎提交问题或者直接PR =============================================''') + items = [ + (), + ("执行挂机任务", taskmenu), + ("宏命令", macromenu), + ("线性规划做菜计算器", profit.run), + ("打开食物语wiki", lambda: utils.openurl("https://wiki.biligame.com/swy")), + ("打开截图文件夹", lambda: utils.openurl("saved")), + ("退出", lambda: sys.exit(0)) + ] while True: - print('''>>>----------< 主 菜 单 >----------<<< -1. 原生模式(需先启动安卓虚拟机并打开食物语) -2. ADB模式(需手机连接电脑开启调试模式并打开食物语) -3. 混合模式(使用scrcpy快速获取手机截屏并模拟点击)(*推荐*) -4. 调试模式(读取程序目录下的test.png并进行图像识别) -7. 线性规划做菜计算器 -8. 用默认浏览器打开食物语wiki -9. 打开截图文件夹 -0. 退出''') - str = input("请选择: ") - global player - if str == "0": - break - if str == "1": - player = Player() - elif str == "2": - player = PlayerADB() - elif str == "3": - player = PlayerScrcpy() - elif str == "4": - player = PlayerTest() - elif str == "7": - profit.run() - continue - elif str == "8": - utils.openurl("https://wiki.biligame.com/swy"); - continue - elif str == "9": - utils.openurl("saved"); - continue - else: - continue - if (player.init()): - select() - player.end() - player = None - return + items[0] = ("连接至设备" if player is None else "设备已连接", playermenu) + utils.showmenu("主菜单", items) + +def androidmenu(): + def _run(Task): + return lambda: utils.reqpermission(lambda: run(Task())) + items = [] + for Task in tasks(): + items.append((f"{Task.name}({Task.description})", _run(Task))) + items.append(("退出", lambda: None)) + utils.showmenu("主菜单", items) -def select(): +def playermenu(): + global player + if player is not None: + return + items = [] + disabled = [] + for (name, desc, Player) in players(): + if Player is not None: + items.append((f"{name}: {desc}", Player)) + else: + disabled.append((f"{name}\n \033[1;33m- 由于未安装\033[1;33m{desc}\033[1;33m, 该模式无法使用\033[0m", + lambda: utils.throw(f"未安装{desc}"))) + items.extend(disabled) + items.append(("返回", lambda: None)) while True: - print(">>>----------< 挂 机 菜 单 >----------<<<") - for i in range(len(getTasks())): - print("{}. {}({})".format(i + 1, getTasks()[i].name, getTasks()[i].description)) - print("PS: 输入其他数字退出") try: - num = int(input("请输入序号: ")) - if num <= 0: break - global task - task = getTasks()[num - 1]() - run() - task = None - except (ValueError, IndexError): + player = utils.showmenu("选择模式", items) break - return + except Exception as e: + print(f"初始化错误: {str(e)}, 请重新选择模式") -def run(): - utils.settitle("食物语挂机脚本运行中 - 按 Ctrl+C 退出") - print("开始运行挂机脚本") - cv2.namedWindow(WINDOW_NAME) - cv2.setMouseCallback(WINDOW_NAME, onclicked) - times = 0 - canceled = False - while not canceled: - times += 1 - print("第 {} 次运行脚本: {}".format(times, task.name)) - task.init() - origin = time.perf_counter() - phase = Phases.BEGIN - while True: - t = time.perf_counter() - result = None - if phase == Phases.BEGIN: - result = task.begin(player, t - origin) - elif phase == Phases.RUNNING: - result = task.run(player, t - origin) - elif phase == Phases.END: - result = task.end(player, t - origin) - else: - print("无效的阶段, 请向作者报告这个问题") - if result is None or result == Results.FAIL: - canceled = True - break - if result == Results.SUCCESS: - value = phase.value + 1 - if value > Phases.END.value: break - else: phase = Phases(value) - if task.getImageCache() is not None: - showimage(task.getImageCache()) - wait = 1 / FPS - (time.perf_counter() - t) - if wait < 0: - print("严重滞后, 处理时间超出 {} ms, 发生了什么呢?".format(-int(wait * 1000))) - wait = 0 - time.sleep(wait) - cv2.destroyAllWindows() - utils.settitle("食物语挂机脚本已运行完毕 - 准备就绪") - print("挂机脚本已运行完毕") - return +def taskmenu(): + items = [] + for Task in tasks(): + items.append((f"{Task.name}({Task.description})", Task)) + items.append(("返回", lambda: None)) + while True: + task = utils.showmenu("挂机菜单", items) + if task is None: + break + run(task) + +def macromenu(): + print("宏命令录制与执行正在开发中!") + input("按回车键返回...") -def onclicked(event, x, y, flags, param): - if event == cv2.EVENT_LBUTTONDOWN: - print("点击 X: {} Y: {}".format(x, y)) - player.click(x, y) - return +def run(task): + def onclicked(x, y, is_preview): + task.clicked = (x, y) + if is_preview: + player.click(x, y) -def showimage(image, wait = 1): - cv2.imshow(WINDOW_NAME, image) - cv2.setMouseCallback(WINDOW_NAME, onclicked) - cv2.waitKey(wait) - return + if player is None: + playermenu() + if player is None: + print("尚未连接至设备, 无法执行挂机任务") + return + utils.settitle("食物语挂机脚本运行中 - 按 Ctrl+C 退出") + print(f"开始运行挂机任务: {task.name}") + utils.createpreview(onclicked) + co = task.run(player) + while True: + t = time.perf_counter() + task.frame = player.screenshot() + try: + co.send(None) + except StopIteration: + break + utils.showpreview(task.frame) + wait = 1 / FPS - (time.perf_counter() - t) + if wait < 0: + print(f"严重滞后, 处理时间超出 {-int(wait * 1000)} ms, 发生了什么呢?") + wait = 0 + time.sleep(wait) + utils.destroypreview() + print("挂机任务已运行完毕") + utils.settitle("食物语挂机脚本已运行完毕 - 准备就绪") -@atexit.register +@atexit.register def onexit(): - utils.settitle("食物语挂机脚本已结束") - if player is not None: player.end() + if player is not None: + player.release() + utils.settitle("食物语挂机脚本已退出") print(''' ============================================= 食物语挂机脚本已停止运行, 感谢您的使用, 再见! =============================================''') - return +# Android +if hasattr(utils, "Build"): + from players import NativePlayer + player = NativePlayer() # 入口 if __name__ == "__main__": if utils.isadmin(): try: - main() + mainmenu() except KeyboardInterrupt: sys.exit(0) else: diff --git a/platforms/__init__.py b/platforms/__init__.py new file mode 100644 index 0000000..c12505d --- /dev/null +++ b/platforms/__init__.py @@ -0,0 +1,12 @@ +import sys +if sys.platform.startswith("win32") or sys.platform.startswith("cygwin"): + from .windows import * + from .console import * +else: + try: + from android.os import Build + except ImportError: + from .linux import * + from .console import * + else: + from .android import * diff --git a/platforms/android.py b/platforms/android.py new file mode 100644 index 0000000..59311e0 --- /dev/null +++ b/platforms/android.py @@ -0,0 +1,74 @@ +import os +import time +import numpy +import cv2 +from java.lang import Thread, InterruptedException +from kongsang.swybot import ScriptBridge + +def openurl(url): + ScriptBridge.openUrl(url) + +def isadmin(): + return True + +def runasadmin(executable, argument=""): + pass + +def settitle(title): + pass + +def selectwindow(): + return ScriptBridge.getService() + +def getsize(service): + return service.getScreenSize() + +def getdpi(service): + return service.getScreenDpi() + +def screenshot(service): + width, height = getsize(service) + while True: + if Thread.interrupted(): + raise InterruptedException() + buffer = service.captureScreen() + if buffer is not None: + break + image = numpy.array(buffer, dtype="uint8") + image.shape = (height, width, 4) + return image + +def click(service, x, y): + service.tap(x, y) + +def reqpermission(callback): + ScriptBridge.requestRecord(callback) + +def showmenu(title, items): + texts = [item[0] for item in items] + ScriptBridge.showMenu(title, texts, lambda select: items[select][1]()) + +_onclicked = None + +def _callback(x, y): + print(f"点击 X: {x} Y: {y}") + if _onclicked is not None: + _onclicked(x, y, False) + +def createpreview(onclicked): + global _onclicked + _onclicked = onclicked + +def showpreview(image, wait=1): + pass + +def destroypreview(): + pass + +def readimage(name): + filepath = os.path.join(os.path.dirname(__file__), "../data/" + name + ".png") + return cv2.imread(filepath, cv2.IMREAD_UNCHANGED) + +def writeimage(name, image): + filepath = ScriptBridge.getStoragePath() + "/" + name + ".png" + return cv2.imwrite(filepath, image, [int(cv2.IMWRITE_PNG_COMPRESSION), 3]) diff --git a/platforms/console.py b/platforms/console.py new file mode 100644 index 0000000..dabf1ed --- /dev/null +++ b/platforms/console.py @@ -0,0 +1,49 @@ +import cv2 + +def showmenu(title, items): + print(f">>>----------< {title} >----------<<<") + for i in range(len(items)): + print(f"{i + 1}. {items[i][0]}") + while True: + select = -1 + try: + s = input("请选择: ") + if s == "": + continue + select = int(s) - 1 + except ValueError: + pass + if select >= 0 and select < len(items): + return items[select][1]() + print("输入的选项无效, 请重新输入!") + +_WINDOW_NAME = "Preview Window" +_onclicked = None + +def _callback(event, x, y, flags, param): + if event == cv2.EVENT_LBUTTONDOWN: + print(f"点击 X: {x} Y: {y}") + if _onclicked is not None: + _onclicked(x, y, True) + +def createpreview(onclicked): + global _onclicked + _onclicked = onclicked + cv2.namedWindow(_WINDOW_NAME) + cv2.setMouseCallback(_WINDOW_NAME, _callback) + +def showpreview(image, wait=1): + cv2.imshow(_WINDOW_NAME, image) + cv2.setMouseCallback(_WINDOW_NAME, _callback) + cv2.waitKey(wait) + +def destroypreview(): + global _onclicked + cv2.destroyWindow(_WINDOW_NAME) + _onclicked = None + +def readimage(name): + return cv2.imread("./data/" + name + ".png", cv2.IMREAD_UNCHANGED) + +def writeimage(name, image): + return cv2.imwrite("./saved/" + name + ".png", image, [int(cv2.IMWRITE_PNG_COMPRESSION), 3]) diff --git a/platforms/linux.py b/platforms/linux.py index 679c3b8..19b8594 100644 --- a/platforms/linux.py +++ b/platforms/linux.py @@ -1,5 +1,3 @@ -# coding=utf-8 - import os import pyautogui import numpy @@ -7,55 +5,47 @@ def openurl(url): os.system("xdg-open " + url) - return def isadmin(): return True -def runasadmin(executable, argument = ""): +def runasadmin(executable, argument=""): print("请以管理员身份运行此程序") - return def settitle(title): - os.system(r'echo -ne "\033]0;{}\007"'.format(title)) - return - -def getdpi(window): - return - -def setnodpi(): - return + os.system(rf'echo -ne "\033]0;{title}\007"') def selectwindow(): + def callback(event, x, y, flags, param): + if event == cv2.EVENT_LBUTTONDOWN: + param.append((x, y)) + print(f"已选择点 {len(param)} X: {x} Y: {y}") + if len(param) == 2: + cv2.destroyAllWindows() print("注意: 挂机时窗口不能移动, 不能被遮挡也不能最小化!!!") - window = [] print("请在接下来打开的截图窗口中选择两个点以选择窗口") title = "Click two points to select window" points = [] image = numpy.asarray(pyautogui.screenshot()) image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) cv2.namedWindow(title) - cv2.setMouseCallback(title, _onclicked, points) + cv2.setMouseCallback(title, callback, points) cv2.imshow(title, image) cv2.waitKey(0) cv2.destroyAllWindows() if len(points) < 2: print("错误: 未选中两个点, 无法获取窗口") - return -1 + return None return points -def _onclicked(event, x, y, flags, param): - if event == cv2.EVENT_LBUTTONDOWN: - param.append((x, y)) - print("已选择点 {} X: {} Y: {}".format(len(param), x, y)) - if len(param) == 2: cv2.destroyAllWindows() - return - def getsize(window): x1, y1 = window[0] x2, y2 = window[1] return x2 - x1, y2 - y1 +def getdpi(window): + return 96 + def screenshot(window): left, top = window[0] width, height = getsize(window) @@ -67,4 +57,3 @@ def screenshot(window): def click(window, x, y): x1, y1 = window[0] pyautogui.click(x1 + x, y1 + y) - return diff --git a/platforms/windows.py b/platforms/windows.py index e5ca81a..4b7d6ea 100644 --- a/platforms/windows.py +++ b/platforms/windows.py @@ -1,5 +1,3 @@ -# coding=utf-8 - import os import ctypes import win32api, win32gui, win32ui, win32con @@ -7,130 +5,123 @@ import cv2 def openurl(url): - "打开文件/文件夹/链接" + """打开文件/文件夹/链接""" os.system("start " + url) - return def isadmin(): - "检查管理员权限" - return True # TODO Windows下似乎不需要管理员权限 + """检查管理员权限""" + return True # Windows下似乎不需要管理员权限 try: return ctypes.windll.shell32.IsUserAnAdmin() except: return False -def runasadmin(executable, argument = ""): - "以管理员身份运行" +def runasadmin(executable, argument=""): + """以管理员身份运行""" ctypes.windll.shell32.ShellExecuteW(None, "runas", executable, argument, None, 1) - return def settitle(title): - "设置控制台窗口标题" + """设置控制台窗口标题""" ctypes.windll.kernel32.SetConsoleTitleW(title) - return - -def getdpi(hWnd): - "获取屏幕DPI" - hDC = win32gui.GetDC(hWnd) - dpi = (win32ui.GetDeviceCaps(hDC, win32con.LOGPIXELSX), win32ui.GetDeviceCaps(hDC, win32con.LOGPIXELSY)) - win32gui.ReleaseDC(hWnd, hDC) - return dpi - -def setnodpi(): - "禁用DPI" - try: # >= windows 8.1 - ctypes.windll.shcore.SetProcessDpiAwareness(2) - except: # <= windows 8.0 - ctypes.windll.user32.SetProcessDPIAware() - return -def _toscreenpos(hWnd, x, y): - return win32gui.ClientToScreen(hWnd, (x, y)) +_WINDOWS_LIST = [ + # Scrcpy后台挂机可用(已经提供对Scrcpy的原生支持, 建议使用混合模式) + ("SDL_app", None, "**现已提供对Scrcpy的原生支持, 无需打开Scrcpy, 请使用混合模式**"), + # 腾讯手游助手后台点击可用, 并且开放ADB端口5555, 然而获取截图时失败 + ("TXGuiFoundation", "腾讯手游助手【极速傲引擎-7.1】", None), + # 华为多屏协同疑似直接获取光标位置, 而非从消息里读取, 所以需要激活才行, 无法后台挂机 + ("StartupDui", "多屏协同", None) +] def _findwindow(parent, classname, windowname): hWnd = 0 - if parent == None: + if parent is None: hWnd = win32gui.FindWindow(classname, windowname) else: hWnd = win32gui.FindWindowEx(parent, 0, classname, windowname) return hWnd def _getwindow(parent, x, y): - if parent == None: + if parent is None: return win32gui.WindowFromPoint((x, y)) else: return win32gui.ChildWindowFromPoint(parent, (x, y)) -_WINDOWS_LIST = [ - # 腾讯手游助手后台点击可用, 并且开放ADB端口5555, 然而获取截图时失败 - ("TXGuiFoundation", "腾讯手游助手【极速傲引擎-7.1】"), - # 华为多屏协同疑似直接获取光标位置, 而非从消息里读取, 所以需要激活才行, 无法后台挂机 - ("StartupDui", "多屏协同"), - # Scrcpy后台挂机可用(已经提供对Scrcpy的原生支持, 建议使用混合模式) - ("SDL_app", None) -] - def selectwindow(): - "选择要挂机的窗口" + """选择窗口""" + def callback(event, x, y, flags, param): + if event == cv2.EVENT_LBUTTONDOWN: + param[1] = _getwindow(param[0], x, y) + print(f"已点击 X: {x} Y: {y} 窗口句柄: {hex(param[1])}") print("注意: 挂机时窗口可以被遮挡, 但不能最小化!!!") window = 0 child = 0 - for classname, windowname in _WINDOWS_LIST: + for classname, windowname, message in _WINDOWS_LIST: window = _findwindow(None, classname, windowname) if window != 0: - if classname == _WINDOWS_LIST[-1][0]: - print("**现已提供对Scrcpy的原生支持, 无需打开Scrcpy, 详见主菜单中的混合模式**") + if message is not None: + print(message) break if window == 0: print("无法自动获取游戏窗口, 请手动获取(可以用VS的SPY++工具获取)") - str = input("请输入窗口类名: ") - classname = str if str != "" else None - str = input("请输入窗口标题: ") - windowname = str if str != "" else None + name = input("请输入窗口类名: ") + classname = name if name != "" else None + name = input("请输入窗口标题: ") + windowname = name if name != "" else None window = _findwindow(None, classname, windowname) if window == 0: print("错误: 无法获取窗口句柄") - return -1 - print("已成功获取窗口句柄: {}".format(hex(window))) + return None + print(f"已成功获取窗口句柄: {hex(window)}") print("请在接下来打开的截图窗口中选择一个点以获取子窗口然后按任意键退出") print("若通过这种方式无法选中子窗口, 请直接在截图窗口按任意键退出并手动输入子窗口句柄") title = "Click a point to select child window" - hwnds = [window, child] + hWnds = [window, child] width, height = getsize(window) buffer = screenshot(window) - image = numpy.frombuffer(buffer, dtype = "uint8") + image = numpy.frombuffer(buffer, dtype="uint8") image.shape = (height, width, 4) cv2.namedWindow(title) - cv2.setMouseCallback(title, _onclicked, hwnds) + cv2.setMouseCallback(title, callback, hWnds) cv2.imshow(title, image) cv2.waitKey(0) cv2.destroyAllWindows() - child = hwnds[1] + child = hWnds[1] if child == 0: - print("遍历获取子窗口尚未编写, 请直接输入子窗口类名") classname = input("请输入子窗口类名: ") - if classname != '': child = _findwindow(window, classname, None) + if classname != '': + child = _findwindow(window, classname, None) if child == 0: print("还是失败的话请直接输入句柄吧...") - str = input("请输入子窗口句柄(16进制): ") - if str == '': child = window - else: child = int(str, 16) - print("已成功获取子窗口句柄: {}".format(hex(child))) + handle = input("请输入子窗口句柄(16进制): ") + if handle == '': + child = window + else: + child = int(handle, 16) + print(f"已成功获取子窗口句柄: {hex(child)}") return child -def _onclicked(event, x, y, flags, param): - if event == cv2.EVENT_LBUTTONDOWN: - param[1] = _getwindow(param[0], x, y) - print("已点击 X: {} Y: {} 窗口句柄: {}".format(x, y, hex(param[1]))) - return - def getsize(hWnd): - "获取窗口尺寸" + """获取窗口尺寸""" left, top, right, bottom = win32gui.GetClientRect(hWnd) return (right - left, bottom - top) +def getdpi(hWnd): + """获取屏幕DPI""" + hDC = win32gui.GetDC(hWnd) + dpi = (win32ui.GetDeviceCaps(hDC, win32con.LOGPIXELSX), win32ui.GetDeviceCaps(hDC, win32con.LOGPIXELSY)) + win32gui.ReleaseDC(hWnd, hDC) + return dpi + +def setnodpi(): + """禁用DPI""" + try: # >= windows 8.1 + ctypes.windll.shcore.SetProcessDpiAwareness(2) + except: # <= windows 8.0 + ctypes.windll.user32.SetProcessDPIAware() + def screenshot(hWnd): - "截图" + """截图""" width, height = getsize(hWnd) # 返回句柄窗口的设备环境,仅包括客户区 hDC = win32gui.GetDC(hWnd) @@ -154,10 +145,13 @@ def screenshot(hWnd): # 获取位图信息 buffer = bitmap.GetBitmapBits(True) # 转换为图片 - image = numpy.frombuffer(buffer, dtype = "uint8") - image.shape = (self.height, self.width, 4) + image = numpy.frombuffer(buffer, dtype="uint8") + image.shape = (height, width, 4) return image +def _toscreenpos(hWnd, x, y): + return win32gui.ClientToScreen(hWnd, (x, y)) + def _getcursorpos(): return win32api.GetCursorPos() @@ -169,22 +163,22 @@ def _setcursorpos(x, y): def _storecursorpos(): global _cursorpos _cursorpos = _getcursorpos() - return def _restorecursorpos(): _setcursorpos(_cursorpos[0], _cursorpos[1]) - return -def click(hWnd, x, y, activate = True): - "模拟鼠标点击" +def click(hWnd, x, y, activate=True): + """模拟鼠标点击""" _storecursorpos() scrpos = _toscreenpos(hWnd, x, y) _setcursorpos(scrpos[0], scrpos[1]) pos = win32api.MAKELONG(x, y) - if activate: win32gui.SetForegroundWindow(hWnd) # 无焦点也能点击, 为啥呢 + if activate: # 无焦点也能点击, 为啥呢 + win32gui.SetForegroundWindow(hWnd) win32api.SendMessage(hWnd, win32con.WM_MOUSEMOVE, 0, pos) win32api.SendMessage(hWnd, win32con.WM_MOUSEACTIVATE, hWnd, win32api.MAKELONG(win32con.HTCLIENT, win32con.WM_LBUTTONDOWN)) win32api.SendMessage(hWnd, win32con.WM_LBUTTONDOWN, win32con.MK_LBUTTON, pos) win32api.SendMessage(hWnd, win32con.WM_LBUTTONUP, 0, pos) _restorecursorpos() - return + +setnodpi() diff --git a/player.py b/player.py deleted file mode 100644 index dd17009..0000000 --- a/player.py +++ /dev/null @@ -1,223 +0,0 @@ -# coding=utf-8 - -import time -import random -import math -import numpy -import cv2 -from ppadb.client import Client as ADBClient -from scrcpy import ScrcpyClient -import utils - -class PlayerBase(object): - """模拟玩家操作的基类""" - # 游戏实际的宽度 - width = 0 - # 游戏实际的高度 - height = 0 - # 预览窗口高度 - wheight = 0 - # 预览窗口高度 / 游戏实际高度 - factor = 1.0 - - def __init__(self): - return - - def init(self): - """执行初始化操作""" - print("正在初始化, 请稍候") - try: - self.wheight = int(input("请输入预览窗口的高(默认480, 0为关闭预览): ")) - except ValueError: - self.wheight = 480 - return True - - def end(self): - """执行释放操作""" - cv2.destroyAllWindows() - return - - def calcFactor(self): - self.factor = self.wheight / self.height - return - - def screenshotraw(self): - """需要子类重写""" - return - - def screenshot(self): - image = self.screenshotraw() - image = cv2.resize(image, None, fx = self.factor, fy = self.factor, interpolation = cv2.INTER_AREA) - return image - - def clickraw(self, x, y): - """需要子类重写""" - return - - def click(self, x, y): - self.clickraw(x / self.factor, y / self.factor) - return - - def clickaround(self, x, y): - self.clickcircle(x, y, 4) - return - - def clickcircle(self, x, y, radius): - # 虽然不均匀, 但正是我想要的 - theta = 2 * math.pi * random.random() - rho = radius * random.random() - dx = rho * math.cos(theta) - dy = rho * math.sin(theta) - self.click(x + dx, y + dy) - return - - def clickbetween(self, x1, y1, x2, y2): - x, y = (x1 + x2) / 2, (y1 + y2) /2 - self.clickrect(x, y, abs(x2 - x1), abs(y2 - y1)) - return - - def clicksquare(self, x, y, length): - dl = random.randint(-length / 2, length / 2) - self.click(x + dl, y + dl) - return - - def clickrect(self, x, y, width, height): - dx = random.randint(-width / 2, width / 2) - dy = random.randint(-height / 2, height / 2) - self.click(x + dx, y + dy) - return - -class Player(PlayerBase): - """模拟鼠标点击窗口""" - window = 0 - - def init(self): - super().init() - self.window = utils.selectwindow() - if self.window == -1: return False - self.width, self.height = utils.getsize(self.window) - print("已获得窗口大小: {} X {}".format(self.width, self.height)) - self.calcFactor() - print("已计算缩放因子: {}".format(self.factor)) - return True - - def calcFactor(self): - # TODO DPI适配 - # 算了我写不出来, 那就别适配了= = - # dpi = utils.getdpi(self.window) - # self.width = int(self.width * dpi['x'] / 96) - # self.height = int(self.height * dpi['y'] / 96) - super().calcFactor() - return - - def screenshotraw(self): - return utils.screenshot(self.window) - - def clickraw(self, x, y): - utils.click(self.window, int(x), int(y)) - return - -class PlayerADB(PlayerBase): - """通过ADB控制手机""" - client = None - device = None - - def init(self): - super().init() - # os.system("adb start-server") - self.client = ADBClient(host="127.0.0.1", port=5037) - devices = [] - try: - print("正在检测设备...") - devices = client.devices() - except: - print("无法连接至ADB Server, 请先启动ADB. ") - return False - size = len(devices) - if size == 1: - print("已自动选择设备: {}".format(devices[0].serial)) - self.device = devices[0] - elif size > 1: - for i in range(size): - print("{}. {}".format(i + 1, device[i].serial)) - select = 0 - while select < 1 or select > size: - select = int(input("请选择设备序号: ")) - self.device = devices[select - 1] - else: - print("未检测到设备, 请手动连接设备. ") - return False - print("已成功连接至设备 {}".format(self.device.serial)) - self.height, self.width = self.device.wm_size() - print("已获得设备屏幕尺寸: {} X {}".format(self.width, self.height)) - self.calcFactor() - print("已计算缩放因子: {}".format(self.factor)) - return True - - def screenshotraw(self): - buffer = self.device.screencap() - image = numpy.frombuffer(buffer, dtype = "uint8") - image = cv2.imdecode(image, cv2.IMREAD_COLOR) - return image - - def clickraw(self, x, y): - self.device.input_tap(int(x), int(y)) - return - -class PlayerScrcpy(PlayerBase): - """使用Scrcpy获取截屏并模拟点击""" - - def init(self): - super().init() - self.client = ScrcpyClient(max_fps=30, queue_length=2) - if not self.client.start(): - print("连接失败") - return False - print("已成功连接至设备 {}".format(self.client.device_name)) - self.width, self.height = self.client.resolution - print("已获得设备屏幕尺寸: {} X {}".format(self.width, self.height)) - self.calcFactor() - print("已计算缩放因子: {}".format(self.factor)) - return True - - def end(self): - self.client.stop() - super().end() - return - - def screenshotraw(self): - image = self.client.get_next_frame(True) - if image is not None: - image = cv2.cvtColor(image, cv2.COLOR_RGB2RGBA) - self.lastimage = image - return self.lastimage - - def clickraw(self, x, y): - self.client.tap(x, y) - return - -class PlayerTest(PlayerBase): - """图像识别测试""" - path = None - image = None - - def init(self): - super().init() - self.path = input("请输入要测试的数据集路径: ") - if self.path == "": self.path = "test" - print("已成功读取数据集 {}".format(self.path)) - self.image = utils.readimage(self.path) - self.height, self.width = self.image.shape[:2] - print("已获得截图尺寸: {} X {}".format(self.width, self.height)) - self.calcFactor() - print("已计算缩放因子: {}".format(self.factor)) - return True - - def screenshotraw(self): - return numpy.copy(self.image) - - def clickraw(self, x, y): - x, y = int(x), int(y) - print("自动点击 X: {} Y: {}".format(x, y)) - time.sleep(0.01) - return diff --git a/players/__init__.py b/players/__init__.py new file mode 100644 index 0000000..f0cf6f0 --- /dev/null +++ b/players/__init__.py @@ -0,0 +1,18 @@ +from .player import addplayer, players +try: + from .native import NativePlayer + addplayer("原生模式", "需先启动安卓虚拟机并打开食物语", NativePlayer) +except ModuleNotFoundError as e: + addplayer("原生模式", e.name, None) +try: + from .adb import ADBPlayer + addplayer("ADB模式", "需手机连接电脑开启调试模式并打开食物语", ADBPlayer) +except ModuleNotFoundError as e: + addplayer("ADB模式", e.name, None) +try: + from .scrcpy import ScrcpyPlayer + addplayer("混合模式(*推荐*)", "使用scrcpy快速获取手机截屏并模拟点击", ScrcpyPlayer) +except ModuleNotFoundError as e: + addplayer("混合模式", e.name, None) +from .debug import DebugPlayer +addplayer("调试模式", "读取程序目录下的test.png并进行图像识别", DebugPlayer) diff --git a/players/adb.py b/players/adb.py new file mode 100644 index 0000000..9d47381 --- /dev/null +++ b/players/adb.py @@ -0,0 +1,47 @@ +from ppadb.client import Client as ADBClient +import numpy +import cv2 +from .player import Player + +class ADBPlayer(Player): + """通过ADB控制手机""" + client = None + device = None + + def __init__(self): + # os.system("adb start-server") + self.client = ADBClient(host="127.0.0.1", port=5037) + devices = [] + try: + print("正在检测设备...") + devices = self.client.devices() + except: + print("无法连接至ADB Server, 请先启动ADB") + raise + size = len(devices) + if size == 1: + print(f"已自动选择设备: {devices[0].serial}") + self.device = devices[0] + elif size > 1: + for i in range(size): + print("{i + 1}. {devices[i].serial}") + select = 0 + while select < 1 or select > size: + select = int(input("请选择设备序号: ")) + self.device = devices[select - 1] + else: + print("未检测到设备, 请手动连接设备") + raise Exception("No devices detected") + print(f"已成功连接至设备 {self.device.serial}") + self.height, self.width = self.device.wm_size() + print(f"已获得设备屏幕尺寸: {self.width} X {self.height}") + super().__init__() + + def _screenshot(self): + buffer = self.device.screencap() + image = numpy.frombuffer(buffer, dtype="uint8") + image = cv2.imdecode(image, cv2.IMREAD_COLOR) + return image + + def _click(self, x, y): + self.device.input_tap(int(x), int(y)) diff --git a/players/debug.py b/players/debug.py new file mode 100644 index 0000000..7a67afd --- /dev/null +++ b/players/debug.py @@ -0,0 +1,27 @@ +import time +import numpy +from .player import Player +import utils + +class DebugPlayer(Player): + """图像识别测试""" + path = None + image = None + + def __init__(self): + self.path = input("请输入要测试的数据集路径: ") + if self.path == "": + self.path = "test" + print(f"已成功读取数据集 {self.path}") + self.image = utils.readimage(self.path) + self.height, self.width = self.image.shape[:2] + print(f"已获得图像尺寸: {self.width} X {self.height}") + super().__init__() + + def _screenshot(self): + return numpy.copy(self.image) + + def _click(self, x, y): + x, y = int(x), int(y) + print(f"自动点击 X: {x} Y: {y}") + time.sleep(0.01) diff --git a/players/native.py b/players/native.py new file mode 100644 index 0000000..51809fc --- /dev/null +++ b/players/native.py @@ -0,0 +1,19 @@ +from .player import Player +import utils + +class NativePlayer(Player): + """模拟鼠标点击窗口""" + window = None + + def __init__(self): + self.window = utils.selectwindow() + assert self.window is not None + self.width, self.height = utils.getsize(self.window) + print(f"已获得窗口大小: {self.width} X {self.height}") + super().__init__() + + def _screenshot(self): + return utils.screenshot(self.window) + + def _click(self, x, y): + utils.click(self.window, int(x), int(y)) diff --git a/players/player.py b/players/player.py new file mode 100644 index 0000000..0257cbe --- /dev/null +++ b/players/player.py @@ -0,0 +1,74 @@ +import random +import math +import cv2 + +DESIGN_LENGTH = 480 + +class Player(object): + """模拟玩家操作的基类""" + + width: int + "游戏实际宽度" + height: int + "游戏实际高度" + factor: int + "设计宽高 / 游戏实际宽高" + + def __init__(self): + """初始化""" + self.factor = DESIGN_LENGTH / min(self.width, self.height) + print(f"已计算缩放比率: {self.factor}") + + def release(self): + """释放资源""" + pass + + def _screenshot(self): + """需要子类重写""" + return None + + def screenshot(self): + image = self._screenshot() + image = cv2.resize(image, None, fx=self.factor, fy=self.factor, interpolation=cv2.INTER_AREA) + return image + + def _click(self, x, y): + """需要子类重写""" + pass + + def click(self, x, y): + self._click(x / self.factor, y / self.factor) + + def clickaround(self, x, y): + self.clickcircle(x, y, 4) + + def clickcircle(self, x, y, radius): + # 虽然不均匀, 但正是我想要的 + theta = 2 * math.pi * random.random() + rho = radius * random.random() + dx = rho * math.cos(theta) + dy = rho * math.sin(theta) + self.click(x + dx, y + dy) + + def clickbetween(self, x1, y1, x2, y2): + x, y = (x1 + x2) / 2, (y1 + y2) / 2 + self.clickrect(x, y, abs(x2 - x1), abs(y2 - y1)) + + def clicksquare(self, x, y, length): + dl = random.randint(-length / 2, length / 2) + self.click(x + dl, y + dl) + + def clickrect(self, x, y, width, height): + dx = random.randint(-width / 2, width / 2) + dy = random.randint(-height / 2, height / 2) + self.click(x + dx, y + dy) + +_players = [] + +def addplayer(name, desc, cls): + """添加玩家操作模拟类""" + _players.append((name, desc, cls)) + +def players(): + """获取玩家操作模拟类""" + return _players diff --git a/players/scrcpy.py b/players/scrcpy.py new file mode 100644 index 0000000..8f6e616 --- /dev/null +++ b/players/scrcpy.py @@ -0,0 +1,31 @@ +import time +import cv2 +from .player import Player +from scrcpy import ScrcpyClient + +class ScrcpyPlayer(Player): + """使用Scrcpy获取截屏并模拟点击""" + client = None + + def __init__(self): + self.client = ScrcpyClient(max_fps=30, queue_length=3) + self.client.start() + print(f"已成功连接至设备 {self.client.device_name}") + self.width, self.height = self.client.resolution + print(f"已获得设备屏幕尺寸: {self.width} X {self.height}") + super().__init__() + + def release(self): + self.client.stop() + + def _screenshot(self): + while True: + image = self.client.get_next_frame(True) + if image is None: + time.sleep(0.01 / self.client.max_fps) + continue + image = cv2.cvtColor(image, cv2.COLOR_RGB2RGBA) + return image + + def _click(self, x, y): + self.client.tap(x, y) diff --git a/profit.py b/profit.py index 65e12f0..e4d3ee3 100644 --- a/profit.py +++ b/profit.py @@ -1,5 +1,3 @@ -# coding=utf-8 - from enum import Enum, auto import math import time @@ -10,16 +8,15 @@ class Goals(Enum): """ 指定线性规划的目标函数 - - MAX_MONEY 总利润最大 - MAX_TIME 总烹饪时间最长 - MAX_CONSUMPTION 消耗的食材最多 - ALL 小孩纸才做选择, 大人全都要 """ MAX_MONEY = 1 - MAX_TIME = auto() + """总利润最大""" + MAX_TIME = auto() + """总烹饪时间最长""" MAX_CONSUMPTION = auto() + """消耗的食材最多""" ALL = auto() + """小孩纸才做选择, 大人全都要""" def calculate(foods, materials, cooks, goal=Goals.MAX_MONEY): MIN_PRODUCTION = 0 # 菜肴最小烹饪量 @@ -35,7 +32,7 @@ def calculate(foods, materials, cooks, goal=Goals.MAX_MONEY): constraints = [] # 每种食材消耗量不超过总量 V(x, y, z, ...) <= V总, M(x, y, z, ...) <= M总, ... for ingredient in swy.Ingredients: - consumption = pulp.lpSum([foods[i].getConsumption(ingredient) * variables[i] for i in range(VAR_NUM)]) + consumption = pulp.lpSum([foods[i].consumption(ingredient) * variables[i] for i in range(VAR_NUM)]) consumptions.append(consumption) constraints.append(consumption <= materials[ingredient]) # 确保菜肴的烹饪状态为1时烹饪数量才大于0 0 * x' <= x <= 30 * x' @@ -52,8 +49,8 @@ def calculate(foods, materials, cooks, goal=Goals.MAX_MONEY): # 总消耗食材h(x, y, z, ...)=各菜肴烹饪数量乘以消耗食材量的总和 obj_consumption = pulp.lpSum(consumptions) # 加权 TODO 尝试其他将多目标线性规划转为单目标的方法? - objective = obj_money / 15_0000 + obj_consumption / sum(materials) - + obj_all = obj_money / 15_0000 + obj_consumption / sum(materials) + problem = pulp.LpProblem("swy_problem", pulp.LpMaximize) problem.addVariables(variables) problem.addVariables(cook_status) @@ -66,7 +63,7 @@ def calculate(foods, materials, cooks, goal=Goals.MAX_MONEY): elif goal == Goals.MAX_CONSUMPTION: problem.setObjective(obj_consumption) elif goal == Goals.ALL: - problem.setObjective(objective) + problem.setObjective(obj_all) print("目前混合模式采用加权把多目标转化为单目标函数, 效果不好") problem.solve() # pulp.LpStatus[problem.status] @@ -96,15 +93,15 @@ def run(): # pulp.LpSolverDefault = pulp.COIN_CMD(path="./libs/cbc.exe") print("欢迎使用食物语线性规划做菜计算器") print("原作者ic30rs, 现由WC维护") - foods = swy.readFoods() + foods = swy.readfoods() materials = [] cooks = [] print("请输入想要烹饪的菜肴的品质, 用空格分割, 不输入则默认使用全部菜肴") str = input("") if str != "": strs = str.split(' ') - dishtypes = set([swy.Rarities(r) for r in strs]) - foods = [food for food in foods if food.rarity in dishtypes] + dish_types = set([swy.Rarities(r) for r in strs]) + foods = [food for food in foods if food.rarity in dish_types] print("请输入食材数量, 格式为: 菜 肉 谷 蛋 鱼 虾") str = input("") strs = str.split(' ') @@ -124,19 +121,18 @@ def run(): type = Goals(int(str)) print("正在计算中, 请稍候...") t = time.time() - result, totalmoney, restmaterials = calculate(foods, materials, cooks, type) + result, total_money, rest_materials = calculate(foods, materials, cooks, type) print("计算完成, 耗时 %f 秒" % (time.time() - t)) if result is None: print("很抱歉, 该问题无解或有无穷多个最优解") return print("结果为(按照输入的顺序):") for index, item in enumerate(result): - cooktime = secondtotime(item["time"]) + cook_time = secondtotime(item["time"]) money = item["food"].price * item["count"] - print("%d. %sX%d 耗时: %s 盈利: %d贝币" % (index + 1, item["food"].name, item["count"], cooktime, money)) - print("总利润: %d贝币" % totalmoney) - print("剩余食材: 菜%d 肉%d 谷%d 蛋%d 鱼%d 虾%d" % tuple(restmaterials)) - return + print("%d. %sX%d 耗时: %s 盈利: %d贝币" % (index + 1, item["food"].name, item["count"], cook_time, money)) + print("总利润: %d贝币" % (total_money)) + print("剩余食材: 菜%d 肉%d 谷%d 蛋%d 鱼%d 虾%d" % tuple(rest_materials)) # 测试 if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index c35af49..0000000 --- a/requirements.txt +++ /dev/null @@ -1,20 +0,0 @@ -altgraph==0.17.2 -av==8.0.3 -future==0.18.2 -MouseInfo==0.1.3 -numpy==1.21.4 -opencv-python==4.5.4.60 -pefile==2021.9.3 -PuLP==2.5.1 -pure-python-adb==0.3.0.dev0 -PyAutoGUI==0.9.53 -PyGetWindow==0.0.9 -pyinstaller==4.7 -pyinstaller-hooks-contrib==2021.3 -PyMsgBox==1.0.9 -pyperclip==1.8.2 -PyRect==0.1.4 -PyScreeze==0.1.28 -pytweening==1.0.4 -pywin32==302 -pywin32-ctypes==0.2.0 diff --git a/scrcpy.py b/scrcpy.py index 2dfb96e..62f9f4e 100644 --- a/scrcpy.py +++ b/scrcpy.py @@ -1,5 +1,3 @@ -# coding=utf-8 - import time import subprocess import socket @@ -49,7 +47,6 @@ def __init__(self, max_size=0, bit_rate=8000000, max_fps=0, crop='-', self.ip = ip self.port = port self.video_data_queue = Queue(queue_length) - return def connect_and_forward_scrcpy(self): try: @@ -96,7 +93,6 @@ def connect_and_forward_scrcpy(self): time.sleep(1) except FileNotFoundError: raise FileNotFoundError("Could not find ADB at path: " + self.adb_path) - return def disable_forward(self): subprocess.call([self.adb_path, 'forward', '--remove', 'tcp:%d' % self.port]) @@ -125,7 +121,6 @@ def connect(self): print("Screen resolution: %dX%d" % (self.resolution[0], self.resolution[1])) self.video_socket.setblocking(False) - return def start(self): if self.is_running: return False @@ -138,7 +133,7 @@ def start(self): if self.decode_thread is None: self.decode_thread = Thread(target=self.loop, daemon=True) self.decode_thread.start() - except: + except Exception: self.stop() raise return False @@ -165,7 +160,6 @@ def loop(self): if self.video_data_queue.full(): self.video_data_queue.get() self.video_data_queue.put(frame.to_ndarray(format="bgr24")) - return def get_next_frame(self, latest_image=False): if self.video_data_queue and not self.video_data_queue.empty(): @@ -239,4 +233,3 @@ def stop(self): self.control_socket.close() self.control_socket = None self.disable_forward() - return diff --git a/swy-bot.pyproj b/swy-bot.pyproj index 02699b4..2859d96 100644 --- a/swy-bot.pyproj +++ b/swy-bot.pyproj @@ -24,43 +24,37 @@ 10.0 - - - Code - - - Code - - - Code - - - Code - - - Code - - - Code - - - Code - - - Code - - - Code - + + + + + + + + + + + + + + + + + + + + + + - + + + + - - - \ No newline at end of file diff --git a/swy.py b/swy.py index f00962c..8376e5b 100644 --- a/swy.py +++ b/swy.py @@ -1,6 +1,6 @@ -# coding=utf-8 - from enum import Enum, IntEnum, auto +from dataclasses import dataclass +from typing import List from utils import timetosecond, secondtotime class Rarities(Enum): @@ -21,22 +21,23 @@ class Ingredients(IntEnum): FISH = auto() SHRIMP = auto() +@dataclass class Food: - def __init__(self, name, rarity, price, time, consumptions): - self.name = name - self.rarity = rarity - self.price = price - self.time = time - self.consumptions = consumptions - return - - def getConsumption(self, ingredient: Ingredients) -> int: + """菜肴数据类""" + name: str + rarity: Rarities + price: int + time: int + consumptions: List[int] + + def consumption(self, ingredient: Ingredients) -> int: """获取该菜肴所需的某种食材的数量""" return self.consumptions[ingredient] foods = None -def readFoods() -> list: +def readfoods() -> List[Food]: + """读取菜肴数据""" global foods if foods is None: foods = list() @@ -47,13 +48,13 @@ def readFoods() -> list: rarity = Rarities(values[1]) price = int(values[2]) time = timetosecond(values[3]) - consumptions = [int(values[i + 4]) for i in range(len(Ingredients))] + consumptions = [int(values[4 + i]) for i in range(len(Ingredients))] foods.append(Food(name, rarity, price, time, consumptions)) return foods # 测试 if __name__ == "__main__": - print("读取菜单") - readFoods() - for food in foods: - print("%s(%s) \t售价: %d \t烹饪时间: %s(%d秒) \t消耗食材: %s" % (food.name, food.rarity.value, food.price, secondtotime(food.time), food.time, str(food.consumptions))) + print("加载菜肴数据") + for food in readfoods(): + print("%s(%s) \t售价: %d \t烹饪时间: %s(%d秒) \t消耗食材: %s" % + (food.name, food.rarity.value, food.price, secondtotime(food.time), food.time, str(food.consumptions))) diff --git a/task.py b/task.py deleted file mode 100644 index 2191d5c..0000000 --- a/task.py +++ /dev/null @@ -1,388 +0,0 @@ -# coding=utf-8 - -from enum import Enum, IntEnum, auto -import time -import random -import math -import numpy -import cv2 -from utils import readimage, writeimage -import matching - -class Phases(IntEnum): - """任务执行的阶段""" - BEGIN = 0 - RUNNING = 1 - END = 2 - -class Results(Enum): - """任务执行的结果""" - PASS = auto() - SUCCESS = auto() - FAIL = auto() - -tasks = [] - -def getTasks(): - """获取任务列表""" - return tasks - -def registerTask(task, name, desc): - """注册任务(已弃用,请使用装饰器注册任务)""" - task.name = name - task.description = desc - tasks.append(task) - return - -def Task(name, desc): - """用于自动注册任务的装饰器""" - def decorator(cls): - registerTask(cls, name, desc) - return cls - return decorator - -class TaskBase(object): - """自动挂机任务的基类""" - name = "" - description = "" - - def __init__(self): - """初始化""" - return - - def init(self): - self.image = None - return - - def begin(self, player, t): - """开始任务""" - return Results.FAIL - - def run(self, player, t): - """执行任务""" - return Results.FAIL - - def end(self, player, t): - """结束任务""" - return Results.FAIL - - def getImageCache(self): - """获取用于预览的图像,如果不想显示请返回None""" - return self.image - -@Task("自动客潮", "请将界面停留在餐厅") -class TaskKeChao(TaskBase): - """客潮自动化""" - - def __init__(self): - super().__init__() - self.templateButton = readimage("kechao_btn") - self.templateDish = readimage("kechao_dish_part") - self.templateTitle = readimage("kechao_title_part") - return - - def init(self): - super().init() - self.lastTime = 0 - # 0:餐厅界面 1:客潮对话框 2:客潮进行中 3:客潮结束结算界面 - self.step = 0 - self.pointCache = [] - return - - def begin(self, player, t): - """需要玩家位于餐厅界面""" - self.image = player.screenshot() - if self.step == 0: - points = findcircle(self.image, 25) - for x, y in points: - if x > (self.image.shape[1] * 0.9) and y < (self.image.shape[0] * 0.2): - # 找到客潮按钮 - player.clickaround(x, y) - self.lastTime = t - self.step = 1 - return Results.PASS - if t - self.lastTime > 3: - # 没找到客潮按钮且超时 - print("未找到客潮按钮, 请确认您正位于餐厅界面") - return Results.FAIL - elif self.step == 1 or self.step == 2: - if t - self.lastTime > 2: - points = findtemplate(self.image, self.templateButton) - for x, y in points: - # 找到开始按钮 - if self.step == 2: - print("点击按钮后没有开始, 可能是客潮开启次数不足") - return Results.FAIL - player.clickaround(x, y) - self.lastTime = t - self.step = 2 - return Results.PASS - # 找不到开始按钮 - if self.step == 2: - # 点过开始按钮了, 进入客潮 - self.lastTime = t - print("进入客潮") - return Results.SUCCESS - # 还没点过开始按钮, 回退到第0步 - self.lastTime = t - self.step = 0 - return Results.PASS - return Results.PASS - - def run(self, player, t): - """客潮挂机中""" - self.image = player.screenshot() - # 处理点的缓存 - self.pointCache = [(x, y, time - 1) for x, y, time in self.pointCache if time > 1] - # 识别圆来寻找菜(旧版本用模版匹配, 效果不好) - points = findcircle(self.image, 25) - points2 = [] - for x, y in points: - if x > (self.image.shape[1] * 0.9): continue - if y > (self.image.shape[0] * 0.8): - # 客潮结束回到餐厅 - self.lastTime = t - self.step = 3 - print("客潮结束") - return Results.SUCCESS - cv2.circle(self.image, (x, y), 25, (0, 0, 255), 3) - if not self.containpoint(x, y): - points2.append((x, y)) - if len(points2) > 0: - x, y = random.choice(points2) - player.clickaround(x, y) - self.pointCache.append((x, y, 10)) - self.lastTime = t - return Results.PASS - if t - self.lastTime > 15: - # 没人点菜, 停止挂机? - print("超过15秒钟没有客人点菜了, 停止挂机") - return Results.FAIL - return Results.PASS - - def end(self, player, t): - """客潮结束""" - self.image = player.screenshot() - if self.step == 3: - if t - self.lastTime > 2: - points = findtemplate(self.image, self.templateTitle) - for x, y in points: - # 正位于客潮结算界面 - filename = "KeChao_" + time.strftime("%Y-%m-%d-%H-%M-%S") - writeimage(filename, self.image) - print("已将客潮结算界面截图保存至: saved/%s.png" % filename) - player.clickaround(x, y) - self.lastTime = t - return Results.PASS - # 结算完了 - self.lastTime = t - return Results.SUCCESS - return Results.PASS - - def containpoint(self, x, y): - for cx, cy, time in self.pointCache: - if math.sqrt(math.pow(int(x) - int(cx), 2) + math.pow(int(y) - int(cy), 2)) < 5: - return True - return False - -class TaskMiniGame(TaskBase): - """活动小游戏挂机任务的基类""" - - def __init__(self): - super().__init__() - self.templateButton = readimage("minigame_btn") - return - - def init(self): - self.lastTime = 0 - # 是否点击过开始按钮了 - self.started = False - return - - def begin(self, player, t): - """需要玩家位于小游戏界面""" - self.image = player.screenshot() - if not self.started: - points = findtemplate(self.image, self.templateButton) - for x, y in points: - cv2.circle(self.image, (x, y), 40, (0, 0, 255), 2) - player.click(x, y) - self.lastTime = t - self.started = True - return Results.PASS - if t - self.lastTime > 3: - # 没找到开始按钮且超时 - print("未找到开始按钮, 请确认您正位于小游戏界面") - return Results.FAIL - elif t - self.lastTime > 1: - self.lastTime = t - return Results.SUCCESS - return Results.PASS - -try: - from constant import * -except: - # **若想使用请自行修改以下数据** - # 消除的时间间隔 - TIME_INTERVAL = 0.5 - # 游戏区域距离屏幕左方的距离 - MARGIN_LEFT = 0 - # 游戏区域距离屏幕顶部的距离 - MARGIN_TOP = 0 - # 横向方块数量 - HORIZONTAL_NUM = 10 - # 纵向方块数量 - VERTICAL_NUM = 10 - # 方块宽度 - SQUARE_WIDTH = 100 - # 方块高度 - SQUARE_HEIGHT = 100 - # 切片处理时的左上和右下坐标 - SUB_LT_X = 20 - SUB_LT_Y = 20 - SUB_RB_X = 80 - SUB_RB_Y = 80 - -@Task("自动小游戏-千人千面", "需自行修改代码进行配置") -class TaskQianRenQianMian(TaskMiniGame): - """千人千面自动连连看""" - - def init(self): - super().init() - self.result = None - self.pair = None - return - - def run(self, player, t): - """小游戏挂机中""" - self.image = player.screenshot() - for j in range(VERTICAL_NUM): - for i in range(HORIZONTAL_NUM): - x = MARGIN_LEFT + i * SQUARE_WIDTH - y = MARGIN_TOP + j * SQUARE_HEIGHT - cv2.rectangle(self.image, (x, y), (x + SQUARE_WIDTH, y + SQUARE_HEIGHT), (0, 255, 0), 1) - if self.result is None: - # 图像切片并保存在数组中 - squares = [] - for j in range(VERTICAL_NUM): - for i in range(HORIZONTAL_NUM): - x = MARGIN_LEFT + i * SQUARE_WIDTH - y = MARGIN_TOP + j * SQUARE_HEIGHT - square = self.image[y : y + SQUARE_HEIGHT, x : x + SQUARE_WIDTH] - # 每个方块向内缩小一部分防止边缘不一致造成干扰 - square = square[SUB_LT_Y : SUB_RB_Y, SUB_LT_X : SUB_RB_X] - squares.append(square) - # 相同的方块作为一种类型放在数组中 - types = [] - for square in squares: - if self.isbackground(square): - continue - if not self.isimageexist(square, types): - types.append(square) - # 将切片处理后的图片数组转换成相对应的数字矩阵 - self.result = [] - num = 0 - for j in range(VERTICAL_NUM): - line = [] - for i in range(HORIZONTAL_NUM): - if self.isbackground(squares[num]): - line.append(0) - else: - for t in range(len(types)): - if isimagesame(squares[num], types[t]): - line.append(t + 1) - break - num += 1 - self.result.append(line) - return Results.PASS - # 执行自动消除 - if t - self.lastTime >= TIME_INTERVAL: - self.lastTime = t - # 第二次选择 - if self.pair is not None: - player.click(self.pair[0] + SQUARE_WIDTH / 2, self.pair[1] + SQUARE_HEIGHT / 2) - self.pair = None - return Results.PASS - # 定位第一个选中点 - for i in range(len(self.result)): - for j in range(len(self.result[0])): - if self.result[i][j] != 0: - # 定位第二个选中点 - for m in range(len(self.result)): - for n in range(len(self.result[0])): - if self.result[m][n] != 0: - if matching.canConnect(i, j, m, n, self.result): - # 执行消除算法并进行第一次选择 - self.result[i][j] = 0 - self.result[m][n] = 0 - x1 = MARGIN_LEFT + j * SQUARE_WIDTH - y1 = MARGIN_TOP + i * SQUARE_HEIGHT - x2 = MARGIN_LEFT + n * SQUARE_WIDTH - y2 = MARGIN_TOP + m * SQUARE_HEIGHT - player.click(x1 + SQUARE_WIDTH / 2, y1 + SQUARE_HEIGHT / 2) - self.pair = (x2, y2) - return Results.PASS - # TODO 判断一下出现结束画面才算完毕, 否则等待一会后重新规划 - print("自动消除运行完毕") - return Results.SUCCESS - return Results.PASS - - def isbackground(self, img): - # TODO 是否有更好的算法? - # OpenCV的顺序是BGR不是RGB... - return abs(img[:, :, 0].mean() - 54) <= 10 and abs(img[:, :, 1].mean() - 70) <= 20 and abs(img[:, :, 2].mean() - 105) <= 15 - - def isimageexist(self, img, img_list): - for existed_img in img_list: - if isimagesame(img, existed_img): - return True - return False - -@Task("自动小游戏", "更多自动小游戏敬请期待...") -class TaskMoreMiniGames(TaskBase): - """算了放弃了, 毁灭吧赶紧的""" - - def begin(self, player, t): - print("我不想做了, 如果您需要的话可以自行编写挂机任务, 然后提交pr") - return Results.FAIL - -def isimagesame(img1, img2, threshold = 0.5): - # TODO 是否有更好的算法? - # b = numpy.subtract(existed_img, img) - # return not numpy.any(b) - result = cv2.matchTemplate(img1, img2, cv2.TM_CCOEFF_NORMED) - location = numpy.where(result >= threshold) - for pt in zip(*location[::-1]): - return True - return False - -def findtemplate(image, template, threshold = 0.75, outline = False): - theight, twidth = template.shape[:2] - result = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED) - # result = cv2.normalize(result, None, 0, 1, cv2.NORM_MINMAX) - location = numpy.where(result >= threshold) - lx, ly = 0, 0 - points = [] - for pt in zip(*location[::-1]): - x, y = pt[0] + int(twidth / 2), pt[1] + int(theight / 2) - # 去掉重复点 - if x - lx < twidth or y - ly < theight: continue - points.append((x, y)) - lx, ly = x, y - if outline: - cv2.rectangle(image, pt, (pt[0] + twidth, pt[1] + theight), (0, 255, 0), 1) - return points - -def findcircle(image, r, outline = False): - gimg = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) - gimg = cv2.medianBlur(gimg, 5) - result = cv2.HoughCircles(gimg, cv2.HOUGH_GRADIENT, 1, int(r / 2), None, 100, 40, r - 10, r + 10) - points = [] - # 寻找不到会返回None - if result is not None: - result = numpy.uint16(numpy.around(result)) - for p in result[0,:]: - points.append((p[0], p[1])) - if outline: - cv2.circle(image, (p[0], p[1]), p[2], (0, 255, 0), 1) - return points diff --git a/tasks/__init__.py b/tasks/__init__.py new file mode 100644 index 0000000..87ea11e --- /dev/null +++ b/tasks/__init__.py @@ -0,0 +1,3 @@ +from .task import tasks +from .kechao import KeChaoTask +from .minigame import QianRenQianMianTask diff --git a/tasks/kechao.py b/tasks/kechao.py new file mode 100644 index 0000000..e3cf9fc --- /dev/null +++ b/tasks/kechao.py @@ -0,0 +1,127 @@ +import random +import math +import time +import cv2 +from .task import RepeatTask, task +from algorithms.detect import findtemplate, findcircle +from utils import readimage, writeimage + +@task("自动客潮", "请将界面停留在餐厅") +class KeChaoTask(RepeatTask): + """客潮自动化""" + + def __init__(self): + self.template_button = readimage("kechao_btn") + self.template_dish = readimage("kechao_dish_part") + self.template_title = readimage("kechao_title_part") + + async def runonce(self, player): + while True: + restart = False + # 餐厅界面 + timer = self.timer(3) + while True: + # 寻找客潮按钮 + target = next(((x, y) for x, y in findcircle(self.frame, 25) + if x > (self.frame.shape[1] * 0.9) and y < (self.frame.shape[0] * 0.2)), None) + if target is not None: + # 找到客潮按钮 + x, y = target + player.clickaround(x, y) + await self.wait(1) + break + elif timer.timeout(): + # 未找到客潮按钮且超时 + print("未找到客潮按钮, 请确认您正位于餐厅界面") + return False + await self.next() + # 客潮对话框 + clicked = False + timer = self.timer(2) + while True: + # 寻找开始按钮 + targets = findtemplate(self.frame, self.template_button) + if not clicked: + # 未点过开始按钮 + if targets: + # 找到开始按钮 + x, y = targets[0] + player.clickaround(x, y) + await self.wait(2) + clicked = True + timer = self.timer(2) + continue + elif timer.timeout(): + # 2秒后还未找到开始按钮, 回退到餐厅界面 + print("未找到客潮对话框, 回退到餐厅界面重试") + restart = True + break + else: + # 已点过开始按钮 + if not targets: + # 未找到开始按钮, 进入客潮 + print("进入客潮") + await self.wait(2) + break + elif timer.timeout(): + # 2秒后还能找到开始按钮, 结束任务 + print("点击按钮后没有开始, 可能是客潮开启次数不足") + return False + await self.next() + if not restart: + break + # 客潮挂机中 + point_cache = [] + timer = self.timer(15) + while True: + # 处理点的缓存 + point_cache = [(x, y, t - 1) for x, y, t in point_cache if t > 1] + # 识别圆来寻找菜(旧版本用模版匹配, 效果不好) + end = False + targets = [] + for x, y in findcircle(self.frame, 25): + if x > (self.frame.shape[1] * 0.9): + continue + if y > (self.frame.shape[0] * 0.8): + end = True + break + cv2.circle(self.frame, (x, y), 25, (0, 0, 255), 3) + if next(((cx, cy) for cx, cy, _ in point_cache + if math.dist((x, y), (cx, cy)) < 5), None) is None: + targets.append((x, y)) + if end: + # 客潮结束回到餐厅 + print("客潮结束") + await self.wait(2) + break + elif targets: + # 随机挑选一个客人上菜! + x, y = random.choice(targets) + player.clickaround(x, y) + point_cache.append((x, y, 10)) + timer = self.timer(15) + elif timer.timeout(): + # 没人点菜, 停止挂机? + print("超过15秒钟没有客人点菜了, 停止挂机") + return False + await self.next() + # 客潮结算界面 + timer = self.timer(3) + while True: + # 寻找客潮结算界面 + targets = findtemplate(self.frame, self.template_title) + if targets: + # 正位于客潮结算界面 + x, y = targets[0] + filename = "KeChao_" + time.strftime("%Y-%m-%d-%H-%M-%S") + writeimage(filename, self.frame) + print(f"已将客潮结算界面截图保存至: saved/{filename}.png") + player.clickaround(x, y) + await self.wait(2) + break + elif timer.timeout(): + # 未找到客潮结算界面 + print("未在3秒内进入客潮结算, 停止挂机") + return False + await self.next() + return True diff --git a/tasks/minigame.py b/tasks/minigame.py new file mode 100644 index 0000000..14ea62b --- /dev/null +++ b/tasks/minigame.py @@ -0,0 +1,159 @@ +import cv2 +from .task import Task, task +from algorithms.detect import isimagesame, findtemplate +from algorithms.matching import canConnect +from utils import readimage +try: + from constant import * +except: + # **若想使用请自行修改以下数据** + # 消除的时间间隔 + TIME_INTERVAL = 1.0 + # 游戏区域距离屏幕左方的距离 + MARGIN_LEFT = 0 + # 游戏区域距离屏幕顶部的距离 + MARGIN_TOP = 0 + # 横向方块数量 + HORIZONTAL_NUM = 10 + # 纵向方块数量 + VERTICAL_NUM = 10 + # 方块宽度 + SQUARE_WIDTH = 100 + # 方块高度 + SQUARE_HEIGHT = 100 + # 切片处理时的左上和右下坐标 + SUB_LT_X = 20 + SUB_LT_Y = 20 + SUB_RB_X = 80 + SUB_RB_Y = 80 + +class MiniGameTask(Task): + """活动小游戏挂机任务的基类""" + + def __init__(self, button_name): + super().__init__() + self.template_button = readimage(button_name) + + async def run(self, player): + started = False + timer = self.timer(3) + while True: + targets = findtemplate(self.frame, self.template_button) + if not started: + # 未点过开始按钮 + if targets: + # 找到开始按钮 + x, y = targets[0] + player.click(x, y) + await self.wait(1) + started = True + timer = self.timer(2) + continue + elif timer.timeout(): + # 未找到开始按钮且超时 + print("未找到开始按钮, 请确认您正位于小游戏界面") + return False + else: + # 已点过开始按钮 + if not targets: + # 未找到开始按钮 + print("开始小游戏") + return True + elif timer.timeout(): + # 1秒后还能找到开始按钮 + print("点击按钮后小游戏没有开始, 1秒后重试") + await self.wait(1) + started = False + timer = self.timer(3) + continue + await self.next() + +@task("自动小游戏-千人千面", "需自行修改代码进行配置") +class QianRenQianMianTask(MiniGameTask): + """千人千面自动连连看""" + + def __init__(self): + super().__init__("minigame_btn") + + async def run(self, player): + if not await super().run(player): + return + # 图像切片并保存在数组中 + squares = [] + for j in range(VERTICAL_NUM): + for i in range(HORIZONTAL_NUM): + x = MARGIN_LEFT + i * SQUARE_WIDTH + y = MARGIN_TOP + j * SQUARE_HEIGHT + square = self.frame[y:(y + SQUARE_HEIGHT), x:(x + SQUARE_WIDTH)] + # 每个方块向内缩小一部分防止边缘不一致造成干扰 + square = square[SUB_LT_Y:SUB_RB_Y, SUB_LT_X:SUB_RB_X] + squares.append(square) + # 相同的方块作为一种类型放在数组中 + types = [] + for square in squares: + if self.isbackground(square): + continue + if next((type for type in types + if isimagesame(square, type)), None) is None: + types.append(square) + # 将切片处理后的图片数组转换成相对应的数字矩阵 + result = [] + num = 0 + for j in range(VERTICAL_NUM): + line = [] + for i in range(HORIZONTAL_NUM): + if self.isbackground(squares[num]): + line.append(0) + else: + for t in range(len(types)): + if isimagesame(squares[num], types[t]): + line.append(t + 1) + break + num += 1 + result.append(line) + # 执行自动消除 + await self.next() + while True: + # 绘制网格 + # for j in range(VERTICAL_NUM): + # for i in range(HORIZONTAL_NUM): + # x = MARGIN_LEFT + i * SQUARE_WIDTH + # y = MARGIN_TOP + j * SQUARE_HEIGHT + # cv2.rectangle(self.frame, (x, y), (x + SQUARE_WIDTH, y + SQUARE_HEIGHT), (0, 255, 0), 1) + # 执行消除算法 + pair = self.matchpair(result) + if pair: + ((x1, y1), (x2, y2)) = pair + player.click(x1, y1) + await self.wait(TIME_INTERVAL) + player.click(x2, y2) + await self.wait(TIME_INTERVAL) + else: + # TODO 判断一下出现结束画面才算完毕, 否则等待一会后重新规划 + print("自动消除运行完毕") + break + + def matchpair(self, grid): + # 定位第一个选中点 + for i in range(len(grid)): + for j in range(len(grid[0])): + if grid[i][j] != 0: + # 定位第二个选中点 + for m in range(len(grid)): + for n in range(len(grid[0])): + if grid[m][n] != 0: + if canConnect(i, j, m, n, grid): + # 消除成功 + grid[i][j] = 0 + grid[m][n] = 0 + x1 = MARGIN_LEFT + j * SQUARE_WIDTH + y1 = MARGIN_TOP + i * SQUARE_HEIGHT + x2 = MARGIN_LEFT + n * SQUARE_WIDTH + y2 = MARGIN_TOP + m * SQUARE_HEIGHT + return ((x1 + SQUARE_WIDTH / 2, y1 + SQUARE_HEIGHT / 2), + (x2 + SQUARE_WIDTH / 2, y2 + SQUARE_HEIGHT / 2)) + return () + + def isbackground(self, img): + # TODO 是否有更好的算法? + return abs(img[:, :, 0].mean() - 54) <= 10 and abs(img[:, :, 1].mean() - 70) <= 20 and abs(img[:, :, 2].mean() - 105) <= 15 diff --git a/tasks/task.py b/tasks/task.py new file mode 100644 index 0000000..1d5e15f --- /dev/null +++ b/tasks/task.py @@ -0,0 +1,69 @@ +import types +import cv2 +from utils import Timer + +class Task(object): + """自动挂机任务的基类""" + name: str + description: str + + frame = None # : cv2.Mat + + async def run(self, player): + """执行任务""" + return True + + def timer(self, interval: float): + """创建一个在指定秒后到期的定时器""" + return Timer(interval) + + @types.coroutine + def next(self): + """等待下一帧""" + yield + + @types.coroutine + def wait(self, s: float): + """等待指定秒""" + timer = self.timer(s) + while not timer.timeout(): + yield + + @types.coroutine + def inputpoint(self): + """获取点""" + self.clicked = None + while self.clicked is None: + yield + point, self.clicked = self.clicked, None + return point + +class RepeatTask(Task): + """可重复执行的自动挂机任务""" + + async def runonce(self, player): + """执行一次任务""" + return True + + async def run(self, player): + times = 0 + while True: + times += 1 + print(f"第 {times} 次运行任务") + if not await self.runonce(player): + break + +_tasks = [] + +def task(name, desc): + """用于自动注册任务的装饰器""" + def decorator(cls): + cls.name = name + cls.description = desc + _tasks.append(cls) + return cls + return decorator + +def tasks(): + """获取任务列表""" + return _tasks diff --git a/utils.py b/utils.py index 7af1698..0337af4 100644 --- a/utils.py +++ b/utils.py @@ -1,14 +1,22 @@ -# coding=utf-8 - import sys -import cv2 -if sys.platform.startswith("win32") or sys.platform.startswith("cygwin"): - from platforms.windows import * -else: - from platforms.linux import * +import time +from platforms import * + +class Timer(object): + """定时器类""" + end_time: float + + def __init__(self, interval: float): + self.end_time = time.perf_counter() + interval + + def timeout(self) -> bool: + return time.perf_counter() >= self.end_time -def timetosecond(time: str) -> int: - strs = time.split(':') +def throw(msg: str): + raise Exception(msg) + +def timetosecond(time_str: str) -> int: + strs = time_str.split(':') second = int(strs[-1]) minute = int(strs[-2]) if len(strs) > 1 else 0 hour = int(strs[-3]) if len(strs) > 2 else 0 @@ -21,12 +29,5 @@ def secondtotime(second: int) -> str: minute %= 60 return "%02d:%02d:%02d" % (hour, minute, second) -def ispacked(): +def ispacked() -> bool: return hasattr(sys, "frozen") - -def readimage(name): - return cv2.imread("./data/" + name + ".png", cv2.IMREAD_UNCHANGED) - -def writeimage(name, image): - cv2.imwrite("./saved/" + name + ".png", image, [int(cv2.IMWRITE_PNG_COMPRESSION), 3]) - return