python使用多线程加速行情数据下载更新
  AlphaSmith 2天前 43 1

1.概述

平时大家搭建自己的因子库,肯定要会涉及到行情数据的下载,因子库的计算入库等工作,股票数据相对来说数量比较大,更新一次需要不少时间,本文将分享如何通过多线程的方式加快数据的下载,以此为例,也可以扩展到其他大数据任务的计算中。
本文使用 Tushare作为数据源,下载 A 股市场所有股票的日线数据(open, high, low, close, vol),我们将分析串行跟并行两种方法在时间效率上的表现。

2. 串行下载

串行下载是最直观的实现方式,按顺序逐个处理每只股票的数据下载请求。注册好tushare账号之后,获取到token,配置到config.json中,或者直接传入也行。
image.png
先获取所有的股票,去掉停牌以及ST的,再调用daily接口获取高开低收成交量数据,最后将数据保存到csv中,以下是完整的代码:

import tushare as ts import pandas as pd import os import datetime import time import json from tqdm import tqdm # 读取配置 def load_tushare_token(config_file='config.json'): with open(config_file, 'r') as f: config = json.load(f) return config['tushare_token'] # 简单速率控制 def rate_limit_wait(interval=0.12): """每次请求间隔0.12秒,确保不超过500次/分钟""" time.sleep(interval) # 初始化 Tushare tushare_token = load_tushare_token() ts.set_token(tushare_token) pro = ts.pro_api() # 获取股票列表 def get_all_stocks(): stock_list = pro.stock_basic(exchange='', list_status='L', fields='ts_code') # 过滤ST股票 stock_list = stock_list[~stock_list['ts_code'].str.contains('ST')] return stock_list['ts_code'].tolist() # 单股票数据获取 def fetch_single_stock_data(ts_code, start_date, end_date): rate_limit_wait() try: df = pro.daily( ts_code=ts_code, start_date=start_date, end_date=end_date, fields='ts_code,trade_date,open,high,low,close,vol' ) return df except Exception as e: print(f"获取 {ts_code} 数据失败: {e}") return pd.DataFrame() # 保存数据到CSV def save_stock_data(df, ts_code, save_dir='data/serial_download'): os.makedirs(save_dir, exist_ok=True) if not df.empty: file_path = os.path.join(save_dir, f"{ts_code}.csv") df.to_csv(file_path, index=False) # 串行下载主函数 def serial_download(): start_time = time.time() target_end_date = datetime.datetime.today().strftime('%Y%m%d') start_date = '20240101' stock_list = get_all_stocks() print(f"📊 开始串行下载 {len(stock_list)} 只股票数据...") success_count = 0 for ts_code in tqdm(stock_list, desc="串行下载进度"): df = fetch_single_stock_data(ts_code, start_date, target_end_date) if not df.empty: save_stock_data(df, ts_code) success_count += 1 end_time = time.time() total_time = end_time - start_time print(f"✅ 串行下载完成!") print(f"📈 成功下载: {success_count}/{len(stock_list)} 只股票") print(f"⏱️ 总耗时: {total_time:.2f} 秒 ({total_time / 60:.2f} 分钟)") print(f"🚀 平均每只股票: {total_time / len(stock_list):.2f} 秒") return total_time if __name__ == "__main__": serial_download()

整体运行完耗时如下:
fb3ad66df1a370615c8a16bbd39994f.png
大概花了半个小时多,当然跟机器性能也有很大的关系,笔者电脑是i5+16G内存的配置。

3. 并发下载

接下来我们尝试使用多线程来改善下载速度,使用python的线程池来优化。在Python里,线程池能够高效管理和复用线程,避免频繁创建和销毁线程带来的开销。线程池预先创建一定数量的线程,当有任务提交时,会从线程池中获取空闲线程来执行任务。任务执行完毕后,线程不会被销毁,而是继续等待新任务,这样就提高了程序的性能。
以下是核心代码:

from concurrent.futures import ThreadPoolExecutor # 创建一个包含3个线程的线程池 with ThreadPoolExecutor(max_workers=3) as executor: # 提交任务到线程池 future = executor.submit(func, arg1, arg2) # func是任务函数,arg1,arg2是函数参数 # 获取任务结果(会阻塞直到任务完成) result = future.result()

以下是完整的使用线程池来完成下载的例子:

