0基础学量化:基于Akshare和Mongodb构建数据获取和因子【第一期】
  无名的人 14天前 79 0

一、 引言

在A股量化投资中,构建完善的因子库对于策略研发和回测效率至关重要。传统基于CSV文件存储因子数据存在冗余、跨周期计算效率低和扩展性差等问题,而通过建立数据库式的因子库,可以显著提升数据管理和检索效率。本项目旨在基于AkShare和MongoDB构建一个A股的价格-成交量因子库,将常见的技术指标和量价指标按日保存,为选股和策略开发提供数据支持。借助开源工具,我们可以批量获取数据、自动清洗和计算因子,并方便地存入数据库,为后续的回测与分析打下基础。

二、 技术架构与依赖工具

2.1该项目采用Python编写,主要依赖以下工具和库:

2.1.1 AkShare:一个开源的Python金融数据接口库,支持获取包括A股在内的各种市场的历史和实时数据。通过统一API(如stock_zh_a_hist),可以方便地下载指定股票的日线行情(后复权或前复权)。

2.1.2 MongoDB:文档型NoSQL数据库,使用灵活的JSON/BSON模式存储数据,适合海量时序数据。我们使用MongoDB来管理行情和因子数据,支持动态扩展字段和构建索引,以加速查询。许多量化团队已采用MongoDB存储因子和行情数据。

2.1.3 Pandas:用于数据处理和分析的Python库。获取到的行情数据会以Pandas DataFrame格式处理,便于进行清洗和因子计算。### 2.1.4 Pandas提供了rolling、pct_change等函数,可轻松计算移动平均、涨跌幅等指标。

2.1.5 PyMongo:Python的MongoDB驱动,用于在代码中连接和操作MongoDB。通过PyMongo可以创建数据库连接、集合(collection),并执行写入和查询操作。

其他辅助库:如numpy用于数值计算,tqdm用于显示循环进度,datetime用于时间处理等。
以上工具协同工作:AkShare负责数据采集,Pandas负责清洗与指标计算,MongoDB负责数据存储与索引优化。

三、 数据获取与清洗

3.1 数据采集采用VolumePriceCollector类(见文章末尾volume_price_collector.py)自动化完成,主要步骤如下:

3.1.1获取A股股票列表:通过ak.stock_info_a_code_name()获取所有A股的代码列表。

3.1.2下载历史日线行情:对于每个股票代码,调用 ak.stock_zh_a_hist(symbol, period=“daily”, adjust=“hfq”) 下载后复权的日线数据。返回结果为Pandas DataFrame,包含日期、开盘、收盘、最高、最低、成交量等字段。

3.1.3字段筛选与重命名:从获取的数据中提取常用字段:日期(日期)、开盘价(开盘)、最高价(最高)、最低价(最低)、收盘价(收盘)、成交量(成交量),并重命名为trade_date、open、high、low、close、volume等通用字段。

3.1.4日期格式修复:将trade_date列转换为日期类型,确保只保留日期部分并转为datetime对象。这一步可避免后续处理中的时间格式问题。

3.1.5添加元数据和类型转换:在DataFrame中加入symbol字段记录股票代码,以及update_time字段记录更新时间。同时,将数值字段转换为正确类型(如将open转为浮点型、volume转为整数)以保证数据一致性。

3.2写入MongoDB:

3.2.1 使用PyMongo连接本地MongoDB,在quant数据库下建立volume_price集合。对每条清洗后的记录,使用update_one操作按(symbol, trade_date)条件插入或更新数据(upsert),以避免重复写入。在代码初始化时,我们对(symbol, trade_date)建立了复合索引,加速了后续的查询。批量处理时使用update_one(…, upsert=True)逐条更新,可以确保历史数据持续迭代更新且不覆盖已有记录。

上述过程在循环中使用tqdm显示进度,并对可能出现的异常进行捕获和跳过。完成后,可查询MongoDB确认数据已正确存储。通过该流程,我们实现了从AkShare批量抓取A股历史数据并清洗、入库,为因子计算提供了基础数据。

四、因子计算方法

4.1因子计算由FactorCalculator类(见文章末尾 写入新因子.py)实现,其流程包括:

读取历史数据:对每个股票,从MongoDB中按日期升序读取该股票的全部行情记录并转换为Pandas DataFrame。
计算价格类因子:利用收盘价序列计算常见的价格因子,包括:

4.1.1 MA5:5日移动平均线,即过去5个交易日收盘价的平均值。

4.1.2 MA20:20日移动平均线,但注意我们使用 .shift(1) 技术将20日均线向前平移一天,避免未来函数(即避免在当日用到当日及以后的价格信息)。

4.1.3 Price_Change_3D:3日动量因子,定义为当前收盘价与3个交易日前收盘价的百分比变化(即pct_change(periods=3))。

