diff --git a/finance/download_data.py b/finance/download_data.py index ec18db9..08d8505 100644 --- a/finance/download_data.py +++ b/finance/download_data.py @@ -1,8 +1,11 @@ import logging from wind_data_fetcher import WindDataFetcher -# 设置日志级别 -logging.basicConfig(level=logging.INFO) +# 设置更详细的日志级别 +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s' +) def main(): # 数据库配置 diff --git a/finance/wind_data_fetcher.py b/finance/wind_data_fetcher.py index 6ae031a..a3d575d 100644 --- a/finance/wind_data_fetcher.py +++ b/finance/wind_data_fetcher.py @@ -51,12 +51,12 @@ class WindDataFetcher: def _setup_logger(self) -> logging.Logger: """设置日志记录器""" logger = logging.getLogger('WindDataFetcher') - logger.setLevel(logging.INFO) + logger.setLevel(logging.DEBUG) # 改为DEBUG级别以获取更多信息 if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + '%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s] - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) @@ -71,8 +71,12 @@ class WindDataFetcher: db_manager: 数据库管理器实例 """ try: + self.logger.debug("开始确保指标定义存在") for indicator_id, definition in self.indicator_definitions.items(): - db_manager.insert_indicator(definition) + self.logger.debug(f"处理指标定义: {indicator_id}") + success = db_manager.insert_indicator(definition) + if not success: + self.logger.warning(f"插入指标定义失败: {indicator_id}") self.logger.info("财务指标定义已确保存在") return True except Exception as e: @@ -93,27 +97,33 @@ class WindDataFetcher: try: # 导入WindPy,如果未安装会抛出异常 from WindPy import w - # 启动Wind + + # 启动Wind + self.logger.debug("启动Wind API") w.start() - end_date = datetime.now().strftime('%Y-%m-%d') + if end_date is None: + end_date = datetime.now().strftime('%Y-%m-%d') - # 定义要获取的指标 indicators = "wgsd_capex_ff,wgsd_assets_bus_cf,wgsd_net_profit_is" - self.logger.info(f"开始从Wind获取数据: {wind_code}, 指标: {indicators}") + self.logger.info(f"开始从Wind获取数据: {wind_code}, 指标: {indicators}, 结束日期: {end_date}") # 调用Wind API # ED-10Y 表示结束日期前10年 + self.logger.debug("调用Wind API...") result = w.wsd(wind_code, indicators, "ED-10Y", end_date, "unit=1;rptType=1;currencyType=;Period=Y;Days=Alldays;Currency=CNY") + self.logger.debug(f"Wind API返回 - ErrorCode: {result.ErrorCode}, 数据形状: {len(result.Data)}x{len(result.Times) if result.Data else 0}") + if result.ErrorCode != 0: self.logger.error(f"Wind API错误: {result.ErrorCode} - {result.Data[0] if result.Data else 'Unknown error'}") return None # 转换为DataFrame + self.logger.debug("转换数据为DataFrame") df = pd.DataFrame(result.Data, index=result.Fields, columns=result.Times).T df.index.name = 'report_date' df.reset_index(inplace=True) @@ -126,6 +136,12 @@ class WindDataFetcher: } df.rename(columns=column_mapping, inplace=True) + # 添加调试信息 + self.logger.debug(f"DataFrame列名: {df.columns.tolist()}") + self.logger.debug(f"DataFrame形状: {df.shape}") + self.logger.debug(f"DataFrame前3行:\n{df.head(3)}") + self.logger.debug(f"数据统计:\n{df.describe()}") + self.logger.info(f"成功获取数据: {wind_code}, 共 {len(df)} 条记录") return df @@ -148,47 +164,79 @@ class WindDataFetcher: 成功返回True,失败返回False """ try: + self.logger.info(f"开始处理股票数据: {wind_code}") + # 获取数据 df = self.fetch_wind_data(wind_code, end_date) - if df is None or df.empty: - self.logger.warning(f"未获取到数据: {wind_code}") + if df is None: + self.logger.error(f"获取数据失败: {wind_code}") return False + + if df.empty: + self.logger.warning(f"获取到空数据: {wind_code}") + return False + + self.logger.debug(f"获取到的数据行数: {len(df)}") + self.logger.debug(f"DataFrame实际列名: {df.columns.tolist()}") # 使用数据库管理器 with FinancialDataManager(**self.db_config) as db_manager: # 确保指标定义存在 + self.logger.debug("确保指标定义存在") self.ensure_indicators_defined(db_manager) # 准备要插入的数据 data_to_insert = [] + valid_row_count = 0 + invalid_row_count = 0 - for _, row in df.iterrows(): + self.logger.debug("开始处理每一行数据") + for idx, row in df.iterrows(): + self.logger.debug(f"处理第 {idx} 行数据") report_date = row['report_date'] + self.logger.debug(f"报告日期: {report_date}") - # 处理每个指标的数据 - for wind_field, db_field in { - 'wgsd_capex_ff': 'capital_expenditure', - 'wgsd_assets_bus_cf': 'asset_disposal_cash_flow', - 'wgsd_net_profit_is': 'net_profit' - }.items(): + # 处理每个指标的数据 - 直接使用Wind返回的原始列名 + row_has_valid_data = False + for wind_field in ['WGSD_CAPEX_FF', 'WGSD_ASSETS_BUS_CF', 'WGSD_NET_PROFIT_IS']: + + value = row.get(wind_field) + self.logger.debug(f"指标 {wind_field} 的值: {value}, 类型: {type(value)}") - value = row.get(db_field) if pd.notna(value) and value is not None: - data_record = { - 'stock_id': wind_code, - 'indicator_id': wind_field, - 'report_date': report_date.strftime('%Y-%m-%d'), - 'data_value': float(value), - 'fiscal_year': report_date.year, - 'period_type': 'Y', - 'currency': 'CNY', - 'data_unit': '元', - 'source_system': 'Wind' - } - data_to_insert.append(data_record) + try: + # 将Wind字段名转换为小写作为indicator_id + indicator_id = wind_field.lower() + + data_record = { + 'stock_id': wind_code, + 'indicator_id': indicator_id, + 'report_date': report_date.strftime('%Y-%m-%d') if hasattr(report_date, 'strftime') else str(report_date), + 'data_value': float(value), + 'fiscal_year': report_date.year if hasattr(report_date, 'year') else pd.to_datetime(report_date).year, + 'period_type': 'Y', + 'currency': 'CNY', + 'data_unit': '元', + 'source_system': 'Wind' + } + data_to_insert.append(data_record) + row_has_valid_data = True + self.logger.debug(f"添加数据记录: {wind_field} = {value}") + except Exception as e: + self.logger.warning(f"处理数据值时出错: {value}, 错误: {e}") + else: + self.logger.debug(f"跳过空值: {wind_field}") + + if row_has_valid_data: + valid_row_count += 1 + else: + invalid_row_count += 1 + + self.logger.info(f"数据处理完成 - 有效行: {valid_row_count}, 无效行: {invalid_row_count}, 总记录数: {len(data_to_insert)}") # 批量插入数据 if data_to_insert: + self.logger.info(f"准备插入 {len(data_to_insert)} 条数据到数据库") success = db_manager.batch_insert_financial_data(data_to_insert) if success: self.logger.info(f"成功保存 {len(data_to_insert)} 条数据到数据库: {wind_code}") @@ -197,11 +245,18 @@ class WindDataFetcher: return success else: self.logger.warning(f"没有有效数据需要保存: {wind_code}") + # 检查为什么所有数据都被过滤掉了 + self.logger.debug("检查数据过滤原因:") + for idx, row in df.iterrows(): + self.logger.debug(f"第{idx}行数据:") + for col in df.columns: + self.logger.debug(f" {col}: {row[col]} (类型: {type(row[col])}, 是否空值: {pd.isna(row[col])})") return False - + except Exception as e: - self.logger.error(f"处理保存数据失败: {e}") + self.logger.error(f"处理保存数据失败: {e}", exc_info=True) return False + def batch_fetch_stocks(self, stock_list: List[str], end_date: str = None) -> Dict[str, bool]: """