Skip to content

Commit

Permalink
导入hdf增加保护
Browse files Browse the repository at this point in the history
  • Loading branch information
fasiondog committed Jan 3, 2022
1 parent 804108d commit 50797b1
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 113 deletions.
47 changes: 33 additions & 14 deletions hikyuu/data/common_h5.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,27 @@
import datetime
import tables as tb

from hikyuu.util import hku_catch
from hikyuu.util.mylog import hku_error

HDF5_COMPRESS_LEVEL = 9


class H5Record(tb.IsDescription):
"""HDF5基础K线数据格式(日线、分钟线、5分钟线"""
datetime = tb.UInt64Col() #IGNORE:E1101
openPrice = tb.UInt32Col() #IGNORE:E1101
highPrice = tb.UInt32Col() #IGNORE:E1101
lowPrice = tb.UInt32Col() #IGNORE:E1101
closePrice = tb.UInt32Col() #IGNORE:E1101
transAmount = tb.UInt64Col() #IGNORE:E1101
transCount = tb.UInt64Col() #IGNORE:E1101
datetime = tb.UInt64Col() #IGNORE:E1101
openPrice = tb.UInt32Col() #IGNORE:E1101
highPrice = tb.UInt32Col() #IGNORE:E1101
lowPrice = tb.UInt32Col() #IGNORE:E1101
closePrice = tb.UInt32Col() #IGNORE:E1101
transAmount = tb.UInt64Col() #IGNORE:E1101
transCount = tb.UInt64Col() #IGNORE:E1101


class H5Index(tb.IsDescription):
"""HDF5扩展K线数据格式(周线、月线、季线、半年线、年线、15分钟线、30分钟线、60分钟线"""
datetime = tb.UInt64Col() #IGNORE:E1101
start = tb.UInt64Col() #IGNORE:E1101
datetime = tb.UInt64Col() #IGNORE:E1101
start = tb.UInt64Col() #IGNORE:E1101


class H5Transaction(tb.IsDescription):
Expand Down Expand Up @@ -69,12 +73,16 @@ class H5MinuteTime(tb.IsDescription):
# K线数据
#------------------------------------------------------------------------------


