Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 91 additions & 2 deletions pydatalab/src/pydatalab/routes/v0_1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@
from pydatalab.mongo import flask_mongo
from pydatalab.permissions import admin_only, get_default_permissions


def check_manager_cycle(user_id_str, new_manager_id_str):
visited = set()
current_id = new_manager_id_str

while current_id is not None:
if current_id == user_id_str:
return True
if current_id in visited:
break
visited.add(current_id)

try:
manager = flask_mongo.db.users.find_one({"_id": ObjectId(current_id)})
if manager and "managers" in manager and manager["managers"]:
if isinstance(manager["managers"], list) and len(manager["managers"]) > 0:
current_id = manager["managers"][0]
else:
break
else:
break
except Exception:
break

return False


ADMIN = Blueprint("admins", __name__)


Expand Down Expand Up @@ -35,13 +62,22 @@ def get_users():
"then": "user",
"else": {"$arrayElemAt": ["$role.role", 0]},
}
}
},
"immutable_id": {"$ifNull": ["$immutable_id", {"$toString": "$_id"}]},
}
},
]
)

return jsonify({"status": "success", "data": list(users)})
users_list = list(users)

for user in users_list:
if "managers" not in user:
user["managers"] = []
elif not isinstance(user["managers"], list):
user["managers"] = []

return jsonify({"status": "success", "data": users_list})


@ADMIN.route("/roles/<user_id>", methods=["PATCH"])
Expand Down Expand Up @@ -93,3 +129,56 @@ def save_role(user_id):
)

return (jsonify({"status": "success"}), 200)


@ADMIN.route("/users/<user_id>/managers", methods=["PATCH"])
def update_user_managers(user_id):
"""Update the managers for a specific user using ObjectIds"""

request_json = request.get_json()

if request_json is None or "managers" not in request_json:
return jsonify({"status": "error", "message": "Managers list not provided"}), 400

managers = request_json["managers"]

if not isinstance(managers, list):
return jsonify({"status": "error", "message": "Managers must be a list"}), 400

existing_user = flask_mongo.db.users.find_one({"_id": ObjectId(user_id)})
if not existing_user:
return jsonify({"status": "error", "message": "User not found"}), 404

manager_object_ids = []
for manager_id in managers:
if manager_id:
try:
manager_oid = ObjectId(manager_id)
except Exception:
return jsonify(
{"status": "error", "message": f"Invalid manager ID format: {manager_id}"}
), 400

if not flask_mongo.db.users.find_one({"_id": manager_oid}):
return jsonify(
{"status": "error", "message": f"Manager with ID {manager_id} not found"}
), 404

if check_manager_cycle(user_id, manager_id):
return jsonify(
{
"status": "error",
"message": "Cannot assign manager: would create a circular management hierarchy",
}
), 400

manager_object_ids.append(str(manager_oid))

update_result = flask_mongo.db.users.update_one(
{"_id": ObjectId(user_id)}, {"$set": {"managers": manager_object_ids}}
)

if update_result.matched_count != 1:
return jsonify({"status": "error", "message": "Unable to update user managers"}), 400

return jsonify({"status": "success"}), 200
202 changes: 202 additions & 0 deletions pydatalab/tests/server/test_manager_assignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
from bson import ObjectId


def test_assign_manager_success(admin_client, real_mongo_client):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

user_2 = db.users.insert_one({"display_name": "User 2"})
user_2_id = str(user_2.inserted_id)

db.roles.insert_one({"_id": user_1.inserted_id, "role": "manager"})

response = admin_client.patch(f"/users/{user_2_id}/managers", json={"managers": [user_1_id]})

assert response.status_code == 200

updated_user = db.users.find_one({"_id": user_2.inserted_id})
assert user_1_id in updated_user["managers"]

db.users.delete_many({"_id": {"$in": [user_1.inserted_id, user_2.inserted_id]}})
db.roles.delete_one({"_id": user_1.inserted_id})


def test_assign_manager_requires_admin(client, real_mongo_client):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

user_2 = db.users.insert_one({"display_name": "User 2"})
user_2_id = str(user_2.inserted_id)

response = client.patch(f"/users/{user_2_id}/managers", json={"managers": [user_1_id]})

assert response.status_code == 403

db.users.delete_many({"_id": {"$in": [user_1.inserted_id, user_2.inserted_id]}})


def test_assign_manager_prevents_direct_cycle(admin_client, real_mongo_client):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

user_2 = db.users.insert_one({"display_name": "User 2"})
user_2_id = str(user_2.inserted_id)

db.roles.insert_one({"_id": user_1.inserted_id, "role": "manager"})
db.roles.insert_one({"_id": user_2.inserted_id, "role": "manager"})

response = admin_client.patch(f"/users/{user_2_id}/managers", json={"managers": [user_1_id]})
assert response.status_code == 200

