import numpy as np import pandas as pd import yfinance as yf stock_code = 'MSFT' def fetch_stock_data(stock_code, start_date='2024-01-01', end_date='2025-01-01'): # 拉取数据 df = yf.download(stock_code, start=start_date, end=end_date) df['5d_change'] = df['Close'].pct_change(5) * 100 df['20d_change'] = df['Close'].pct_change(20) * 100 df['60d_change'] = df['Close'].pct_change(60) * 100 # 将数据保存为 CSV 文件 df.to_csv(f'{stock_code}.csv') return df def calculate_rsi(data, window=14): delta = data['Close'].diff() gain = np.where(delta > 0, delta, 0).ravel() loss = np.where(delta < 0, -delta, 0).ravel() avg_gain = pd.Series(gain).rolling(window=window).mean() avg_loss = pd.Series(loss).rolling(window=window).mean() rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) data['RSI'] = rsi return data def calculate_rsi2(data, window=14): delta = data['Close'].diff() gain = delta.clip(lower=0) loss = -delta.clip(upper=0) avg_gain = gain.rolling(window=window).mean() avg_loss = loss.rolling(window=window).mean() rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) data['RSI'] = rsi return data ''' df = fetch_stock_data(stock_code) df = calculate_rsi2(df) # 输出RSI值为NaN的行 nan_rsi_rows = df[df['RSI'].isna()] print("RSI值为NaN的行:") print(nan_rsi_rows['RSI']) # 统计非NaN的行数 non_nan_count = df['RSI'].count() print(f"RSI值非NaN的行数:{non_nan_count}") ''' def calculate_macd(data, short_window=12, long_window=26, signal_window=9): data['EMA12'] = data['Close'].ewm(span=short_window, adjust=False).mean() data['EMA26'] = data['Close'].ewm(span=long_window, adjust=False).mean() data['MACD'] = data['EMA12'] - data['EMA26'] data['Signal_Line'] = data['MACD'].ewm(span=signal_window, adjust=False).mean() return data def strategy(data): # 策略条件:超跌 + RSI低于30 + MACD金叉 buy_signals = (data['5d_change'] < -10) & (data['20d_change'] < -20) & (data['RSI'] < 30) & (data['MACD'] > data['Signal_Line']) buy_signals2 = (data['5d_change'] < -10) & (data['20d_change'] < -20) buy_signals3 = (data['5d_change'] < -10) & (data['RSI'] < 30) & (data['MACD'] > data['Signal_Line']) buy_signals4 = (data['5d_change'] < -10) & (data['RSI'] < 30) sell_signals = (data['Close'].pct_change() < -0.05) | (data['Close'].pct_change() > 0.1) data['Buy_Signal'] = buy_signals4 data['Sell_Signal'] = sell_signals return data # 主函数 def run_strategy(stock_code): df = fetch_stock_data(stock_code) df = calculate_rsi2(df) df = calculate_macd(df) df = strategy(df) buy_signal_df = df[df['Buy_Signal']] print(buy_signal_df[['Close', '5d_change', 'RSI', 'MACD', 'Signal_Line', 'Buy_Signal', 'Sell_Signal']]) sell_signal_df = df[df['Sell_Signal']] print(sell_signal_df[['Close', '5d_change', 'RSI', 'MACD', 'Signal_Line', 'Buy_Signal', 'Sell_Signal']]) #signal_df = df[(df['Buy_Signal']) | (df['Sell_Signal'])] #print(signal_df[['Close', '5d_change', 'RSI', 'MACD', 'Signal_Line', 'Buy_Signal', 'Sell_Signal']]) #print(df[['Close', '5d_change', 'RSI', 'MACD', 'Signal_Line', 'Buy_Signal', 'Sell_Signal']].tail(20)) # 示例:运行策略 run_strategy('KC')