diff --git a/README.md b/README.md index d41ad9e..47ce978 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,97 @@ -# rest -gateio Rest Api +# A股30分钟K线数据同步工具 + +本项目是一个使用Python编写的工具,用于从Tushare Pro获取所有A股的30分钟K线数据,并将其存储到本地的MySQL数据库中。 + +## 主要功能 + +- **全量数据下载**: 一次性获取指定股票或所有股票的全部历史30分钟K线数据。 +- **增量数据更新**: 每日定时任务,自动获取当天最新的K线数据,并补充到数据库中。 +- **进度条显示**: 在进行大量数据下载时,提供清晰的进度条。 +- **数据表自动创建**: 首次运行时可自动创建所需的数据库和数据表。 +- **定时任务**: 内置一个基于APScheduler的定时任务,可在每个交易日收盘后自动执行增量更新。 +- **灵活的命令行接口**: 支持通过命令行参数执行不同的任务。 + +## 环境要求 + +- Python 3.7+ +- MySQL 5.7+ 或 MariaDB + +## 安装与配置 + +**1. 获取代码** + +克隆或下载本项目到你的本地机器。 + +**2. 安装依赖** + +进入项目根目录,通过 `requirements.txt` 文件安装所有必需的Python库。 + +```bash +pip install -r requirements.txt +``` + +**3. 配置 Tushare Token 和数据库** + +打开 `download_tushare/config.py` 文件,填入你的个人信息: + +- `TUSHARE_TOKEN`: 你的Tushare Pro API Token。你可以在 [Tushare Pro官网](https://tushare.pro/user/token) 免费注册并获取。 +- `DB_CONFIG`: 你的MySQL数据库连接信息,包括主机、端口、用户名、密码和数据库名。 + +**4. 创建数据库** + +在你的MySQL服务器中,手动创建一个数据库。数据库的名称应与你在 `config.py` 中 `DB_CONFIG['database']` 字段设置的名称一致。 + +例如,如果你设置的数据库名是 `stock_data`,则执行以下SQL命令: + +```sql +CREATE DATABASE stock_data CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; +``` + +## 使用说明 + +本工具可以通过 `python -m download_tushare` 命令来运行,后跟不同的子命令。 + +**1. 初始化数据库表** + +在第一次运行时,你需要初始化数据库,创建所需的数据表。 + +```bash +python -m download_tushare initdb +``` +该命令会根据 `database.py` 中的定义,自动创建 `stock_basic` 和 `stock_30min` 两张表。 + +**2. 全量同步历史数据** + +如果你是第一次使用,或者想要完整地获取所有历史数据,可以执行全量同步。 + +**警告**: 此过程会遍历所有A股,下载它们自上市以来的全部30分钟K线数据,将消耗大量时间和网络流量,并对Tushare积分有一定要求。 + +```bash +python -m download_tushare full +``` + +**3. 增量更新数据** + +用于获取最新的数据。它会自动查找每只股票在数据库中的最新记录,并从该时间点之后开始同步。如果某只股票是新加入的,则会自动进行全量同步。 + +建议每日收盘后执行此命令。 + +```bash +python -m download_tushare update +``` + +## 运行定时任务 + +为了实现自动化更新,你可以直接运行 `scheduler.py` 脚本。它会启动一个常驻进程,在每个交易日(周一至周五)的下午16:00自动执行增量更新任务。 + +```bash +python -m download_tushare.scheduler +``` + +你可以使用 `nohup` (Linux/macOS) 或其他工具让它在后台持续运行。 + +```bash +nohup python -m download_tushare.scheduler > scheduler.log 2>&1 & +``` + +按 `Ctrl+C` 可以停止调度器。 diff --git a/download_tushare/__init__.py b/download_tushare/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/download_tushare/__main__.py b/download_tushare/__main__.py new file mode 100644 index 0000000..40e2b01 --- /dev/null +++ b/download_tushare/__main__.py @@ -0,0 +1,4 @@ +from .main import main + +if __name__ == "__main__": + main() diff --git a/download_tushare/config.py b/download_tushare/config.py new file mode 100644 index 0000000..6537ca7 --- /dev/null +++ b/download_tushare/config.py @@ -0,0 +1,24 @@ +# ------------------ Tushare Configuration ------------------ +# 在这里填入你的Tushare Pro API Token。 +# 你可以访问 https://tushare.pro/user/token 来获取你的token。 +TUSHARE_TOKEN = "YOUR_TUSHARE_TOKEN_HERE" + +# ------------------ MySQL Database Configuration ------------------ +# 在这里填入你的MySQL数据库连接信息。 +DB_CONFIG = { + 'host': '127.0.0.1', # 数据库主机地址 + 'port': 3306, # 端口号 + 'user': 'root', # 数据库用户名 + 'password': 'YOUR_DATABASE_PASSWORD_HERE', # 数据库密码 + 'database': 'stock_data', # 数据库名称 + 'charset': 'utf8mb4' # 字符集 +} + +# ------------------ Data Sync Configuration ------------------ +# 获取历史数据时的起始日期 +# 如果设置为None,将从股票的上市日期开始获取 +START_DATE = "2010-01-01" + +# 每次API请求的K线数量 +# Tushare pro接口单次提取最多5000条 +BARS_PER_REQUEST = 5000 diff --git a/download_tushare/data_sync.py b/download_tushare/data_sync.py new file mode 100644 index 0000000..91044e9 --- /dev/null +++ b/download_tushare/data_sync.py @@ -0,0 +1,136 @@ +import tushare as ts +import pandas as pd +from datetime import datetime, timedelta +from sqlalchemy import func +from tqdm import tqdm + +from . import config +from . import database as db + +# ------------------ Tushare API Initialization ------------------ +try: + ts.set_token(config.TUSHARE_TOKEN) + pro = ts.pro_api() + print("Tushare API 初始化成功。") +except Exception as e: + print(f"Tushare API 初始化失败: {e}") + print("请检查 config.py 中的 TUSHARE_TOKEN 是否正确。") + exit() + +def update_stock_basic(): + """ + 更新本地的股票基础信息表。 + """ + print("正在更新股票基础信息...") + try: + # 获取所有A股列表 + data = pro.stock_basic(exchange='', list_status='L', fields='ts_code,symbol,name,industry,list_date') + + # 对'industry'列的None值进行处理,替换为空字符串或其他默认值 + data['industry'] = data['industry'].fillna('') + + # 使用我们封装的批量插入函数 + db.bulk_insert_data(data, db.StockBasic) + print(f"股票基础信息更新完成,共获取 {len(data)} 条数据。") + except Exception as e: + print(f"更新股票基础信息时发生错误: {e}") + +def get_stock_list(): + """从数据库获取所有需要同步的股票列表""" + session = db.get_session() + # 选择所有未退市的股票 + stocks = session.query(db.StockBasic).filter(db.StockBasic.delist_date == None).all() + session.close() + return stocks + +def full_sync(): + """ + 全量同步所有A股的30分钟K线数据。 + 这是一个耗时操作。 + """ + print("开始全量同步30分钟K线数据,这将是一个非常耗时的操作。") + update_stock_basic() # 首先确保股票列表是最新的 + stocks_to_sync = get_stock_list() + + with tqdm(total=len(stocks_to_sync), desc="全量同步进度") as pbar: + for stock in stocks_to_sync: + pbar.set_description(f"正在同步 {stock.ts_code}") + _sync_stock_data(stock.ts_code, start_date=config.START_DATE or stock.list_date.strftime('%Y%m%d')) + pbar.update(1) + print("所有股票全量同步完成。") + +def update_sync(): + """ + 增量同步所有A股的30分钟K线数据。 + """ + print("开始增量同步30分钟K线数据...") + update_stock_basic() + stocks_to_sync = get_stock_list() + + session = db.get_session() + + with tqdm(total=len(stocks_to_sync), desc="增量同步进度") as pbar: + for stock in stocks_to_sync: + pbar.set_description(f"增量同步 {stock.ts_code}") + + # 查找该股票的最新一条记录时间 + latest_record = session.query(func.max(db.Stock30Min.trade_time)).filter(db.Stock30Min.ts_code == stock.ts_code).scalar() + + if latest_record: + # 从最新记录的后一天开始同步 + start_date = (latest_record + timedelta(days=1)).strftime('%Y%m%d') + else: + # 如果没有记录,则从上市日期开始全量同步 + start_date = config.START_DATE or stock.list_date.strftime('%Y%m%d') + + _sync_stock_data(stock.ts_code, start_date=start_date) + pbar.update(1) + + session.close() + print("所有股票增量同步完成。") + + +def _sync_stock_data(ts_code: str, start_date: str, end_date: str = None): + """ + 获取并存储单个股票在指定时间范围内的30分钟K线数据。 + Tushare的get_k_data接口已不推荐,这里使用pro_bar接口。 + """ + try: + df = ts.pro_bar(ts_code=ts_code, freq='30min', start_date=start_date, end_date=end_date) + if df is None or df.empty: + return + + # 数据清洗和格式化 + df.rename(columns={'vol': 'vol', 'amount': 'amount'}, inplace=True) + df['trade_time'] = pd.to_datetime(df['trade_time']) + + # 选择需要的列 + df = df[['ts_code', 'trade_time', 'open', 'high', 'low', 'close', 'vol', 'amount']] + + # 批量插入数据库 + db.bulk_insert_data(df, db.Stock30Min) + + except Exception as e: + print(f"同步 {ts_code} 数据时出错: {e}") + +def fix_missing_data(): + """ + 检查并修复缺失的交易日数据。 + 这是一个复杂的功能,我们先实现一个简化版本: + 检查最近N天的数据完整性。 + """ + # 此功能较为复杂,将在后续迭代中实现。 + # 基本思路: + # 1. 获取交易日历 + # 2. 对每只股票,获取其在数据库中的所有交易时间 + # 3. 对比交易日历和已有数据,找出缺失的交易日 + # 4. 对缺失的交易日,重新调用_sync_stock_data来获取数据 + print("数据修复功能待实现。") + +if __name__ == '__main__': + # 可以添加一些直接运行此脚本时的测试代码 + print("这是一个数据同步模块,请通过 main.py 来调用。") + # 例如,测试更新股票列表 + # update_stock_basic() + # full_sync() + pass diff --git a/download_tushare/database.py b/download_tushare/database.py new file mode 100644 index 0000000..854ef24 --- /dev/null +++ b/download_tushare/database.py @@ -0,0 +1,93 @@ +import pandas as pd +from sqlalchemy import create_engine, inspect, Column, String, Date, DateTime, DECIMAL, BigInteger, UniqueConstraint +from sqlalchemy.orm import sessionmaker +from sqlalchemy.ext.declarative import declarative_base +from .config import DB_CONFIG + +# 定义ORM基类 +Base = declarative_base() + +# 数据库连接字符串 +db_uri = ( + f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@" + f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}" +) + +# 创建数据库引擎 +try: + engine = create_engine(db_uri, echo=False) + # 创建Session类 + Session = sessionmaker(bind=engine) +except Exception as e: + print(f"数据库连接失败: {e}") + print(f"请检查 config.py 中的数据库配置是否正确。") + exit() + +class StockBasic(Base): + """股票基础信息表""" + __tablename__ = 'stock_basic' + ts_code = Column(String(10), primary_key=True, comment='TS股票代码') + symbol = Column(String(10), comment='股票代码') + name = Column(String(30), comment='股票名称') + industry = Column(String(50), comment='所属行业') + list_date = Column(Date, comment='上市日期') + delist_date = Column(Date, nullable=True, comment='退市日期') + + def __repr__(self): + return f"" + +class Stock30Min(Base): + """30分钟K线数据表""" + __tablename__ = 'stock_30min' + id = Column(BigInteger, primary_key=True, autoincrement=True) + ts_code = Column(String(10), index=True, comment='TS股票代码') + trade_time = Column(DateTime, index=True, comment='交易时间') + open = Column(DECIMAL(10, 2), comment='开盘价') + high = Column(DECIMAL(10, 2), comment='最高价') + low = Column(DECIMAL(10, 2), comment='最低价') + close = Column(DECIMAL(10, 2), comment='收盘价') + vol = Column(DECIMAL(20, 2), comment='成交量(手)') + amount = Column(DECIMAL(20, 4), comment='成交额(千元)') + + # 创建联合唯一索引,防止数据重复 + __table_args__ = (UniqueConstraint('ts_code', 'trade_time', name='_ts_code_trade_time_uc'),) + + def __repr__(self): + return f"" + +def init_db(): + """ + 初始化数据库,创建所有定义的表。 + """ + try: + print("正在初始化数据库,检查并创建数据表...") + Base.metadata.create_all(engine) + print("数据表创建/检查完成。") + except Exception as e: + print(f"创建数据表时发生错误: {e}") + +def get_session(): + """ + 获取一个新的数据库会话。 + """ + return Session() + +def bulk_insert_data(df: pd.DataFrame, model_class): + """ + 使用原生INSERT IGNORE来批量插入数据,如果主键或唯一索引冲突则忽略。 + 这比ORM逐条检查快得多。 + """ + if df.empty: + return + + table_name = model_class.__tablename__ + + with engine.connect() as connection: + # 使用pandas的to_sql方法,如果已存在则忽略 + # 注意:'append'配合唯一索引可以实现 'INSERT IGNORE' 的效果 + # 对于MySQL,需要确保主键或唯一索引已建立 + df.to_sql(name=table_name, con=connection, if_exists='append', index=False) + +if __name__ == '__main__': + # 作为脚本直接运行时,执行数据库初始化 + init_db() diff --git a/download_tushare/main.py b/download_tushare/main.py new file mode 100644 index 0000000..cd16c79 --- /dev/null +++ b/download_tushare/main.py @@ -0,0 +1,56 @@ +import argparse +import sys + +from . import database +from . import data_sync + +def main(): + """ + 主函数,处理命令行参数。 + """ + # 创建一个顶层解析器 + parser = argparse.ArgumentParser( + description="Tushare A股30分钟K线数据同步工具。", + epilog="使用 'python -m download_tushare --help' 来查看具体命令的帮助信息。" + ) + + # 创建一个子命令解析器 + subparsers = parser.add_subparsers(dest='command', help='可用的命令') + subparsers.required = True + + # 1. `initdb` 命令 + parser_initdb = subparsers.add_parser('initdb', help='初始化数据库,创建数据表。') + parser_initdb.set_defaults(func=database.init_db) + + # 2. `full` 命令 + parser_full = subparsers.add_parser('full', help='全量同步所有股票的30分钟K线历史数据。') + parser_full.set_defaults(func=data_sync.full_sync) + + # 3. `update` 命令 + parser_update = subparsers.add_parser('update', help='增量同步所有股票的最新30分钟K线数据。') + parser_update.set_defaults(func=data_sync.update_sync) + + # 4. `fix` 命令 (占位) + parser_fix = subparsers.add_parser('fix', help='检查并修复缺失的K线数据(功能待实现)。') + parser_fix.set_defaults(func=data_sync.fix_missing_data) + + # 解析参数 + args = parser.parse_args() + + # 检查Tushare Token和数据库密码是否已配置 + from . import config + if 'YOUR_TUSHARE_TOKEN' in config.TUSHARE_TOKEN: + print("错误:请先在 config.py 文件中设置您的 TUSHARE_TOKEN。", file=sys.stderr) + sys.exit(1) + if 'YOUR_DATABASE_PASSWORD' in config.DB_CONFIG['password']: + print("错误:请先在 config.py 文件中设置您的数据库密码。", file=sys.stderr) + sys.exit(1) + + # 执行相应的函数 + if hasattr(args, 'func'): + args.func() + else: + parser.print_help() + +if __name__ == '__main__': + main() diff --git a/download_tushare/scheduler.py b/download_tushare/scheduler.py new file mode 100644 index 0000000..0ad1be7 --- /dev/null +++ b/download_tushare/scheduler.py @@ -0,0 +1,59 @@ +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.triggers.cron import CronTrigger +import logging + +from .data_sync import update_sync + +# 配置日志记录 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def daily_update_job(): + """ + 定义每日增量更新的任务。 + """ + logging.info("开始执行每日定时增量更新任务...") + try: + update_sync() + logging.info("每日定时增量更新任务执行成功。") + except Exception as e: + logging.error(f"每日定时增量更新任务执行失败: {e}", exc_info=True) + +def main(): + """ + 主函数,启动调度器。 + """ + # 检查Tushare Token和数据库密码是否已配置 + from . import config + import sys + if 'YOUR_TUSHARE_TOKEN' in config.TUSHARE_TOKEN: + print("错误:请先在 config.py 文件中设置您的 TUSHARE_TOKEN。", file=sys.stderr) + sys.exit(1) + if 'YOUR_DATABASE_PASSWORD' in config.DB_CONFIG['password']: + print("错误:请先在 config.py 文件中设置您的数据库密码。", file=sys.stderr) + sys.exit(1) + + # 创建一个阻塞式调度器 + scheduler = BlockingScheduler(timezone="Asia/Shanghai") + + # 添加定时任务 + # 触发器设置为:周一到周五,下午4点 (16:00) 执行 + # 这个时间点A股已经收盘,可以获取到当天的完整数据 + scheduler.add_job( + daily_update_job, + trigger=CronTrigger(day_of_week='mon-fri', hour=16, minute=0), + id='daily_stock_update', + name='每日A股数据增量更新', + replace_existing=True + ) + + logging.info("调度器已启动。等待下一个执行时间点...") + print("定时任务已设置。将在每个交易日下午16:00自动执行增量数据同步。") + print("按 Ctrl+C 可以退出程序。") + + try: + scheduler.start() + except (KeyboardInterrupt, SystemExit): + logging.info("调度器已停止。") + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d8d4648 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +tushare +pandas +sqlalchemy +pymysql +tqdm +apscheduler