初步完成数据库代码(插入数据失败)

This commit is contained in:
2025-11-16 07:29:28 +08:00
parent 825300fd85
commit d9d907e673
4 changed files with 699 additions and 0 deletions

43
finance/download_data.py Normal file
View File

@ -0,0 +1,43 @@
import logging
from wind_data_fetcher import WindDataFetcher
# 设置日志级别
logging.basicConfig(level=logging.INFO)
def main():
# 数据库配置
db_config = {
'host': '127.0.0.1',
'database': 'fintech',
'user': 'root',
'password': 'secret',
'port': 3306
}
# 初始化Wind数据获取器
wind_fetcher = WindDataFetcher(db_config)
# 单个股票示例
print("=== 处理单个股票 ===")
success = wind_fetcher.process_and_save_data("000001.SZ", "2024-12-31")
print(f"处理结果: {'成功' if success else '失败'}")
# # 批量处理示例
# print("\n=== 批量处理多个股票 ===")
# stock_list = [
# "600519.SZ", # 贵州茅台
# "000002.SZ", # 万科A
# "600036.SH", # 招商银行
# "601318.SH", # 中国平安
# ]
# results = wind_fetcher.batch_fetch_stocks(stock_list, "2024-12-31")
# # 打印结果摘要
# print("\n=== 处理结果摘要 ===")
# for stock, success in results.items():
# status = "成功" if success else "失败"
# print(f"{stock}: {status}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,318 @@
import logging
from datetime import datetime
from typing import List, Dict, Optional, Any
try:
import MySQLdb
from MySQLdb import Error
MYSQL_AVAILABLE = True
except ImportError:
MYSQL_AVAILABLE = False
logging.warning("未安装MySQLdb请安装: pip install mysqlclient")
class FinancialDataManager:
"""金融数据管理类 - 使用MySQLdb连接MySQL数据库"""
def __init__(self, host: str, database: str, user: str, password: str, port: int = 3306):
"""
初始化数据库连接
Args:
host: 数据库主机
database: 数据库名
user: 用户名
password: 密码
port: 端口号默认3306
"""
if not MYSQL_AVAILABLE:
raise ImportError("未安装MySQLdb请安装: pip install mysqlclient")
self.host = host
self.database = database
self.user = user
self.password = password
self.port = port
self.connection = None
self.logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
"""设置日志记录器"""
logger = logging.getLogger('FinancialDataManager')
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def connect(self) -> bool:
"""建立数据库连接"""
try:
self.connection = MySQLdb.connect(
host=self.host,
user=self.user,
passwd=self.password,
db=self.database,
port=self.port,
charset='utf8mb4',
autocommit=False
)
if self.connection.open:
self.logger.info("成功连接到MySQL数据库")
return True
except Error as e:
self.logger.error(f"数据库连接失败: {e}")
return False
def disconnect(self):
"""关闭数据库连接"""
if self.connection and self.connection.open:
self.connection.close()
self.logger.info("数据库连接已关闭")
def __enter__(self):
"""上下文管理器入口"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.disconnect()
def _execute_query(self, query: str, params: tuple = None) -> bool:
"""执行SQL查询的辅助方法"""
if not self.connection:
self.logger.error("数据库未连接")
return False
try:
cursor = self.connection.cursor()
cursor.execute(query, params or ())
self.connection.commit()
return True
except Error as e:
self.logger.error(f"执行查询失败: {e}")
if self.connection:
self.connection.rollback()
return False
finally:
if 'cursor' in locals():
cursor.close()
def _fetch_all(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
"""执行查询并返回所有结果的辅助方法"""
if not self.connection:
self.logger.error("数据库未连接")
return []
try:
cursor = self.connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute(query, params or ())
results = cursor.fetchall()
return results
except Error as e:
self.logger.error(f"查询失败: {e}")
return []
finally:
if 'cursor' in locals():
cursor.close()
# ==================== 股票数据操作 ====================
def insert_stock(self, stock_data: Dict[str, Any]) -> bool:
"""
插入或更新股票基本信息
"""
required_fields = ['stock_id', 'stock_name']
if not all(field in stock_data for field in required_fields):
self.logger.error(f"缺少必需字段: {required_fields}")
return False
query = """
INSERT INTO stocks (stock_id, stock_name, exchange, industry, market_cap, listing_date)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
stock_name = VALUES(stock_name),
exchange = VALUES(exchange),
industry = VALUES(industry),
market_cap = VALUES(market_cap),
listing_date = VALUES(listing_date),
updated_at = CURRENT_TIMESTAMP
"""
success = self._execute_query(query, (
stock_data['stock_id'],
stock_data['stock_name'],
stock_data.get('exchange'),
stock_data.get('industry'),
stock_data.get('market_cap'),
stock_data.get('listing_date')
))
if success:
self.logger.info(f"成功处理股票: {stock_data['stock_id']}")
return success
def batch_insert_stocks(self, stocks_list: List[Dict[str, Any]]) -> bool:
"""批量插入股票数据"""
success_count = 0
for stock_data in stocks_list:
if self.insert_stock(stock_data):
success_count += 1
self.logger.info(f"批量插入完成: {success_count}/{len(stocks_list)} 成功")
return success_count == len(stocks_list)
# ==================== 财务指标定义操作 ====================
def insert_indicator(self, indicator_data: Dict[str, Any]) -> bool:
"""
插入或更新财务指标定义
"""
required_fields = ['indicator_id', 'indicator_name']
if not all(field in indicator_data for field in required_fields):
self.logger.error(f"缺少必需字段: {required_fields}")
return False
query = """
INSERT INTO financial_indicators
(indicator_id, indicator_name, indicator_desc, category, unit, data_source)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
indicator_name = VALUES(indicator_name),
indicator_desc = VALUES(indicator_desc),
category = VALUES(category),
unit = VALUES(unit),
data_source = VALUES(data_source)
"""
success = self._execute_query(query, (
indicator_data['indicator_id'],
indicator_data['indicator_name'],
indicator_data.get('indicator_desc'),
indicator_data.get('category'),
indicator_data.get('unit'),
indicator_data.get('data_source', 'Wind')
))
if success:
self.logger.info(f"成功处理指标: {indicator_data['indicator_id']}")
return success
# ==================== 财务数据操作 ====================
def insert_financial_data(self, financial_data: Dict[str, Any]) -> bool:
"""
插入财务数据
"""
required_fields = ['stock_id', 'indicator_id', 'report_date', 'data_value']
if not all(field in financial_data for field in required_fields):
self.logger.error(f"缺少必需字段: {required_fields}")
return False
query = """
INSERT INTO financial_data
(stock_id, indicator_id, report_date, fiscal_year, period_type,
currency, data_value, data_unit, data_status, source_system)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
data_value = VALUES(data_value),
data_unit = VALUES(data_unit),
data_status = VALUES(data_status),
source_system = VALUES(source_system),
last_updated = CURRENT_TIMESTAMP
"""
success = self._execute_query(query, (
financial_data['stock_id'],
financial_data['indicator_id'],
financial_data['report_date'],
financial_data.get('fiscal_year'),
financial_data.get('period_type', 'Y'),
financial_data.get('currency', 'CNY'),
financial_data['data_value'],
financial_data.get('data_unit'),
financial_data.get('data_status', 'valid'),
financial_data.get('source_system', 'Wind')
))
if success:
self.logger.info(
f"成功插入财务数据: {financial_data['stock_id']} - "
f"{financial_data['indicator_id']} - {financial_data['report_date']}"
)
return success
def batch_insert_financial_data(self, data_list: List[Dict[str, Any]]) -> bool:
"""批量插入财务数据"""
success_count = 0
for data in data_list:
if self.insert_financial_data(data):
success_count += 1
self.logger.info(f"批量插入财务数据完成: {success_count}/{len(data_list)} 成功")
return success_count == len(data_list)
# ==================== 查询方法 ====================
def get_stock_info(self, stock_id: str) -> Optional[Dict[str, Any]]:
"""根据股票代码查询股票信息"""
query = "SELECT * FROM stocks WHERE stock_id = %s"
results = self._fetch_all(query, (stock_id,))
return results[0] if results else None
def get_financial_data(self, stock_id: str, indicator_id: str = None,
start_date: str = None, end_date: str = None) -> List[Dict[str, Any]]:
"""
查询财务数据
"""
query = """
SELECT fd.*, fi.indicator_name, fi.category, s.stock_name
FROM financial_data fd
LEFT JOIN financial_indicators fi ON fd.indicator_id = fi.indicator_id
LEFT JOIN stocks s ON fd.stock_id = s.stock_id
WHERE fd.stock_id = %s
"""
params = [stock_id]
if indicator_id:
query += " AND fd.indicator_id = %s"
params.append(indicator_id)
if start_date:
query += " AND fd.report_date >= %s"
params.append(start_date)
if end_date:
query += " AND fd.report_date <= %s"
params.append(end_date)
query += " ORDER BY fd.report_date DESC, fd.indicator_id"
return self._fetch_all(query, tuple(params))
def check_data_exists(self, stock_id: str, indicator_id: str,
report_date: str, period_type: str = 'Y') -> bool:
"""检查指定数据是否已存在"""
query = """
SELECT COUNT(*) as count FROM financial_data
WHERE stock_id = %s AND indicator_id = %s
AND report_date = %s AND period_type = %s
"""
try:
cursor = self.connection.cursor()
cursor.execute(query, (stock_id, indicator_id, report_date, period_type))
result = cursor.fetchone()
return result[0] > 0
except Error as e:
self.logger.error(f"检查数据存在性失败: {e}")
return False
finally:
if 'cursor' in locals():
cursor.close()

View File

@ -0,0 +1,232 @@
import pandas as pd
from datetime import datetime, timedelta
import logging
from typing import List, Dict, Any, Optional
import sys
import os
# 添加当前目录到路径,以便导入自定义模块
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from financial_data_manager import FinancialDataManager
class WindDataFetcher:
"""Wind数据获取器 - 用于从Wind API获取财务数据并存入数据库"""
def __init__(self, db_config: Dict[str, Any]):
"""
初始化Wind数据获取器
Args:
db_config: 数据库配置字典包含host, database, user, password, port
"""
self.db_config = db_config
self.logger = self._setup_logger()
# 指标定义映射
self.indicator_definitions = {
'wgsd_capex_ff': {
'indicator_id': 'wgsd_capex_ff',
'indicator_name': '资本支出_现金流量表',
'category': '现金流量表',
'unit': '',
'indicator_desc': '资本性支出,反映公司用于购置固定资产、无形资产和其他长期资产支付的现金'
},
'wgsd_assets_bus_cf': {
'indicator_id': 'wgsd_assets_bus_cf',
'indicator_name': '资产处置收益_现金流量表',
'category': '现金流量表',
'unit': '',
'indicator_desc': '资产处置产生的现金流量,反映公司处置固定资产、无形资产和其他长期资产收回的现金净额'
},
'wgsd_net_profit_is': {
'indicator_id': 'wgsd_net_profit_is',
'indicator_name': '净利润_利润表',
'category': '利润表',
'unit': '',
'indicator_desc': '净利润,反映公司在一定会计期间内实现的税后利润'
}
}
def _setup_logger(self) -> logging.Logger:
"""设置日志记录器"""
logger = logging.getLogger('WindDataFetcher')
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def ensure_indicators_defined(self, db_manager: FinancialDataManager) -> bool:
"""
确保财务指标定义已存在于数据库中
Args:
db_manager: 数据库管理器实例
"""
try:
for indicator_id, definition in self.indicator_definitions.items():
db_manager.insert_indicator(definition)
self.logger.info("财务指标定义已确保存在")
return True
except Exception as e:
self.logger.error(f"确保指标定义失败: {e}")
return False
def fetch_wind_data(self, wind_code: str, end_date: str = None) -> Optional[pd.DataFrame]:
"""
从Wind API获取财务数据
Args:
wind_code: 股票代码 (如: "000001.SZ")
end_date: 结束日期,格式 "YYYY-MM-DD",默认为当前日期
Returns:
pandas DataFrame 包含获取的数据失败返回None
"""
try:
# 导入WindPy如果未安装会抛出异常
from WindPy import w
# 启动Wind
w.start()
end_date = datetime.now().strftime('%Y-%m-%d')
# 定义要获取的指标
indicators = "wgsd_capex_ff,wgsd_assets_bus_cf,wgsd_net_profit_is"
self.logger.info(f"开始从Wind获取数据: {wind_code}, 指标: {indicators}")
# 调用Wind API
# ED-10Y 表示结束日期前10年
result = w.wsd(wind_code, indicators, "ED-10Y", end_date,
"unit=1;rptType=1;currencyType=;Period=Y;Days=Alldays;Currency=CNY")
if result.ErrorCode != 0:
self.logger.error(f"Wind API错误: {result.ErrorCode} - {result.Data[0] if result.Data else 'Unknown error'}")
return None
# 转换为DataFrame
df = pd.DataFrame(result.Data, index=result.Fields, columns=result.Times).T
df.index.name = 'report_date'
df.reset_index(inplace=True)
# 重命名列
column_mapping = {
'wgsd_capex_ff': 'capital_expenditure',
'wgsd_assets_bus_cf': 'asset_disposal_cash_flow',
'wgsd_net_profit_is': 'net_profit'
}
df.rename(columns=column_mapping, inplace=True)
self.logger.info(f"成功获取数据: {wind_code}, 共 {len(df)} 条记录")
return df
except ImportError:
self.logger.error("未安装WindPy请先安装Wind官方Python API")
return None
except Exception as e:
self.logger.error(f"获取Wind数据失败: {e}")
return None
def process_and_save_data(self, wind_code: str, end_date: str = None) -> bool:
"""
获取数据并保存到数据库
Args:
wind_code: 股票代码
end_date: 结束日期
Returns:
成功返回True失败返回False
"""
try:
# 获取数据
df = self.fetch_wind_data(wind_code, end_date)
if df is None or df.empty:
self.logger.warning(f"未获取到数据: {wind_code}")
return False
# 使用数据库管理器
with FinancialDataManager(**self.db_config) as db_manager:
# 确保指标定义存在
self.ensure_indicators_defined(db_manager)
# 准备要插入的数据
data_to_insert = []
for _, row in df.iterrows():
report_date = row['report_date']
# 处理每个指标的数据
for wind_field, db_field in {
'wgsd_capex_ff': 'capital_expenditure',
'wgsd_assets_bus_cf': 'asset_disposal_cash_flow',
'wgsd_net_profit_is': 'net_profit'
}.items():
value = row.get(db_field)
if pd.notna(value) and value is not None:
data_record = {
'stock_id': wind_code,
'indicator_id': wind_field,
'report_date': report_date.strftime('%Y-%m-%d'),
'data_value': float(value),
'fiscal_year': report_date.year,
'period_type': 'Y',
'currency': 'CNY',
'data_unit': '',
'source_system': 'Wind'
}
data_to_insert.append(data_record)
# 批量插入数据
if data_to_insert:
success = db_manager.batch_insert_financial_data(data_to_insert)
if success:
self.logger.info(f"成功保存 {len(data_to_insert)} 条数据到数据库: {wind_code}")
else:
self.logger.error(f"保存数据到数据库失败: {wind_code}")
return success
else:
self.logger.warning(f"没有有效数据需要保存: {wind_code}")
return False
except Exception as e:
self.logger.error(f"处理保存数据失败: {e}")
return False
def batch_fetch_stocks(self, stock_list: List[str], end_date: str = None) -> Dict[str, bool]:
"""
批量获取多个股票的数据
Args:
stock_list: 股票代码列表
end_date: 结束日期
Returns:
字典,键为股票代码,值为是否成功
"""
results = {}
for stock_code in stock_list:
self.logger.info(f"开始处理股票: {stock_code}")
success = self.process_and_save_data(stock_code, end_date)
results[stock_code] = success
# 添加短暂延迟,避免请求过于频繁
import time
time.sleep(1)
# 统计结果
success_count = sum(results.values())
self.logger.info(f"批量处理完成: 成功 {success_count}/{len(stock_list)}")
return results

View File

@ -0,0 +1,106 @@
-- MySQL dump 10.13 Distrib 8.0.33, for Win64 (x86_64)
--
-- Host: 127.0.0.1 Database: fintech
-- ------------------------------------------------------
-- Server version 8.0.42
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!50503 SET NAMES utf8mb4 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
--
-- Table structure for table `financial_data`
--
DROP TABLE IF EXISTS `financial_data`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `financial_data` (
`data_id` bigint NOT NULL AUTO_INCREMENT,
`stock_id` varchar(20) NOT NULL,
`indicator_id` varchar(50) NOT NULL,
`report_date` date NOT NULL,
`fiscal_year` year DEFAULT NULL,
`period_type` varchar(10) DEFAULT 'Y',
`currency` varchar(10) DEFAULT 'CNY',
`data_value` decimal(20,4) DEFAULT NULL,
`data_unit` varchar(20) DEFAULT NULL,
`data_status` varchar(20) DEFAULT 'valid',
`source_system` varchar(20) DEFAULT 'Wind',
`last_updated` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`data_id`),
UNIQUE KEY `uk_stock_indicator_date` (`stock_id`,`indicator_id`,`report_date`,`period_type`),
KEY `idx_stock_id` (`stock_id`),
KEY `idx_indicator_id` (`indicator_id`),
KEY `idx_report_date` (`report_date`),
KEY `idx_fiscal_year` (`fiscal_year`),
KEY `idx_stock_date` (`stock_id`,`report_date`),
KEY `idx_indicator_date` (`indicator_id`,`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `financial_indicators`
--
DROP TABLE IF EXISTS `financial_indicators`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `financial_indicators` (
`indicator_id` varchar(50) NOT NULL,
`indicator_name` varchar(100) NOT NULL,
`indicator_desc` text,
`category` varchar(20) DEFAULT NULL,
`unit` varchar(20) DEFAULT NULL,
`data_source` varchar(20) DEFAULT 'Wind',
`is_active` tinyint(1) DEFAULT '1',
`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`indicator_id`),
KEY `idx_category` (`category`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `stocks`
--
DROP TABLE IF EXISTS `stocks`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!50503 SET character_set_client = utf8mb4 */;
CREATE TABLE `stocks` (
`stock_id` varchar(20) NOT NULL,
`stock_name` varchar(100) NOT NULL,
`exchange` varchar(10) DEFAULT NULL,
`industry` varchar(50) DEFAULT NULL,
`market_cap` decimal(18,2) DEFAULT NULL,
`listing_date` date DEFAULT NULL,
`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`stock_id`),
KEY `idx_exchange` (`exchange`),
KEY `idx_industry` (`industry`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping routines for database 'fintech'
--
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2025-11-15 21:46:16