response = admin_client.patch(f"/users/{user_1_id}/managers", json={"managers": [user_2_id]})

assert response.status_code == 400
assert "circular" in response.json["message"].lower()

db.users.delete_many({"_id": {"$in": [user_1.inserted_id, user_2.inserted_id]}})
db.roles.delete_many({"_id": {"$in": [user_1.inserted_id, user_2.inserted_id]}})


def test_assign_manager_prevents_deep_cycle(admin_client, real_mongo_client):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

user_2 = db.users.insert_one({"display_name": "User 2"})
user_2_id = str(user_2.inserted_id)

user_3 = db.users.insert_one({"display_name": "User 3"})
user_3_id = str(user_3.inserted_id)

for uid in [user_1.inserted_id, user_2.inserted_id, user_3.inserted_id]:
db.roles.insert_one({"_id": uid, "role": "manager"})

admin_client.patch(f"/users/{user_2_id}/managers", json={"managers": [user_1_id]})
admin_client.patch(f"/users/{user_3_id}/managers", json={"managers": [user_2_id]})

response = admin_client.patch(f"/users/{user_1_id}/managers", json={"managers": [user_3_id]})

assert response.status_code == 400
assert "circular" in response.json["message"].lower()

db.users.delete_many(
{"_id": {"$in": [user_1.inserted_id, user_2.inserted_id, user_3.inserted_id]}}
)
db.roles.delete_many(
{"_id": {"$in": [user_1.inserted_id, user_2.inserted_id, user_3.inserted_id]}}
)


def test_assign_manager_allows_hierarchy(admin_client, real_mongo_client):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

user_2 = db.users.insert_one({"display_name": "User 2"})
user_2_id = str(user_2.inserted_id)

user_3 = db.users.insert_one({"display_name": "User 3"})
user_3_id = str(user_3.inserted_id)

for uid in [user_1.inserted_id, user_2.inserted_id]:
db.roles.insert_one({"_id": uid, "role": "manager"})

response = admin_client.patch(f"/users/{user_2_id}/managers", json={"managers": [user_1_id]})
assert response.status_code == 200

response = admin_client.patch(f"/users/{user_3_id}/managers", json={"managers": [user_2_id]})
assert response.status_code == 200

db.users.delete_many(
{"_id": {"$in": [user_1.inserted_id, user_2.inserted_id, user_3.inserted_id]}}
)
db.roles.delete_many({"_id": {"$in": [user_1.inserted_id, user_2.inserted_id]}})


def test_remove_manager_assignment(admin_client, real_mongo_client):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

user_2 = db.users.insert_one({"display_name": "User 2", "managers": [user_1_id]})
user_2_id = str(user_2.inserted_id)

db.roles.insert_one({"_id": user_1.inserted_id, "role": "manager"})

response = admin_client.patch(f"/users/{user_2_id}/managers", json={"managers": []})

assert response.status_code == 200

updated_user = db.users.find_one({"_id": user_2.inserted_id})
assert updated_user["managers"] == []

db.users.delete_many({"_id": {"$in": [user_1.inserted_id, user_2.inserted_id]}})
db.roles.delete_one({"_id": user_1.inserted_id})


def test_assign_nonexistent_manager(admin_client, real_mongo_client):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

fake_id = str(ObjectId())

response = admin_client.patch(f"/users/{user_1_id}/managers", json={"managers": [fake_id]})

assert response.status_code == 404

db.users.delete_one({"_id": user_1.inserted_id})


def test_assign_manager_invalid_format(admin_client, real_mongo_client):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

response = admin_client.patch(
f"/users/{user_1_id}/managers", json={"managers": ["not-a-valid-id"]}
)

assert response.status_code == 400

db.users.delete_one({"_id": user_1.inserted_id})


def test_manager_can_see_managed_user_items(admin_client, real_mongo_client, user_id):
db = real_mongo_client.get_database()

user_1 = db.users.insert_one({"display_name": "User 1"})
user_1_id = str(user_1.inserted_id)

db.roles.insert_one({"_id": user_1.inserted_id, "role": "manager"})

response = admin_client.patch(f"/users/{str(user_id)}/managers", json={"managers": [user_1_id]})
assert response.status_code == 200

response = admin_client.post(
"/new-sample/", json={"item_id": "test-managed-sample", "type": "samples"}
)
assert response.status_code == 201

response = admin_client.get("/get-item-data/test-managed-sample")
assert response.status_code == 200
refcode = response.json["item_data"]["refcode"]

response = admin_client.patch(
f"/items/{refcode}/permissions", json={"creators": [{"immutable_id": str(user_id)}]}
)
assert response.status_code == 200

db.users.delete_one({"_id": user_1.inserted_id})
db.roles.delete_one({"_id": user_1.inserted_id})
Loading