# import json, datetime # from config import * # import pandas as pd # import scipy.io as scio # from modules.expression_pool import init_db, add_expr, top_exprs # from agents.generator_agent import GeneratorAgent # from agents.evaluator_agent import evaluate_expression # from modules.utils import load_mat_as_numeric # # prepare # conn = init_db(DB_PATH) # # df = pd.read_csv(DATASET_PATH) # # df = scio.loadmat(DATASET_PATH) # 读取数据文件 # X, y = load_mat_as_numeric(DATASET_PATH) # X_df = pd.DataFrame(X) # # print(df) # # X_df = pd.DataFrame(df['X']) # 读取训练数据 # # print(df['Y']) # # y0 = pd.DataFrame(df['Y']) # 读取标签 # # X_df = df.drop(columns=['label']) # # y = y0.values # # print("y type:", type(y), "dtype:", getattr(y, "dtype", None)) # # print("y example:", y[:10]) # # load seed # with open(EXPR_SEED_PATH) as f: # seeds = json.load(f) # # evaluate seeds first # for s in seeds: # score, fvals, top_idx = evaluate_expression(s['expression'], X_df, y, TOP_K, CV_FOLDS) # add_expr(conn, s['expression'], score, s.get('explanation',''), str(s.get('complexity',''))) # # init generator # gen = GeneratorAgent(MODEL_NAME) # # iterative loop # for it in range(ITERATIONS): # print("Iteration", it+1) # refs = top_exprs(conn, k=TOP_K) # # build prompt_text with refs + feature stats # # prompt = "Given top expressions: " + str(refs) + "\nGenerate expressions in format: Expression: ... Rationale: ..." # top_expressions = [] # List[(expr, score)] # top_expressions.append((refs, score)) # top_expressions = sorted( # top_expressions, # key=lambda x: -x[1] # )[:5] # new_text = gen.generate_candidates(top_expressions) # for out in new_text: # # extract Expression line # expr_line = None # for line in out.splitlines(): # if line.strip().lower().startswith("expression"): # expr_line = line.split(":",1)[1].strip() # break # if not expr_line: expr_line = out.strip() # score, fvals, top_idx = evaluate_expression(expr_line, X_df, y, TOP_K, CV_FOLDS) # add_expr(conn, expr_line, score, out, "") # print(f"Candidate {expr_line} -> score {score:.4f}") # results = [] # for expr in EXPRESSIONS: # exec_out = executor.run(expr, X, y) # analysis = analyzer.analyze(expr, exec_out["cv_score"]) # results.append({ # "expression": expr, # "score": exec_out["cv_score"], # "analysis": analysis # }) # ranking = judge.rank(results) #-----------------------------------------------------------------------2.0--------------- # from agents.analyzer_agent import AnalyzerAgent # MODEL_PATH = "/data1/fangsensen/deepseek-math-7b-rl" # agent = AnalyzerAgent( # name="AnalyzerAgent", # model_path=MODEL_PATH # ) # expressions = [ # "I(X;Y)", # "I(X;Y|Z)", # "I(X;Y) - I(X;Z)", # "I(X;Y|Z) - I(X;Y)", # "I(X;Y;Z)" # ] # # expressions = [ # # "I(X;Y|Z) - I(X;Y)", # # ] # for expr in expressions: # print("=" * 80) # result = agent.analyze_expression(expr) # print(result) #-----------------------------------------------------------------------路由--------------- import numpy as np from agents.router_agent import FSRouterAgent import scipy.io as scio import pandas as pd from sklearn.preprocessing import LabelEncoder def load_mat_dataset( file_path, feature_keys=("X", "data", "fea"), label_keys=("Y", "y", "label"), ): """ 通用 .mat 数据集读取函数(FSExecutor / Agent 兼容) Parameters ---------- file_path : str .mat 文件路径 feature_keys : tuple 特征矩阵可能的 key label_keys : tuple 标签可能的 key Returns ------- X : np.ndarray, shape (n_samples, n_features) y : np.ndarray, shape (n_samples,) meta : dict 元信息(类别数、样本数等) """ data = scio.loadmat(file_path) # ---------- 1. 读取 X ---------- X = None for key in feature_keys: if key in data: X = data[key] break if X is None: raise KeyError(f"Cannot find feature matrix in {file_path}") X = np.asarray(X) if X.dtype == object: X = np.array( [[float(v[0]) if isinstance(v, (list, np.ndarray)) else float(v) for v in row] for row in X] ) else: X = X.astype(float) # ---------- 2. 读取 y ---------- y = None for key in label_keys: if key in data: y = data[key] break if y is None: raise KeyError(f"Cannot find label vector in {file_path}") # y 常见是 (n,1) y = np.asarray(y).reshape(-1) # ---------- 3. 标签清洗 & 编码 ---------- # 处理 object / string / 混合类型 if y.dtype == object: y = pd.Series(y).apply(lambda x: x[0] if isinstance(x, (list, np.ndarray)) else x) label_encoder = LabelEncoder() y = label_encoder.fit_transform(y) # ---------- 4. 元信息 ---------- meta = { "n_samples": X.shape[0], "n_features": X.shape[1], "n_classes": len(np.unique(y)), "classes": np.unique(y), "label_encoder": label_encoder, } return X, y, meta base_url = "/home/fangsensen/AutoFS/data/" datanames = ['dna','Factors','madelon','Movement_libras','Musk1','spambase','splice','Synthetic_control', 'Waveform','Wdbc',] # dataname = 'Authorship' def main(dataname): X, y, meta = load_mat_dataset( base_url + dataname + ".mat" ) # X = data.data # y = data.target # task = { "X": X, "y": y, "algorithms": ["JMIM","CFR","DCSF","IWFS","MRI","MRMD","UCRFS","CSMDCCMR",], "n_selected_features": 5, "class_specific": False, "classifiers": ["nb", "svm", "rf"], "cv": 10, "random_state": 19, "params":{"n_selected_features":15,}, "dataname":dataname, } router = FSRouterAgent() leaderboard = router.run(task) for rank, res in enumerate(leaderboard, 1): print(f"Rank {rank}: {res}") return leaderboard if __name__ == "__main__": for dataname in datanames: main(dataname) # {'selected_features': [59, 50, 56, 4, 38, 9, 29, 23, 0, 20, 34, 36, 24, 26, 28], # 'num_features': 15, # 'metrics': {'nb': {'f1': 0.9181133571145461, 'auc': 0.9807805770573524}, # 'svm': {'f1': 0.9282600079270711, 'auc': 0.980695564275392}, # 'rf': {'f1': 0.9219976218787156, 'auc': 0.9768411621948705}}, # 'time': 7.378173112869263, # 'algorithm': 'JMIM'}, # {'selected_features': [59, 50, 56, 4, 38, 0, 9, 29, 23, 20, 36, 34, 24, 28, 26], # 'num_features': 15, # 'metrics': {'nb': {'f1': 0.9163694015061433, 'auc': 0.9805189493459717}, # 'svm': {'f1': 0.9265953230281413, 'auc': 0.98064247666047}, # 'rf': {'f1': 0.9189853349187476, 'auc': 0.9769441217042379}}, # 'time': 2.0774385929107666, # 'algorithm': 'CFR'}