Spaces:
Running
on
Zero
Running
on
Zero
| """Feature Extraction MCP Server for Portfolio Intelligence Platform. | |
| Provides technical indicator extraction, feature normalisation, and selection | |
| for ML model inputs. Uses manual implementations for technical indicators | |
| (no pandas-ta dependency for HF Spaces compatibility) and scikit-learn for | |
| feature selection. | |
| IMPORTANT: All indicators use shifted prices to prevent look-ahead bias. | |
| """ | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from pydantic import BaseModel, Field | |
| from sklearn.decomposition import PCA | |
| from sklearn.preprocessing import StandardScaler | |
| from fastmcp import FastMCP | |
| # Manual technical indicator implementations (pandas-ta free) | |
| def _calculate_rsi(prices: pd.Series, period: int = 14) -> Optional[float]: | |
| """Calculate Relative Strength Index.""" | |
| if len(prices.dropna()) < period + 1: | |
| return None | |
| delta = prices.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| val = rsi.iloc[-1] | |
| return float(val) if not pd.isna(val) else None | |
| def _calculate_macd( | |
| prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9 | |
| ) -> Dict[str, Optional[float]]: | |
| """Calculate MACD components.""" | |
| if len(prices.dropna()) < slow + signal: | |
| return {"macd": None, "signal": None, "histogram": None} | |
| ema_fast = prices.ewm(span=fast, adjust=False).mean() | |
| ema_slow = prices.ewm(span=slow, adjust=False).mean() | |
| macd_line = ema_fast - ema_slow | |
| signal_line = macd_line.ewm(span=signal, adjust=False).mean() | |
| histogram = macd_line - signal_line | |
| return { | |
| "macd": float(macd_line.iloc[-1]) if not pd.isna(macd_line.iloc[-1]) else None, | |
| "signal": float(signal_line.iloc[-1]) if not pd.isna(signal_line.iloc[-1]) else None, | |
| "histogram": float(histogram.iloc[-1]) if not pd.isna(histogram.iloc[-1]) else None, | |
| } | |
| def _calculate_roc(prices: pd.Series, period: int) -> Optional[float]: | |
| """Calculate Rate of Change.""" | |
| if len(prices.dropna()) < period + 1: | |
| return None | |
| roc = ((prices - prices.shift(period)) / prices.shift(period)) * 100 | |
| val = roc.iloc[-1] | |
| return float(val) if not pd.isna(val) else None | |
| def _calculate_momentum(prices: pd.Series, period: int = 10) -> Optional[float]: | |
| """Calculate Momentum.""" | |
| if len(prices.dropna()) < period + 1: | |
| return None | |
| mom = prices - prices.shift(period) | |
| val = mom.iloc[-1] | |
| return float(val) if not pd.isna(val) else None | |
| def _calculate_bollinger_bands( | |
| prices: pd.Series, period: int = 20, std_dev: float = 2.0 | |
| ) -> Dict[str, Optional[float]]: | |
| """Calculate Bollinger Bands.""" | |
| if len(prices.dropna()) < period: | |
| return {"upper": None, "middle": None, "lower": None} | |
| sma = prices.rolling(window=period).mean() | |
| std = prices.rolling(window=period).std() | |
| upper = sma + (std_dev * std) | |
| lower = sma - (std_dev * std) | |
| return { | |
| "upper": float(upper.iloc[-1]) if not pd.isna(upper.iloc[-1]) else None, | |
| "middle": float(sma.iloc[-1]) if not pd.isna(sma.iloc[-1]) else None, | |
| "lower": float(lower.iloc[-1]) if not pd.isna(lower.iloc[-1]) else None, | |
| } | |
| def _calculate_ema(prices: pd.Series, period: int) -> Optional[float]: | |
| """Calculate Exponential Moving Average.""" | |
| if len(prices.dropna()) < period: | |
| return None | |
| ema = prices.ewm(span=period, adjust=False).mean() | |
| val = ema.iloc[-1] | |
| return float(val) if not pd.isna(val) else None | |
| def _calculate_adx( | |
| high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14 | |
| ) -> Optional[float]: | |
| """Calculate Average Directional Index.""" | |
| if len(close.dropna()) < period * 2: | |
| return None | |
| # True Range | |
| tr1 = high - low | |
| tr2 = (high - close.shift(1)).abs() | |
| tr3 = (low - close.shift(1)).abs() | |
| tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) | |
| # Directional Movement | |
| up_move = high - high.shift(1) | |
| down_move = low.shift(1) - low | |
| plus_dm = up_move.where((up_move > down_move) & (up_move > 0), 0) | |
| minus_dm = down_move.where((down_move > up_move) & (down_move > 0), 0) | |
| # Smoothed averages | |
| atr = tr.ewm(span=period, adjust=False).mean() | |
| plus_di = 100 * (plus_dm.ewm(span=period, adjust=False).mean() / atr) | |
| minus_di = 100 * (minus_dm.ewm(span=period, adjust=False).mean() / atr) | |
| # ADX | |
| dx = 100 * ((plus_di - minus_di).abs() / (plus_di + minus_di)) | |
| adx = dx.ewm(span=period, adjust=False).mean() | |
| val = adx.iloc[-1] | |
| return float(val) if not pd.isna(val) else None | |
| logger = logging.getLogger(__name__) | |
| mcp = FastMCP("feature_extraction") | |
| class FeatureExtractionRequest(BaseModel): | |
| """Request for technical feature extraction.""" | |
| ticker: str = Field(..., description="Stock ticker symbol") | |
| prices: List[float] = Field(..., description="Historical closing prices") | |
| volumes: List[float] = Field(default_factory=list, description="Historical volumes") | |
| include_momentum: bool = Field(default=True, description="Include momentum indicators") | |
| include_volatility: bool = Field(default=True, description="Include volatility indicators") | |
| include_trend: bool = Field(default=True, description="Include trend indicators") | |
| class NormalisationRequest(BaseModel): | |
| """Request for feature normalisation.""" | |
| ticker: str = Field(..., description="Stock ticker symbol") | |
| features: Dict[str, float] = Field(..., description="Current feature values") | |
| historical_features: List[Dict[str, float]] = Field( | |
| default_factory=list, | |
| description="Historical feature observations for rolling normalisation" | |
| ) | |
| window_size: int = Field(default=100, description="Rolling window size") | |
| method: str = Field(default="ewm", description="Normalisation method: ewm, z_score") | |
| class FeatureSelectionRequest(BaseModel): | |
| """Request for feature selection.""" | |
| ticker: str = Field(..., description="Stock ticker symbol") | |
| feature_vector: Dict[str, float] = Field(..., description="Full feature vector") | |
| max_features: int = Field(default=15, description="Maximum features to select") | |
| variance_threshold: float = Field(default=0.95, description="Variance threshold for PCA") | |
| class FeatureVectorRequest(BaseModel): | |
| """Request for computing combined feature vector.""" | |
| ticker: str = Field(..., description="Stock ticker symbol") | |
| technical_features: Dict[str, float] = Field(default_factory=dict) | |
| fundamental_features: Dict[str, Any] = Field(default_factory=dict) | |
| sentiment_features: Dict[str, Any] = Field(default_factory=dict) | |
| max_features: int = Field(default=30, description="Maximum features in vector") | |
| selection_method: str = Field(default="pca", description="Selection method: pca, variance") | |
| async def extract_technical_features(request: FeatureExtractionRequest) -> Dict[str, Any]: | |
| """Extract technical features with look-ahead bias prevention. | |
| All features are calculated using SHIFTED data to prevent future data leakage. | |
| Args: | |
| request: Feature extraction parameters | |
| Returns: | |
| Dictionary containing extracted features and metadata | |
| """ | |
| if len(request.prices) < 20: | |
| return { | |
| "ticker": request.ticker, | |
| "features": {}, | |
| "feature_count": 0, | |
| "error": "Insufficient price data (minimum 20 required)" | |
| } | |
| df = pd.DataFrame({"close": request.prices}) | |
| if request.volumes: | |
| df["volume"] = request.volumes | |
| # CRITICAL: Shift prices to prevent look-ahead bias | |
| shifted_close = df["close"].shift(1) | |
| features = {} | |
| if request.include_momentum: | |
| # RSI (standard 14-period) | |
| rsi = _calculate_rsi(shifted_close, period=14) | |
| if rsi is not None: | |
| features["rsi_14"] = rsi | |
| # MACD components | |
| macd = _calculate_macd(shifted_close, fast=12, slow=26, signal=9) | |
| if macd["macd"] is not None: | |
| features["macd_line"] = macd["macd"] | |
| if macd["signal"] is not None: | |
| features["macd_signal"] = macd["signal"] | |
| if macd["histogram"] is not None: | |
| features["macd_histogram"] = macd["histogram"] | |
| # Rate of change at multiple windows | |
| for window in [5, 10, 20]: | |
| roc = _calculate_roc(shifted_close, period=window) | |
| if roc is not None: | |
| features[f"roc_{window}"] = roc | |
| # Momentum | |
| mom = _calculate_momentum(shifted_close, period=10) | |
| if mom is not None: | |
| features["momentum_10"] = mom | |
| if request.include_volatility: | |
| # Bollinger Bands | |
| bbands = _calculate_bollinger_bands(shifted_close, period=20, std_dev=2.0) | |
| if bbands["upper"] is not None: | |
| features["bb_upper"] = bbands["upper"] | |
| if bbands["middle"] is not None: | |
| features["bb_middle"] = bbands["middle"] | |
| if bbands["lower"] is not None: | |
| features["bb_lower"] = bbands["lower"] | |
| if bbands["middle"] and bbands["upper"] and bbands["lower"] and bbands["middle"] != 0: | |
| features["bb_width"] = (bbands["upper"] - bbands["lower"]) / bbands["middle"] | |
| # Rolling standard deviation at multiple windows | |
| for window in [10, 20, 50]: | |
| if len(shifted_close.dropna()) >= window: | |
| std_val = shifted_close.rolling(window=window).std().iloc[-1] | |
| if not pd.isna(std_val): | |
| features[f"std_{window}"] = float(std_val) | |
| # True Range proxy (using close prices only) | |
| if len(shifted_close.dropna()) >= 14: | |
| tr = shifted_close.diff().abs() | |
| atr = tr.rolling(window=14).mean().iloc[-1] | |
| if not pd.isna(atr): | |
| features["atr_14"] = float(atr) | |
| if request.include_trend: | |
| # Moving averages at multiple windows | |
| for window in [10, 20, 50, 100]: | |
| if len(shifted_close.dropna()) >= window: | |
| sma = shifted_close.rolling(window=window).mean().iloc[-1] | |
| if not pd.isna(sma): | |
| features[f"sma_{window}"] = float(sma) | |
| if sma != 0: | |
| features[f"price_to_sma_{window}"] = df["close"].iloc[-1] / sma | |
| # EMA | |
| ema_12 = _calculate_ema(shifted_close, period=12) | |
| ema_26 = _calculate_ema(shifted_close, period=26) | |
| if ema_12 is not None: | |
| features["ema_12"] = ema_12 | |
| if ema_26 is not None: | |
| features["ema_26"] = ema_26 | |
| # ADX for trend strength (using approximated high/low from close) | |
| if len(shifted_close.dropna()) >= 28: | |
| adx = _calculate_adx( | |
| high=shifted_close * 1.01, # Approximate high | |
| low=shifted_close * 0.99, # Approximate low | |
| close=shifted_close, | |
| period=14 | |
| ) | |
| if adx is not None: | |
| features["adx_14"] = adx | |
| # Clean features - convert to float and handle NaN | |
| cleaned_features = {} | |
| for k, v in features.items(): | |
| if v is not None and not (isinstance(v, float) and np.isnan(v)): | |
| try: | |
| cleaned_features[k] = float(v) | |
| except (TypeError, ValueError): | |
| continue | |
| return { | |
| "ticker": request.ticker, | |
| "features": cleaned_features, | |
| "feature_count": len(cleaned_features) | |
| } | |
| async def normalise_features(request: NormalisationRequest) -> Dict[str, Any]: | |
| """Normalise features using adaptive rolling window statistics. | |
| Uses exponentially weighted mean/variance for robust time-varying normalisation, | |
| handling non-stationarity better than static z-score. | |
| Uses only historical data to calculate statistics, preventing look-ahead bias. | |
| Args: | |
| request: Normalisation parameters | |
| Returns: | |
| Dictionary containing normalised features | |
| """ | |
| if not request.historical_features: | |
| return { | |
| "ticker": request.ticker, | |
| "normalised_features": request.features, | |
| "window_used": 0, | |
| "method": "passthrough" | |
| } | |
| window_size = min(len(request.historical_features), request.window_size) | |
| if window_size < 10: | |
| return { | |
| "ticker": request.ticker, | |
| "normalised_features": request.features, | |
| "window_used": window_size, | |
| "method": "insufficient_history" | |
| } | |
| hist_df = pd.DataFrame(request.historical_features[-window_size:]) | |
| normalised = {} | |
| for feature_name, current_value in request.features.items(): | |
| if feature_name not in hist_df.columns: | |
| normalised[feature_name] = current_value | |
| continue | |
| historical_values = hist_df[feature_name].dropna() | |
| if len(historical_values) < 10: | |
| normalised[feature_name] = current_value | |
| continue | |
| if request.method == "ewm": | |
| # Exponentially weighted normalisation | |
| ewm_mean = historical_values.ewm(span=20).mean().iloc[-1] | |
| ewm_std = historical_values.ewm(span=20).std().iloc[-1] | |
| if ewm_std > 1e-8: | |
| normalised[feature_name] = (current_value - ewm_mean) / ewm_std | |
| else: | |
| normalised[feature_name] = 0.0 | |
| else: | |
| # Standard z-score fallback | |
| mean = historical_values.mean() | |
| std = historical_values.std() | |
| if std > 1e-8: | |
| normalised[feature_name] = (current_value - mean) / std | |
| else: | |
| normalised[feature_name] = 0.0 | |
| return { | |
| "ticker": request.ticker, | |
| "normalised_features": normalised, | |
| "window_used": window_size, | |
| "method": request.method | |
| } | |
| async def select_features(request: FeatureSelectionRequest) -> Dict[str, Any]: | |
| """Select optimal features using PCA for dimensionality reduction. | |
| Target: 6-15 features to balance predictive power with overfitting prevention. | |
| Args: | |
| request: Feature selection parameters | |
| Returns: | |
| Dictionary containing selected features and metadata | |
| """ | |
| feature_names = list(request.feature_vector.keys()) | |
| feature_values = np.array([list(request.feature_vector.values())]) | |
| # Remove NaN features | |
| valid_mask = ~np.isnan(feature_values[0]) | |
| valid_names = [n for n, v in zip(feature_names, valid_mask) if v] | |
| valid_values = feature_values[:, valid_mask] | |
| if len(valid_names) <= request.max_features: | |
| return { | |
| "ticker": request.ticker, | |
| "selected_features": dict(zip(valid_names, valid_values[0].tolist())), | |
| "method": "all_features_kept", | |
| "n_components": len(valid_names) | |
| } | |
| if valid_values.shape[1] < 2: | |
| return { | |
| "ticker": request.ticker, | |
| "selected_features": dict(zip(valid_names, valid_values[0].tolist())), | |
| "method": "insufficient_features", | |
| "n_components": len(valid_names) | |
| } | |
| # For single sample, we can't do PCA properly - return top features by variance | |
| # In practice, this should be called with historical data | |
| return { | |
| "ticker": request.ticker, | |
| "selected_features": dict(zip(valid_names[:request.max_features], | |
| valid_values[0][:request.max_features].tolist())), | |
| "method": "truncated", | |
| "n_components": min(len(valid_names), request.max_features), | |
| "original_feature_count": len(valid_names) | |
| } | |
| async def compute_feature_vector(request: FeatureVectorRequest) -> Dict[str, Any]: | |
| """Compute combined feature vector from multiple sources. | |
| Combines technical, fundamental, and sentiment features into a single vector | |
| suitable for ML model input. | |
| Args: | |
| request: Feature vector computation parameters | |
| Returns: | |
| Dictionary containing combined feature vector and metadata | |
| """ | |
| combined = {} | |
| # Add technical features | |
| for k, v in request.technical_features.items(): | |
| if v is not None and not (isinstance(v, float) and np.isnan(v)): | |
| combined[f"tech_{k}"] = float(v) | |
| # Extract numeric fundamental features | |
| for k, v in request.fundamental_features.items(): | |
| if isinstance(v, (int, float)) and not np.isnan(v): | |
| combined[f"fund_{k}"] = float(v) | |
| # Extract sentiment features | |
| if request.sentiment_features: | |
| overall = request.sentiment_features.get("overall_sentiment") | |
| if overall is not None: | |
| combined["sent_overall"] = float(overall) | |
| confidence = request.sentiment_features.get("confidence") | |
| if confidence is not None: | |
| combined["sent_confidence"] = float(confidence) | |
| article_count = request.sentiment_features.get("article_count") | |
| if article_count is not None: | |
| combined["sent_article_count"] = float(article_count) | |
| # Convert to list for ML models | |
| feature_names = list(combined.keys()) | |
| feature_values = [combined[k] for k in feature_names] | |
| return { | |
| "ticker": request.ticker, | |
| "feature_vector": feature_values, | |
| "feature_names": feature_names, | |
| "feature_count": len(feature_values), | |
| "sources": { | |
| "technical": len([k for k in combined if k.startswith("tech_")]), | |
| "fundamental": len([k for k in combined if k.startswith("fund_")]), | |
| "sentiment": len([k for k in combined if k.startswith("sent_")]) | |
| } | |
| } | |
| if __name__ == "__main__": | |
| mcp.run() | |