def open_h5file(dest_dir, market, ktype):
filename = "{}/{}_{}.h5".format(dest_dir, market.lower(), ktype.lower())
h5file = tb.open_file(filename, "a", filters=tb.Filters(complevel=HDF5_COMPRESS_LEVEL, complib='zlib', shuffle=True))
h5file = tb.open_file(
filename, "a", filters=tb.Filters(complevel=HDF5_COMPRESS_LEVEL, complib='zlib', shuffle=True)
)
return h5file
@hku_catch(None, trace=True)
def get_h5table(h5file, market, code):
try:
group = h5file.get_node("/", "data")
Expand Down Expand Up @@ -228,7 +236,8 @@ def getNewDate(index_type, olddate):
try:
table = h5file.get_node("/data", tablename)
except:
except Exception as e:
hku_error("{}".format(e))
return
for index_type in index_list:
Expand Down Expand Up @@ -275,11 +284,16 @@ def getNewDate(index_type, olddate):
# 分笔数据
#------------------------------------------------------------------------------
def open_trans_file(dest_dir, market):
filename = "{}/{}_trans.h5".format(dest_dir, market.lower())
h5file = tb.open_file(filename, "a", filters=tb.Filters(complevel=HDF5_COMPRESS_LEVEL, complib='zlib', shuffle=True))
h5file = tb.open_file(
filename, "a", filters=tb.Filters(complevel=HDF5_COMPRESS_LEVEL, complib='zlib', shuffle=True)
)
return h5file
@hku_catch(trace=True)
def get_trans_table(h5file, market, code):
try:
group = h5file.get_node("/", "data")
Expand Down Expand Up @@ -320,7 +334,7 @@ def update_hdf5_trans_index(h5file, tablename):
index_row = index_table.row
if index_total:
index_last_date = int(index_table[-1]['datetime'])
last_date = int(table[-1]['datetime']//1000000 * 10000)
last_date = int(table[-1]['datetime'] // 1000000 * 10000)
if index_last_date == last_date:
return
startix = int(index_table[-1]['start'])
Expand All @@ -345,14 +359,19 @@ def update_hdf5_trans_index(h5file, tablename):
index += 1
index_table.flush()
#------------------------------------------------------------------------------
# 分时数据
#------------------------------------------------------------------------------
def open_time_file(dest_dir, market):
filename = "{}/{}_time.h5".format(dest_dir, market.lower())
h5file = tb.open_file(filename, "a", filters=tb.Filters(complevel=HDF5_COMPRESS_LEVEL, complib='zlib', shuffle=True))
h5file = tb.open_file(
filename, "a", filters=tb.Filters(complevel=HDF5_COMPRESS_LEVEL, complib='zlib', shuffle=True)
)
return h5file
@hku_catch(None, trace=True)
def get_time_table(h5file, market, code):
try:
group = h5file.get_node("/", "data")
Expand Down
125 changes: 65 additions & 60 deletions hikyuu/data/pytdx_to_h5.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import sqlite3
from pytdx.hq import TDXParams

from hikyuu.util.mylog import hku_error

from .common import MARKETID, STOCKTYPE, get_stktype_list
from .common_sqlite3 import (
get_codepre_list, create_database, get_marketid, get_last_date, get_stock_list, update_last_date
)
from .common_h5 import (
H5Record, H5Index, open_h5file, get_h5table, update_hdf5_extern_data, open_trans_file,
get_trans_table, update_hdf5_trans_index, open_time_file, get_time_table
H5Record, H5Index, open_h5file, get_h5table, update_hdf5_extern_data, open_trans_file, get_trans_table,
update_hdf5_trans_index, open_time_file, get_time_table
)
from .weight_to_sqlite import qianlong_import_weight

Expand Down Expand Up @@ -78,16 +80,12 @@ def import_stock_name(connect, api, market, quotations=None):
stktype_list = get_stktype_list(quotations)
a = cur.execute(
"select stockid, code, name, valid from stock where marketid={} and type in {}".format(
marketid, stktype_list
)
"select stockid, code, name, valid from stock where marketid={} and type in {}".format(marketid, stktype_list)
)
a = a.fetchall()
oldStockDict = {}
for oldstock in a:
oldstockid, oldcode, oldname, oldvalid = oldstock[0], oldstock[1], oldstock[2], int(
oldstock[3]
)
oldstockid, oldcode, oldname, oldvalid = oldstock[0], oldstock[1], oldstock[2], int(oldstock[3])
oldStockDict[oldcode] = oldstockid
# 新的代码表中无此股票,则置为无效
Expand All @@ -97,14 +95,9 @@ def import_stock_name(connect, api, market, quotations=None):
# 股票名称发生变化,更新股票名称;如果原无效,则置为有效
if oldcode in newStockDict:
if oldname != newStockDict[oldcode]:
cur.execute(
"update stock set name='%s' where stockid=%i" %
(newStockDict[oldcode], oldstockid)
)
cur.execute("update stock set name='%s' where stockid=%i" % (newStockDict[oldcode], oldstockid))
if oldvalid == 0:
cur.execute(
"update stock set valid=1, endDate=99999999 where stockid=%i" % oldstockid
)
cur.execute("update stock set valid=1, endDate=99999999 where stockid=%i" % oldstockid)
# 处理新出现的股票
codepre_list = get_codepre_list(connect, marketid, quotations)
Expand Down Expand Up @@ -183,16 +176,18 @@ def guess_5min_n_step(last_datetime):
return (n, step)
def import_one_stock_data(
connect, api, h5file, market, ktype, stock_record, startDate=199012191500
):
def import_one_stock_data(connect, api, h5file, market, ktype, stock_record, startDate=199012191500):
market = market.upper()
pytdx_market = to_pytdx_market(market)
stockid, marketid, code, valid, stktype = stock_record[0], stock_record[1], stock_record[2], stock_record[3], \
stock_record[4]
table = get_h5table(h5file, market, code)
if table is None:
hku_error("Can't get table({}{})".format(market, code))
return 0

last_datetime = table[-1]['datetime'] if table.nrows > 0 else startDate

today = datetime.date.today()
Expand Down Expand Up @@ -234,7 +229,8 @@ def import_one_stock_data(
bar_datetime = (tmp.year * 10000 + tmp.month * 100 + tmp.day) * 10000
if ktype != 'DAY':
bar_datetime += bar['hour'] * 100 + bar['minute']
except:
except Exception as e:
hku_error("Failed translate datetime: {}! {}".format(bar, e))
continue

if today_datetime >= bar_datetime > last_datetime \
Expand All @@ -251,8 +247,8 @@ def import_one_stock_data(
row['transCount'] = round(bar['vol'])
row.append()
add_record_count += 1
except:
print("Can't trans record:", bar)
except Exception as e:
hku_error("Can't trans record: {}! {}".format(bar, e))
last_datetime = bar_datetime

if add_record_count > 0:
Expand All @@ -278,22 +274,12 @@ def import_one_stock_data(
elif table.nrows == 0:
#print(market, stock_record)
table.remove()
pass

#table.close()
return add_record_count


def import_data(
connect,
market,
ktype,
quotations,
api,
dest_dir,
startDate=199012190000,
progress=ProgressBar
):
def import_data(connect, market, ktype, quotations, api, dest_dir, startDate=199012190000, progress=ProgressBar):
"""导入通达信指定盘后数据路径中的K线数据。注:只导入基础信息数据库中存在的股票。

:param connect : sqlit3链接
Expand Down Expand Up @@ -351,14 +337,23 @@ def import_on_stock_trans(connect, api, h5file, market, stock_record, max_days):
stockid, marketid, code, valid, stktype = stock_record[0], stock_record[1], stock_record[2], stock_record[3], \
stock_record[4]
table = get_trans_table(h5file, market, code)
if table is None:
hku_error("Failed get_trans_table({}{})!".format(market, code))
return 0

today = datetime.date.today()
if table.nrows > 0:
last_datetime = int(table[-1]['datetime'] // 1000000)
last_y = int(last_datetime // 10000)
last_m = int(last_datetime // 100 - last_y * 100)
last_d = int(last_datetime - (last_y * 10000 + last_m * 100))
last_date = datetime.date(last_y, last_m, last_d)
need_days = (today - last_date).days
try:
last_datetime = int(table[-1]['datetime'] // 1000000)
last_y = int(last_datetime // 10000)
last_m = int(last_datetime // 100 - last_y * 100)
last_d = int(last_datetime - (last_y * 10000 + last_m * 100))
last_date = datetime.date(last_y, last_m, last_d)
need_days = (today - last_date).days
except Exception as e:
hku_error("Failed get last date from hdf5({}{}), remove this table! {}".format(market, code, e))
table.remove()
return 0
else:
need_days = max_days

Expand All @@ -381,25 +376,28 @@ def import_on_stock_trans(connect, api, h5file, market, stock_record, max_days):
pre_minute = 900

for record in buf:
minute = int(record['time'][0:2]) * 100 + int(record['time'][3:])
if minute != pre_minute:
second = 0 if minute == 1500 else 2
pre_minute = minute
else:
second += 3
if second > 59:
continue
row['datetime'] = cur_date * 1000000 + minute * 100 + second
row['price'] = int(record['price'] * 1000)
row['vol'] = record['vol']
row['buyorsell'] = record['buyorsell']
row.append()
add_record_count += 1
try:
minute = int(record['time'][0:2]) * 100 + int(record['time'][3:])
if minute != pre_minute:
second = 0 if minute == 1500 else 2
pre_minute = minute
else:
second += 3
if second > 59:
continue
row['datetime'] = cur_date * 1000000 + minute * 100 + second
row['price'] = int(record['price'] * 1000)
row['vol'] = record['vol']
row['buyorsell'] = record['buyorsell']
row.append()
add_record_count += 1
except Exception as e:
hku_error("Failed trans to record! {}", e)

if add_record_count > 0:
table.flush()
elif table.nrows == 0:
table.remove
table.remove()

return add_record_count

Expand Down Expand Up @@ -437,6 +435,10 @@ def import_on_stock_time(connect, api, h5file, market, stock_record, max_days):
stockid, marketid, code, valid, stktype = stock_record[0], stock_record[1], stock_record[2], stock_record[3], \
stock_record[4]
table = get_time_table(h5file, market, code)
if table is None:
hku_error("Can't get table({}{})!".format(market, code))
return 0

today = datetime.date.today()
if table.nrows > 0:
last_datetime = int(table[-1]['datetime'] // 10000)
Expand Down Expand Up @@ -474,17 +476,20 @@ def import_on_stock_time(connect, api, h5file, market, stock_record, max_days):
time = 1300
elif time == 1360:
time = 1400
row['datetime'] = this_date + time
row['price'] = int(record['price'] * 1000)
row['vol'] = record['vol']
row.append()
time += 1
add_record_count += 1
try:
row['datetime'] = this_date + time
row['price'] = int(record['price'] * 1000)
row['vol'] = record['vol']
row.append()
time += 1
add_record_count += 1
except Exception as e:
hku_error("Failed trans record {}! {}".format(record, e))

if add_record_count > 0:
table.flush()
elif table.nrows == 0:
table.remove
table.remove()

return add_record_count

Expand Down
Loading

0 comments on commit 50797b1

Please sign in to comment.