You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
from Bio import SeqIO
from Bio import Entrez
import re
import io
import hashlib
import urllib.request
import urllib.error
import time
from itertools import chain
自定义异常类,用于区分服务器错误
class ServerException(Exception):
pass
class DownloadError(Exception):
"""基类,用于所有下载相关的错误。"""
pass
class ProteinDownloadError(DownloadError):
"""特定于蛋白质序列下载的错误。"""
pass
class CDSDownloadError(DownloadError):
"""特定于CDS序列下载的错误。"""
pass
class GenomicDownloadError(DownloadError):
"""特定于基因组序列下载的错误。"""
pass
重试请求函数
def retry_request(request_func, backoff, timeout, retries=3):
for _ in range(retries):
try:
handle = request_func
return handle
except urllib.error.HTTPError as e:
if e.code == 500:
time.sleep(backoff)
else:
raise ServerException(f"HTTP Error: {e}")
raise ServerException("Max retries exceeded for HTTP request")
# 解析HTTP响应内容
esearch_record = Entrez.read(esearch_response) # 使用Entrez.read来解析XML文件
# 关闭HTTP响应对象
esearch_response.close()
add_delay(1)
# 直接获取IdList
seq_ids = esearch_record["IdList"] # 获取"IdList"键对应的值
# 打开输出文件
with open(protein_output_filename, "w") as protein_output_file:
for seq_id in seq_ids:
try:
# 获取efetch的HTTP响应对象
efetch_func = retry_request(
lambda: Entrez.efetch(db="protein", id=seq_id, rettype="fasta", retmode="text"), backoff=2,
timeout=10)
efetch_response = efetch_func() # 调用函数获取HTTP响应对象
# 读取efetch响应内容并写入文件
protein_output_file.write(efetch_response.read())
# 关闭efetch响应对象
efetch_response.close()
except urllib.error.URLError as e:
print(f"URL error for sequence ID {seq_id}: {e.reason}")
raise ProteinDownloadError(f"Failed to download protein sequence with ID {seq_id}")
except Exception as e:
print(f"An unexpected error occurred for sequence ID {seq_id}: {e}")
raise DownloadError(f"An unexpected error occurred for sequence ID {seq_id}")
print(f"All protein sequences for {organism} have been downloaded and saved to {protein_output_filename}.")
# 直接获取IdList
seq_ids = esearch_result["IdList"]
# 打开输出文件以写入基因组序列
with open(genomic_output_filename, "w") as genomic_output_file:
for seq_id in seq_ids:
try:
# 获取efetch的HTTP响应对象
efetch_func = retry_request(
lambda: Entrez.efetch(db="nucleotide", id=seq_id, rettype="gb", retmode="text"), backoff=2,
timeout=10)
efetch_response = efetch_func()
genomic_record = SeqIO.read(io.BytesIO(efetch_response.read()), "genbank") # 读取GenBank记录
# 写入基因组序列
genomic_output_file.write(str(genomic_record.seq) + "\n")
efetch_response.close()
except urllib.error.URLError as e:
print(f"URL error for sequence ID {seq_id}: {e.reason}")
except Exception as e:
print(f"An error occurred for sequence ID {seq_id}: {e}")
print(f"All genomic sequences for {organism} have been downloaded and saved to {genomic_output_filename}.")
调用函数获取并保存蛋白质、CDS和基因组序列
for organism, taxid in organisms.items():
fetch_and_save_protein_sequences(organism, protein_output_filename)
fetch_and_save_cds_sequences(organism, cds_output_filename)
fetch_and_save_genomic_sequences(organism, genomic_output_filename)
调用函数获取并保存蛋白质、CDS和基因组序列
for organism, taxid in organisms.items():
fetch_and_save_protein_sequences(organism, protein_output_filename)
fetch_and_save_cds_sequences(organism, cds_output_filename)
fetch_and_save_genomic_sequences(organism, genomic_output_filename)
定义筛选匹配序列的函数
def find_matching_sequences(input_filename, btw, numc, seen_sequences):
"""
从给定的FASTA文件中筛选出符合特定条件的序列,并进行去重。
"""
with open(input_filename, "r") as input_file:
for record in SeqIO.parse(input_file, "fasta"):
sequence = str(record.seq) # 序列转换为字符串
c_positions = [m.start() for m in re.finditer('C', sequence)] # 找到所有'C'的位置
if len(c_positions) >= numc:
valid_pairs = []
for i in range(len(c_positions) - 1):
if c_positions[i + 1] - c_positions[i] <= btw:
valid_pairs.append((c_positions[i], c_positions[i + 1]))
consecutive_valid_pairs = 0
for i in range(len(valid_pairs) - 1):
if valid_pairs[i][1] == valid_pairs[i + 1][0] - 1: # 检查是否连续
consecutive_valid_pairs += 1
if consecutive_valid_pairs >= numc - 1: # 检查连续对的数量
# 使用SHA-256哈希算法
sequence_hash = hashlib.sha256(sequence.encode('utf-8')).hexdigest()
if sequence_hash not in seen_sequences:
yield record # 生成满足条件的序列
seen_sequences.add(sequence_hash) # 添加到已见集合
break # 找到后退出循环
else:
consecutive_valid_pairs = 0 # 重置计数器
from Bio import SeqIO
from Bio import Entrez
import re
import io
import hashlib
import urllib.request
import urllib.error
import time
from itertools import chain
自定义异常类,用于区分服务器错误
class ServerException(Exception):
pass
class DownloadError(Exception):
"""基类,用于所有下载相关的错误。"""
pass
class ProteinDownloadError(DownloadError):
"""特定于蛋白质序列下载的错误。"""
pass
class CDSDownloadError(DownloadError):
"""特定于CDS序列下载的错误。"""
pass
class GenomicDownloadError(DownloadError):
"""特定于基因组序列下载的错误。"""
pass
重试请求函数
def retry_request(request_func, backoff, timeout, retries=3):
for _ in range(retries):
try:
handle = request_func
return handle
except urllib.error.HTTPError as e:
if e.code == 500:
time.sleep(backoff)
else:
raise ServerException(f"HTTP Error: {e}")
raise ServerException("Max retries exceeded for HTTP request")
添加延迟函数
def add_delay(seconds):
time.sleep(seconds)
定义拉丁学名
organisms = {
"Panax ginseng": "txid4054",
" Hevea brasiliensis": "txid3981",
"Robinia pseudoacacia": "txid35938",
"Pueraria lobata": "txid3893",
"Salvinia natans": "txid42333",
"Albizia julibrissin": "txid3813",
"Cercis chinensis": "txid161750",
}
定义输出文件名
protein_output_filename = "proteins.fasta"
cds_output_filename = "cds.fasta"
genomic_output_filename = "genomic.fasta"
Entrez.email = "[email protected]"
去重集合
seen_sequences = set()
搜索并保存protein序列到FASTA文件
def fetch_and_save_protein_sequences(organism, protein_output_filename):
# 获取esearch的HTTP响应对象
esearch_func = retry_request(lambda: Entrez.esearch(db="protein", term=organism, usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func() # 调用函数获取HTTP响应对象
搜索并保存CDS序列到FASTA文件
def fetch_and_save_cds_sequences(organism, cds_output_filename):
# 使用esearch搜索指定物种的mRNA序列
esearch_func = retry_request(lambda: Entrez.esearch(db="nucleotide", term=f"{organism}[mRNA]", usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func()
esearch_result = Entrez.read(esearch_response) # 读取esearch响应结果
esearch_response.close()
add_delay(1)
定义一个函数用于获取并保存基因组序列
def fetch_and_save_genomic_sequences(organism, genomic_output_filename):
# 使用esearch搜索指定物种的基因组序列
esearch_func = retry_request(lambda: Entrez.esearch(db="nucleotide", term=f"{organism}[Genomic]", usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func()
esearch_result = Entrez.read(esearch_response) # 读取esearch响应结果
esearch_response.close()
add_delay(1)
调用函数获取并保存蛋白质、CDS和基因组序列
for organism, taxid in organisms.items():
fetch_and_save_protein_sequences(organism, protein_output_filename)
fetch_and_save_cds_sequences(organism, cds_output_filename)
fetch_and_save_genomic_sequences(organism, genomic_output_filename)
调用函数获取并保存蛋白质、CDS和基因组序列
for organism, taxid in organisms.items():
fetch_and_save_protein_sequences(organism, protein_output_filename)
fetch_and_save_cds_sequences(organism, cds_output_filename)
fetch_and_save_genomic_sequences(organism, genomic_output_filename)
定义筛选匹配序列的函数
def find_matching_sequences(input_filename, btw, numc, seen_sequences):
"""
从给定的FASTA文件中筛选出符合特定条件的序列,并进行去重。
"""
with open(input_filename, "r") as input_file:
for record in SeqIO.parse(input_file, "fasta"):
sequence = str(record.seq) # 序列转换为字符串
c_positions = [m.start() for m in re.finditer('C', sequence)] # 找到所有'C'的位置
if len(c_positions) >= numc:
valid_pairs = []
for i in range(len(c_positions) - 1):
if c_positions[i + 1] - c_positions[i] <= btw:
valid_pairs.append((c_positions[i], c_positions[i + 1]))
将匹配序列写入文件的函数
def write_matching_sequences_to_file(matching_sequences_generator, output_file):
"""
将筛选出的匹配序列写入到指定的FASTA文件中。
参数:
matching_sequences_generator (迭代器): 包含匹配序列的生成器。
output_file (str): 输出文件的路径。
"""
with open(output_file, "w") as output_handle:
SeqIO.write(matching_sequences_generator, output_handle, "fasta")
设置筛选参数的范围
btw_range = range(20, 100) # btw的取值范围
numc_values = [3, 4, 5] # numc的取值
清空去重集合
seen_sequences.clear()
定义最终结果文件名
final_result_filename = "8.5_result.fasta"
初始化一个空的生成器列表
all_matching_sequences = []
对于每个btw和numc的组合,找到匹配的序列
for btw in btw_range:
for numc in numc_values:
# 找到匹配的序列并添加到生成器列表中
matching_sequences_proteins = find_matching_sequences(protein_output_filename, btw, numc, seen_sequences)
matching_sequences_cds = find_matching_sequences(cds_output_filename, btw, numc, seen_sequences)
matching_sequences_genomic = find_matching_sequences(genomic_output_filename, btw, numc, seen_sequences)
调用函数,传递生成器列表而不是单个生成器
write_matching_sequences_to_file(chain(*all_matching_sequences), final_result_filename)
打印完成消息
print(f"All matching sequences have been saved in {final_result_filename}")
The text was updated successfully, but these errors were encountered: