"""
Functions for managing partitions
"""
import logging
from _ctypes import pointer
from ctypes import byref, c_ubyte
from functools import reduce
from .common_utils import AutoCArray, refresh_c_arrays
from .cryptoki import (CK_SLOT_ID,
CK_ULONG,
CK_SESSION_HANDLE,
CA_CreateContainer,
CA_DeleteContainerWithHandle,
CA_GetContainerList,
CA_GetContainerCapabilitySet,
CA_GetContainerCapabilitySetting,
CA_GetContainerPolicySet,
CA_GetContainerPolicySetting,
CA_GetContainerName,
CA_GetContainerStorageInformation,
CA_GetContainerStatus,
CA_SetContainerPolicy,
CA_SetContainerPolicies,
CA_SetContainerSize)
from .defines import (LUNA_PARTITION_TYPE_STANDALONE,
LUNA_CF_CONTAINER_ENABLED,
LUNA_CF_KCV_CREATED,
LUNA_CF_LKCV_CREATED,
LUNA_CF_HA_INITIALIZED,
LUNA_CF_PARTITION_INITIALIZED,
LUNA_CF_CONTAINER_ACTIVATED,
LUNA_CF_CONTAINER_LUSR_ACTIVATED,
LUNA_CF_USER_PIN_INITIALIZED,
LUNA_CF_SO_PIN_LOCKED,
LUNA_CF_SO_PIN_TO_BE_CHANGED,
LUNA_CF_USER_PIN_LOCKED,
LUNA_CF_LIMITED_USER_PIN_LOCKED,
LUNA_CF_LIMITED_USER_CREATED,
LUNA_CF_USER_PIN_TO_BE_CHANGED,
LUNA_CF_LIMITED_USER_PIN_TO_BE_CHANGED)
from .exceptions import make_error_handle_function
LOG = logging.getLogger(__name__)
def ca_create_container(h_session, storage_size, password=None, label='Inserted Token'):
"""Inserts a token into a slot without a Security Officer on the token
:param int h_session: Session handle
:param storage_size: The storage size of the token (0 for undefined/unlimited)
:param password: The password associated with the token (Default value = 'userpin')
:param label: The label associated with the token (Default value = 'Inserted Token')
:returns: The result code, The container number
"""
h_sess = CK_SESSION_HANDLE(h_session)
h_container = CK_ULONG()
LOG.info("CA_CreateContainer: Inserting token with no SO storage_size=%s, pin=%s, label=%s",
storage_size, password, label)
if password == '':
password = None
password = AutoCArray(data=password)
label = AutoCArray(data=label)
ret = CA_CreateContainer(h_sess, CK_ULONG(0),
label.array, label.size.contents,
password.array, password.size.contents,
CK_ULONG(-1), CK_ULONG(-1), CK_ULONG(0), CK_ULONG(0),
CK_ULONG(storage_size), byref(h_container))
LOG.info("CA_CreateContainer: Inserted token into slot %s", h_container.value)
return ret, h_container.value
ca_create_container_ex = make_error_handle_function(ca_create_container)
def ca_delete_container_with_handle(h_session, h_container):
"""
Delete a container by handle
:param int h_session: Session handle
:param h_container: target container handle
:return: result code
"""
h_sess = CK_SESSION_HANDLE(h_session)
container_id = CK_ULONG(h_container)
LOG.info(
"CA_DeleteContainerWithHandle: "
"Attempting to delete container with handle: %s", h_container)
ret = CA_DeleteContainerWithHandle(h_sess, container_id)
LOG.info("CA_DeleteContainerWithHandle: Ret Value: %s", ret)
return ret
ca_delete_container_with_handle_ex = make_error_handle_function(ca_delete_container_with_handle)
def ca_get_container_list(slot, group_handle=0, container_type=LUNA_PARTITION_TYPE_STANDALONE):
"""
Get list of containers.
:param slot: slot ID of the slot to query
:param group_handle: group ID
:param container_type: type of container
:return: result code, list of container handles
"""
slot_id = CK_SLOT_ID(slot)
group = CK_ULONG(group_handle)
cont_type = CK_ULONG(container_type)
cont_handles = AutoCArray()
@refresh_c_arrays(1)
def _get_cont_list():
"""Closer for retries to work w/ properties
"""
return CA_GetContainerList(slot_id, group, cont_type,
cont_handles.array, cont_handles.size)
ret = _get_cont_list()
return ret, list(cont_handles.array)
ca_get_container_list_ex = make_error_handle_function(ca_get_container_list)
def ca_get_container_capability_set(slot, h_container):
"""
Get the container capabilities of the given slot.
:param int slot: target slot number
:param int h_container: target container handle
:return: result code, {id: val} dict of capabilities (None if command failed)
"""
slot_id = CK_SLOT_ID(slot)
cont_id = CK_ULONG(h_container)
cap_ids = AutoCArray()
cap_vals = AutoCArray()
@refresh_c_arrays(1)
def _get_container_caps():
"""Closer for retries to work w/ properties
"""
return CA_GetContainerCapabilitySet(slot_id,
cont_id,
cap_ids.array,
cap_ids.size,
cap_vals.array,
cap_vals.size)
ret = _get_container_caps()
return ret, dict(list(zip(cap_ids, cap_vals)))
ca_get_container_capability_set_ex = make_error_handle_function(ca_get_container_capability_set)
def ca_get_container_capability_setting(slot, h_container, capability_id):
"""
Get the value of a container's single capability
:param slot: slot ID of slot to query
:param h_container: target container handle
:param capability_id: capability ID
:return: result code, CK_ULONG representing capability active or not
"""
slot_id = CK_SLOT_ID(slot)
cont_id = CK_ULONG(h_container)
cap_id = CK_ULONG(capability_id)
cap_val = CK_ULONG()
ret = CA_GetContainerCapabilitySetting(slot_id,
cont_id,
cap_id,
pointer(cap_val))
return ret, cap_val.value
ca_get_container_capability_setting_ex = make_error_handle_function(
ca_get_container_capability_setting)
def ca_get_container_policy_set(slot, h_container):
"""
Get the policies of the given slot and container.
:param int slot: target slot number
:param int h_container: target container handle
:return: result code, {id: val} dict of policies (None if command failed)
"""
slot_id = CK_SLOT_ID(slot)
cont_id = CK_ULONG(h_container)
pol_ids = AutoCArray()
pol_vals = AutoCArray()
@refresh_c_arrays(1)
def _ca_get_container_policy_set():
"""Closure for retries.
"""
return CA_GetContainerPolicySet(slot_id,
cont_id,
pol_ids.array,
pol_ids.size,
pol_vals.array,
pol_vals.size)
ret = _ca_get_container_policy_set()
return ret, dict(list(zip(pol_ids, pol_vals)))
ca_get_container_policy_set_ex = make_error_handle_function(ca_get_container_policy_set)
def ca_get_container_policy_setting(slot, h_container, policy_id):
"""
Get the value of a container's single policy
:param slot: slot ID of slot to query
:param h_container: target container handle
:param policy_id: policy ID
:return: result code, CK_ULONG representing policy active or not
"""
slot_id = CK_SLOT_ID(slot)
cont_id = CK_ULONG(h_container)
pol_id = CK_ULONG(policy_id)
pol_val = CK_ULONG()
ret = CA_GetContainerPolicySetting(slot_id, cont_id, pol_id, pointer(pol_val))
return ret, pol_val.value
ca_get_container_policy_setting_ex = make_error_handle_function(ca_get_container_policy_setting)
def ca_get_container_name(slot, h_container):
"""
Get a container's name
:param slot: target slot
:param h_container: target container handle
:return: result code, container name string
"""
slot_id = CK_SLOT_ID(slot)
cont_id = CK_ULONG(h_container)
name_arr = AutoCArray(ctype=c_ubyte)
@refresh_c_arrays(1)
def _ca_get_container_name():
"""
Closure for retries
"""
return CA_GetContainerName(slot_id,
cont_id,
name_arr.array,
name_arr.size)
ret = _ca_get_container_name()
return ret, ''.join(map(chr, name_arr.array))
ca_get_container_name_ex = make_error_handle_function(ca_get_container_name)
def ca_get_container_storage_information(slot, h_container):
"""
Get a container's storage information
:param slot: target slot
:param h_container: target container handle
:return: result code, dict of storage values
"""
slot_id = CK_SLOT_ID(slot)
cont_id = CK_ULONG(h_container)
overhead = CK_ULONG()
total = CK_ULONG()
used = CK_ULONG()
free = CK_ULONG()
obj_count = CK_ULONG()
ret = CA_GetContainerStorageInformation(slot_id,
cont_id,
pointer(overhead),
pointer(total),
pointer(used),
pointer(free),
pointer(obj_count))
return ret, {'overhead': overhead.value,
'total': total.value,
'used': used.value,
'free': free.value,
'object_count': obj_count.value}
ca_get_container_storage_information_ex = make_error_handle_function(
ca_get_container_storage_information)
def ca_get_container_status(slot, h_container):
"""
Get a container's Status
:param slot: target slot
:param h_container: target container handle
:return: result code, dict of flags, dict of failed logins
"""
slot_id = CK_SLOT_ID(slot)
cont_id = CK_ULONG(h_container)
status_flags = CK_ULONG()
failed_so_logins = CK_ULONG()
failed_user_logins = CK_ULONG()
failed_limited_user_logins = CK_ULONG()
ret = CA_GetContainerStatus(slot_id,
cont_id,
pointer(status_flags),
pointer(failed_so_logins),
pointer(failed_user_logins),
pointer(failed_limited_user_logins))
flags_dict = {
LUNA_CF_CONTAINER_ENABLED: None,
LUNA_CF_KCV_CREATED: None,
LUNA_CF_LKCV_CREATED: None,
LUNA_CF_HA_INITIALIZED: None,
LUNA_CF_PARTITION_INITIALIZED: None,
LUNA_CF_CONTAINER_ACTIVATED: None,
LUNA_CF_CONTAINER_LUSR_ACTIVATED: None,
LUNA_CF_USER_PIN_INITIALIZED: None,
LUNA_CF_SO_PIN_LOCKED: None,
LUNA_CF_SO_PIN_TO_BE_CHANGED: None,
LUNA_CF_USER_PIN_LOCKED: None,
LUNA_CF_LIMITED_USER_PIN_LOCKED: None,
LUNA_CF_LIMITED_USER_CREATED: None,
LUNA_CF_USER_PIN_TO_BE_CHANGED: None,
LUNA_CF_LIMITED_USER_PIN_TO_BE_CHANGED: None
}
flags_or = reduce(lambda x, y: x | y, flags_dict.keys())
mask = status_flags.value & flags_or
if status_flags.value ^ mask != 0:
unknown_flags = []
for i in range(status_flags.value.bit_length()):
if ((status_flags.value ^ mask) >> i) & 1:
unknown_flags.append(2 ** i)
raise Exception("Found unknown flags! {}".format(' '.join(unknown_flags)))
for key, flag in flags_dict.items():
flags_dict[key] = 1 if key & status_flags.value else 0
failed_logins_dict = {
'failed_so_logins': failed_so_logins.value,
'failed_user_logins': failed_user_logins.value,
'failed_limited_user_logins': failed_limited_user_logins.value
}
for key, val in failed_logins_dict.items():
if not val ^ int('1' * 64, 2) or not val ^ int('1' * 32, 2):
failed_logins_dict[key] = -1
return ret, flags_dict, failed_logins_dict
ca_get_container_status_ex = make_error_handle_function(ca_get_container_status)
def ca_set_container_policy(h_session, h_containerber, policy_id, policy_val):
"""Sets a policy on the container.
NOTE: With per partition SO this method should generally not be used. Instead
ca_set_partition_policies should be used
:param int h_session: Session handle
:param h_containerber: The container number to set the policy on.
:param policy_id: The identifier of the policy (ex. CONTAINER_CONFIG_MINIMUM_PIN_LENGTH)
:param policy_val: The value to set the policy to
:returns: The result code
"""
ret = CA_SetContainerPolicy(CK_SESSION_HANDLE(h_session),
CK_ULONG(h_containerber),
CK_ULONG(policy_id),
CK_ULONG(policy_val))
return ret
ca_set_container_policy_ex = make_error_handle_function(ca_set_container_policy)
def ca_set_container_policies(h_session, h_container, policies):
"""
Set multiple container policies.
:param int h_session: Session handle
:param h_container: target container handle
:param policies: dict of policy ID ints and value ints
:return: result code
"""
h_sess = CK_SESSION_HANDLE(h_session)
container_id = CK_ULONG(h_container)
pol_id_list = list(policies.keys())
pol_val_list = list(policies.values())
pol_ids = AutoCArray(data=pol_id_list, ctype=CK_ULONG)
pol_vals = AutoCArray(data=pol_val_list, ctype=CK_ULONG)
ret = CA_SetContainerPolicies(h_sess,
container_id,
pol_ids.size.contents,
pol_ids.array,
pol_vals.array)
return ret
ca_set_container_policies_ex = make_error_handle_function(ca_set_container_policies)
def ca_set_container_size(h_session, h_container, size):
"""
Set a container's size
:param int h_session: Session handle
:param h_container: target container handle
:param size: size
:return: result code
"""
h_sess = CK_SESSION_HANDLE(h_session)
container_id = CK_ULONG(h_container)
size = CK_ULONG(size)
ret = CA_SetContainerSize(h_sess,
container_id,
size)
return ret
ca_set_container_size_ex = make_error_handle_function(ca_set_container_size)