diff --git a/finance/download_data.py b/finance/download_data.py new file mode 100644 index 0000000..ec18db9 --- /dev/null +++ b/finance/download_data.py @@ -0,0 +1,43 @@ +import logging +from wind_data_fetcher import WindDataFetcher + +# 设置日志级别 +logging.basicConfig(level=logging.INFO) + +def main(): + # 数据库配置 + db_config = { + 'host': '127.0.0.1', + 'database': 'fintech', + 'user': 'root', + 'password': 'secret', + 'port': 3306 + } + + # 初始化Wind数据获取器 + wind_fetcher = WindDataFetcher(db_config) + + # 单个股票示例 + print("=== 处理单个股票 ===") + success = wind_fetcher.process_and_save_data("000001.SZ", "2024-12-31") + print(f"处理结果: {'成功' if success else '失败'}") + + # # 批量处理示例 + # print("\n=== 批量处理多个股票 ===") + # stock_list = [ + # "600519.SZ", # 贵州茅台 + # "000002.SZ", # 万科A + # "600036.SH", # 招商银行 + # "601318.SH", # 中国平安 + # ] + + # results = wind_fetcher.batch_fetch_stocks(stock_list, "2024-12-31") + + # # 打印结果摘要 + # print("\n=== 处理结果摘要 ===") + # for stock, success in results.items(): + # status = "成功" if success else "失败" + # print(f"{stock}: {status}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/finance/financial_data_manager.py b/finance/financial_data_manager.py new file mode 100644 index 0000000..c3a2f2c --- /dev/null +++ b/finance/financial_data_manager.py @@ -0,0 +1,318 @@ +import logging +from datetime import datetime +from typing import List, Dict, Optional, Any + +try: + import MySQLdb + from MySQLdb import Error + MYSQL_AVAILABLE = True +except ImportError: + MYSQL_AVAILABLE = False + logging.warning("未安装MySQLdb,请安装: pip install mysqlclient") + + +class FinancialDataManager: + """金融数据管理类 - 使用MySQLdb连接MySQL数据库""" + + def __init__(self, host: str, database: str, user: str, password: str, port: int = 3306): + """ + 初始化数据库连接 + + Args: + host: 数据库主机 + database: 数据库名 + user: 用户名 + password: 密码 + port: 端口号,默认3306 + """ + if not MYSQL_AVAILABLE: + raise ImportError("未安装MySQLdb,请安装: pip install mysqlclient") + + self.host = host + self.database = database + self.user = user + self.password = password + self.port = port + self.connection = None + self.logger = self._setup_logger() + + def _setup_logger(self) -> logging.Logger: + """设置日志记录器""" + logger = logging.getLogger('FinancialDataManager') + logger.setLevel(logging.INFO) + + if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + def connect(self) -> bool: + """建立数据库连接""" + try: + self.connection = MySQLdb.connect( + host=self.host, + user=self.user, + passwd=self.password, + db=self.database, + port=self.port, + charset='utf8mb4', + autocommit=False + ) + if self.connection.open: + self.logger.info("成功连接到MySQL数据库") + return True + except Error as e: + self.logger.error(f"数据库连接失败: {e}") + return False + + def disconnect(self): + """关闭数据库连接""" + if self.connection and self.connection.open: + self.connection.close() + self.logger.info("数据库连接已关闭") + + def __enter__(self): + """上下文管理器入口""" + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """上下文管理器出口""" + self.disconnect() + + def _execute_query(self, query: str, params: tuple = None) -> bool: + """执行SQL查询的辅助方法""" + if not self.connection: + self.logger.error("数据库未连接") + return False + + try: + cursor = self.connection.cursor() + cursor.execute(query, params or ()) + self.connection.commit() + return True + except Error as e: + self.logger.error(f"执行查询失败: {e}") + if self.connection: + self.connection.rollback() + return False + finally: + if 'cursor' in locals(): + cursor.close() + + def _fetch_all(self, query: str, params: tuple = None) -> List[Dict[str, Any]]: + """执行查询并返回所有结果的辅助方法""" + if not self.connection: + self.logger.error("数据库未连接") + return [] + + try: + cursor = self.connection.cursor(MySQLdb.cursors.DictCursor) + cursor.execute(query, params or ()) + results = cursor.fetchall() + return results + except Error as e: + self.logger.error(f"查询失败: {e}") + return [] + finally: + if 'cursor' in locals(): + cursor.close() + + # ==================== 股票数据操作 ==================== + + def insert_stock(self, stock_data: Dict[str, Any]) -> bool: + """ + 插入或更新股票基本信息 + """ + required_fields = ['stock_id', 'stock_name'] + if not all(field in stock_data for field in required_fields): + self.logger.error(f"缺少必需字段: {required_fields}") + return False + + query = """ + INSERT INTO stocks (stock_id, stock_name, exchange, industry, market_cap, listing_date) + VALUES (%s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + stock_name = VALUES(stock_name), + exchange = VALUES(exchange), + industry = VALUES(industry), + market_cap = VALUES(market_cap), + listing_date = VALUES(listing_date), + updated_at = CURRENT_TIMESTAMP + """ + + success = self._execute_query(query, ( + stock_data['stock_id'], + stock_data['stock_name'], + stock_data.get('exchange'), + stock_data.get('industry'), + stock_data.get('market_cap'), + stock_data.get('listing_date') + )) + + if success: + self.logger.info(f"成功处理股票: {stock_data['stock_id']}") + return success + + def batch_insert_stocks(self, stocks_list: List[Dict[str, Any]]) -> bool: + """批量插入股票数据""" + success_count = 0 + for stock_data in stocks_list: + if self.insert_stock(stock_data): + success_count += 1 + + self.logger.info(f"批量插入完成: {success_count}/{len(stocks_list)} 成功") + return success_count == len(stocks_list) + + # ==================== 财务指标定义操作 ==================== + + def insert_indicator(self, indicator_data: Dict[str, Any]) -> bool: + """ + 插入或更新财务指标定义 + """ + required_fields = ['indicator_id', 'indicator_name'] + if not all(field in indicator_data for field in required_fields): + self.logger.error(f"缺少必需字段: {required_fields}") + return False + + query = """ + INSERT INTO financial_indicators + (indicator_id, indicator_name, indicator_desc, category, unit, data_source) + VALUES (%s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + indicator_name = VALUES(indicator_name), + indicator_desc = VALUES(indicator_desc), + category = VALUES(category), + unit = VALUES(unit), + data_source = VALUES(data_source) + """ + + success = self._execute_query(query, ( + indicator_data['indicator_id'], + indicator_data['indicator_name'], + indicator_data.get('indicator_desc'), + indicator_data.get('category'), + indicator_data.get('unit'), + indicator_data.get('data_source', 'Wind') + )) + + if success: + self.logger.info(f"成功处理指标: {indicator_data['indicator_id']}") + return success + + # ==================== 财务数据操作 ==================== + + def insert_financial_data(self, financial_data: Dict[str, Any]) -> bool: + """ + 插入财务数据 + """ + required_fields = ['stock_id', 'indicator_id', 'report_date', 'data_value'] + if not all(field in financial_data for field in required_fields): + self.logger.error(f"缺少必需字段: {required_fields}") + return False + + query = """ + INSERT INTO financial_data + (stock_id, indicator_id, report_date, fiscal_year, period_type, + currency, data_value, data_unit, data_status, source_system) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + data_value = VALUES(data_value), + data_unit = VALUES(data_unit), + data_status = VALUES(data_status), + source_system = VALUES(source_system), + last_updated = CURRENT_TIMESTAMP + """ + + success = self._execute_query(query, ( + financial_data['stock_id'], + financial_data['indicator_id'], + financial_data['report_date'], + financial_data.get('fiscal_year'), + financial_data.get('period_type', 'Y'), + financial_data.get('currency', 'CNY'), + financial_data['data_value'], + financial_data.get('data_unit'), + financial_data.get('data_status', 'valid'), + financial_data.get('source_system', 'Wind') + )) + + if success: + self.logger.info( + f"成功插入财务数据: {financial_data['stock_id']} - " + f"{financial_data['indicator_id']} - {financial_data['report_date']}" + ) + return success + + def batch_insert_financial_data(self, data_list: List[Dict[str, Any]]) -> bool: + """批量插入财务数据""" + success_count = 0 + for data in data_list: + if self.insert_financial_data(data): + success_count += 1 + + self.logger.info(f"批量插入财务数据完成: {success_count}/{len(data_list)} 成功") + return success_count == len(data_list) + + # ==================== 查询方法 ==================== + + def get_stock_info(self, stock_id: str) -> Optional[Dict[str, Any]]: + """根据股票代码查询股票信息""" + query = "SELECT * FROM stocks WHERE stock_id = %s" + results = self._fetch_all(query, (stock_id,)) + return results[0] if results else None + + def get_financial_data(self, stock_id: str, indicator_id: str = None, + start_date: str = None, end_date: str = None) -> List[Dict[str, Any]]: + """ + 查询财务数据 + """ + query = """ + SELECT fd.*, fi.indicator_name, fi.category, s.stock_name + FROM financial_data fd + LEFT JOIN financial_indicators fi ON fd.indicator_id = fi.indicator_id + LEFT JOIN stocks s ON fd.stock_id = s.stock_id + WHERE fd.stock_id = %s + """ + params = [stock_id] + + if indicator_id: + query += " AND fd.indicator_id = %s" + params.append(indicator_id) + + if start_date: + query += " AND fd.report_date >= %s" + params.append(start_date) + + if end_date: + query += " AND fd.report_date <= %s" + params.append(end_date) + + query += " ORDER BY fd.report_date DESC, fd.indicator_id" + + return self._fetch_all(query, tuple(params)) + + def check_data_exists(self, stock_id: str, indicator_id: str, + report_date: str, period_type: str = 'Y') -> bool: + """检查指定数据是否已存在""" + query = """ + SELECT COUNT(*) as count FROM financial_data + WHERE stock_id = %s AND indicator_id = %s + AND report_date = %s AND period_type = %s + """ + try: + cursor = self.connection.cursor() + cursor.execute(query, (stock_id, indicator_id, report_date, period_type)) + result = cursor.fetchone() + return result[0] > 0 + except Error as e: + self.logger.error(f"检查数据存在性失败: {e}") + return False + finally: + if 'cursor' in locals(): + cursor.close() \ No newline at end of file diff --git a/finance/wind_data_fetcher.py b/finance/wind_data_fetcher.py new file mode 100644 index 0000000..6ae031a --- /dev/null +++ b/finance/wind_data_fetcher.py @@ -0,0 +1,232 @@ +import pandas as pd +from datetime import datetime, timedelta +import logging +from typing import List, Dict, Any, Optional +import sys +import os + +# 添加当前目录到路径,以便导入自定义模块 +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from financial_data_manager import FinancialDataManager + +class WindDataFetcher: + """Wind数据获取器 - 用于从Wind API获取财务数据并存入数据库""" + + def __init__(self, db_config: Dict[str, Any]): + """ + 初始化Wind数据获取器 + + Args: + db_config: 数据库配置字典,包含host, database, user, password, port + """ + self.db_config = db_config + self.logger = self._setup_logger() + + # 指标定义映射 + self.indicator_definitions = { + 'wgsd_capex_ff': { + 'indicator_id': 'wgsd_capex_ff', + 'indicator_name': '资本支出_现金流量表', + 'category': '现金流量表', + 'unit': '元', + 'indicator_desc': '资本性支出,反映公司用于购置固定资产、无形资产和其他长期资产支付的现金' + }, + 'wgsd_assets_bus_cf': { + 'indicator_id': 'wgsd_assets_bus_cf', + 'indicator_name': '资产处置收益_现金流量表', + 'category': '现金流量表', + 'unit': '元', + 'indicator_desc': '资产处置产生的现金流量,反映公司处置固定资产、无形资产和其他长期资产收回的现金净额' + }, + 'wgsd_net_profit_is': { + 'indicator_id': 'wgsd_net_profit_is', + 'indicator_name': '净利润_利润表', + 'category': '利润表', + 'unit': '元', + 'indicator_desc': '净利润,反映公司在一定会计期间内实现的税后利润' + } + } + + def _setup_logger(self) -> logging.Logger: + """设置日志记录器""" + logger = logging.getLogger('WindDataFetcher') + logger.setLevel(logging.INFO) + + if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + def ensure_indicators_defined(self, db_manager: FinancialDataManager) -> bool: + """ + 确保财务指标定义已存在于数据库中 + + Args: + db_manager: 数据库管理器实例 + """ + try: + for indicator_id, definition in self.indicator_definitions.items(): + db_manager.insert_indicator(definition) + self.logger.info("财务指标定义已确保存在") + return True + except Exception as e: + self.logger.error(f"确保指标定义失败: {e}") + return False + + def fetch_wind_data(self, wind_code: str, end_date: str = None) -> Optional[pd.DataFrame]: + """ + 从Wind API获取财务数据 + + Args: + wind_code: 股票代码 (如: "000001.SZ") + end_date: 结束日期,格式 "YYYY-MM-DD",默认为当前日期 + + Returns: + pandas DataFrame 包含获取的数据,失败返回None + """ + try: + # 导入WindPy,如果未安装会抛出异常 + from WindPy import w + # 启动Wind + w.start() + + end_date = datetime.now().strftime('%Y-%m-%d') + + + # 定义要获取的指标 + indicators = "wgsd_capex_ff,wgsd_assets_bus_cf,wgsd_net_profit_is" + + self.logger.info(f"开始从Wind获取数据: {wind_code}, 指标: {indicators}") + + # 调用Wind API + # ED-10Y 表示结束日期前10年 + result = w.wsd(wind_code, indicators, "ED-10Y", end_date, + "unit=1;rptType=1;currencyType=;Period=Y;Days=Alldays;Currency=CNY") + + if result.ErrorCode != 0: + self.logger.error(f"Wind API错误: {result.ErrorCode} - {result.Data[0] if result.Data else 'Unknown error'}") + return None + + # 转换为DataFrame + df = pd.DataFrame(result.Data, index=result.Fields, columns=result.Times).T + df.index.name = 'report_date' + df.reset_index(inplace=True) + + # 重命名列 + column_mapping = { + 'wgsd_capex_ff': 'capital_expenditure', + 'wgsd_assets_bus_cf': 'asset_disposal_cash_flow', + 'wgsd_net_profit_is': 'net_profit' + } + df.rename(columns=column_mapping, inplace=True) + + self.logger.info(f"成功获取数据: {wind_code}, 共 {len(df)} 条记录") + return df + + except ImportError: + self.logger.error("未安装WindPy,请先安装Wind官方Python API") + return None + except Exception as e: + self.logger.error(f"获取Wind数据失败: {e}") + return None + + def process_and_save_data(self, wind_code: str, end_date: str = None) -> bool: + """ + 获取数据并保存到数据库 + + Args: + wind_code: 股票代码 + end_date: 结束日期 + + Returns: + 成功返回True,失败返回False + """ + try: + # 获取数据 + df = self.fetch_wind_data(wind_code, end_date) + if df is None or df.empty: + self.logger.warning(f"未获取到数据: {wind_code}") + return False + + # 使用数据库管理器 + with FinancialDataManager(**self.db_config) as db_manager: + # 确保指标定义存在 + self.ensure_indicators_defined(db_manager) + + # 准备要插入的数据 + data_to_insert = [] + + for _, row in df.iterrows(): + report_date = row['report_date'] + + # 处理每个指标的数据 + for wind_field, db_field in { + 'wgsd_capex_ff': 'capital_expenditure', + 'wgsd_assets_bus_cf': 'asset_disposal_cash_flow', + 'wgsd_net_profit_is': 'net_profit' + }.items(): + + value = row.get(db_field) + if pd.notna(value) and value is not None: + data_record = { + 'stock_id': wind_code, + 'indicator_id': wind_field, + 'report_date': report_date.strftime('%Y-%m-%d'), + 'data_value': float(value), + 'fiscal_year': report_date.year, + 'period_type': 'Y', + 'currency': 'CNY', + 'data_unit': '元', + 'source_system': 'Wind' + } + data_to_insert.append(data_record) + + # 批量插入数据 + if data_to_insert: + success = db_manager.batch_insert_financial_data(data_to_insert) + if success: + self.logger.info(f"成功保存 {len(data_to_insert)} 条数据到数据库: {wind_code}") + else: + self.logger.error(f"保存数据到数据库失败: {wind_code}") + return success + else: + self.logger.warning(f"没有有效数据需要保存: {wind_code}") + return False + + except Exception as e: + self.logger.error(f"处理保存数据失败: {e}") + return False + + def batch_fetch_stocks(self, stock_list: List[str], end_date: str = None) -> Dict[str, bool]: + """ + 批量获取多个股票的数据 + + Args: + stock_list: 股票代码列表 + end_date: 结束日期 + + Returns: + 字典,键为股票代码,值为是否成功 + """ + results = {} + + for stock_code in stock_list: + self.logger.info(f"开始处理股票: {stock_code}") + success = self.process_and_save_data(stock_code, end_date) + results[stock_code] = success + + # 添加短暂延迟,避免请求过于频繁 + import time + time.sleep(1) + + # 统计结果 + success_count = sum(results.values()) + self.logger.info(f"批量处理完成: 成功 {success_count}/{len(stock_list)}") + + return results \ No newline at end of file diff --git a/fintech_2025-11-15_214603.sql b/fintech_2025-11-15_214603.sql new file mode 100644 index 0000000..e534e88 --- /dev/null +++ b/fintech_2025-11-15_214603.sql @@ -0,0 +1,106 @@ +-- MySQL dump 10.13 Distrib 8.0.33, for Win64 (x86_64) +-- +-- Host: 127.0.0.1 Database: fintech +-- ------------------------------------------------------ +-- Server version 8.0.42 + +/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; +/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; +/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; +/*!50503 SET NAMES utf8mb4 */; +/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; +/*!40103 SET TIME_ZONE='+00:00' */; +/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; +/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; +/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; +/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; + +-- +-- Table structure for table `financial_data` +-- + +DROP TABLE IF EXISTS `financial_data`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!50503 SET character_set_client = utf8mb4 */; +CREATE TABLE `financial_data` ( + `data_id` bigint NOT NULL AUTO_INCREMENT, + `stock_id` varchar(20) NOT NULL, + `indicator_id` varchar(50) NOT NULL, + `report_date` date NOT NULL, + `fiscal_year` year DEFAULT NULL, + `period_type` varchar(10) DEFAULT 'Y', + `currency` varchar(10) DEFAULT 'CNY', + `data_value` decimal(20,4) DEFAULT NULL, + `data_unit` varchar(20) DEFAULT NULL, + `data_status` varchar(20) DEFAULT 'valid', + `source_system` varchar(20) DEFAULT 'Wind', + `last_updated` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`data_id`), + UNIQUE KEY `uk_stock_indicator_date` (`stock_id`,`indicator_id`,`report_date`,`period_type`), + KEY `idx_stock_id` (`stock_id`), + KEY `idx_indicator_id` (`indicator_id`), + KEY `idx_report_date` (`report_date`), + KEY `idx_fiscal_year` (`fiscal_year`), + KEY `idx_stock_date` (`stock_id`,`report_date`), + KEY `idx_indicator_date` (`indicator_id`,`report_date`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `financial_indicators` +-- + +DROP TABLE IF EXISTS `financial_indicators`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!50503 SET character_set_client = utf8mb4 */; +CREATE TABLE `financial_indicators` ( + `indicator_id` varchar(50) NOT NULL, + `indicator_name` varchar(100) NOT NULL, + `indicator_desc` text, + `category` varchar(20) DEFAULT NULL, + `unit` varchar(20) DEFAULT NULL, + `data_source` varchar(20) DEFAULT 'Wind', + `is_active` tinyint(1) DEFAULT '1', + `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`indicator_id`), + KEY `idx_category` (`category`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `stocks` +-- + +DROP TABLE IF EXISTS `stocks`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!50503 SET character_set_client = utf8mb4 */; +CREATE TABLE `stocks` ( + `stock_id` varchar(20) NOT NULL, + `stock_name` varchar(100) NOT NULL, + `exchange` varchar(10) DEFAULT NULL, + `industry` varchar(50) DEFAULT NULL, + `market_cap` decimal(18,2) DEFAULT NULL, + `listing_date` date DEFAULT NULL, + `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`stock_id`), + KEY `idx_exchange` (`exchange`), + KEY `idx_industry` (`industry`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Dumping routines for database 'fintech' +-- +/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; + +/*!40101 SET SQL_MODE=@OLD_SQL_MODE */; +/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; +/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; +/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; +/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */; +/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; +/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; + +-- Dump completed on 2025-11-15 21:46:16