Spaces:
Sleeping
Sleeping
| from pymongo import MongoClient | |
| from datetime import datetime | |
| import pytz | |
| from prefect_utils import trigger_maintenance_request | |
| from prefect import task | |
| import pandas as pd | |
| import os | |
| import asyncio | |
| MONGODB_PASSWORD = os.getenv("MONGODB_PASSWORD") | |
| blinded_connection_string = os.getenv("blinded_connection_string") | |
| connection_string = blinded_connection_string.replace("<db_password>", MONGODB_PASSWORD) | |
| def generate_empty_well(): | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["wells"] | |
| rows = ['B', 'C', 'D', 'E', 'F', 'G', 'H'] | |
| columns = [str(i) for i in range(1, 13)] | |
| for row in rows: | |
| for col in columns: | |
| well = f"{row}{col}" | |
| metadata = { | |
| "well": well, | |
| "status": "empty", | |
| "project": "OT2" | |
| } | |
| #send_data_to_mongodb(collection="wells", data=metadata) | |
| query = {"well": well} | |
| update_data = {"$set": metadata} | |
| result = collection.update_one(query, update_data, upsert=True) | |
| # close connection | |
| dbclient.close() | |
| def update_used_wells(used_wells): | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["wells"] | |
| for well in used_wells: | |
| metadata = { | |
| "well": well, | |
| "status": "used", | |
| "project": "OT2" | |
| } | |
| #send_data_to_mongodb(collection="wells", data=metadata) | |
| query = {"well": well} | |
| update_data = {"$set": metadata} | |
| result = collection.update_one(query, update_data, upsert=True) | |
| # close connection | |
| dbclient.close() | |
| def find_unused_wells(): | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["wells"] | |
| query = {"status": "empty"} | |
| response = list(collection.find(query)) | |
| df = pd.DataFrame(response) | |
| # Extract the "well" column as a list | |
| #if "well" not in df.columns: | |
| # raise ValueError("No available wells.") | |
| # Sort obtained list | |
| def well_sort_key(well): | |
| row = well[0] # A, B, C, ... | |
| col = int(well[1:]) # 1, 2, ..., 12 | |
| return (row, col) | |
| if "well" in df.columns: | |
| empty_wells = sorted(df["well"].tolist(), key=well_sort_key) | |
| else: | |
| empty_wells = [] | |
| if len(empty_wells) == 1: | |
| maintenance_type = "wellplate_maintenance" | |
| if check_maintenance_status(maintenance_type) == 0: | |
| asyncio.run(trigger_maintenance_request(maintenance_type)) | |
| set_maintenance_status(maintenance_type,1) | |
| else: | |
| print("[DEBUG] a maintenance request has been sent") | |
| dbclient.close() | |
| # Check if there are any empty wells | |
| #if len(empty_wells) == 0: | |
| # raise ValueError("No empty wells found") | |
| #print(empty_wells) | |
| return empty_wells | |
| def save_result(result_data): | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["result"] | |
| #collection = db["test_result"] | |
| result_data["timestamp"] = datetime.now(pytz.timezone("America/Toronto")) # UTC time | |
| insert_result = collection.insert_one(result_data) | |
| inserted_id = insert_result.inserted_id | |
| # close connection | |
| dbclient.close() | |
| return inserted_id | |
| def get_student_quota(student_id): | |
| with MongoClient(connection_string) as client: | |
| db = client["LCM-OT-2-SLD"] | |
| collection = db["student"] | |
| student = collection.find_one({"student_id": student_id}) | |
| if student is None: | |
| raise ValueError(f"Student ID '{student_id}' not found in the database.") | |
| return student.get("quota", 0) | |
| def decrement_student_quota(student_id): | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["student"] | |
| student = collection.find_one({"student_id": student_id}) | |
| if not student: | |
| return f"Student ID {student_id} not found." | |
| if student.get("quota", 0) <= 0: | |
| return f"Student ID {student_id} has no remaining quota." | |
| result = collection.update_one( | |
| {"student_id": student_id, "quota": {"$gt": 0}}, | |
| {"$inc": {"quota": -1}} | |
| ) | |
| if result.modified_count > 0: | |
| return f"Student ID {student_id}'s quota update successfully." | |
| else: | |
| return f"Quota update failed for Student ID {student_id}." | |
| def add_student_quota(student_id, quota): | |
| """ | |
| Adds a new student with a given quota. | |
| :param student_id: The ID of the student. | |
| :param quota: The initial quota for the student. | |
| """ | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["student"] | |
| student_data = {"student_id": student_id, "quota": quota} | |
| collection.update_one({"student_id": student_id}, {"$set": student_data}, upsert=True) | |
| dbclient.close() | |
| def check_maintenance_status(maintenance_type): | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["HITL_status"] | |
| status_doc = collection.find_one({"name": maintenance_type}) | |
| dbclient.close() | |
| if status_doc is None: | |
| return 0 | |
| return status_doc.get("flag", 0) | |
| def set_maintenance_status(maintenance_type,new_status): | |
| #flag == 1, request sent; flag == 0, no request undergoing | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["HITL_status"] | |
| collection.update_one( | |
| {"name": maintenance_type}, | |
| {"$set": {"flag": new_status, | |
| "timestamp": datetime.now(pytz.timezone("America/Toronto"))} | |
| }, | |
| upsert=True | |
| ) | |
| dbclient.close() | |
| def insert_maintenance_log(maintenance_type, operator="Unknown"): | |
| dbclient = MongoClient(connection_string) | |
| db = dbclient["LCM-OT-2-SLD"] | |
| collection = db["maintenance_logs"] | |
| log_data = { | |
| "maintenance_type": maintenance_type, | |
| "timestamp": datetime.now(pytz.timezone("America/Toronto")), | |
| "operator": operator | |
| } | |
| collection.insert_one(log_data) | |
| dbclient.close() | |
| if __name__ == "__main__": | |
| #generate_empty_well() | |
| find_unused_wells() | |