4.1.4 Price_Volatility_20D:20日收益率波动率,即过去20日收益率的标准差。

计算成交量类因子:利用成交量序列计算典型的量价因子,包括:

4.2.1 Volume_Spike(量比):当日成交量与过去5日均量之比减1,表示成交量放大的程度。例如df[‘Volume_Spike’] = df[‘volume’] / df[‘volume’].rolling(5).mean() - 1。当值显著大于0时,说明当日成交量高于近期均量。

4.2.2(可选)Volume_MA5:5日成交量均值,用于上面量比的计算。

4.2.3 Price_Up & Price_Volume_Corr:一个量价相关性因子,用于衡量连续上涨天数与成交量的相关性。这里Price_Up为价格上涨的二值指标(收盘价高于开盘价时取1,否则取0),Price_Volume_Corr为其与成交量的滚动相关系数。

避免未来函数:在计算诸如MA20时,使用.shift(1)确保因子仅使用过去的数据,不会泄露未来信息。这样处理后,因子的计算是可用于实际回测的日末状态。

4.2 批量写入MongoDB:计算完成后,将因子值写回数据库。在DataFrame中遍历每行,构建UpdateOne操作(以MongoDB文档的_id为键)批量更新对应文档的因子字段。因子列如“MA5”、“MA20”、“Price_Change_3D”、“Price_Volatility_20D”、“Volume_Spike”等被添加到原始文档中。使用bulk_write可高效地一次性提交多条更新,提高写入性能。

通过上述步骤,我们将计算得到的所有因子值存储到了MongoDB中,与原始行情数据一起构成了完整的因子库。

五、数据组织结构与索引优化

5.1 在MongoDB中,我们使用一个集合(volume_price)来同时存放基础行情数据和计算出的因子。每个文档对应一只股票在某个交易日的记录,主要字段包括:股票代码(symbol)、交易日期(trade_date)、开盘价、收盘价、成交量等以及计算后的各项因子。为提高查询和分析效率,我们对关键字段建立了索引:

5.1.1 复合主索引:已对(symbol, trade_date)建立联合索引。这使得检索某只股票在特定日期区间内的所有数据非常快速,也能保证不会插入重复的同一天记录。

5.1.2 因子字段索引:对于一些常用因子字段(如MA5、MA20等),也可以根据需求建立索引。这样,如果需要按因子值筛选股票(例如查找MA5最高的股票),查询效率会更高。

5.1.3集合设计:在一般设计中,可以将行情数据和因子数据分为两个集合(例如daily_data和factor_data),但本项目简化为一个集合存储,以便对同一条记录同时访问行情与因子字段。MongoDB灵活的模式允许我们随时新增因子字段而无需修改表结构。

通过合理的集合设计和索引策略,可以让后续回测模块快速读取单只股票的因子序列,也方便按照日期或因子值进行扫描,满足选股和回测的需求。

示例展示

以下以示例说明如何获取某支股票的因子数据并绘制图表:

获取因子数据:可使用FactorCalculator类提供的get_factor_data(symbol, start_date, end_date)方法。此方法从数据库中查询指定股票和日期范围内的文档,并返回一个包含symbol, trade_date, open, close, MA5, MA20, Price_Change_3D, Volume_Spike等列的Pandas DataFrame。例如:
from factor_calculator import FactorCalculator
factor_calc = FactorCalculator(db_name=‘quant’, collection_name=‘volume_price’)
df = factor_calc.get_factor_data(“000001”, “20250101”, “20250609”)
这将获取平安银行(代码为000001)在2025年1月到6月期间的因子数据。得到的df中,trade_date为索引或者列,其他列为因子数值。
示例查询数据.png
绘制移动平均线:利用Matplotlib或Pandas内置绘图功能,可以将收盘价与其均线画在同一张折线图上。
示例代码:
import matplotlib.pyplot as plt
df.plot(x=‘trade_date’, y=[‘close’,‘MA5’,‘MA20’], figsize=(8,4))
plt.title(“收盘价与MA5/MA20走势图”)
plt.show()
图中可以看到收盘价(黑线)与MA5(橙线)、MA20(红线)的走势关系。这种图表有助于观察短期均线与长期均线的交叉情况。
结果走势.png
绘制成交量与量比:可以绘制一个双轴图。左轴使用柱状图显示每日成交量,右轴用折线图显示量比Volume_Spike。例如:
fig, ax1 = plt.subplots()
ax1.bar(df[‘trade_date’], df[‘volume’], color=‘skyblue’, label=‘成交量’)
ax2 = ax1.twinx()
ax2.plot(df[‘trade_date’], df[‘Volume_Spike’], color=‘orange’, label=‘量比’)
ax1.set_ylabel(“成交量”)
ax2.set_ylabel(“量比”)
plt.title(“成交量与量比双轴图”)
plt.show()
如图所示,当某日成交量显著放大时,其对应的量比(橙线)也会明显上升。这种双轴图直观地表现了量比与成交量的关联。
量比.png

