File size: 23,084 Bytes
9b88b42
 
4cc5533
9b88b42
 
4cc5533
9b88b42
 
 
 
d72878a
9b88b42
6752363
915ea5d
9b88b42
 
f2c29a4
9b88b42
76897aa
9b88b42
 
 
 
ad036c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b88b42
 
 
76897aa
 
 
 
 
 
 
9b88b42
 
 
6d9f030
76897aa
9b88b42
 
6d9f030
76897aa
 
 
 
 
 
 
 
 
 
 
9b88b42
 
 
 
 
 
 
 
 
 
 
4cc5533
9b88b42
 
 
 
 
4cc5533
 
9b88b42
 
 
 
 
 
 
 
 
 
 
 
 
 
d72878a
9b88b42
 
 
 
 
 
a081162
 
 
 
 
 
 
 
9b88b42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad036c4
9b88b42
9f411df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
915ea5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17d25c6
 
 
 
 
 
 
 
 
 
 
 
 
 
915ea5d
 
 
9b88b42
 
 
 
 
 
9f411df
9b88b42
 
 
 
f2c29a4
 
 
 
 
9f411df
9b88b42
 
d72878a
 
 
 
 
9b88b42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d72878a
9b88b42
 
 
 
 
 
 
 
 
 
 
62c549e
 
 
 
9b88b42
 
 
62c549e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b88b42
62c549e
 
 
 
 
 
9b88b42
62c549e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b88b42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
915ea5d
9b88b42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2c29a4
 
 
 
9b88b42
 
d72878a
 
 
 
 
9b88b42
 
 
 
 
 
 
4cc5533
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22e48cb
4cc5533
 
 
 
 
 
 
 
 
 
 
 
 
9b88b42
 
 
d72878a
9b88b42
 
 
 
 
 
 
 
 
d72878a
 
9b88b42
 
 
 
 
 
 
4cc5533
1c78117
9b88b42
 
 
d72878a
 
 
 
9b88b42
 
 
 
 
d72878a
 
 
 
 
 
 
 
 
 
 
 
 
 
9b88b42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
"""LangGraph workflow for multi-agent portfolio analysis.

This implements the multi-phase architecture:
Phase 1: Data Layer MCPs (Yahoo Finance, FMP, Trading-MCP, FRED)
Phase 2: Computation Layer MCPs (Portfolio Optimizer, Risk Analyzer)
Phase 2.5: ML Predictions (Ensemble Predictor with Chronos)
Phase 3: LLM Synthesis (Portfolio Analyst Agent)
"""

import logging
import time
from typing import Dict, Any, List
from datetime import datetime, timezone
from decimal import Decimal

from langgraph.graph import StateGraph, END
from backend.models.agent_state import AgentState, MCPCall
from backend.agents.portfolio_analyst import PortfolioAnalystAgent
from backend.agents.personas import create_persona_agent, PersonaType

logger = logging.getLogger(__name__)


def summarize_fred_data(series_data: Dict[str, Any], indicator_name: str) -> Dict[str, Any]:
    """Summarize FRED time series to key statistics for token efficiency.

    Args:
        series_data: FRED API response with observations
        indicator_name: Name of the economic indicator

    Returns:
        Summarised statistics dictionary
    """
    if not series_data or 'observations' not in series_data:
        return {}

    observations = series_data['observations']
    if not observations:
        return {}

    # Extract values
    values = [float(obs.get('value', 0)) for obs in observations if obs.get('value')]
    if not values:
        return {}

    # Calculate statistics
    current = values[-1]
    mean = sum(values) / len(values)

    return {
        'name': indicator_name,
        'current': round(current, 4),
        'mean': round(mean, 4),
        'min': round(min(values), 4),
        'max': round(max(values), 4),
        'observations_count': len(values),
        'trend': 'up' if current > mean else 'down'
    }