import tushare as ts import pandas as pd import os import datetime import time import json import threading from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm # 读取配置 def load_tushare_token(config_file='config.json'): with open(config_file, 'r') as f: config = json.load(f) return config['tushare_token'] # 高级速率控制器 class RateLimiter: def __init__(self, max_calls_per_minute=500): self.max_calls = max_calls_per_minute self.interval = 60.0 / self.max_calls # 每次调用间隔 self.lock = threading.Lock() # 线程锁 self.last_call = time.time() # 上次调用时间 def wait(self): """确保调用频率不超过限制""" with self.lock: now = time.time() elapsed = now - self.last_call wait_time = self.interval - elapsed if wait_time > 0: time.sleep(wait_time) self.last_call = time.time() # 初始化 Tushare tushare_token = load_tushare_token() ts.set_token(tushare_token) pro = ts.pro_api() # 全局速率控制器 rate_limiter = RateLimiter(max_calls_per_minute=500) # 获取股票列表 def get_all_stocks(): stock_list = pro.stock_basic(exchange='', list_status='L', fields='ts_code') # 过滤ST股票 stock_list = stock_list[~stock_list['ts_code'].str.contains('ST')] return stock_list['ts_code'].tolist() # 批量获取多股票数据 def fetch_batch_data(ts_codes, start_date, end_date): """一次API调用获取多只股票数据""" all_df = pd.DataFrame() # daily接口支持批量查询,但建议分批处理 for ts_code in ts_codes: rate_limiter.wait() try: df = pro.daily( ts_code=ts_code, start_date=start_date, end_date=end_date, fields='ts_code,trade_date,open,high,low,close,vol' ) if not df.empty: all_df = pd.concat([all_df, df]) except Exception as e: print(f"获取 {ts_code} 数据失败: {e}") continue return all_df # 保存批量数据到CSV文件 def save_batch_to_csv(df, save_dir='data/concurrent_download'): """将批量数据按股票代码分别保存""" os.makedirs(save_dir, exist_ok=True) for ts_code, group in df.groupby('ts_code'): file_path = os.path.join(save_dir, f"{ts_code}.csv") # 如果文件已存在,合并去重 if os.path.exists(file_path): old_df = pd.read_csv(file_path, dtype={'ts_code': str, 'trade_date': str}) group = pd.concat([old_df, group]).drop_duplicates( subset=['trade_date'] ).sort_values('trade_date') group.to_csv(file_path, index=False) # 获取本地数据最新日期 def get_local_latest_dates(ts_codes, save_dir='data/concurrent_download'): """检查本地已有数据的最新日期,用于增量更新""" latest_dates = {} for ts_code in ts_codes: file_path = os.path.join(save_dir, f"{ts_code}.csv") if os.path.exists(file_path): df = pd.read_csv(file_path, dtype={'ts_code': str, 'trade_date': str}) latest_dates[ts_code] = df['trade_date'].max() else: latest_dates[ts_code] = None return latest_dates # 更新一批股票数据 def update_batch(ts_codes, target_end_date, save_dir='data/concurrent_download'): """处理一批股票的数据更新""" latest_dates = get_local_latest_dates(ts_codes, save_dir) # 计算每只股票需要更新的起始日期 start_dates = {} for ts_code in ts_codes: local_date = latest_dates[ts_code] if local_date: start_date = (pd.to_datetime(local_date) + pd.Timedelta(days=1)).strftime('%Y%m%d') else: start_date = '20240101' # 如果本地无数据,从年初开始 if start_date <= target_end_date: start_dates[ts_code] = start_date if not start_dates: return # 无需更新 # 使用最早的起始日期进行批量查询 batch_start_date = min(start_dates.values()) df = fetch_batch_data(ts_codes, batch_start_date, target_end_date) if not df.empty: save_batch_to_csv(df, save_dir) # 并发下载主函数 def concurrent_download(batch_size=40, max_workers=10): start_time = time.time() target_end_date = datetime.datetime.today().strftime('%Y%m%d') stock_list = get_all_stocks() print(f"🚀 开始并发下载 {len(stock_list)} 只股票数据...") print(f"📦 批处理大小: {batch_size},并发线程数: {max_workers}") # 将股票列表分批 batches = [stock_list[i:i + batch_size] for i in range(0, len(stock_list), batch_size)] print(f"📊 总批次数: {len(batches)}") # 使用线程池并发处理 success_batches = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有批次任务 futures = [ executor.submit(update_batch, batch, target_end_date) for batch in batches ] # 等待任务完成并显示进度 for future in tqdm(as_completed(futures), total=len(futures), desc="并发下载进度"): try: future.result() # 获取结果,如果有异常会抛出 success_batches += 1 except Exception as e: print(f"批次处理失败: {e}") end_time = time.time() total_time = end_time - start_time print(f"✅ 并发下载完成!") print(f"📈 成功处理批次: {success_batches}/{len(batches)}") print(f"⏱️ 总耗时: {total_time:.2f} 秒 ({total_time / 60:.2f} 分钟)") print(f"🚀 平均每只股票: {total_time / len(stock_list):.3f} 秒") print(f"⚡ 并发效率提升: {(len(stock_list) * 0.2) / total_time:.2f}x") return total_time if __name__ == "__main__": concurrent_download(batch_size=40, max_workers=10) # 调整参数适配daily接口

运行以上程序,得到耗时如下:
image.png
从结果上看,采用多线程的方式,速度提高了3倍,当然还可以进一步优化,这些感兴趣的同学可以下去再试试看。

4. 总结

通过对比分析,并发下载方案在处理大规模股票数据时具有压倒性优势,可以大幅缩短数据处理的耗时,为大家节省宝贵的时间。本期就介绍到这,欢迎交流,有感性的话题也可以评论区留言,能力范围内的一定不吝分享,下期见!

最后一次编辑于 2天前 2

普洱咖啡

厉害

2025-06-06 13:14:19      回复