"""
Methods responsible for managing a user's session and login/c_logout
"""
import logging
from ctypes import cast, c_void_p, create_string_buffer, \
byref, pointer, string_at
from .common_utils import AutoCArray
# cryptoki constants
from .cryptoki import (CK_ULONG,
CK_BBOOL,
CK_SLOT_ID,
CK_SLOT_INFO,
CK_SESSION_HANDLE,
CK_FLAGS,
CK_NOTIFY,
CK_SESSION_INFO,
CK_USER_TYPE,
CK_TOKEN_INFO,
CK_VOID_PTR,
CK_BYTE, CK_INFO, C_GetInfo, CA_GetFirmwareVersion, c_ulong,
CK_C_INITIALIZE_ARGS, CK_C_INITIALIZE_ARGS_PTR)
# Cryptoki Functions
from .cryptoki import (C_Initialize,
C_GetSlotList,
C_GetSlotInfo,
C_CloseAllSessions,
C_GetSessionInfo,
C_OpenSession,
C_Login,
C_Logout,
C_CloseSession,
C_InitPIN,
CA_FactoryReset,
C_GetTokenInfo,
C_Finalize,
C_SetPIN,
CA_OpenApplicationID,
CA_CloseApplicationID,
CA_Restart,
CA_SetApplicationID)
from .defines import CKR_OK, CKF_RW_SESSION, CKF_SERIAL_SESSION
from .exceptions import make_error_handle_function, LunaCallException
LOG = logging.getLogger(__name__)
[docs]def c_initialize(flags=None, init_struct=None):
"""Initializes current process for use with PKCS11.
Some sample flags:
CKF_LIBRARY_CANT_CREATE_OS_THREADS
CKF_OS_LOCKING_OK
See the `PKCS11 documentation <https://www.cryptsoft.com/pkcs11doc/v220/pkcs11__all_8h.html#aC_Initialize>`_
for more details.
:param int flags: Flags to be set within InitArgs Struct. (Default = None)
:param init_struct: InitArgs structure (Default = None)
:returns: Cryptoki return code.
"""
if flags:
if not init_struct:
init_struct = CK_C_INITIALIZE_ARGS()
init_struct.flags = flags
if init_struct:
init_struct_p = cast(init_struct, c_void_p)
else:
init_struct_p = None
LOG.info("Initializing Cryptoki Library")
ret = C_Initialize(init_struct_p)
return ret
c_initialize_ex = make_error_handle_function(c_initialize)
[docs]def c_finalize():
"""Finalizes PKCS11 library.
:return: Cryptoki return code
"""
LOG.info("Finalizing Library")
ret = C_Finalize(0)
return ret
c_finalize_ex = make_error_handle_function(c_finalize)
[docs]def c_open_session(slot_num, flags=(CKF_SERIAL_SESSION | CKF_RW_SESSION)):
"""Opens a session on the given slot
:param int slot_num: The slot to get a session on
:param int flags: The flags to open the session with
(Default value = (CKF_SERIAL_SESSION | CKF_RW_SESSION)
:returns: (retcode, session handle)
:rtype: tuple
"""
# OPEN SESSION
arg3 = create_string_buffer(b"Application")
h_session = CK_SESSION_HANDLE()
arg3 = cast(arg3, c_void_p)
# CFUNCTYPE(CK_RV, CK_SESSION_HANDLE, CK_NOTIFICATION, CK_VOID_PTR)
ret = C_OpenSession(CK_SLOT_ID(slot_num), CK_FLAGS(flags),
cast(arg3, CK_VOID_PTR), CK_NOTIFY(0),
pointer(h_session))
LOG.info("C_OpenSession: Opening Session. slot=%s", slot_num)
return ret, h_session.value
c_open_session_ex = make_error_handle_function(c_open_session)
[docs]def login(h_session, slot_num=1, password=None, user_type=1):
"""Login to the given session.
:param int h_session: Session handle
:param int slot_num: Slot index to login on (Default value = 1)
:param bytes password: Password to login with (Default value = "userpin")
:param int user_type: User type to login as (Default value = 1)
:returns: retcode
:rtype: int
"""
# LOGIN
LOG.info("C_Login: "
"user_type=%s, "
"slot=%s, "
"password=***", user_type, slot_num)
if password == '':
password = None
user_type = CK_USER_TYPE(user_type)
password = AutoCArray(data=password, ctype=CK_BYTE)
ret = C_Login(h_session, user_type, password.array, password.size.contents)
return ret
login_ex = make_error_handle_function(login)
[docs]def c_get_info():
"""
Get general information about the Cryptoki Library
Returns a dictionary containing the following keys:
* cryptokiVersion
* manufacturerID
* flags
* libraryDescription
* libraryVersion
``cryptokiVersion`` and ``libraryVersion`` are :py:class:`~pycryptoki.cryptoki.CK_VERSION` structs,
and the major/minor values can be accessed directly (``info['cryptokiVersion'].major == 2``)
:return: (retcode, info dictionary)
"""
info = {}
info_struct = CK_INFO()
ret = C_GetInfo(byref(info_struct))
if ret == CKR_OK:
info['cryptokiVersion'] = info_struct.cryptokiVersion
info['manufacturerID'] = string_at(info_struct.manufacturerID)
info['flags'] = info_struct.flags
info['libraryDescription'] = string_at(info_struct.libraryDescription)
info['libraryVersion'] = info_struct.libraryVersion
return ret, info
c_get_info_ex = make_error_handle_function(c_get_info)
[docs]def c_get_slot_list(token_present=True):
"""
Get a list of all slots.
:param bool token_present: If true, will only return slots that have a token present.
:return: List of slots
"""
slots = AutoCArray(ctype=CK_ULONG)
rc = C_GetSlotList(CK_BBOOL(token_present),
slots.array,
slots.size)
if rc != CKR_OK:
return rc, []
rc = C_GetSlotList(CK_BBOOL(token_present),
slots.array,
slots.size)
return rc, [x for x in slots]
c_get_slot_list_ex = make_error_handle_function(c_get_slot_list)
[docs]def c_get_slot_info(slot):
"""
Get information about the given slot number.
:param int slot: Target slot
:return: Dictionary of slot information
"""
slot_info = CK_SLOT_INFO()
slot_info_dict = {}
ret = C_GetSlotInfo(slot, byref(slot_info))
if ret != CKR_OK:
return ret, {}
slot_info_dict['slotDescription'] = string_at(slot_info.slotDescription, 64).rstrip()
slot_info_dict['manufacturerID'] = string_at(slot_info.manufacturerID, 32).rstrip()
slot_info_dict['flags'] = slot_info.flags
hw_version = "{}.{}".format(slot_info.hardwareVersion.major,
slot_info.hardwareVersion.minor)
slot_info_dict['hardwareVersion'] = hw_version
fw_version = "{}.{}.{}".format(slot_info.firmwareVersion.major,
slot_info.firmwareVersion.minor / 10,
slot_info.firmwareVersion.minor % 10)
slot_info_dict['firmwareVersion'] = fw_version
return ret, slot_info_dict
c_get_slot_info_ex = make_error_handle_function(c_get_slot_info)
[docs]def c_get_session_info(session):
"""Get information about the given session.
:param int session: session handle
:return: (retcode, dictionary of session information)
:rtype: tuple
"""
session_info = {}
c_session_info = CK_SESSION_INFO()
ret = C_GetSessionInfo(CK_SESSION_HANDLE(session), byref(c_session_info))
if ret == CKR_OK:
session_info['state'] = c_session_info.state
session_info['flags'] = c_session_info.flags
session_info['slotID'] = c_session_info.slotID
session_info['usDeviceError'] = c_session_info.usDeviceError
return ret, session_info
c_get_session_info_ex = make_error_handle_function(c_get_session_info)
[docs]def c_get_token_info(slot_id, rstrip=True):
"""Gets the token info for a given slot id
:param int slot_id: Token slot ID
:param bool rstrip: If true, will strip trailing whitespace from char data.
:returns: (retcode, A python dictionary representing the token info)
:rtype: tuple
"""
token_info = {}
c_token_info = CK_TOKEN_INFO()
LOG.info("Getting token info. slot=%s", slot_id)
ret = C_GetTokenInfo(CK_ULONG(slot_id), byref(c_token_info))
if ret == CKR_OK:
token_info['label'] = string_at(c_token_info.label, 32)
token_info['manufacturerID'] = string_at(c_token_info.manufacturerID, 32)
token_info['model'] = string_at(c_token_info.model, 16)
token_info['serialNumber'] = string_at(c_token_info.serialNumber, 16)
token_info['flags'] = c_token_info.flags
token_info['ulFreePrivateMemory'] = c_token_info.ulFreePrivateMemory
token_info['ulTotalPrivateMemory'] = c_token_info.ulTotalPrivateMemory
token_info['ulMaxSessionCount'] = c_token_info.usMaxSessionCount
token_info['ulSessionCount'] = c_token_info.usSessionCount
token_info['ulMaxRwSessionCount'] = c_token_info.usMaxRwSessionCount
token_info['ulRwSessionCount'] = c_token_info.usRwSessionCount
token_info['ulMaxPinLen'] = c_token_info.usMaxPinLen
token_info['ulMinPinLen'] = c_token_info.usMinPinLen
token_info['ulTotalPublicMemory'] = c_token_info.ulTotalPublicMemory
token_info['ulFreePublicMemory'] = c_token_info.ulFreePublicMemory
token_info['hardwareVersion'] = c_token_info.hardwareVersion
token_info['firmwareVersion'] = c_token_info.firmwareVersion
token_info['utcTime'] = string_at(c_token_info.utcTime, 16)
if rstrip:
token_info['label'] = token_info['label'].rstrip()
token_info['manufacturerID'] = token_info['manufacturerID'].rstrip()
token_info['model'] = token_info['model'].rstrip()
token_info['serialNumber'] = token_info['serialNumber'].rstrip()
token_info['utcTime'] = token_info['utcTime'].rstrip()
return ret, token_info
c_get_token_info_ex = make_error_handle_function(c_get_token_info)
[docs]def get_slot_dict(token_present=False):
"""Compiles a dictionary of the available slots
:returns: A python dictionary of the available slots
"""
slot_list = c_get_slot_list_ex(token_present)
slot_dict = {}
ret = CKR_OK
for slot in slot_list:
ret, data = c_get_slot_info(slot)
if ret != CKR_OK:
LOG.error("C_GetSlotInfo failed at slot %s")
break
slot_dict[slot] = data
return ret, slot_dict
get_slot_dict_ex = make_error_handle_function(get_slot_dict)
[docs]def c_close_session(h_session):
"""Closes a session
:param int h_session: Session handle
:returns: retcode
:rtype: int
"""
# CLOSE SESSION
LOG.info("C_CloseSession: Closing session %s", h_session)
ret = C_CloseSession(h_session)
return ret
c_close_session_ex = make_error_handle_function(c_close_session)
[docs]def c_logout(h_session):
"""Logs out of a given session
:param int h_session: Session handle
:returns: retcode
:rtype: int
"""
LOG.info("C_Logout: Logging out of session %s", h_session)
ret = C_Logout(h_session)
return ret
c_logout_ex = make_error_handle_function(c_logout)
[docs]def c_init_pin(h_session, pin):
"""Initializes the PIN
:param int h_session: Session handle
:param pin: pin to c_initialize
:returns: THe result code
"""
LOG.info("C_InitPIN: Initializing PIN to %s", pin)
pin = AutoCArray(data=pin)
ret = C_InitPIN(h_session, pin.array, pin.size.contents)
return ret
c_init_pin_ex = make_error_handle_function(c_init_pin)
[docs]def ca_factory_reset(slot):
"""Does a factory reset on a given slot
:param slot: The slot to do a factory reset on
:returns: The result code
"""
LOG.info("CA_FactoryReset: Factory Reset. slot=%s", slot)
ret = CA_FactoryReset(CK_SLOT_ID(slot), CK_ULONG(0))
return ret
ca_factory_reset_ex = make_error_handle_function(ca_factory_reset)
[docs]def c_set_pin(h_session, old_pass, new_pass):
"""Allows a user to change their PIN
:param int h_session: Session handle
:param old_pass: The user's old password
:param new_pass: The user's desired new password
:returns: The result code
"""
LOG.info("C_SetPIN: Changing password. "
"old_pass=%s, new_pass=%s", old_pass, new_pass)
old_pass = AutoCArray(data=old_pass)
new_pass = AutoCArray(data=new_pass)
ret = C_SetPIN(h_session,
old_pass.array, old_pass.size.contents,
new_pass.array, new_pass.size.contents)
return ret
c_set_pin_ex = make_error_handle_function(c_set_pin)
[docs]def c_close_all_sessions(slot):
"""Closes all the sessions on a given slot
:param slot: The slot to close all sessions on
:returns: retcode
:rtype: int
"""
LOG.info("C_CloseAllSessions: Closing all sessions. slot=%s", slot)
ret = C_CloseAllSessions(CK_ULONG(slot))
return ret
c_close_all_sessions_ex = make_error_handle_function(c_close_all_sessions)
[docs]def ca_openapplicationID(slot, id_high, id_low):
"""Open an application ID on the given slot.
:param int slot: Slot on which to open the APP ID
:param int id_high: High value of App ID
:param int id_low: Low value of App ID
:return: retcode
:rtype: int
"""
uid_high = CK_ULONG(id_high)
uid_low = CK_ULONG(id_low)
LOG.info("CA_OpenApplicationID: Attempting to open App ID %s:%s", id_high, id_low)
ret = CA_OpenApplicationID(CK_ULONG(slot), uid_high, uid_low)
LOG.info("CA_OpenApplicationID: Ret Value: %s", ret)
return ret
ca_openapplicationID_ex = make_error_handle_function(ca_openapplicationID)
[docs]def ca_closeapplicationID(slot, id_high, id_low):
"""Close a given AppID on a slot.
:param int slot: Slot on which to close the APP ID
:param int id_high: High value of App ID
:param int id_low: Low value of App ID
:return: retcode
:rtype: int
"""
uid_high = CK_ULONG(id_high)
uid_low = CK_ULONG(id_low)
LOG.info("CA_CloseApplicationID: Attempting to close App ID %s:%s", id_high, id_low)
ret = CA_CloseApplicationID(CK_ULONG(slot), uid_high, uid_low)
LOG.info("CA_CloseApplicationID: Ret Value: %s", ret)
return ret
ca_closeapplicationID_ex = make_error_handle_function(ca_closeapplicationID)
[docs]def ca_setapplicationID(id_high, id_low):
"""Set the App ID for the current process.
:param int id_high: High value of App ID
:param int id_low: Low value of App ID
:return: retcode
:rtype: int
"""
uid_high = CK_ULONG(id_high)
uid_low = CK_ULONG(id_low)
LOG.info("CA_SetApplicationID: Attempting to set App ID %s:%s", id_high, id_low)
ret = CA_SetApplicationID(uid_high, uid_low)
LOG.info("CA_SetApplicationID: Ret Value: %s", ret)
return ret
ca_setapplicationID_ex = make_error_handle_function(ca_setapplicationID)
[docs]def ca_restart(slot):
"""
:param slot:
"""
LOG.info("CA_Restart: attempting to restart")
ret = CA_Restart(CK_ULONG(slot))
LOG.info("CA_Restart: Ret Value: %s", ret)
return ret
ca_restart_ex = make_error_handle_function(ca_restart)
[docs]def get_firmware_version(slot):
"""
Returns a string representing the firmware version of the given slot.
It will first try to call ``CA_GetFirmwareVersion``, and if that fails (not present on older
cryptoki libraries), will call ``C_GetTokenInfo``.
:param int slot: Token slot number
:return: Firmware String in the format "X.Y.Z", where X is major, Y is minor, Z is subminor.
:rtype: str
"""
# Note, CA_GetFirmwareVersion should be available from 6.3+.
try:
ul_major, ul_minor, ul_subminor = c_ulong(), c_ulong(), c_ulong()
ret = CA_GetFirmwareVersion(slot, byref(ul_major), byref(ul_minor), byref(ul_subminor))
if ret != 0:
LOG.warning("Failed retrieving Firmware information from slot '%s'", slot)
raise LunaCallException(ret, "CA_GetFirmwareVersion", (0,))
else:
major = ul_major.value
minor = ul_minor.value
subminor = ul_subminor.value
except AttributeError:
raw_firmware = c_get_token_info_ex(slot)['firmwareVersion']
major = raw_firmware.major
minor = raw_firmware.minor / 10
subminor = raw_firmware.minor % 10
return "{}.{}.{}".format(major, minor, subminor)