一、 引言
在A股量化投资中,构建完善的因子库对于策略研发和回测效率至关重要。传统基于CSV文件存储因子数据存在冗余、跨周期计算效率低和扩展性差等问题,而通过建立数据库式的因子库,可以显著提升数据管理和检索效率。本项目旨在基于AkShare和MongoDB构建一个A股的价格-成交量因子库,将常见的技术指标和量价指标按日保存,为选股和策略开发提供数据支持。借助开源工具,我们可以批量获取数据、自动清洗和计算因子,并方便地存入数据库,为后续的回测与分析打下基础。
二、 技术架构与依赖工具
2.1该项目采用Python编写,主要依赖以下工具和库:
2.1.1 AkShare:一个开源的Python金融数据接口库,支持获取包括A股在内的各种市场的历史和实时数据。通过统一API(如stock_zh_a_hist),可以方便地下载指定股票的日线行情(后复权或前复权)。
2.1.2 MongoDB:文档型NoSQL数据库,使用灵活的JSON/BSON模式存储数据,适合海量时序数据。我们使用MongoDB来管理行情和因子数据,支持动态扩展字段和构建索引,以加速查询。许多量化团队已采用MongoDB存储因子和行情数据。
2.1.3 Pandas:用于数据处理和分析的Python库。获取到的行情数据会以Pandas DataFrame格式处理,便于进行清洗和因子计算。### 2.1.4 Pandas提供了rolling、pct_change等函数,可轻松计算移动平均、涨跌幅等指标。
2.1.5 PyMongo:Python的MongoDB驱动,用于在代码中连接和操作MongoDB。通过PyMongo可以创建数据库连接、集合(collection),并执行写入和查询操作。
其他辅助库:如numpy用于数值计算,tqdm用于显示循环进度,datetime用于时间处理等。
以上工具协同工作:AkShare负责数据采集,Pandas负责清洗与指标计算,MongoDB负责数据存储与索引优化。
三、 数据获取与清洗
3.1 数据采集采用VolumePriceCollector类(见文章末尾volume_price_collector.py)自动化完成,主要步骤如下:
3.1.1获取A股股票列表:通过ak.stock_info_a_code_name()获取所有A股的代码列表。
3.1.2下载历史日线行情:对于每个股票代码,调用 ak.stock_zh_a_hist(symbol, period=“daily”, adjust=“hfq”) 下载后复权的日线数据。返回结果为Pandas DataFrame,包含日期、开盘、收盘、最高、最低、成交量等字段。
3.1.3字段筛选与重命名:从获取的数据中提取常用字段:日期(日期)、开盘价(开盘)、最高价(最高)、最低价(最低)、收盘价(收盘)、成交量(成交量),并重命名为trade_date、open、high、low、close、volume等通用字段。
3.1.4日期格式修复:将trade_date列转换为日期类型,确保只保留日期部分并转为datetime对象。这一步可避免后续处理中的时间格式问题。
3.1.5添加元数据和类型转换:在DataFrame中加入symbol字段记录股票代码,以及update_time字段记录更新时间。同时,将数值字段转换为正确类型(如将open转为浮点型、volume转为整数)以保证数据一致性。
3.2写入MongoDB:
3.2.1 使用PyMongo连接本地MongoDB,在quant数据库下建立volume_price集合。对每条清洗后的记录,使用update_one操作按(symbol, trade_date)条件插入或更新数据(upsert),以避免重复写入。在代码初始化时,我们对(symbol, trade_date)建立了复合索引,加速了后续的查询。批量处理时使用update_one(…, upsert=True)逐条更新,可以确保历史数据持续迭代更新且不覆盖已有记录。
上述过程在循环中使用tqdm显示进度,并对可能出现的异常进行捕获和跳过。完成后,可查询MongoDB确认数据已正确存储。通过该流程,我们实现了从AkShare批量抓取A股历史数据并清洗、入库,为因子计算提供了基础数据。
四、因子计算方法
4.1因子计算由FactorCalculator类(见文章末尾 写入新因子.py)实现,其流程包括:
读取历史数据:对每个股票,从MongoDB中按日期升序读取该股票的全部行情记录并转换为Pandas DataFrame。
计算价格类因子:利用收盘价序列计算常见的价格因子,包括:
4.1.1 MA5:5日移动平均线,即过去5个交易日收盘价的平均值。
4.1.2 MA20:20日移动平均线,但注意我们使用 .shift(1) 技术将20日均线向前平移一天,避免未来函数(即避免在当日用到当日及以后的价格信息)。
4.1.3 Price_Change_3D:3日动量因子,定义为当前收盘价与3个交易日前收盘价的百分比变化(即pct_change(periods=3))。
4.1.4 Price_Volatility_20D:20日收益率波动率,即过去20日收益率的标准差。
计算成交量类因子:利用成交量序列计算典型的量价因子,包括:
4.2.1 Volume_Spike(量比):当日成交量与过去5日均量之比减1,表示成交量放大的程度。例如df[‘Volume_Spike’] = df[‘volume’] / df[‘volume’].rolling(5).mean() - 1。当值显著大于0时,说明当日成交量高于近期均量。
4.2.2(可选)Volume_MA5:5日成交量均值,用于上面量比的计算。
4.2.3 Price_Up & Price_Volume_Corr:一个量价相关性因子,用于衡量连续上涨天数与成交量的相关性。这里Price_Up为价格上涨的二值指标(收盘价高于开盘价时取1,否则取0),Price_Volume_Corr为其与成交量的滚动相关系数。
避免未来函数:在计算诸如MA20时,使用.shift(1)确保因子仅使用过去的数据,不会泄露未来信息。这样处理后,因子的计算是可用于实际回测的日末状态。
4.2 批量写入MongoDB:计算完成后,将因子值写回数据库。在DataFrame中遍历每行,构建UpdateOne操作(以MongoDB文档的_id为键)批量更新对应文档的因子字段。因子列如“MA5”、“MA20”、“Price_Change_3D”、“Price_Volatility_20D”、“Volume_Spike”等被添加到原始文档中。使用bulk_write可高效地一次性提交多条更新,提高写入性能。
通过上述步骤,我们将计算得到的所有因子值存储到了MongoDB中,与原始行情数据一起构成了完整的因子库。
五、数据组织结构与索引优化
5.1 在MongoDB中,我们使用一个集合(volume_price)来同时存放基础行情数据和计算出的因子。每个文档对应一只股票在某个交易日的记录,主要字段包括:股票代码(symbol)、交易日期(trade_date)、开盘价、收盘价、成交量等以及计算后的各项因子。为提高查询和分析效率,我们对关键字段建立了索引:
5.1.1 复合主索引:已对(symbol, trade_date)建立联合索引。这使得检索某只股票在特定日期区间内的所有数据非常快速,也能保证不会插入重复的同一天记录。
5.1.2 因子字段索引:对于一些常用因子字段(如MA5、MA20等),也可以根据需求建立索引。这样,如果需要按因子值筛选股票(例如查找MA5最高的股票),查询效率会更高。
5.1.3集合设计:在一般设计中,可以将行情数据和因子数据分为两个集合(例如daily_data和factor_data),但本项目简化为一个集合存储,以便对同一条记录同时访问行情与因子字段。MongoDB灵活的模式允许我们随时新增因子字段而无需修改表结构。
通过合理的集合设计和索引策略,可以让后续回测模块快速读取单只股票的因子序列,也方便按照日期或因子值进行扫描,满足选股和回测的需求。
示例展示
以下以示例说明如何获取某支股票的因子数据并绘制图表:
获取因子数据:可使用FactorCalculator类提供的get_factor_data(symbol, start_date, end_date)方法。此方法从数据库中查询指定股票和日期范围内的文档,并返回一个包含symbol, trade_date, open, close, MA5, MA20, Price_Change_3D, Volume_Spike等列的Pandas DataFrame。例如:
from factor_calculator import FactorCalculator
factor_calc = FactorCalculator(db_name=‘quant’, collection_name=‘volume_price’)
df = factor_calc.get_factor_data(“000001”, “20250101”, “20250609”)
这将获取平安银行(代码为000001)在2025年1月到6月期间的因子数据。得到的df中,trade_date为索引或者列,其他列为因子数值。
绘制移动平均线:利用Matplotlib或Pandas内置绘图功能,可以将收盘价与其均线画在同一张折线图上。
示例代码:
import matplotlib.pyplot as plt
df.plot(x=‘trade_date’, y=[‘close’,‘MA5’,‘MA20’], figsize=(8,4))
plt.title(“收盘价与MA5/MA20走势图”)
plt.show()
图中可以看到收盘价(黑线)与MA5(橙线)、MA20(红线)的走势关系。这种图表有助于观察短期均线与长期均线的交叉情况。
绘制成交量与量比:可以绘制一个双轴图。左轴使用柱状图显示每日成交量,右轴用折线图显示量比Volume_Spike。例如:
fig, ax1 = plt.subplots()
ax1.bar(df[‘trade_date’], df[‘volume’], color=‘skyblue’, label=‘成交量’)
ax2 = ax1.twinx()
ax2.plot(df[‘trade_date’], df[‘Volume_Spike’], color=‘orange’, label=‘量比’)
ax1.set_ylabel(“成交量”)
ax2.set_ylabel(“量比”)
plt.title(“成交量与量比双轴图”)
plt.show()
如图所示,当某日成交量显著放大时,其对应的量比(橙线)也会明显上升。这种双轴图直观地表现了量比与成交量的关联。
以上示例仅为演示生成方法,实际中可根据需要自定义各种因子图表以辅助分析。
四、小结与展望
本文介绍了一个基于AkShare和MongoDB的A股因子库的构建流程:从数据采集、清洗,到因子计算和存储,覆盖了从头到尾的主要环节。我们使用MongoDB高效管理历史行情和因子时间序列,并利用AkShare批量获取数据,Pandas完成因子计算,从而极大地提高了因子数据的获取和处理效率。第一期实现了常见的量价技术因子(如MA5、MA20、价格动量、量比等)的计算和存储,为量化选股和回测提供了基础。
未来的工作可以在此基础上做更多扩展:例如引入基本面财务因子、对因子进行行业中性化处理、添加高频(分钟级)指标、整合回测与交易执行模块等。此外,还可以结合更多数据源(如新闻舆情、宏观指标等)和机器学习方法优化因子组合。通过不断丰富和完善,因子库将为量化研究和交易提供更加可靠的决策支持。
参考来源: AkShare官方文档及示例;MongoDB相关介绍;项目提供的源码文件。
附录 A:volume_price_collector.py
###volume_price_collector.py
import akshare as ak
import pandas as pd
from pymongo import MongoClient
from datetime import datetime, date
import time
from tqdm import tqdm
class VolumePriceCollector:
def init(self):
# MongoDB连接配置
self.client = MongoClient(‘mongodb://localhost:27017/’)
self.db = self.client[‘quant’]
self.collection = self.db[‘volume_price’]
# 创建复合索引(提升查询效率)
self.collection.create_index([
("symbol", 1),
("trade_date", 1)
])
print("MongoDB连接成功!数据库索引已创建")
def close(self):
self.client.close()
print("MongoDB连接已安全关闭")
def get_clean_data(self, symbol):
"""获取清洗后的量价数据(修复日期类型问题)"""
try:
# 获取原始数据(后复权)
df = ak.stock_zh_a_hist(
symbol=symbol,
period="daily",
adjust="hfq"
)
# 字段筛选与重命名
df = df[['日期', '开盘', '最高', '最低', '收盘', '成交量']].copy()
df.columns = ['trade_date', 'open', 'high', 'low', 'close', 'volume']
# 🔧 关键修复:转换日期格式
df['trade_date'] = pd.to_datetime(df['trade_date']).dt.date # 保持日期部分
df['trade_date'] = pd.to_datetime(df['trade_date']) # 转换为datetime对象
# 添加元数据
df['symbol'] = symbol
df['update_time'] = datetime.now()
# 确保数值类型正确(网页9)
df['open'] = df['open'].astype(float)
df['volume'] = df['volume'].astype(int)
return df
except Exception as e:
print(f"⚠️ 获取 {symbol} 数据失败: {str(e)}")
return pd.DataFrame()
def batch_update(self):
"""批量更新全市场数据(增强错误处理)"""
try:
# 获取全A股代码列表
stock_df = ak.stock_info_a_code_name()
total_stocks = len(stock_df)
print(f"获取到 {total_stocks} 只A股代码")
success_count = 0
# 使用tqdm进度条
for symbol in tqdm(stock_df['code'], desc="📊 数据采集进度"):
try:
clean_df = self.get_clean_data(symbol)
if not clean_df.empty:
# 转换为字典格式
records = clean_df.to_dict('records')
# 批量更新(避免重复)
for record in records:
# 🔧 确保日期是datetime类型(网页1)
if isinstance(record['trade_date'], date):
record['trade_date'] = datetime.combine(
record['trade_date'],
datetime.min.time()
)
self.collection.update_one(
{
"symbol": record['symbol'],
"trade_date": record['trade_date']
},
{"$set": record},
upsert=True
)
success_count += 1
time.sleep(0.2) # 控制请求频率
except Exception as e:
print(f"⚠️ 处理 {symbol} 时出错: {str(e)}")
continue
print(f"\n✅ 更新完成!成功处理 {success_count}/{total_stocks} 只股票")
return success_count
except Exception as e:
print(f"❌ 批量更新失败: {str(e)}")
return 0
if name == “main”:
collector = VolumePriceCollector()
try:
collector.batch_update()
finally:
collector.close()
附录 B:写入新因子.py
from pymongo import MongoClient, UpdateMany,UpdateOne
import pandas as pd
import numpy as np
from tqdm import tqdm
import time
class FactorCalculator:
def init(self, db_name=‘quant’, collection_name=‘volume_price’):
self.client = MongoClient(‘mongodb://localhost:27017/’)
self.db = self.client[db_name]
self.collection = self.db[collection_name]
# 创建索引(提升查询效率)
self.collection.create_index([
("symbol", 1),
("trade_date", 1),
("MA5", 1),
("MA20", 1)
])
print(f"已连接MongoDB: {db_name}.{collection_name}")
def close_connection(self):
self.client.close()
print("MongoDB连接已关闭")
def calculate_factors(self):
"""计算因子并批量写入MongoDB"""
print("开始计算技术指标因子...")
start_time = time.time()
# 获取所有股票代码
stock_codes = self.collection.distinct("symbol")
processed = 0
failed = 0
for symbol in tqdm(stock_codes, desc="因子计算进度"):
try:
# 获取单只股票全部历史数据
cursor = self.collection.find({"symbol": symbol}).sort("trade_date", 1)
data = list(cursor)
df = pd.DataFrame(data)
# 跳过数据不足的股票
if len(df) < 21: # 至少需要20个交易日计算MA20
continue
# 计算技术指标因子
df = self._compute_price_factors(df)
df = self._compute_volume_factors(df)
# 准备批量更新操作
updates = []
for _, row in df.iterrows():
update = UpdateOne(
{"_id": row['_id']},
{"$set": {
"MA5": row.get('MA5'),
"MA20": row.get('MA20'),
"Price_Change_3D": row.get('Price_Change_3D'),
"Price_Volatility_20D": row.get('Price_Volatility_20D'),
"Volume_Spike": row.get('Volume_Spike')
}}
)
updates.append(update)
# 批量写入数据库
if updates:
self.collection.bulk_write(updates, ordered=False)
processed += 1
except Exception as e:
print(f"❌ {symbol}因子计算失败: {str(e)}")
failed += 1
continue
elapsed = time.time() - start_time
print(f"\n✅ 因子计算完成! 成功:{processed} 失败:{failed} 耗时:{elapsed:.2f}秒")
def _compute_price_factors(self, df):
"""计算价格类因子"""
close = pd.to_numeric(df['close'])
# 移动平均线
df['MA5'] = close.rolling(5).mean() # 5日均线
df['MA20'] = close.rolling(20).mean().shift(1) # 昨日20日均线(避免未来函数)
# 动量因子
df['Price_Change_3D'] = close.pct_change(periods=3) # 3日涨跌幅
df['Price_Volatility_20D'] = close.rolling(20).std() # 20日波动率
return df
def _compute_volume_factors(self, df):
"""计算成交量类因子"""
volume = pd.to_numeric(df['volume'])
close = pd.to_numeric(df['close'])
# 成交量异动
df['Volume_MA5'] = volume.rolling(5).mean()
df['Volume_Spike'] = (volume / df['Volume_MA5'] - 1) # 量比
# 量价相关性因子
df['Price_Up'] = np.where(df['close'] > df['open'], 1, 0)
df['Price_Volume_Corr'] = df['Price_Up'].rolling(10).corr(volume)
return df
def get_factor_data(self, symbol, start_date, end_date):
"""获取带因子的股票数据"""
start_date = pd.to_datetime(start_date)
end_date = pd.to_datetime(end_date)
query = {
"symbol": symbol,
"trade_date": {"$gte": start_date, "$lte": end_date}
}
projection = {
"_id": 0,
"symbol": 1,
"trade_date": 1,
"open": 1,
"close": 1,
"MA5": 1,
"MA20": 1,
"Price_Change_3D": 1,
"Volume_Spike": 1
}
cursor = self.collection.find(query, projection).sort("trade_date", 1)
return pd.DataFrame(list(cursor))
if name == “main”:
# 初始化因子计算器
factor_calc = FactorCalculator()
# 计算并写入因子(首次运行使用)
factor_calc.calculate_factors()
# 示例:获取贵州茅台2025年因子数据
df_600519 = factor_calc.get_factor_data("000001", "20250101", "20250609")
print(f"因子数据:\n{df_600519.head()}")
# 关闭连接
factor_calc.close_connection()