以上示例仅为演示生成方法,实际中可根据需要自定义各种因子图表以辅助分析。

四、小结与展望

本文介绍了一个基于AkShare和MongoDB的A股因子库的构建流程:从数据采集、清洗,到因子计算和存储,覆盖了从头到尾的主要环节。我们使用MongoDB高效管理历史行情和因子时间序列,并利用AkShare批量获取数据,Pandas完成因子计算,从而极大地提高了因子数据的获取和处理效率。第一期实现了常见的量价技术因子(如MA5、MA20、价格动量、量比等)的计算和存储,为量化选股和回测提供了基础。
未来的工作可以在此基础上做更多扩展:例如引入基本面财务因子、对因子进行行业中性化处理、添加高频(分钟级)指标、整合回测与交易执行模块等。此外,还可以结合更多数据源(如新闻舆情、宏观指标等)和机器学习方法优化因子组合。通过不断丰富和完善,因子库将为量化研究和交易提供更加可靠的决策支持。
参考来源: AkShare官方文档及示例;MongoDB相关介绍;项目提供的源码文件。

附录 A:volume_price_collector.py

###volume_price_collector.py
import akshare as ak
import pandas as pd
from pymongo import MongoClient
from datetime import datetime, date
import time
from tqdm import tqdm

class VolumePriceCollector:
def init(self):
# MongoDB连接配置
self.client = MongoClient(‘mongodb://localhost:27017/’)
self.db = self.client[‘quant’]
self.collection = self.db[‘volume_price’]

    # 创建复合索引(提升查询效率)
    self.collection.create_index([
        ("symbol", 1), 
        ("trade_date", 1)
    ])
    print("MongoDB连接成功!数据库索引已创建")

def close(self):
    self.client.close()
    print("MongoDB连接已安全关闭")

def get_clean_data(self, symbol):
    """获取清洗后的量价数据(修复日期类型问题)"""
    try:
        # 获取原始数据(后复权)
        df = ak.stock_zh_a_hist(
            symbol=symbol,
            period="daily",
            adjust="hfq"
        )
        
        # 字段筛选与重命名
        df = df[['日期', '开盘', '最高', '最低', '收盘', '成交量']].copy()
        df.columns = ['trade_date', 'open', 'high', 'low', 'close', 'volume']
        
        # 🔧 关键修复:转换日期格式
        df['trade_date'] = pd.to_datetime(df['trade_date']).dt.date  # 保持日期部分
        df['trade_date'] = pd.to_datetime(df['trade_date'])  # 转换为datetime对象
        
        # 添加元数据
        df['symbol'] = symbol
        df['update_time'] = datetime.now()
        
        # 确保数值类型正确(网页9)
        df['open'] = df['open'].astype(float)
        df['volume'] = df['volume'].astype(int)
        
        return df
    except Exception as e:
        print(f"⚠️ 获取 {symbol} 数据失败: {str(e)}")
        return pd.DataFrame()

def batch_update(self):
    """批量更新全市场数据(增强错误处理)"""
    try:
        # 获取全A股代码列表
        stock_df = ak.stock_info_a_code_name()
        total_stocks = len(stock_df)
        print(f"获取到 {total_stocks} 只A股代码")
        
        success_count = 0
        # 使用tqdm进度条
        for symbol in tqdm(stock_df['code'], desc="📊 数据采集进度"):
            try:
                clean_df = self.get_clean_data(symbol)
                
                if not clean_df.empty:
                    # 转换为字典格式
                    records = clean_df.to_dict('records')
                    
                    # 批量更新(避免重复)
                    for record in records:
                        # 🔧 确保日期是datetime类型(网页1)
                        if isinstance(record['trade_date'], date):
                            record['trade_date'] = datetime.combine(
                                record['trade_date'], 
                                datetime.min.time()
                            )
                        
                        self.collection.update_one(
                            {
                                "symbol": record['symbol'],
                                "trade_date": record['trade_date']
                            },
                            {"$set": record},
                            upsert=True
                        )
                    success_count += 1
                time.sleep(0.2)  # 控制请求频率
            except Exception as e:
                print(f"⚠️ 处理 {symbol} 时出错: {str(e)}")
                continue
        
        print(f"\n✅ 更新完成!成功处理 {success_count}/{total_stocks} 只股票")
        return success_count
    except Exception as e:
        print(f"❌ 批量更新失败: {str(e)}")
        return 0

if name == “main”:
collector = VolumePriceCollector()
try:
collector.batch_update()
finally:
collector.close()

附录 B:写入新因子.py

from pymongo import MongoClient, UpdateMany,UpdateOne
import pandas as pd
import numpy as np
from tqdm import tqdm
import time

class FactorCalculator:
def init(self, db_name=‘quant’, collection_name=‘volume_price’):
self.client = MongoClient(‘mongodb://localhost:27017/’)
self.db = self.client[db_name]
self.collection = self.db[collection_name]

    # 创建索引(提升查询效率)
    self.collection.create_index([
        ("symbol", 1), 
        ("trade_date", 1),
        ("MA5", 1),
        ("MA20", 1)
    ])
    print(f"已连接MongoDB: {db_name}.{collection_name}")

def close_connection(self):
    self.client.close()
    print("MongoDB连接已关闭")

def calculate_factors(self):
    """计算因子并批量写入MongoDB"""
    print("开始计算技术指标因子...")
    start_time = time.time()
    
    # 获取所有股票代码
    stock_codes = self.collection.distinct("symbol")
    processed = 0
    failed = 0
    
    for symbol in tqdm(stock_codes, desc="因子计算进度"):
        try:
            # 获取单只股票全部历史数据
            cursor = self.collection.find({"symbol": symbol}).sort("trade_date", 1)
            data = list(cursor)
            df = pd.DataFrame(data)
            
            # 跳过数据不足的股票
            if len(df) < 21:  # 至少需要20个交易日计算MA20
                continue
                
            # 计算技术指标因子
            df = self._compute_price_factors(df)
            df = self._compute_volume_factors(df)
            
            # 准备批量更新操作
            updates = []
            for _, row in df.iterrows():
                update = UpdateOne(
                    {"_id": row['_id']},
                    {"$set": {
                        "MA5": row.get('MA5'),
                        "MA20": row.get('MA20'),
                        "Price_Change_3D": row.get('Price_Change_3D'),
                        "Price_Volatility_20D": row.get('Price_Volatility_20D'),
                        "Volume_Spike": row.get('Volume_Spike')
                    }}
                )
                updates.append(update)
            
            # 批量写入数据库
            if updates:
                self.collection.bulk_write(updates, ordered=False)
                processed += 1
        except Exception as e:
            print(f"❌ {symbol}因子计算失败: {str(e)}")
            failed += 1
            continue
    
    elapsed = time.time() - start_time
    print(f"\n✅ 因子计算完成! 成功:{processed} 失败:{failed} 耗时:{elapsed:.2f}秒")

def _compute_price_factors(self, df):
    """计算价格类因子"""
    close = pd.to_numeric(df['close'])
    
    # 移动平均线
    df['MA5'] = close.rolling(5).mean()  # 5日均线
    df['MA20'] = close.rolling(20).mean().shift(1)  # 昨日20日均线(避免未来函数)
    
    # 动量因子
    df['Price_Change_3D'] = close.pct_change(periods=3)  # 3日涨跌幅
    df['Price_Volatility_20D'] = close.rolling(20).std()  # 20日波动率
    
    return df

def _compute_volume_factors(self, df):
    """计算成交量类因子"""
    volume = pd.to_numeric(df['volume'])
    close = pd.to_numeric(df['close'])
    
    # 成交量异动
    df['Volume_MA5'] = volume.rolling(5).mean()
    df['Volume_Spike'] = (volume / df['Volume_MA5'] - 1)  # 量比
    
    # 量价相关性因子
    df['Price_Up'] = np.where(df['close'] > df['open'], 1, 0)
    df['Price_Volume_Corr'] = df['Price_Up'].rolling(10).corr(volume)
    
    return df

def get_factor_data(self, symbol, start_date, end_date):
    """获取带因子的股票数据"""
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
    
    query = {
        "symbol": symbol,
        "trade_date": {"$gte": start_date, "$lte": end_date}
    }
    projection = {
        "_id": 0,
        "symbol": 1,
        "trade_date": 1,
        "open": 1,
        "close": 1,
        "MA5": 1,
        "MA20": 1,
        "Price_Change_3D": 1,
        "Volume_Spike": 1
    }
    
    cursor = self.collection.find(query, projection).sort("trade_date", 1)
    return pd.DataFrame(list(cursor))

if name == “main”:
# 初始化因子计算器
factor_calc = FactorCalculator()

# 计算并写入因子(首次运行使用)
factor_calc.calculate_factors()

# 示例:获取贵州茅台2025年因子数据
df_600519 = factor_calc.get_factor_data("000001", "20250101", "20250609")
print(f"因子数据:\n{df_600519.head()}")

# 关闭连接
factor_calc.close_connection()
最后一次编辑于 14天前 1

暂无评论

推荐阅读