class PortfolioAnalysisWorkflow:
    """LangGraph workflow for portfolio analysis."""

    def __init__(
        self,
        mcp_router,
        roast_mode: bool = False,
        persona: PersonaType | str | None = None
    ):
        """Initialise the workflow with MCP router.

        Args:
            mcp_router: MCP router instance for calling MCP servers
            roast_mode: If True, use brutal honesty mode for analysis
            persona: Optional investor persona (e.g., 'warren_buffett', 'cathie_wood', 'ray_dalio')
        """
        self.mcp_router = mcp_router
        self.roast_mode = roast_mode
        self.persona = persona

        # Initialise the appropriate analyst agent
        if persona:
            # Use persona-based analysis
            self.analyst_agent = create_persona_agent(persona)
            logger.info(f"Using persona agent: {persona}")
        else:
            # Use standard analysis (with optional roast mode)
            self.analyst_agent = PortfolioAnalystAgent(roast_mode=roast_mode)
            logger.info(f"Using standard analyst (roast_mode={roast_mode})")

        # Build the workflow graph
        self.workflow = self._build_workflow()

    def _build_workflow(self) -> StateGraph:
        """Build the LangGraph workflow."""
        workflow = StateGraph(AgentState)

        # Add nodes for each phase
        workflow.add_node("phase_1_data_layer", self._phase_1_data_layer)
        workflow.add_node("phase_2_computation", self._phase_2_computation)
        workflow.add_node("phase_2_5_ml_predictions", self._phase_2_5_ml_predictions)
        workflow.add_node("phase_3_synthesis", self._phase_3_synthesis)

        # Define the flow
        workflow.set_entry_point("phase_1_data_layer")
        workflow.add_edge("phase_1_data_layer", "phase_2_computation")
        workflow.add_edge("phase_2_computation", "phase_2_5_ml_predictions")
        workflow.add_edge("phase_2_5_ml_predictions", "phase_3_synthesis")
        workflow.add_edge("phase_3_synthesis", END)

        return workflow.compile()

    async def _phase_1_data_layer(self, state: AgentState) -> AgentState:
        """Phase 1: Fetch all data from data layer MCPs.

        MCPs called:
        - Yahoo Finance: Real-time quotes and historical data
        - FMP: Company fundamentals
        - Trading-MCP: Technical indicators
        - FRED: Economic indicators
        """
        logger.info("PHASE 1: Fetching data from Data Layer MCPs")
        phase_start = time.perf_counter()

        tickers = [h["ticker"] for h in state["holdings"]]

        try:
            # Fetch market data (Yahoo Finance)
            logger.debug(f"Fetching market data for {len(tickers)} tickers")
            market_data_list = await self.mcp_router.call_yahoo_finance_mcp("get_quote", {"tickers": tickers})

            # Transform list to dict keyed by ticker
            market_data = {}
            for quote in market_data_list:
                ticker = quote.get("ticker") or quote.get("symbol")
                if ticker:
                    market_data[ticker] = quote

            # Fetch historical data for each ticker
            historical_data = {}
            for ticker in tickers:
                hist = await self.mcp_router.call_yahoo_finance_mcp(
                    "get_historical_data",
                    {"ticker": ticker, "period": "1y", "interval": "1d"}
                )
                historical_data[ticker] = hist

            # Fetch fundamentals (FMP)
            logger.debug("Fetching company fundamentals")
            fundamentals = {}
            for ticker in tickers:
                fund = await self.mcp_router.call_fmp_mcp("get_company_profile", {"ticker": ticker})
                fundamentals[ticker] = fund

            # Fetch technical indicators (Trading-MCP)
            logger.debug("Calculating technical indicators")
            technical_indicators = {}
            for ticker in tickers:
                tech = await self.mcp_router.call_trading_mcp(
                    "get_technical_indicators",
                    {"ticker": ticker, "period": "3mo"}
                )
                technical_indicators[ticker] = tech

            # Fetch economic data (FRED)
            logger.debug("Fetching economic indicators")
            economic_data = {}
            for series_id in ["GDP", "UNRATE", "DFF"]:
                econ = await self.mcp_router.call_fred_mcp("get_economic_series", {"series_id": series_id})
                economic_data[series_id] = summarize_fred_data(econ, series_id)

            # Fetch news sentiment (Enhancement #3 - News Sentiment MCP)
            logger.debug("Fetching news sentiment for all holdings")
            sentiment_data = {}
            for ticker in tickers:
                try:
                    sentiment = await self.mcp_router.call_news_sentiment_mcp(
                        "get_news_with_sentiment",
                        {"ticker": ticker, "days_back": 7}
                    )
                    sentiment_data[ticker] = sentiment
                    logger.debug(f"{ticker} sentiment: {sentiment.get('overall_sentiment', 0):.2f}")
                except Exception as e:
                    logger.warning(f"Failed to fetch sentiment for {ticker}: {e}")
                    # Continue with empty sentiment on error
                    sentiment_data[ticker] = {
                        "ticker": ticker,
                        "overall_sentiment": 0.0,
                        "confidence": 0.0,
                        "article_count": 0,
                        "articles": [],
                        "error": str(e)
                    }

            # Enrich holdings with market values based on realtime data
            enriched_holdings = []
            for holding in state["holdings"]:
                ticker = holding.get("ticker")
                quantity = holding.get("quantity", 0)
                dollar_amount = holding.get("dollar_amount", 0)

                # Get current price from realtime_data
                current_price = None
                if ticker in market_data:
                    price_data = market_data[ticker]
                    current_price = price_data.get("price", 0) or price_data.get("regularMarketPrice", 0)

                # Calculate market value
                if quantity > 0 and current_price:
                    market_value = Decimal(str(quantity)) * Decimal(str(current_price))
                elif dollar_amount > 0:
                    market_value = Decimal(str(dollar_amount))
                else:
                    market_value = Decimal("0")

                # Create enriched holding (immutable pattern)
                enriched_holding = {
                    **holding,
                    "current_price": current_price,
                    "market_value": float(market_value)
                }
                enriched_holdings.append(enriched_holding)

            # Calculate total portfolio value
            total_portfolio_value = sum(h["market_value"] for h in enriched_holdings)

            # Calculate portfolio weights
            for holding in enriched_holdings:
                if total_portfolio_value > 0:
                    holding["weight"] = holding["market_value"] / total_portfolio_value
                else:
                    # Edge case: equal weights if total is 0
                    holding["weight"] = 1.0 / len(enriched_holdings) if len(enriched_holdings) > 0 else 0.0

            # Log for verification
            logger.info(f"Portfolio total value: ${total_portfolio_value:,.2f}, weights sum: {sum(h['weight'] for h in enriched_holdings):.4f}")

            # Update state with enriched holdings
            state["holdings"] = enriched_holdings

            # Update state
            state["historical_prices"] = historical_data
            state["fundamentals"] = fundamentals
            state["realtime_data"] = market_data
            state["technical_indicators"] = technical_indicators
            state["economic_data"] = economic_data
            state["sentiment_data"] = sentiment_data  # Enhancement #3
            state["current_step"] = "phase_1_complete"

            # Log MCP calls
            state["mcp_calls"].extend([
                MCPCall.model_validate({"mcp": "yahoo_finance", "tool": "get_quote"}).model_dump(),
                MCPCall.model_validate({"mcp": "yahoo_finance", "tool": "get_historical_data"}).model_dump(),
                MCPCall.model_validate({"mcp": "fmp", "tool": "get_company_profile"}).model_dump(),
                MCPCall.model_validate({"mcp": "trading_mcp", "tool": "get_technical_indicators"}).model_dump(),
                MCPCall.model_validate({"mcp": "fred", "tool": "get_economic_series"}).model_dump(),
                MCPCall.model_validate({"mcp": "news_sentiment", "tool": "get_news_with_sentiment"}).model_dump(),
            ])

            # Track phase duration
            phase_duration_ms = int((time.perf_counter() - phase_start) * 1000)
            state["phase_1_duration_ms"] = phase_duration_ms

            logger.info(f"PHASE 1 COMPLETE: Fetched data for {len(tickers)} assets ({phase_duration_ms}ms)")

        except Exception as e:
            logger.error(f"Error in Phase 1: {e}")
            state["errors"].append(f"Phase 1 error: {str(e)}")

        return state

    async def _phase_2_computation(self, state: AgentState) -> AgentState:
        """Phase 2: Run computational models with data from Phase 1.

        MCPs called:
        - Portfolio Optimizer: HRP, Black-Litterman, Mean-Variance
        - Risk Analyzer: VaR, CVaR, Monte Carlo
        """
        logger.info("PHASE 2: Running Computation Layer MCPs")
        phase_start = time.perf_counter()

        try:
            # Prepare market data for optimization
            market_data_list = []
            for ticker, hist_data in state["historical_prices"].items():
                market_data_list.append({
                    "ticker": ticker,
                    "prices": hist_data.get("close_prices", []),
                    "dates": hist_data.get("dates", []),
                })

            # Check for single-asset portfolio (requires minimum 2 assets for optimization)
            unique_tickers = set(ticker for ticker in state["historical_prices"].keys())
            is_single_asset = len(unique_tickers) < 2

            # Run portfolio optimizations
            logger.debug("Running portfolio optimizations")

            if is_single_asset:
                # Single-asset fallback: cannot optimise, show 100% allocation
                single_ticker = list(unique_tickers)[0]
                logger.info(f"Single-asset portfolio detected ({single_ticker}) - skipping optimization, showing 100% allocation")

                # Create fallback optimization results with 100% allocation
                fallback_weights = {single_ticker: 1.0}

                # Calculate basic metrics from the single asset
                ticker_holding = next(
                    (h for h in state["holdings"] if h["ticker"] == single_ticker),
                    None
                )

                fallback_result = {
                    "weights": fallback_weights,
                    "expected_return": 0.0,
                    "volatility": 0.0,
                    "sharpe": 0.0,
                }

                hrp_result = fallback_result
                bl_result = fallback_result
                mv_result = {
                    **fallback_result,
                    "message": "Portfolio optimization requires minimum 2 assets. Showing current 100% allocation."
                }

            else:
                # Multiple assets: proceed with normal optimization
                # HRP
                hrp_result = await self.mcp_router.call_portfolio_optimizer_mcp(
                    "optimize_hrp",
                    {
                        "market_data": market_data_list,
                        "method": "hrp",
                        "risk_tolerance": state["risk_tolerance"],
                    }
                )

                # Black-Litterman
                bl_result = await self.mcp_router.call_portfolio_optimizer_mcp(
                    "optimize_black_litterman",
                    {
                        "market_data": market_data_list,
                        "method": "black_litterman",
                        "risk_tolerance": state["risk_tolerance"],
                    }
                )

                # Mean-Variance
                mv_result = await self.mcp_router.call_portfolio_optimizer_mcp(
                    "optimize_mean_variance",
                    {
                        "market_data": market_data_list,
                        "method": "mean_variance",
                        "risk_tolerance": state["risk_tolerance"],
                    }
                )

            # Run risk analysis
            logger.debug("Running risk analysis")
            portfolio_input = []
            for holding in state["holdings"]:
                ticker = holding["ticker"]
                historical_prices = state["historical_prices"].get(ticker, {}).get("close_prices", [])

                portfolio_input.append({
                    "ticker": ticker,
                    "weight": holding.get("weight", 0),
                    "prices": historical_prices,
                })

            risk_result = await self.mcp_router.call_risk_analyzer_mcp(
                "analyze_risk",
                {
                    "portfolio": portfolio_input,
                    "portfolio_value": sum(h.get("market_value", 0) for h in state["holdings"]),
                    "confidence_level": 0.95,
                    "method": "monte_carlo",
                    "num_simulations": 10000,
                }
            )

            # Update state
            state["optimisation_results"] = {
                "hrp": hrp_result,
                "black_litterman": bl_result,
                "mean_variance": mv_result,
            }
            state["risk_analysis"] = risk_result
            state["current_step"] = "phase_2_complete"

            # Log MCP calls
            state["mcp_calls"].extend([
                MCPCall.model_validate({"mcp": "portfolio_optimizer_mcp", "tool": "optimize_hrp"}).model_dump(),
                MCPCall.model_validate({"mcp": "portfolio_optimizer_mcp", "tool": "optimize_black_litterman"}).model_dump(),
                MCPCall.model_validate({"mcp": "portfolio_optimizer_mcp", "tool": "optimize_mean_variance"}).model_dump(),
                MCPCall.model_validate({"mcp": "risk_analyzer_mcp", "tool": "analyze_risk"}).model_dump(),
            ])

            # Track phase duration
            phase_duration_ms = int((time.perf_counter() - phase_start) * 1000)
            state["phase_2_duration_ms"] = phase_duration_ms

            logger.info(f"PHASE 2 COMPLETE: Optimizations and risk analysis done ({phase_duration_ms}ms)")

        except Exception as e:
            logger.error(f"Error in Phase 2: {e}")
            state["errors"].append(f"Phase 2 error: {str(e)}")

        return state

    async def _phase_2_5_ml_predictions(self, state: AgentState) -> AgentState:
        """Phase 2.5: Generate ML-based price forecasts using Ensemble Predictor.

        MCP called:
        - Ensemble Predictor: Chronos + statistical models for price forecasting
        """
        logger.info("PHASE 2.5: Generating ML predictions")
        phase_start = time.perf_counter()

        try:
            # Generate forecasts for each holding
            logger.debug("Running ensemble forecasts for portfolio holdings")
            ensemble_forecasts = {}

            for holding in state["holdings"]:
                ticker = holding["ticker"]

                # Get historical prices from Phase 1 data
                hist_data = state["historical_prices"].get(ticker, {})
                prices = hist_data.get("close_prices", [])

                if not prices or len(prices) < 10:
                    logger.warning(f"Insufficient price data for {ticker}, skipping forecast")
                    continue

                try:
                    # Call ensemble predictor
                    forecast_result = await self.mcp_router.call_ensemble_predictor_mcp(
                        "forecast_ensemble",
                        {
                            "ticker": ticker,
                            "prices": prices,
                            "forecast_horizon": 30,  # 30-day forecast
                            "confidence_level": 0.95,
                            "use_returns": True,  # Forecast returns for stability
                            "ensemble_method": "mean",  # Simple averaging
                        }
                    )

                    ensemble_forecasts[ticker] = forecast_result
                    logger.debug(f"Generated forecast for {ticker} using {len(forecast_result.get('models_used', []))} models")

                except Exception as e:
                    logger.warning(f"Forecast failed for {ticker}: {e}")
                    continue

            # Update state
            state["ensemble_forecasts"] = ensemble_forecasts
            state["current_step"] = "phase_2_5_complete"

            # Log MCP calls
            state["mcp_calls"].extend([
                MCPCall.model_validate({
                    "mcp": "ensemble_predictor",
                    "tool": "forecast_ensemble"
                }).model_dump(),
            ])

            # Track phase duration
            phase_duration_ms = int((time.perf_counter() - phase_start) * 1000)
            state["phase_2_5_duration_ms"] = phase_duration_ms

            logger.info(
                f"PHASE 2.5 COMPLETE: Generated forecasts for {len(ensemble_forecasts)} assets ({phase_duration_ms}ms)"
            )

        except Exception as e:
            logger.error(f"Error in Phase 2.5: {e}")
            state["errors"].append(f"Phase 2.5 error: {str(e)}")
            # Set empty forecasts to allow workflow to continue
            state["ensemble_forecasts"] = {}

        return state

    async def _phase_3_synthesis(self, state: AgentState) -> AgentState:
        """Phase 3: LLM synthesis of all data into actionable insights."""
        logger.info("PHASE 3: LLM Synthesis")
        phase_start = time.perf_counter()

        try:
            # Prepare data for analyst agent
            portfolio_data = {
                "holdings": state["holdings"],
                "portfolio_id": state.get("portfolio_id", "unknown"),
                "risk_tolerance": state["risk_tolerance"],
            }

            # Call analyst agent (returns AgentResult with usage metrics)
            result = await self.analyst_agent.analyze_portfolio(
                portfolio_data=portfolio_data,
                market_data=state.get("realtime_data", {}),
                fundamentals=state.get("fundamentals", {}),
                technical_indicators=state.get("technical_indicators", {}),
                economic_data=state.get("economic_data", {}),
                optimization_results=state.get("optimisation_results", {}),
                risk_analysis=state.get("risk_analysis", {}),
                ensemble_forecasts=state.get("ensemble_forecasts", {}),
                sentiment_data=state.get("sentiment_data", {}),
                risk_tolerance=state["risk_tolerance"],
            )

            # Extract analysis output and usage metrics
            analysis = result.output

            # Update state with analysis results
            state["ai_synthesis"] = analysis.summary
            state["recommendations"] = analysis.recommendations
            state["reasoning_steps"].extend(analysis.reasoning)
            state["current_step"] = "complete"

            # Track LLM usage metrics
            state["llm_input_tokens"] = result.input_tokens
            state["llm_output_tokens"] = result.output_tokens
            state["llm_total_tokens"] = result.total_tokens
            state["llm_request_count"] = result.request_count

            # Track phase duration
            phase_duration_ms = int((time.perf_counter() - phase_start) * 1000)
            state["phase_3_duration_ms"] = phase_duration_ms

            logger.info(
                f"PHASE 3 COMPLETE: Analysis generated (health score: {analysis.health_score}, "
                f"{result.total_tokens} tokens, {phase_duration_ms}ms)"
            )

        except Exception as e:
            logger.error(f"Error in Phase 3: {e}")
            state["errors"].append(f"Phase 3 error: {str(e)}")

        return state

    async def run(self, initial_state: AgentState) -> AgentState:
        """Run the complete workflow.

        Args:
            initial_state: Initial state with portfolio and query

        Returns:
            Final state with complete analysis
        """
        logger.info(f"Starting portfolio analysis workflow for {len(initial_state['holdings'])} holdings")

        result = await self.workflow.ainvoke(initial_state)

        logger.info("Workflow complete")
        return result