Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/nexusidentity/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
Release History
===============

1.0.0b6
+++++++
* Removed sdk dependency using httpclient to resolve the long path issue

1.0.0b5
+++++++
* Adding support for older algorithm ssh keys
Expand Down
142 changes: 78 additions & 64 deletions src/nexusidentity/azext_nexusidentity/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,9 @@ def generate_nexus_identity_keys(algorithm=None):

import os
import subprocess
import asyncio
import sys

from azure.identity import AzureCliCredential
from msgraph import GraphServiceClient
from msgraph.generated.models.open_type_extension import OpenTypeExtension
from msgraph.generated.models.extension import Extension
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
import requests
import json

# Generate SSH key
if sys.platform.startswith("win") or sys.platform.startswith("linux"):
Expand Down Expand Up @@ -84,66 +78,86 @@ def generate_nexus_identity_keys(algorithm=None):
raise CLIError(f"Unexpected error reading public key: {e}") from e

try:
credential = AzureCliCredential()
scopes = ["https://graph.microsoft.com//.default"]
graph_client = GraphServiceClient(credentials=credential, scopes=scopes)

except ClientAuthenticationError as e:
logger.error("Authentication failed: %s", e)
raise CLIError(f"Authentication failed: {e}") from e
# Get access token using Azure CLI
if sys.platform.startswith("win"):
az_cmd = "az account get-access-token --resource https://graph.microsoft.com --output json"
token_result = subprocess.run(
az_cmd,
capture_output=True,
text=True,
check=True,
shell=True,
)
else:
token_result = subprocess.run(
["az", "account", "get-access-token", "--resource", "https://graph.microsoft.com", "--output", "json"],
capture_output=True,
text=True,
check=True,
)
token_json = json.loads(token_result.stdout)
access_token = token_json["accessToken"]
except Exception as e:
logger.error("An unexpected error occurred: %s", e)
raise CLIError(f"An unexpected error occurred: {e}") from e

async def me():
extension_id = "com.nexusidentity.keys"

# Get user object
user = await graph_client.me.get()
print("Exception to fetch bearer token:", e)
logger.error("Failed to obtain access token: %s", e)
raise CLIError(f"Failed to obtain access token: {e}") from e

# Get extensions associated with the user
extensions = await graph_client.me.extensions.get()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}

extension_exists = any(
extension.id == extension_id for extension in extensions.value
)
extension_id = "com.nexusidentity.keys"
graph_base = "https://graph.microsoft.com/v1.0"

try:
# Update or create extension
if extension_exists:
request_body = Extension(
odata_type="microsoft.graph.openTypeExtension",
additional_data={
"extension_name": extension_id,
"publicKey": public_key,
},
)
await graph_client.me.extensions.by_extension_id(
extension_id
).patch(request_body)

print(
f"Successfully updated public key to Microsoft Entra Id account {user.mail}"
)
else:
request_body = OpenTypeExtension(
odata_type="microsoft.graph.openTypeExtension",
extension_name=extension_id,
additional_data={"publicKey": public_key},
)
await graph_client.me.extensions.post(request_body)

print(
f"Successfully uploaded public key to Microsoft Entra Id account {user.mail}"
)
except ODataError as e:
logger.error("Error updating extension: %s", e)
raise CLIError(f"Error updating extension: {e}") from e
except HttpResponseError as e:
logger.error("Failed to update or create extension: %s", e)
raise CLIError(f"Failed to update or create extension: {e}") from e

asyncio.run(me())
try:
# Get user info
user_resp = requests.get(f"{graph_base}/me", headers=headers)
user_resp.raise_for_status()
user = user_resp.json()
user_mail = user.get("mail") or user.get("userPrincipalName")

# Get extensions
ext_resp = requests.get(f"{graph_base}/me/extensions", headers=headers)
ext_resp.raise_for_status()
extensions = ext_resp.json().get("value", [])
extension_exists = any(ext.get("id") == extension_id for ext in extensions)

if extension_exists:
# Update extension
patch_body = {
"@odata.type": "microsoft.graph.openTypeExtension",
"extensionName": extension_id,
"publicKey": public_key
}
patch_resp = requests.patch(
f"{graph_base}/me/extensions/{extension_id}",
headers=headers,
data=json.dumps(patch_body)
)
patch_resp.raise_for_status()
print(f"Successfully updated public key to Microsoft Entra Id account {user_mail}")
else:
# Create extension
post_body = {
"@odata.type": "microsoft.graph.openTypeExtension",
"extensionName": extension_id,
"publicKey": public_key
}
post_resp = requests.post(
f"{graph_base}/me/extensions",
headers=headers,
data=json.dumps(post_body)
)
post_resp.raise_for_status()
print(f"Successfully uploaded public key to Microsoft Entra Id account {user_mail}")

except requests.HTTPError as e:
logger.error("HTTP error: %s", e)
raise CLIError(f"HTTP error: {e}") from e
except Exception as e:
logger.error("Unexpected error: %s", e)
raise CLIError(f"Unexpected error: {e}") from e
else:
logger.warning(
"This command is currently supported only on Windows and linux platforms"
Expand Down
2 changes: 1 addition & 1 deletion src/nexusidentity/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

# TODO: Confirm this is the right version number you want and it matches your
# HISTORY.rst entry.
VERSION = '1.0.0b5'
VERSION = '1.0.0b6'

# The full list of classifiers is available at
# https://pypi.python.org/pypi?%3Aaction=list_classifiers
Expand Down
Loading