Fancy-yousa's picture
Upload 78 files
b5567db verified
# import json, datetime
# from config import *
# import pandas as pd
# import scipy.io as scio
# from modules.expression_pool import init_db, add_expr, top_exprs
# from agents.generator_agent import GeneratorAgent
# from agents.evaluator_agent import evaluate_expression
# from modules.utils import load_mat_as_numeric
# # prepare
# conn = init_db(DB_PATH)
# # df = pd.read_csv(DATASET_PATH)
# # df = scio.loadmat(DATASET_PATH) # 读取数据文件
# X, y = load_mat_as_numeric(DATASET_PATH)
# X_df = pd.DataFrame(X)
# # print(df)
# # X_df = pd.DataFrame(df['X']) # 读取训练数据
# # print(df['Y'])
# # y0 = pd.DataFrame(df['Y']) # 读取标签
# # X_df = df.drop(columns=['label'])
# # y = y0.values
# # print("y type:", type(y), "dtype:", getattr(y, "dtype", None))
# # print("y example:", y[:10])
# # load seed
# with open(EXPR_SEED_PATH) as f:
# seeds = json.load(f)
# # evaluate seeds first
# for s in seeds:
# score, fvals, top_idx = evaluate_expression(s['expression'], X_df, y, TOP_K, CV_FOLDS)
# add_expr(conn, s['expression'], score, s.get('explanation',''), str(s.get('complexity','')))
# # init generator
# gen = GeneratorAgent(MODEL_NAME)
# # iterative loop
# for it in range(ITERATIONS):
# print("Iteration", it+1)
# refs = top_exprs(conn, k=TOP_K)
# # build prompt_text with refs + feature stats
# # prompt = "Given top expressions: " + str(refs) + "\nGenerate expressions in format: Expression: ... Rationale: ..."
# top_expressions = [] # List[(expr, score)]
# top_expressions.append((refs, score))
# top_expressions = sorted(
# top_expressions,
# key=lambda x: -x[1]
# )[:5]
# new_text = gen.generate_candidates(top_expressions)
# for out in new_text:
# # extract Expression line
# expr_line = None
# for line in out.splitlines():
# if line.strip().lower().startswith("expression"):
# expr_line = line.split(":",1)[1].strip()
# break
# if not expr_line: expr_line = out.strip()
# score, fvals, top_idx = evaluate_expression(expr_line, X_df, y, TOP_K, CV_FOLDS)
# add_expr(conn, expr_line, score, out, "")
# print(f"Candidate {expr_line} -> score {score:.4f}")
# results = []
# for expr in EXPRESSIONS:
# exec_out = executor.run(expr, X, y)
# analysis = analyzer.analyze(expr, exec_out["cv_score"])
# results.append({
# "expression": expr,
# "score": exec_out["cv_score"],
# "analysis": analysis
# })
# ranking = judge.rank(results)
#-----------------------------------------------------------------------2.0---------------
# from agents.analyzer_agent import AnalyzerAgent
# MODEL_PATH = "/data1/fangsensen/deepseek-math-7b-rl"
# agent = AnalyzerAgent(
# name="AnalyzerAgent",
# model_path=MODEL_PATH
# )
# expressions = [
# "I(X;Y)",
# "I(X;Y|Z)",
# "I(X;Y) - I(X;Z)",
# "I(X;Y|Z) - I(X;Y)",
# "I(X;Y;Z)"
# ]
# # expressions = [
# # "I(X;Y|Z) - I(X;Y)",
# # ]
# for expr in expressions:
# print("=" * 80)
# result = agent.analyze_expression(expr)
# print(result)
#-----------------------------------------------------------------------路由---------------
import numpy as np
from agents.router_agent import FSRouterAgent
import scipy.io as scio
import pandas as pd
from sklearn.preprocessing import LabelEncoder
def load_mat_dataset(
file_path,
feature_keys=("X", "data", "fea"),
label_keys=("Y", "y", "label"),
):
"""
通用 .mat 数据集读取函数(FSExecutor / Agent 兼容)
Parameters
----------
file_path : str
.mat 文件路径
feature_keys : tuple
特征矩阵可能的 key
label_keys : tuple
标签可能的 key
Returns
-------
X : np.ndarray, shape (n_samples, n_features)
y : np.ndarray, shape (n_samples,)
meta : dict
元信息(类别数、样本数等)
"""
data = scio.loadmat(file_path)
# ---------- 1. 读取 X ----------
X = None
for key in feature_keys:
if key in data:
X = data[key]
break
if X is None:
raise KeyError(f"Cannot find feature matrix in {file_path}")
X = np.asarray(X)
if X.dtype == object:
X = np.array(
[[float(v[0]) if isinstance(v, (list, np.ndarray)) else float(v)
for v in row]
for row in X]
)
else:
X = X.astype(float)
# ---------- 2. 读取 y ----------
y = None
for key in label_keys:
if key in data:
y = data[key]
break
if y is None:
raise KeyError(f"Cannot find label vector in {file_path}")
# y 常见是 (n,1)
y = np.asarray(y).reshape(-1)
# ---------- 3. 标签清洗 & 编码 ----------
# 处理 object / string / 混合类型
if y.dtype == object:
y = pd.Series(y).apply(lambda x: x[0] if isinstance(x, (list, np.ndarray)) else x)
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)
# ---------- 4. 元信息 ----------
meta = {
"n_samples": X.shape[0],
"n_features": X.shape[1],
"n_classes": len(np.unique(y)),
"classes": np.unique(y),
"label_encoder": label_encoder,
}
return X, y, meta
base_url = "/home/fangsensen/AutoFS/data/"
datanames = ['dna','Factors','madelon','Movement_libras','Musk1','spambase','splice','Synthetic_control', 'Waveform','Wdbc',]
# dataname = 'Authorship'
def main(dataname):
X, y, meta = load_mat_dataset(
base_url + dataname + ".mat"
)
# X = data.data
# y = data.target
#
task = {
"X": X,
"y": y,
"algorithms": ["JMIM","CFR","DCSF","IWFS","MRI","MRMD","UCRFS","CSMDCCMR",],
"n_selected_features": 5,
"class_specific": False,
"classifiers": ["nb", "svm", "rf"],
"cv": 10,
"random_state": 19,
"params":{"n_selected_features":15,},
"dataname":dataname,
}
router = FSRouterAgent()
leaderboard = router.run(task)
for rank, res in enumerate(leaderboard, 1):
print(f"Rank {rank}: {res}")
return leaderboard
if __name__ == "__main__":
for dataname in datanames:
main(dataname)
# {'selected_features': [59, 50, 56, 4, 38, 9, 29, 23, 0, 20, 34, 36, 24, 26, 28],
# 'num_features': 15,
# 'metrics': {'nb': {'f1': 0.9181133571145461, 'auc': 0.9807805770573524},
# 'svm': {'f1': 0.9282600079270711, 'auc': 0.980695564275392},
# 'rf': {'f1': 0.9219976218787156, 'auc': 0.9768411621948705}},
# 'time': 7.378173112869263,
# 'algorithm': 'JMIM'},
# {'selected_features': [59, 50, 56, 4, 38, 0, 9, 29, 23, 20, 36, 34, 24, 28, 26],
# 'num_features': 15,
# 'metrics': {'nb': {'f1': 0.9163694015061433, 'auc': 0.9805189493459717},
# 'svm': {'f1': 0.9265953230281413, 'auc': 0.98064247666047},
# 'rf': {'f1': 0.9189853349187476, 'auc': 0.9769441217042379}},
# 'time': 2.0774385929107666,
# 'algorithm': 'CFR'}