Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
repos:
- repo: https://github.com/python/black
rev: 24.4.2
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 25.11.0
hooks:
- id: black
- repo: https://github.com/pre-commit/mirrors-isort
rev: v5.10.1
- repo: https://github.com/PyCQA/isort
rev: 7.0.0
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/pycqa/flake8
rev: 7.1.0
rev: 7.3.0
hooks:
- id: flake8
- repo: https://github.com/homebysix/pre-commit-macadmin
rev: v1.16.2
rev: v1.22.0
hooks:
- id: check-plists
- id: check-autopkg-recipes
Expand Down
8 changes: 2 additions & 6 deletions WorkSpaceOneImporter/research/PUTassignmentRuleExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@
"description": "Text value",
"smart_groups": ["c687ab4d-6c3b-485b-b30b-f47ff1132a4d"],
"bsp_assignments": {
"smart_groups_online_licenses": [
"3915d8a1-4e0c-4716-b1a5-6d7d391c020b"
],
"smart_groups_offline_licenses": [
"6bf7ab67-7065-4028-bef7-ee8d987a7c29"
],
"smart_groups_online_licenses": ["3915d8a1-4e0c-4716-b1a5-6d7d391c020b"],
"smart_groups_offline_licenses": ["6bf7ab67-7065-4028-bef7-ee8d987a7c29"],
},
"vpp_app_details": {
"license_usage": [
Expand Down
66 changes: 16 additions & 50 deletions WorkSpaceOneImporter/research/oauth_renew_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@


def get_timestamp():
timestamp = (datetime.now().astimezone() + timedelta(milliseconds=500)).replace(
microsecond=0
)
timestamp = (datetime.now().astimezone() + timedelta(milliseconds=500)).replace(microsecond=0)
return timestamp


Expand Down Expand Up @@ -76,38 +74,28 @@ def main():
ws1_oauth_token_url = os.environ["AUTOPKG_ws1_oauth_token_url"]
print(f"Found env var AUTOPKG_ws1_oauth_token_url: {ws1_oauth_token_url}")
else:
print(
"Did not find environment variable AUTOPKG_ws1_oauth_token_url - aborting!"
)
print("Did not find environment variable AUTOPKG_ws1_oauth_token_url - aborting!")
exit(code=1)

if "AUTOPKG_ws1_oauth_client_id" in os.environ:
ws1_oauth_client_id = os.environ["AUTOPKG_ws1_oauth_client_id"]
print(f"Found env var AUTOPKG_ws1_oauth_client_id: {ws1_oauth_client_id}")
else:
print(
"Did not find environment variable AUTOPKG_ws1_oauth_client_id - aborting!"
)
print("Did not find environment variable AUTOPKG_ws1_oauth_client_id - aborting!")
exit(code=1)

if "AUTOPKG_ws1_oauth_client_secret" in os.environ:
ws1_oauth_client_secret = os.environ["AUTOPKG_ws1_oauth_client_secret"]
print(
f"Found env var AUTOPKG_ws1_oauth_client_secret: {ws1_oauth_client_secret}"
)
print(f"Found env var AUTOPKG_ws1_oauth_client_secret: {ws1_oauth_client_secret}")
else:
print(
"Did not find environment variable AUTOPKG_ws1_oauth_client_secret - aborting!"
)
print("Did not find environment variable AUTOPKG_ws1_oauth_client_secret - aborting!")
exit(code=1)

# oauth2 token is to be renewed when a specified percentage of the expiry time is left
if "AUTOPKG_ws1_oauth_renew_margin" in os.environ:
try:
ws1_oauth_renew_margin = float(os.environ["AUTOPKG_ws1_oauth_renew_margin"])
print(
f"Found env var AUTOPKG_ws1_oauth_renew_margin: {ws1_oauth_renew_margin}"
)
print(f"Found env var AUTOPKG_ws1_oauth_renew_margin: {ws1_oauth_renew_margin}")
except ValueError:
print(
"Found env var AUTOPKG_ws1_oauth_renew_margin is NOT a float: "
Expand Down Expand Up @@ -164,21 +152,13 @@ def main():
ws1_oauth_keychain, service, "oauth_token_renew_timestamp"
)
if oauth_token_renew_timestamp_str is not None:
oauth_token_renew_timestamp = datetime.fromisoformat(
oauth_token_renew_timestamp_str
)
print(
f"Retrieved timestamp to renew existing token from keychain: {oauth_token_renew_timestamp.isoformat()}"
)
oauth_token_renew_timestamp = datetime.fromisoformat(oauth_token_renew_timestamp_str)
print(f"Retrieved timestamp to renew existing token from keychain: {oauth_token_renew_timestamp.isoformat()}")
else:
oauth_token_renew_timestamp = None
while datetime.now().astimezone() < time_stop:
timestamp = get_timestamp()
if (
oauth_token is None
or oauth_token_renew_timestamp is None
or timestamp >= oauth_token_renew_timestamp
):
if oauth_token is None or oauth_token_renew_timestamp is None or timestamp >= oauth_token_renew_timestamp:
# the Oauth renewal API body payload
payload = {
"grant_type": "client_credentials",
Expand Down Expand Up @@ -209,33 +189,21 @@ def main():
if response.status_code == 200:
# get timestamp, round to nearest whole second
oauth_token_issued_timestamp = get_timestamp()
print(
f"Oauth2 token issued at: {oauth_token_issued_timestamp.isoformat()}"
)
print(f"Oauth2 token issued at: {oauth_token_issued_timestamp.isoformat()}")

result = response.json()
oauth_token = result["access_token"]
print(f"Access token: {oauth_token}")
print(f"Access token length: {len(result['access_token'])}")
print(f"Access token validity is {result['expires_in']} seconds")

renew_threshold = round(
result["expires_in"] * (100 - ws1_oauth_renew_margin) / 100
)
print(
f"Access token threshold for renewal set to {renew_threshold} seconds"
)
oauth_token_renew_timestamp = oauth_token_issued_timestamp + timedelta(
seconds=renew_threshold
)
print(
f"Oauth2 token should be renewed after: {oauth_token_renew_timestamp.isoformat()}"
)
renew_threshold = round(result["expires_in"] * (100 - ws1_oauth_renew_margin) / 100)
print(f"Access token threshold for renewal set to {renew_threshold} seconds")
oauth_token_renew_timestamp = oauth_token_issued_timestamp + timedelta(seconds=renew_threshold)
print(f"Oauth2 token should be renewed after: {oauth_token_renew_timestamp.isoformat()}")

# recalculate stop time until a bit after the actual token validity period from the API response
test_runtime = round(
result["expires_in"] * (100 + ws1_oauth_renew_margin) / 100
)
test_runtime = round(result["expires_in"] * (100 + ws1_oauth_renew_margin) / 100)
time_stop = timestamp_start + timedelta(seconds=test_runtime)
print(f"Test stop scheduled for: {time_stop.isoformat()}")

Expand All @@ -246,9 +214,7 @@ def main():

print("ToDo: test calling a program that can use the Oauth token")

result = set_password_in_keychain(
ws1_oauth_keychain, service, "oauth_token", oauth_token
)
result = set_password_in_keychain(ws1_oauth_keychain, service, "oauth_token", oauth_token)
result = set_password_in_keychain(
ws1_oauth_keychain,
service,
Expand Down
12 changes: 3 additions & 9 deletions WorkSpaceOneImporter/research/oauth_renew_test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ def main():
# service = "Autopkg_WS1_OAUTH"
service = "autopkg_tool_launcher"

ws1_oauth_token_url = get_password_from_keychain(
keychain, service, "WS1_OAUTH_TOKEN_URL"
)
ws1_oauth_token_url = get_password_from_keychain(keychain, service, "WS1_OAUTH_TOKEN_URL")
if ws1_oauth_token_url is None:
print(f"Failed to get WS1_OAUTH_TOKEN_URL from keychain {keychain} - aborting")
exit(code=1)
Expand All @@ -44,13 +42,9 @@ def main():
print(f"Retrieved WS1_OAUTH_CLIENT_ID from keychain {keychain}")
os.environ["AUTOPKG_ws1_oauth_client_id"] = client_id

client_secret = get_password_from_keychain(
keychain, service, "WS1_OAUTH_CLIENT_SECRET"
)
client_secret = get_password_from_keychain(keychain, service, "WS1_OAUTH_CLIENT_SECRET")
if client_secret is None:
print(
f"Failed to get WS1_OAUTH_CLIENT_SECRET from keychain {keychain} - aborting"
)
print(f"Failed to get WS1_OAUTH_CLIENT_SECRET from keychain {keychain} - aborting")
exit(code=1)
print(f"Retrieved WS1_OAUTH_CLIENT_SECRET from keychain {keychain}")
os.environ["AUTOPKG_ws1_oauth_client_secret"] = client_secret
Expand Down