Skip to content

Commit

Permalink
Merge pull request KKKKKKKEM#7 from yintian710/main
Browse files Browse the repository at this point in the history
feat(添加 CSV 种子引擎): 添加 CSV 种子引擎
  • Loading branch information
KKKKKKKEM authored Dec 8, 2023
2 parents 27987f1 + caf266f commit e7f974b
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 13 deletions.
2 changes: 1 addition & 1 deletion bricks/core/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ async def main():
self.loop.run_until_complete(main())
try:
self.loop.close()
except:
except: # noqa
pass

def stop(self):
Expand Down
2 changes: 1 addition & 1 deletion bricks/lib/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def drop(self, columns):
for column in columns:
try:
item.__delitem__(column)
except:
except: # noqa
pass

@property
Expand Down
4 changes: 2 additions & 2 deletions bricks/lib/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, __dict: Optional[Union[dict, str]] = ..., **kwargs) -> None:
self.fingerprint = None
try:
super().__init__(__dict, **kwargs)
except:
except: # noqa
self.fingerprint = __dict

fingerprint = property(
Expand Down Expand Up @@ -213,7 +213,7 @@ def remove(self, *values):
try:
for value in values:
self.queue.remove(value)
except:
except: # noqa
pass
else:
count += 1
Expand Down
2 changes: 1 addition & 1 deletion bricks/lib/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def is_json(self, **kwargs):
try:
self.json(**kwargs)
return True
except:
except: # noqa
return False

def __str__(self):
Expand Down
89 changes: 89 additions & 0 deletions bricks/plugins/make_seeds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
"""
@File : make_seeds.py
@Date : 2023-12-06 16:39
@Author : yintian
@Desc :
"""
import math
import time
from typing import Optional, Union, List
from urllib.parse import urlencode

from bricks.utils.csv import CsvReader
from bricks.utils.pandora import get_files_by_path


def by_csv(
path: Union[str, List[str]],
fields: list = None,
batch_size: int = 10000,
skip: Union[str, int] = None,
query: str = None,
mapping: Optional[dict] = None,
stop: Optional[int] = math.inf,
reader_kwargs: Optional[dict] = None,
record: Optional[dict] = None,
):
"""
从 CSV 中获取种子
:param path: 文件路径
:param fields: 暂不支持
:param batch_size: 一次投多少种子
:param skip: 跳过初始多少种子
:param query: 查询条件, 为 python 伪代码, 如 "a < 10 and 'bbb' in b.lower()"
:param mapping: 暂不支持
:param stop: 投到多少停止
:param reader_kwargs: 初始化 csv reader 的其他参数
:param record:
:return:
"""
record = record or {}
reader_kwargs = reader_kwargs or {}
mapping = mapping or {}
if mapping or fields:
raise ValueError(f'暂不支持 mapping/fields 参数')
if skip is None:
if record:
skip = 'auto'
else:
skip = 0

raw_skip = skip
total = 0
for file in get_files_by_path(path):
_record = {
"path": path,
"file": file,
"query": query,
}
record_key = urlencode(_record)
if raw_skip == 'auto':
total = int(record.get('total', 0))
skip = int(record.get(record_key, 0))
else:
skip = raw_skip
with CsvReader(file_path=file, **reader_kwargs) as reader:
for row in reader.iter_data(
count=batch_size,
skip=skip,
fields=fields,
query=query
):
total += len(row)
skip += len(row)
if total > stop:
return
record.update({record_key: skip})
yield row


if __name__ == '__main__':
st = time.time()
for __ in range(1000):
with CsvReader(file_path='../../files/e.csv') as r:
for _ in r.iter_data(skip=10, query='int(a) % 3 == 0'):
# print(_)
pass
print(time.time() - st)
8 changes: 4 additions & 4 deletions bricks/spider/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ def make_seeds(self, context: air.Context, **kwargs):

for node in pandora.iterable(self.config.init):
engine = node.func
args = node.args or []
kwargs = node.kwargs or {}
node_args = node.args or []
node_kwargs = node.kwargs or {}

if not callable(engine):
engine = pandora.load_objects(engine)

seeds = pandora.invoke(
func=engine,
args=[context, *args],
kwargs=kwargs,
args=[context, *node_args],
kwargs={**kwargs, **node_kwargs},
annotations={air.Context: context},
namespace={"context": context}
)
Expand Down
111 changes: 111 additions & 0 deletions bricks/utils/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
"""
@File : csv.py
@Date : 2023-12-08 11:10
@Author : yintian
@Desc :
"""
import csv
import itertools
import os


class CsvReader:
"""CSV reader"""
__cache = {}

def __init__(self, file_path, page_size=10, encoding='utf-8-sig', **kwargs):
self.file_path = file_path
self.page_size = page_size
self.current_page = 1
self.total_rows = 0
self.encoding = encoding
for k, v in kwargs.items():
setattr(self, k, v)

def __enter__(self):
if not os.path.exists(self.file_path):
raise FileExistsError(f'文件 {self.file_path} 不存在')
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if reader := self.__cache.get(self.file_path):
reader['file'].close()
del reader['reader']
self.__cache.pop(self.file_path)

@property
def data(self) -> csv.DictReader:
if self.__cache.get(self.file_path):
return self.__cache[self.file_path]['reader']
file = open(self.file_path, 'r', encoding=self.encoding)
csv_reader = csv.DictReader(file)
self.__cache[self.file_path] = {
'reader': csv_reader,
'file': file
}
return csv_reader

@staticmethod
def build_query(query):
"""
构建过滤方法
:param query: 过滤条件, 伪 Python 代码
:return:
"""
if not query:
return lambda x: True
_query = (query.
replace(' AND ', ' and ').
replace(' OR ', ' or ').
replace(' NULL ', ' None ').
replace(' NOT ', ' not '))

# 将 query 语句包装在一个假的表达式中
_query = f"True if ({_query}) else False"

if ' like ' in _query.lower():
raise ValueError('筛选暂不支持 like 语法')
if ' exists ' in _query.lower():
raise ValueError('筛选暂不支持 exists 语法')
if ' between ' in _query.lower():
raise ValueError('筛选暂不支持 between 语法')

def filter_func(item):
return eval(_query, {**item})

return filter_func

def iter_data(self, count=1000, skip=0, query='', fields=None):
"""
获取数据
:param count: 要获取多少个
:param skip: 跳过多少个
:param query: 查询条件, 伪 Python 代码
:param fields: 暂不支持
:return:
"""
fields = fields or [] # noqa
query_func = self.build_query(query)
try:
if skip:
_ = [next(self.data) for _ in range(skip)]
except StopIteration:
return
while True:
data = []
row_count = itertools.count(1)
for row in self.data:
if not query_func(row):
continue
data.append(row)
if next(row_count) >= count:
break
if not data:
return
yield data


if __name__ == '__main__':
pass
24 changes: 22 additions & 2 deletions bricks/utils/pandora.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def prepare(func, args=None, kwargs: dict = None, annotations: dict = None, name

try:
parameters = inspect.signature(func).parameters
except:
except: # noqa
parameters = {}
new_args = [*args]
kwargs and new_args.append(kwargs)
Expand Down Expand Up @@ -221,7 +221,7 @@ def use_jsonp():
for func in funcs:
try:
return func()
except:
except: # noqa
pass

else:
Expand Down Expand Up @@ -256,6 +256,26 @@ def get_simple_stack(e):
return formatted_trace


def get_files_by_path(path: Union[List[str], str]):
if not path:
return []
if isinstance(path, list):
return [j for i in path for j in get_files_by_path(i)]
if os.path.isdir(path):
dirname = path
else:
return [path]
if os.path.exists(path):
ret = map(
lambda x: os.path.join(dirname, x),
os.listdir(path) if os.path.isdir(path) else [os.path.basename(path)]
)
else:
ret = []

return ret


if __name__ == '__main__':
def fun(a: int, *args, c: int = 1, **kwargs):
print(a, args, c, kwargs)
Expand Down
17 changes: 15 additions & 2 deletions demos/template_spider_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,27 @@
# @Desc :
from bricks import const
from bricks.core import signals
from bricks.lib.queues import RedisQueue
from bricks.plugins.make_seeds import by_csv
from bricks.spider import template
from bricks.spider.template import Config


class Spider(template.Spider):

def __init__(self, **kwargs):

super().__init__(**kwargs)

@property
def config(self) -> Config:
return Config(
init=[
template.Init(
func=lambda: {"page": 1}
func=by_csv,
kwargs={
'path': r'D:\yintian\myProject\bricks\files\e.csv',
}
)
],
events={
Expand Down Expand Up @@ -88,4 +98,7 @@ def is_success(context: template.Context):

if __name__ == '__main__':
spider = Spider()
spider.run()
spider.run(
task_name='init'
)

0 comments on commit e7f974b

Please sign in to comment.