Files
SKRIPSI/web-model/model.py
reizha 831085d8b3 feat
2025-07-11 19:14:55 +07:00

444 lines
16 KiB
Python

# Enhanced PatchTST Stock Classifier with 3-class Labels (SELL, HOLD, BUY) + Class Weight & K-Fold Cross-Validation
import yfinance as yf
import pandas as pd
import numpy as np
import talib
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import StratifiedKFold
from datasets import Dataset
from transformers import (
PatchTSTConfig, PatchTSTForClassification,
Trainer, TrainingArguments, EarlyStoppingCallback
)
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")
# ===== CONFIG =====
CONTEXT_LENGTH = 48
HORIZON = 6
LABEL_COLUMN = "action"
LABEL_NAMES = ['SELL', 'HOLD', 'BUY']
N_SPLITS = 5 # K-Fold Cross Validation
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from gymnasium import Env, spaces
class StockTradingEnv(Env):
def __init__(self, features, labels, prices, buy_th=0.02, sell_th=0.02):
super().__init__()
self.features = features
self.labels = labels
self.prices = prices
self.buy_th = buy_th
self.sell_th = sell_th
self.current_step = 0
self.action_space = spaces.Discrete(3) # 0=SELL, 1=HOLD, 2=BUY
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(features.shape[1],), dtype=np.float32)
def reset(self, *, seed=None, options=None):
super().reset(seed=seed)
self.current_step = 0
return self.features[self.current_step], {}
def step(self, action):
price_now = self.prices[self.current_step]
future_prices = self.prices[self.current_step+1:self.current_step+HORIZON+1]
done = self.current_step >= len(self.features) - HORIZON - 1
reward = 0
if len(future_prices) > 0:
future_max = np.max(future_prices)
future_min = np.min(future_prices)
max_gain = (future_max - price_now) / price_now
max_loss = (price_now - future_min) / price_now
if action == 2 and max_gain >= self.buy_th:
reward = +1
elif action == 0 and max_loss >= self.sell_th:
reward = +1
elif action == 1 and (max_gain < self.buy_th and max_loss < self.sell_th):
reward = +0.5 # HOLD benar
else:
reward = -1
self.current_step += 1
next_obs = self.features[self.current_step] if not done else np.zeros_like(self.features[0])
return next_obs, reward, done, False, {}
def train_ppo_on_patch_features(X_patch, y, price_series):
env = DummyVecEnv([lambda: StockTradingEnv(X_patch, y, price_series)])
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=20000)
return model
class FocalLoss(nn.Module):
def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
super().__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, logits, targets):
ce_loss = F.cross_entropy(logits, targets, reduction='none', weight=self.alpha)
pt = torch.exp(-ce_loss)
loss = (1 - pt) ** self.gamma * ce_loss
return loss.mean() if self.reduction == 'mean' else loss.sum()
class CustomTrainer(Trainer):
def __init__(self, *args, focal_alpha=None, focal_gamma=2.0, **kwargs):
super().__init__(*args, **kwargs)
self.focal_loss_fn = FocalLoss(alpha=focal_alpha, gamma=focal_gamma)
def compute_loss(self, model, inputs, return_outputs=False):
labels = inputs.pop("target_values").long()
outputs = model(**inputs)
logits = outputs.prediction_logits
if self.focal_loss_fn.alpha is not None:
self.focal_loss_fn.alpha = self.focal_loss_fn.alpha.to(logits.device)
loss = self.focal_loss_fn(logits, labels)
return (loss, outputs) if return_outputs else loss
def plot_confusion_and_report(y_true, y_pred, labels):
cm = confusion_matrix(y_true, y_pred)
report = classification_report(y_true, y_pred, target_names=labels, output_dict=True)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.tight_layout()
plt.show()
f1_scores = [report[label]['f1-score'] for label in labels]
plt.figure(figsize=(6, 4))
sns.barplot(x=labels, y=f1_scores)
plt.title("F1-Score per Class")
plt.ylabel("F1 Score")
plt.ylim(0, 1)
plt.tight_layout()
plt.show()
def download_stock_data(path="bbca_1h.csv", use_yfinance=False):
if use_yfinance:
df = yf.download("BBCA.JK", interval="1d", start="2024-01-01", auto_adjust=True)
# Cek jika multi-index, drop level atas
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df = df.reset_index().rename(columns={"Date": "Date"})
# else:
# df = pd.read_csv(path, parse_dates=["datetime"])
# df = df.rename(columns={
# "datetime": "Date", "open": "Open", "high": "High",
# "low": "Low", "close": "Close", "volume": "Volume"
# })
# df = df[df["symbol"] == "IDX:BBCA"].drop(columns=["symbol"])
return df.sort_values("Date").reset_index(drop=True)
def extract_features(df):
df['return'] = df['Close'].pct_change()
df['daily_momentum'] = df['Close'] / df['Open']
df['range_efficiency'] = abs(df['Close'] - df['Open']) / (df['High'] - df['Low']).replace(0, np.nan)
df['volume_momentum'] = df['Volume'] / df['Volume'].rolling(5).mean().replace(0, np.nan)
df['adx'] = talib.ADX(df['High'], df['Low'], df['Close'], timeperiod=14) / 100.0
df['rsi_scaled'] = talib.RSI(df['Close'], timeperiod=14) / 100.0
return df.dropna().reset_index(drop=True)
def create_labels(df, buy_th=0.02, sell_th=0.02, horizon=HORIZON, label_column='action'):
labels = []
for i in range(len(df)):
if i > len(df) - horizon - 1:
labels.append(np.nan)
continue
base = df['Close'].iloc[i]
future = df['Close'].iloc[i+1:i+1+horizon]
max_gain = (np.max(future) - base) / base
max_loss = (base - np.min(future)) / base
if max_loss >= sell_th:
labels.append(0) # SELL
elif max_gain >= buy_th:
labels.append(2) # BUY
else:
labels.append(1) # HOLD
df[label_column] = labels
return df
def create_sliding_windows(df, features):
X, y = [], []
for i in range(CONTEXT_LENGTH, len(df)):
label = df[LABEL_COLUMN].iloc[i]
if np.isnan(label): continue
X.append(df[features].iloc[i - CONTEXT_LENGTH:i].values)
y.append(label)
return np.array(X), np.array(y)
def compute_metrics(eval_pred):
preds, labels = eval_pred
preds = np.argmax(preds, axis=1)
report = classification_report(labels, preds, target_names=LABEL_NAMES, output_dict=True)
return {
'accuracy': accuracy_score(labels, preds),
'f1_macro': report['macro avg']['f1-score'],
'precision_macro': report['macro avg']['precision'],
'recall_macro': report['macro avg']['recall']
}
def train_model_with_cv(df, feature_columns):
scaler = StandardScaler()
df[feature_columns] = scaler.fit_transform(df[feature_columns])
X, y = create_sliding_windows(df, feature_columns)
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=42)
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
print(f"\n==== Fold {fold+1}/{N_SPLITS} ====")
train_dataset = Dataset.from_dict({"past_values": X[train_idx], "target_values": torch.tensor(y[train_idx], dtype=torch.long)})
val_dataset = Dataset.from_dict({"past_values": X[val_idx], "target_values": torch.tensor(y[val_idx], dtype=torch.long)})
model = PatchTSTForClassification(PatchTSTConfig(
num_input_channels=len(feature_columns),
num_targets=3,
context_length=CONTEXT_LENGTH,
patch_length=HORIZON,
stride=HORIZON,
embedding_dim=512,
num_layers=4,
num_heads=16,
use_cls_token=True
))
args = TrainingArguments(
output_dir=f"./checkpoints/fold{fold}",
evaluation_strategy="epoch",
save_strategy="epoch",
learning_rate=5e-5,
per_device_train_batch_size=32,
per_device_eval_batch_size=32,
num_train_epochs=50,
load_best_model_at_end=True,
metric_for_best_model="accuracy",
report_to="none",
label_names=["target_values"],
weight_decay=0.01,
warmup_ratio=0.1,
lr_scheduler_type="cosine"
)
class_weights = compute_class_weight('balanced', classes=np.unique(y[train_idx]), y=y[train_idx])
trainer = CustomTrainer(
model=model,
args=args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
focal_alpha=torch.tensor(class_weights, dtype=torch.float32),
focal_gamma=1.0,
callbacks=[EarlyStoppingCallback(early_stopping_patience=5)]
)
trainer.train()
eval_result = trainer.evaluate()
predictions = trainer.predict(val_dataset)
y_pred = predictions.predictions.argmax(axis=1)
y_true = predictions.label_ids
# plot_confusion_and_report(y_true, y_pred, LABEL_NAMES)
trainer.save_model(f"./checkpoints/fold{fold}/best_model")
print("Eval result:", eval_result)
def extract_patch_features(model, X):
model.eval()
features = []
with torch.no_grad():
for i in range(0, len(X), 64): # batch processing
batch = torch.tensor(X[i:i+64], dtype=torch.float32)
outputs = model.base_model(
past_values=batch,
output_hidden_states=True,
return_dict=True
)
# Ambil CLS token dari hidden state terakhir
cls_tokens = outputs.hidden_states[-1][:, 0, :] # [batch_size, hidden_dim]
features.append(cls_tokens)
return torch.cat(features, dim=0).numpy()
def evaluate_ppo_model(ppo_model, features, labels, prices, starting_balance=10000, verbose=True):
env = StockTradingEnv(features, labels, prices)
obs, _ = env.reset()
done = False
balance = starting_balance
position = None # {'type': 'BUY'/'SELL', 'price': float}
rewards = []
actions = []
price_history = []
while not done:
action, _ = ppo_model.predict(obs, deterministic=True)
actions.append(action)
current_price = prices[env.current_step]
# Execute trading logic (simplified)
if action == 2: # BUY
if position is None:
position = {'type': 'BUY', 'price': current_price}
elif action == 0: # SELL
if position and position['type'] == 'BUY':
# Sell and take profit/loss
profit = (current_price - position['price']) / position['price']
balance *= (1 + profit)
position = None
# HOLD does nothing
obs, reward, done, _, _ = env.step(action)
rewards.append(reward)
price_history.append(balance)
final_reward = sum(rewards)
roi = (balance - starting_balance) / starting_balance
if verbose:
print(f"Initial Capital: ${starting_balance:.2f}")
print(f"Final Capital: ${balance:.2f}")
print(f"Total ROI: {roi*100:.2f}%")
print(f"Total Reward: {final_reward}")
print(f"Total Actions: {len(actions)}")
plt.figure(figsize=(10, 4))
plt.plot(price_history, label="Capital over Time")
plt.title("PPO Trading Simulation")
plt.ylabel("Capital ($)")
plt.xlabel("Steps")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
return {
"final_balance": balance,
"roi": roi,
"total_reward": final_reward,
"actions": actions,
"price_over_time": price_history,
}
def main():
df = download_stock_data(use_yfinance=True) # Change to False if using local CSV
if df.empty: return print("No data!")
df = extract_features(df)
df = create_labels(df)
print(df[LABEL_COLUMN].value_counts(normalize=True))
feature_cols = [col for col in df.columns if col not in ["Date", LABEL_COLUMN, "Open", "High", "Low", "Close", "Volume"]]
train_model_with_cv(df, feature_cols)
scaler = StandardScaler()
df[feature_cols] = scaler.fit_transform(df[feature_cols])
X, y = create_sliding_windows(df, feature_cols)
patch_model = PatchTSTForClassification.from_pretrained("./checkpoints/fold3/best_model")
patch_model.eval()
patch_features = extract_patch_features(patch_model, X)
X_patch_final = patch_features[:, 0, :] # Ambil CLS token -> shape: (265, 128)
y_aligned = y[:len(X_patch_final)]
close_aligned = df["Close"].values[-len(X_patch_final):]
print("patch_features:", patch_features.shape)
print("X_patch_final:", X_patch_final.shape)
print("y:", y_aligned.shape)
print("close:", close_aligned.shape)
ppo_model = train_ppo_on_patch_features(X_patch_final, y_aligned, close_aligned)
ppo_model.save("ppo_stock_trading_model")
ppo_model = PPO.load("ppo_stock_trading_model")
results = evaluate_ppo_model(
ppo_model,
X_patch_final,
y_aligned,
close_aligned,
starting_balance=2000
)
def predict(endDate: str, symbol="BBCA.JK", use_yfinance=True):
from datetime import datetime, timedelta
# 1. Load model
patch_model = PatchTSTForClassification.from_pretrained("./checkpoints/fold3/best_model")
ppo_model = PPO.load("ppo_stock_trading_model")
# 2. Set end date dan ambil data cukup panjang ke belakang
end_date = pd.to_datetime(endDate)
start_date = end_date - timedelta(days=CONTEXT_LENGTH * 5)
df = yf.download(symbol, start=start_date.strftime('%Y-%m-%d'),
end=(end_date + timedelta(days=1)).strftime('%Y-%m-%d'),
interval="1d", auto_adjust=True)
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df = df.reset_index().rename(columns={"Date": "Date"})
df = df.sort_values("Date").reset_index(drop=True)
if df.empty or len(df) < CONTEXT_LENGTH:
return {"error": "Not enough data for prediction."}
df = extract_features(df)
# Cek apakah endDate tersedia dalam df
if end_date not in df["Date"].values:
return {"error": f"endDate {endDate} not found in data."}
# Cari index endDate dalam df
target_idx = df[df["Date"] == end_date].index[0]
if target_idx < CONTEXT_LENGTH:
return {"error": "Not enough candles before endDate."}
# 3. Ambil window data sebelum endDate
window_df = df.iloc[target_idx - CONTEXT_LENGTH:target_idx]
feature_cols = [col for col in df.columns if col not in ["Date", "Open", "High", "Low", "Close", "Volume", LABEL_COLUMN]]
# 4. Normalize dan ubah ke format tensor
scaler = StandardScaler()
X_window = scaler.fit_transform(window_df[feature_cols])
X_input = np.expand_dims(X_window, axis=0)
# 5. Ekstrak fitur dari PatchTST (CLS token)
patch_feature = extract_patch_features(patch_model, X_input)[:, 0, :] # shape: (1, hidden_dim)
# 6. PPO prediction
action, _ = ppo_model.predict(patch_feature[0], deterministic=True)
action_label = LABEL_NAMES[action]
close_price = float(df.loc[target_idx, "Close"])
return {
"symbol": symbol,
"clicked_date": end_date.strftime("%Y-%m-%d"),
"action_code": int(action),
"action_label": action_label,
"close_price": close_price
}
# if __name__ == "__main__":
# result = predict("2024-07-01", symbol="BBCA.JK")
# print(result)