"""
Methods related to encrypting data/files.
"""
import logging
from _ctypes import POINTER
from ctypes import create_string_buffer, cast, byref, string_at, c_ubyte
from .string_helpers import _coerce_mech_to_str
from .attributes import Attributes, to_byte_array
from .common_utils import AutoCArray, refresh_c_arrays
from .conversions import from_bytestring
from .cryptoki import CK_ULONG, \
C_EncryptInit, C_Encrypt
from .cryptoki import C_Decrypt, C_DecryptInit, CK_OBJECT_HANDLE, \
C_WrapKey, C_UnwrapKey, C_EncryptUpdate, C_EncryptFinal, CK_BYTE_PTR, \
C_DecryptUpdate, C_DecryptFinal
from .defines import CKR_OK
from .exceptions import make_error_handle_function
from .lookup_dicts import ret_vals_dictionary
from .mechanism import parse_mechanism
MAX_BUFFER = 0xffff
LOG = logging.getLogger(__name__)
[docs]def c_encrypt(h_session, h_key, data, mechanism, output_buffer=None):
"""Encrypts data with a given key and encryption flavor
encryption flavors
.. note:: If data is a list or tuple of strings, multi-part encryption will be used.
:param int h_session: Current session
:param int h_key: The key handle to encrypt the data with
:param data: The data to encrypt, either a bytestring or a list of bytestrings. If this is
a list a multipart operation will be used
.. note:: This will be converted to hexadecimal by calling::
to_hex(from_bytestring(data))
If you need to pass in raw hex data, call::
to_bytestring(from_hex(hex-data))
References:
* :py:func:`~pycryptoki.conversions.to_hex`
* :py:func:`~pycryptoki.conversions.from_hex`
* :py:func:`~pycryptoki.conversions.to_bytestring`
* :py:func:`~pycryptoki.conversions.from_bytestring`
:param mechanism: See the :py:func:`~pycryptoki.mechanism.parse_mechanism` function
for possible values.
:param list|int output_buffer: Integer or list of integers that specify a size of output
buffer to use for an operation. By default will query with NULL pointer buffer
to get required size of buffer.
:returns: (Retcode, Python bytestring of encrypted data)
:rtype: tuple
"""
LOG.debug("Encrypting data with mechanism:\n%s", _coerce_mech_to_str(mechanism))
mech = parse_mechanism(mechanism)
# if a list is passed out do an encrypt operation on each string in the list, otherwise just
# do one encrypt operation
is_multi_part_operation = isinstance(data, (list, tuple))
# Initialize encryption
ret = C_EncryptInit(h_session, byref(mech), CK_ULONG(h_key))
if ret != CKR_OK:
return ret, None
if is_multi_part_operation:
ret, encrypted_python_string = do_multipart_operation(h_session, C_EncryptUpdate,
C_EncryptFinal, data, output_buffer)
else:
plain_data, plain_data_length = to_byte_array(from_bytestring(data))
plain_data = cast(plain_data, POINTER(c_ubyte))
if output_buffer is not None:
size = CK_ULONG(output_buffer)
enc_data = AutoCArray(ctype=c_ubyte,
size=size)
ret = C_Encrypt(h_session,
plain_data, plain_data_length,
enc_data.array, enc_data.size)
else:
enc_data = AutoCArray(ctype=c_ubyte)
@refresh_c_arrays(1)
def _encrypt():
"""Closure for getting the buffer size with encrypt."""
return C_Encrypt(h_session,
plain_data, plain_data_length,
enc_data.array, enc_data.size)
ret = _encrypt()
if ret != CKR_OK:
return ret, None
# Convert encrypted data into a python string
encrypted_python_string = string_at(enc_data.array, enc_data.size.contents.value)
return ret, encrypted_python_string
c_encrypt_ex = make_error_handle_function(c_encrypt)
def _split_string_into_list(python_string, block_size):
"""Splits a string into a list of equal size chunks
:param python_string: The string to divide
:param block_size: The size of the blocks to divide the string into
:returns: A list of strings of block_size
"""
total_length = len(python_string)
return [python_string[x:x + block_size] for x in range(0, total_length, block_size)]
def _get_string_from_list(list_of_strings):
"""Takes a list of strings and returns a single concatenated string.
:param list_of_strings: A list of strings to be concatenated
:returns: Single string representing the concatenated list
"""
return b"".join(list_of_strings)
[docs]def c_decrypt(h_session, h_key, encrypted_data, mechanism, output_buffer=None):
"""Decrypt given data with the given key and mechanism.
.. note:: If data is a list or tuple of strings, multi-part decryption will be used.
:param int h_session: The session to use
:param int h_key: The handle of the key to use to decrypt
:param bytes encrypted_data: Data to be decrypted
.. note:: Data will be converted to hexadecimal by calling::
to_hex(from_bytestring(data))
If you need to pass in raw hex data, call::
to_bytestring(from_hex(hex-data))
References:
* :py:func:`~pycryptoki.conversions.to_hex`
* :py:func:`~pycryptoki.conversions.from_hex`
* :py:func:`~pycryptoki.conversions.to_bytestring`
* :py:func:`~pycryptoki.conversions.from_bytestring`
:param mechanism: See the :py:func:`~pycryptoki.mechanism.parse_mechanism` function
for possible values.
:param list|int output_buffer: Integer or list of integers that specify a size of output
buffer to use for an operation. By default will query with NULL pointer buffer
to get required size of buffer.
:returns: (Retcode, Python bytestring of decrypted data))
:rtype: tuple
"""
mech = parse_mechanism(mechanism)
# Initialize Decrypt
ret = C_DecryptInit(h_session, mech, CK_ULONG(h_key))
if ret != CKR_OK:
return ret, None
# if a list is passed out do a decrypt operation on each string in the list, otherwise just
# do one decrypt operation
is_multi_part_operation = isinstance(encrypted_data, (list, tuple))
if is_multi_part_operation:
ret, python_data = do_multipart_operation(h_session, C_DecryptUpdate, C_DecryptFinal,
encrypted_data, output_buffer)
else:
# Get the length of the final data
# NOTE: The "Conventions for functions returning output in a variable-length buffer"
# section of the PKCS#11 spec says that the length returned in this
# case (no output buffer given to C_Decrypt) can exceed the precise
# number of bytes needed. So the python string that's returned in the
# end needs to be adjusted based on the second called to C_Decrypt
# which will have the right length
c_enc_data, c_enc_data_len = to_byte_array(from_bytestring(encrypted_data))
c_enc_data = cast(c_enc_data, POINTER(c_ubyte))
if output_buffer is not None:
size = CK_ULONG(output_buffer)
decrypted_data = AutoCArray(ctype=c_ubyte,
size=size)
ret = C_Decrypt(h_session,
c_enc_data, c_enc_data_len,
decrypted_data.array, decrypted_data.size)
else:
decrypted_data = AutoCArray(ctype=c_ubyte)
@refresh_c_arrays(1)
def _decrypt():
""" Perform the decryption ops"""
return C_Decrypt(h_session,
c_enc_data, c_enc_data_len,
decrypted_data.array, decrypted_data.size)
ret = _decrypt()
if ret != CKR_OK:
return ret, None
# Convert the decrypted data to a python readable format
python_data = string_at(decrypted_data.array, decrypted_data.size.contents.value)
return ret, python_data
c_decrypt_ex = make_error_handle_function(c_decrypt)
[docs]def do_multipart_operation(h_session,
c_update_function,
c_finalize_function,
input_data_list,
output_buffer=None):
"""Some code which will do a multipart encrypt or decrypt since they are the same
with just different functions called
:param int h_session: Session handle
:param c_update_function: C_<NAME>Update function to call to update each operation.
:param c_finalize_function: Function to call at end of multipart operation.
:param input_data_list: List of data to call update function on.
.. note:: Data will be converted to hexadecimal by calling::
to_hex(from_bytestring(data))
If you need to pass in raw hex data, call::
to_bytestring(from_hex(hex-data))
References:
* :py:func:`~pycryptoki.conversions.to_hex`
* :py:func:`~pycryptoki.conversions.from_hex`
* :py:func:`~pycryptoki.conversions.to_bytestring`
* :py:func:`~pycryptoki.conversions.from_bytestring`
:param list output_buffer: List of integers that specify a size of output buffers to use
for multi-part operations. By default will query with NULL pointer buffer
to get required size of buffer
"""
python_data = []
error = None
for index, chunk in enumerate(input_data_list):
if output_buffer:
out_data_len = CK_ULONG(output_buffer[index])
out_data = cast(create_string_buffer(b'', output_buffer[index]), CK_BYTE_PTR)
else:
out_data_len = CK_ULONG()
out_data = None
data_chunk, data_chunk_len = to_byte_array(from_bytestring(chunk))
data_chunk = cast(data_chunk, POINTER(c_ubyte))
ret = c_update_function(h_session,
data_chunk, data_chunk_len,
out_data, byref(out_data_len))
if ret != CKR_OK:
LOG.debug("%s call on chunk %.20s (%s/%s) Failed w/ ret %s (%s)",
c_update_function.__name__,
chunk, index + 1, len(input_data_list),
ret_vals_dictionary.get(ret, "Unknown retcode"), str(hex(ret)))
error = ret
break
if not output_buffer:
# Need a second call to actually get the data.
LOG.debug("Creating cipher data buffer of size %s", out_data_len.value)
out_data = create_string_buffer(b'', out_data_len.value)
ret = c_update_function(h_session,
data_chunk, data_chunk_len,
cast(out_data, CK_BYTE_PTR), byref(out_data_len))
if ret != CKR_OK:
LOG.debug("%s call on chunk %.20s (%s/%s) Failed w/ ret %s (%s)",
c_update_function.__name__,
chunk, index + 1, len(input_data_list),
ret_vals_dictionary.get(ret, "Unknown retcode"), str(hex(ret)))
error = ret
break
# Get the output
python_data.append(string_at(out_data, out_data_len.value))
if error:
# Make sure we finalize the operation -- don't want to leave any operations active.
ret = c_finalize_function(h_session,
cast(create_string_buffer(b'', MAX_BUFFER), CK_BYTE_PTR),
CK_ULONG(MAX_BUFFER))
LOG.debug("%s call after a %s failure returned: %s (%s)",
c_finalize_function.__name__,
c_update_function.__name__,
ret_vals_dictionary.get(ret, "Unknown retcode"), str(hex(ret)))
return error, b"".join(python_data)
if output_buffer:
fin_out_data_len = CK_ULONG(max(output_buffer))
fin_out_data = create_string_buffer(b"", fin_out_data_len.value)
ret = c_finalize_function(h_session, cast(fin_out_data, CK_BYTE_PTR),
byref(fin_out_data_len))
if ret != CKR_OK:
return ret, b"".join(python_data)
else:
# Finalizing multipart decrypt operation
fin_out_data_len = CK_ULONG()
# Get buffer size for data
ret = c_finalize_function(h_session, None, byref(fin_out_data_len))
if ret != CKR_OK:
return ret, b"".join(python_data)
fin_out_data = create_string_buffer(b"", fin_out_data_len.value)
output = cast(fin_out_data, CK_BYTE_PTR)
ret = c_finalize_function(h_session, output, byref(fin_out_data_len))
if ret != CKR_OK:
return ret, b"".join(python_data)
if fin_out_data_len.value > 0:
python_data.append(string_at(fin_out_data, fin_out_data_len.value))
return ret, b"".join(python_data)
[docs]def c_wrap_key(h_session, h_wrapping_key, h_key, mechanism, output_buffer=None):
"""Wrap a key off the HSM into an encrypted data blob.
:param int h_session: The session to use
:param int h_wrapping_key: The handle of the key to use to wrap another key
:param int h_key: The key to wrap based on the encryption flavor
:param mechanism: See the :py:func:`~pycryptoki.mechanism.parse_mechanism` function
for possible values.
:returns: (Retcode, python bytestring representing wrapped key)
:rtype: tuple
"""
mech = parse_mechanism(mechanism)
if output_buffer is not None:
size = CK_ULONG(output_buffer)
wrapped_key = AutoCArray(ctype=c_ubyte,
size=size)
ret = C_WrapKey(h_session, mech,
CK_OBJECT_HANDLE(h_wrapping_key), CK_OBJECT_HANDLE(h_key),
wrapped_key.array, wrapped_key.size)
else:
wrapped_key = AutoCArray(ctype=c_ubyte)
@refresh_c_arrays(1)
def _wrap():
""" Perform the Wrapping operation"""
return C_WrapKey(h_session, mech,
CK_OBJECT_HANDLE(h_wrapping_key), CK_OBJECT_HANDLE(h_key),
wrapped_key.array, wrapped_key.size)
ret = _wrap()
if ret != CKR_OK:
return ret, None
return ret, string_at(wrapped_key.array, wrapped_key.size.contents.value)
c_wrap_key_ex = make_error_handle_function(c_wrap_key)
[docs]def c_unwrap_key(h_session, h_unwrapping_key, wrapped_key, key_template, mechanism):
"""Unwrap a key from an encrypted data blob.
:param int h_session: The session to use
:param int h_unwrapping_key: The wrapping key handle
:param bytes wrapped_key: The wrapped key
.. note:: Data will be converted to hexadecimal by calling::
to_hex(from_bytestring(data))
If you need to pass in raw hex data, call::
to_bytestring(from_hex(hex-data))
References:
* :py:func:`~pycryptoki.conversions.to_hex`
* :py:func:`~pycryptoki.conversions.from_hex`
* :py:func:`~pycryptoki.conversions.to_bytestring`
* :py:func:`~pycryptoki.conversions.from_bytestring`
:param dict key_template: The python template representing the new key's template
:param mechanism: See the :py:func:`~pycryptoki.mechanism.parse_mechanism` function
for possible values.
:returns: (Retcode, unwrapped key handle)
:rtype: tuple
"""
mech = parse_mechanism(mechanism)
c_template = Attributes(key_template).get_c_struct()
byte_wrapped_key, key_len = to_byte_array(from_bytestring(wrapped_key))
byte_wrapped_key = cast(byte_wrapped_key, CK_BYTE_PTR)
h_output_key = CK_ULONG()
ret = C_UnwrapKey(h_session, mech, CK_OBJECT_HANDLE(h_unwrapping_key),
byte_wrapped_key, key_len,
c_template, CK_ULONG(len(key_template)), byref(h_output_key))
return ret, h_output_key.value
c_unwrap_key_ex = make_error_handle_function(c_unwrap_key)