1.概述
平时大家搭建自己的因子库,肯定要会涉及到行情数据的下载,因子库的计算入库等工作,股票数据相对来说数量比较大,更新一次需要不少时间,本文将分享如何通过多线程的方式加快数据的下载,以此为例,也可以扩展到其他大数据任务的计算中。
本文使用 Tushare作为数据源,下载 A 股市场所有股票的日线数据(open, high, low, close, vol),我们将分析串行跟并行两种方法在时间效率上的表现。
2. 串行下载
串行下载是最直观的实现方式,按顺序逐个处理每只股票的数据下载请求。注册好tushare账号之后,获取到token,配置到config.json中,或者直接传入也行。
先获取所有的股票,去掉停牌以及ST的,再调用daily接口获取高开低收成交量数据,最后将数据保存到csv中,以下是完整的代码:
import tushare as ts
import pandas as pd
import os
import datetime
import time
import json
from tqdm import tqdm
# 读取配置
def load_tushare_token(config_file='config.json'):
with open(config_file, 'r') as f:
config = json.load(f)
return config['tushare_token']
# 简单速率控制
def rate_limit_wait(interval=0.12):
"""每次请求间隔0.12秒,确保不超过500次/分钟"""
time.sleep(interval)
# 初始化 Tushare
tushare_token = load_tushare_token()
ts.set_token(tushare_token)
pro = ts.pro_api()
# 获取股票列表
def get_all_stocks():
stock_list = pro.stock_basic(exchange='', list_status='L', fields='ts_code')
# 过滤ST股票
stock_list = stock_list[~stock_list['ts_code'].str.contains('ST')]
return stock_list['ts_code'].tolist()
# 单股票数据获取
def fetch_single_stock_data(ts_code, start_date, end_date):
rate_limit_wait()
try:
df = pro.daily(
ts_code=ts_code,
start_date=start_date,
end_date=end_date,
fields='ts_code,trade_date,open,high,low,close,vol'
)
return df
except Exception as e:
print(f"获取 {ts_code} 数据失败: {e}")
return pd.DataFrame()
# 保存数据到CSV
def save_stock_data(df, ts_code, save_dir='data/serial_download'):
os.makedirs(save_dir, exist_ok=True)
if not df.empty:
file_path = os.path.join(save_dir, f"{ts_code}.csv")
df.to_csv(file_path, index=False)
# 串行下载主函数
def serial_download():
start_time = time.time()
target_end_date = datetime.datetime.today().strftime('%Y%m%d')
start_date = '20240101'
stock_list = get_all_stocks()
print(f"📊 开始串行下载 {len(stock_list)} 只股票数据...")
success_count = 0
for ts_code in tqdm(stock_list, desc="串行下载进度"):
df = fetch_single_stock_data(ts_code, start_date, target_end_date)
if not df.empty:
save_stock_data(df, ts_code)
success_count += 1
end_time = time.time()
total_time = end_time - start_time
print(f"✅ 串行下载完成!")
print(f"📈 成功下载: {success_count}/{len(stock_list)} 只股票")
print(f"⏱️ 总耗时: {total_time:.2f} 秒 ({total_time / 60:.2f} 分钟)")
print(f"🚀 平均每只股票: {total_time / len(stock_list):.2f} 秒")
return total_time
if __name__ == "__main__":
serial_download()
整体运行完耗时如下:
大概花了半个小时多,当然跟机器性能也有很大的关系,笔者电脑是i5+16G内存的配置。
3. 并发下载
接下来我们尝试使用多线程来改善下载速度,使用python的线程池来优化。在Python里,线程池能够高效管理和复用线程,避免频繁创建和销毁线程带来的开销。线程池预先创建一定数量的线程,当有任务提交时,会从线程池中获取空闲线程来执行任务。任务执行完毕后,线程不会被销毁,而是继续等待新任务,这样就提高了程序的性能。
以下是核心代码:
from concurrent.futures import ThreadPoolExecutor
# 创建一个包含3个线程的线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务到线程池
future = executor.submit(func, arg1, arg2) # func是任务函数,arg1,arg2是函数参数
# 获取任务结果(会阻塞直到任务完成)
result = future.result()
以下是完整的使用线程池来完成下载的例子:
import tushare as ts
import pandas as pd
import os
import datetime
import time
import json
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
# 读取配置
def load_tushare_token(config_file='config.json'):
with open(config_file, 'r') as f:
config = json.load(f)
return config['tushare_token']
# 高级速率控制器
class RateLimiter:
def __init__(self, max_calls_per_minute=500):
self.max_calls = max_calls_per_minute
self.interval = 60.0 / self.max_calls # 每次调用间隔
self.lock = threading.Lock() # 线程锁
self.last_call = time.time() # 上次调用时间
def wait(self):
"""确保调用频率不超过限制"""
with self.lock:
now = time.time()
elapsed = now - self.last_call
wait_time = self.interval - elapsed
if wait_time > 0:
time.sleep(wait_time)
self.last_call = time.time()
# 初始化 Tushare
tushare_token = load_tushare_token()
ts.set_token(tushare_token)
pro = ts.pro_api()
# 全局速率控制器
rate_limiter = RateLimiter(max_calls_per_minute=500)
# 获取股票列表
def get_all_stocks():
stock_list = pro.stock_basic(exchange='', list_status='L', fields='ts_code')
# 过滤ST股票
stock_list = stock_list[~stock_list['ts_code'].str.contains('ST')]
return stock_list['ts_code'].tolist()
# 批量获取多股票数据
def fetch_batch_data(ts_codes, start_date, end_date):
"""一次API调用获取多只股票数据"""
all_df = pd.DataFrame()
# daily接口支持批量查询,但建议分批处理
for ts_code in ts_codes:
rate_limiter.wait()
try:
df = pro.daily(
ts_code=ts_code,
start_date=start_date,
end_date=end_date,
fields='ts_code,trade_date,open,high,low,close,vol'
)
if not df.empty:
all_df = pd.concat([all_df, df])
except Exception as e:
print(f"获取 {ts_code} 数据失败: {e}")
continue
return all_df
# 保存批量数据到CSV文件
def save_batch_to_csv(df, save_dir='data/concurrent_download'):
"""将批量数据按股票代码分别保存"""
os.makedirs(save_dir, exist_ok=True)
for ts_code, group in df.groupby('ts_code'):
file_path = os.path.join(save_dir, f"{ts_code}.csv")
# 如果文件已存在,合并去重
if os.path.exists(file_path):
old_df = pd.read_csv(file_path, dtype={'ts_code': str, 'trade_date': str})
group = pd.concat([old_df, group]).drop_duplicates(
subset=['trade_date']
).sort_values('trade_date')
group.to_csv(file_path, index=False)
# 获取本地数据最新日期
def get_local_latest_dates(ts_codes, save_dir='data/concurrent_download'):
"""检查本地已有数据的最新日期,用于增量更新"""
latest_dates = {}
for ts_code in ts_codes:
file_path = os.path.join(save_dir, f"{ts_code}.csv")
if os.path.exists(file_path):
df = pd.read_csv(file_path, dtype={'ts_code': str, 'trade_date': str})
latest_dates[ts_code] = df['trade_date'].max()
else:
latest_dates[ts_code] = None
return latest_dates
# 更新一批股票数据
def update_batch(ts_codes, target_end_date, save_dir='data/concurrent_download'):
"""处理一批股票的数据更新"""
latest_dates = get_local_latest_dates(ts_codes, save_dir)
# 计算每只股票需要更新的起始日期
start_dates = {}
for ts_code in ts_codes:
local_date = latest_dates[ts_code]
if local_date:
start_date = (pd.to_datetime(local_date) + pd.Timedelta(days=1)).strftime('%Y%m%d')
else:
start_date = '20240101' # 如果本地无数据,从年初开始
if start_date <= target_end_date:
start_dates[ts_code] = start_date
if not start_dates:
return # 无需更新
# 使用最早的起始日期进行批量查询
batch_start_date = min(start_dates.values())
df = fetch_batch_data(ts_codes, batch_start_date, target_end_date)
if not df.empty:
save_batch_to_csv(df, save_dir)
# 并发下载主函数
def concurrent_download(batch_size=40, max_workers=10):
start_time = time.time()
target_end_date = datetime.datetime.today().strftime('%Y%m%d')
stock_list = get_all_stocks()
print(f"🚀 开始并发下载 {len(stock_list)} 只股票数据...")
print(f"📦 批处理大小: {batch_size},并发线程数: {max_workers}")
# 将股票列表分批
batches = [stock_list[i:i + batch_size] for i in range(0, len(stock_list), batch_size)]
print(f"📊 总批次数: {len(batches)}")
# 使用线程池并发处理
success_batches = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有批次任务
futures = [
executor.submit(update_batch, batch, target_end_date)
for batch in batches
]
# 等待任务完成并显示进度
for future in tqdm(as_completed(futures), total=len(futures), desc="并发下载进度"):
try:
future.result() # 获取结果,如果有异常会抛出
success_batches += 1
except Exception as e:
print(f"批次处理失败: {e}")
end_time = time.time()
total_time = end_time - start_time
print(f"✅ 并发下载完成!")
print(f"📈 成功处理批次: {success_batches}/{len(batches)}")
print(f"⏱️ 总耗时: {total_time:.2f} 秒 ({total_time / 60:.2f} 分钟)")
print(f"🚀 平均每只股票: {total_time / len(stock_list):.3f} 秒")
print(f"⚡ 并发效率提升: {(len(stock_list) * 0.2) / total_time:.2f}x")
return total_time
if __name__ == "__main__":
concurrent_download(batch_size=40, max_workers=10) # 调整参数适配daily接口
运行以上程序,得到耗时如下:
从结果上看,采用多线程的方式,速度提高了3倍,当然还可以进一步优化,这些感兴趣的同学可以下去再试试看。
4. 总结
通过对比分析,并发下载方案在处理大规模股票数据时具有压倒性优势,可以大幅缩短数据处理的耗时,为大家节省宝贵的时间。本期就介绍到这,欢迎交流,有感性的话题也可以评论区留言,能力范围内的一定不吝分享,下期见!