|
| 1 | +# Placeholder for adding logic specific to application |
| 2 | +# and backend key store. |
| 3 | +# |
| 4 | +import os |
| 5 | +import json |
| 6 | +import sys |
| 7 | +from azure.identity import DefaultAzureCredential |
| 8 | +from azure.keyvault.keys import KeyClient |
| 9 | +from azure.keyvault.keys.crypto import CryptographyClient, EncryptionAlgorithm |
| 10 | +# Append the current application path to sys path to be able to resolve local modules. |
| 11 | +# |
| 12 | +sys.path.append('.') |
| 13 | +sys.path.append('./model') |
| 14 | +from constants import ConfigurationConstants, Operations, CryptoConstants |
| 15 | +import utils |
| 16 | +from json_objects import EncryptDecryptRequest, JsonWebKeyResponse, EncryptDecryptResponse |
| 17 | + |
| 18 | + |
| 19 | +def decrypt(request, json_key_attributes_dict, pin, version): |
| 20 | + """ |
| 21 | + This method will be called by the application entry point |
| 22 | + for decrypting the payload. |
| 23 | + request.value has the plaintext payload |
| 24 | + request.alg contains the padding algorithm for encryption. |
| 25 | + """ |
| 26 | + set_env(json_key_attributes_dict, pin) |
| 27 | + credential = DefaultAzureCredential() |
| 28 | + key_vault_key = get_akv_key(json_key_attributes_dict, credential) |
| 29 | + crypto_client = CryptographyClient(key_vault_key, credential=credential) |
| 30 | + decrypted_payload = crypto_client.decrypt(EncryptionAlgorithm.rsa_oaep, request.value) |
| 31 | + response = EncryptDecryptResponse(decrypted_payload.plaintext) |
| 32 | + return response |
| 33 | + |
| 34 | +def encrypt(request, json_key_attributes_dict, pin, version): |
| 35 | + """ |
| 36 | + This method will be called by the application entry point |
| 37 | + for encrypting the payload. |
| 38 | + request.value has the plaintext payload |
| 39 | + request.alg contains the padding algorithm for encryption. |
| 40 | + """ |
| 41 | + set_env(json_key_attributes_dict, pin) |
| 42 | + credential = DefaultAzureCredential() |
| 43 | + key_vault_key = get_akv_key(json_key_attributes_dict, credential) |
| 44 | + crypto_client = CryptographyClient(key_vault_key, credential=credential) |
| 45 | + encrypted_payload = crypto_client.encrypt(EncryptionAlgorithm.rsa_oaep, request.value) |
| 46 | + response = EncryptDecryptResponse(encrypted_payload.ciphertext) |
| 47 | + return response |
| 48 | + |
| 49 | +def get_key(json_key_attributes_dict, pin, version): |
| 50 | + set_env(json_key_attributes_dict, pin) |
| 51 | + credential = DefaultAzureCredential() |
| 52 | + |
| 53 | + key_vault_key = get_akv_key(json_key_attributes_dict, credential) |
| 54 | + |
| 55 | + # JsonWebKeyResponse expects integer inputs and converts them to byte array |
| 56 | + # However AKV SDK already provides byte arrays for Exponent and Modulus. |
| 57 | + # We will instantiate the object with a dummy value and then overwrite the |
| 58 | + # exponent and module value. |
| 59 | + # |
| 60 | + dummy_val = 1 |
| 61 | + key_response = JsonWebKeyResponse(1,1) |
| 62 | + key_response.e = utils.urlsafe_b64encode_as_str(key_vault_key.key.e) |
| 63 | + key_response.n = utils.urlsafe_b64encode_as_str(key_vault_key.key.n) |
| 64 | + return key_response |
| 65 | + |
| 66 | +def get_akv_key(json_key_attributes_dict, credential): |
| 67 | + """ |
| 68 | + Gets the AKV key object. |
| 69 | + """ |
| 70 | + if "vault_url" in json_key_attributes_dict: |
| 71 | + vault_url = json_key_attributes_dict["vault_url"] |
| 72 | + else: |
| 73 | + raise KeyError('vault_url was expected in the parameters but not found') |
| 74 | + |
| 75 | + if "keyname" in json_key_attributes_dict: |
| 76 | + key_name = json_key_attributes_dict["keyname"] |
| 77 | + else: |
| 78 | + raise KeyError('keyname was expected in the parameters but not found') |
| 79 | + if "keyversion" in json_key_attributes_dict: |
| 80 | + key_version = json_key_attributes_dict["keyversion"] |
| 81 | + else: |
| 82 | + raise KeyError('keyversion was expected in the parameters but not found') |
| 83 | + |
| 84 | + key_client = KeyClient(vault_url=vault_url, credential=credential) |
| 85 | + key_vault_key = key_client.get_key(key_name, key_version) |
| 86 | + |
| 87 | + return key_vault_key |
| 88 | + |
| 89 | +def set_env(json_key_attributes_dict, pin): |
| 90 | + """ |
| 91 | + Sets the environment variables for the MS identity credential lookup to work. |
| 92 | + """ |
| 93 | + if "azure_client_id" in json_key_attributes_dict: |
| 94 | + key_version = json_key_attributes_dict["azure_client_id"] |
| 95 | + else: |
| 96 | + raise KeyError('azure_client_id was expected in the parameters but not found') |
| 97 | + |
| 98 | + if "azure_tenant_id" in json_key_attributes_dict: |
| 99 | + key_version = json_key_attributes_dict["azure_tenant_id"] |
| 100 | + else: |
| 101 | + raise KeyError('azure_tenant_id was expected in the parameters but not found') |
| 102 | + |
| 103 | + os.environ["AZURE_CLIENT_ID"]=json_key_attributes_dict["azure_client_id"] |
| 104 | + os.environ["AZURE_TENANT_ID"]=json_key_attributes_dict["azure_tenant_id"] |
| 105 | + os.environ["AZURE_CLIENT_SECRET"]=pin |
0 commit comments