Skip to content

Commit

Permalink
security: Add Windows CNG support
Browse files Browse the repository at this point in the history
  • Loading branch information
zjkmxy committed Jul 5, 2021
1 parent be15a06 commit 11c78f8
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/ndn/client_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from .transport.stream_socket import Face, UnixFace, TcpFace
if sys.platform == 'darwin':
from .security.tpm.tpm_osx_keychain import TpmOsxKeychain
if sys.platform == 'win32':
from .security.tpm.tpm_cng import TpmCng


def read_client_conf():
Expand Down Expand Up @@ -91,6 +93,8 @@ def default_keychain(pib: str, tpm: str) -> Keychain:
tpm = TpmFile(tpm_loc)
elif tpm_schema == 'tpm-osxkeychain':
tpm = TpmOsxKeychain()
elif tpm_schema == 'tpm-cng':
tpm = TpmCng()
else:
raise ValueError(f'Unrecognized tpm schema: {tpm}')
if pib_schema == 'pib-sqlite3':
Expand Down
53 changes: 51 additions & 2 deletions src/ndn/platform/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ class SockaddrUn(c.Structure):
class Cng:
__instance = None

STATUS_UNSUCCESSFUL = c.c_long(0xC0000001).value # -0x3FFFFFFF
NTE_BAD_KEYSET = c.c_long(0x80090016).value
NCRYPT_OVERWRITE_KEY_FLAG = c.c_long(0x00000080)
BCRYPT_SHA256_ALGORITHM = c.c_wchar_p('SHA256')
BCRYPT_ECDSA_P256_ALGORITHM = c.c_wchar_p('ECDSA_P256')
BCRYPT_ECDSA_P384_ALGORITHM = c.c_wchar_p('ECDSA_P384')
BCRYPT_ECDSA_P521_ALGORITHM = c.c_wchar_p('ECDSA_P521')
NCRYPT_ECDSA_P256_ALGORITHM = BCRYPT_ECDSA_P256_ALGORITHM
NCRYPT_ECDSA_P384_ALGORITHM = BCRYPT_ECDSA_P384_ALGORITHM
NCRYPT_ECDSA_P521_ALGORITHM = BCRYPT_ECDSA_P521_ALGORITHM
BCRYPT_OBJECT_LENGTH = c.c_wchar_p('ObjectLength')
BCRYPT_HASH_LENGTH = c.c_wchar_p('HashDigestLength')
MS_PLATFORM_KEY_STORAGE_PROVIDER = c.c_wchar_p('Microsoft Platform Crypto Provider')
MS_KEY_STORAGE_PROVIDER = c.c_wchar_p('Microsoft Software Key Storage Provider')
BCRYPT_ECCPUBLIC_BLOB = c.c_wchar_p('ECCPUBLICBLOB')
BCRYPT_ECCPRIVATE_BLOB = c.c_wchar_p('ECCPRIVATEBLOB')

class BcryptEcckeyBlob(c.Structure):
_fields_ = [("dw_magic", c.c_ulong), ("cb_key", c.c_ulong)]

@staticmethod
def nt_success(status):
return status >= 0

def __new__(cls):
if Cng.__instance is None:
Cng.__instance = object.__new__(cls)
Expand All @@ -46,8 +70,6 @@ def __init__(self):
self.bcrypt = c.windll.bcrypt
self.ncrypt = c.windll.ncrypt

# TODO: Finish Windows 10 CNG


class Win32(Platform):
def client_conf_paths(self):
Expand Down Expand Up @@ -140,3 +162,30 @@ async def open_unix_connection(self, path=None):
transport, _ = await Win32._create_unix_connection(loop, lambda: protocol, path)
writer = aio.StreamWriter(transport, protocol, reader, loop)
return reader, writer


class ReleaseGuard:
def __init__(self):
self.__dict__['_list'] = []

def __getattr__(self, idx):
return self._list[idx]

def __setattr__(self, idx, value):
self._list[idx] = value

def __iadd__(self, defer):
self._list.append(defer)
return self

def __enter__(self):
if len(self._list) > 0:
raise RuntimeError('Re-enter a ReleaseGuard')
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._list.reverse()
for f in self._list:
if f:
f()
return False
236 changes: 235 additions & 1 deletion src/ndn/security/tpm/tpm_cng.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,239 @@
# limitations under the License.
# -----------------------------------------------------------------------------
import sys
import ctypes as c
import logging
from typing import Tuple
from Cryptodome.PublicKey import ECC, RSA
from Cryptodome.Util.asn1 import DerSequence
from ndn.encoding import FormalName, BinaryStr

from ...encoding import Signer, NonStrictName, SignatureType, KeyLocator, Name
from .tpm import Tpm
if sys.platform == 'win32':
from ...platform.windows import Cng
from ...platform.windows import Cng, ReleaseGuard


class CngSigner(Signer):
def __init__(self, key_name: NonStrictName, sig_len, key_type, h_key):
self.h_key = h_key
self.key_name = key_name
if key_type == "RSA":
self.key_type = SignatureType.SHA256_WITH_RSA
self.sig_len = sig_len
elif key_type == "ECDSA":
self.key_type = SignatureType.SHA256_WITH_ECDSA
self.sig_len = sig_len + 8
else:
raise ValueError(f'Unrecognized key type {key_type}')

def write_signature_info(self, signature_info):
signature_info.signature_type = self.key_type
signature_info.key_locator = KeyLocator()
signature_info.key_locator.name = self.key_name

def get_signature_value_size(self):
return self.sig_len

def __del__(self):
Cng().ncrypt.NCryptFreeObject(self.h_key)

@staticmethod
def _hash_contents(contents):
cng = Cng()
with ReleaseGuard() as defer:
h_hash_alg = c.c_void_p()
status = cng.bcrypt.BCryptOpenAlgorithmProvider(c.pointer(h_hash_alg), cng.BCRYPT_SHA256_ALGORITHM, 0, 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by BCryptOpenAlgorithmProvider')
defer += lambda: cng.bcrypt.BCryptCloseAlgorithmProvider(h_hash_alg, 0)

cb_hash_obj = c.c_ulong(0)
cb_data = c.c_ulong(0)
status = cng.bcrypt.BCryptGetProperty(h_hash_alg, cng.BCRYPT_OBJECT_LENGTH, c.pointer(cb_hash_obj), 4,
c.pointer(cb_data), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by BCryptGetProperty')
hash_obj = (c.c_byte * cb_hash_obj.value)()

cb_hash = c.c_ulong(0)
status = cng.bcrypt.BCryptGetProperty(h_hash_alg, cng.BCRYPT_HASH_LENGTH, c.pointer(cb_hash), 4,
c.pointer(cb_data), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by BCryptGetProperty')
hash_val = (c.c_byte * cb_hash.value)()

h_hash = c.c_void_p()
status = cng.bcrypt.BCryptCreateHash(h_hash_alg, c.pointer(h_hash), hash_obj, cb_hash_obj,
0, 0, 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by BCryptCreateHash')
defer += lambda: cng.bcrypt.BCryptDestroyHash(h_hash)

for blk in contents:
status = cng.bcrypt.BCryptHashData(h_hash, c.c_char_p(bytes(blk)), len(blk), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by BCryptHashData')

status = cng.bcrypt.BCryptFinishHash(h_hash, hash_val, cb_hash, 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by BCryptFinishHash')

return hash_val, cb_hash

def write_signature_value(self, wire, contents) -> int:
cng = Cng()
hash_val, cb_hash = self._hash_contents(contents)

cb_signature = c.c_ulong()
status = cng.ncrypt.NCryptSignHash(self.h_key, 0, hash_val, cb_hash, 0, 0,
c.pointer(cb_signature), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptSignHash')

signature = (c.c_ubyte * cb_signature.value)()
status = cng.ncrypt.NCryptSignHash(self.h_key, 0, hash_val, cb_hash, signature, cb_signature,
c.pointer(cb_signature), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptSignHash')

if self.key_type == SignatureType.SHA256_WITH_ECDSA:
der = DerSequence((int.from_bytes(signature[:cb_signature.value//2], 'big'),
int.from_bytes(signature[cb_signature.value//2:], 'big'))).encode()
else:
der = bytes(signature)

real_len = len(der)
wire[:real_len] = der
return real_len


class TpmCng(Tpm):
def __init__(self):
cng = Cng()
self.h_prov = c.c_void_p()
# Try TPM2.0 platform key storage
status = cng.ncrypt.NCryptOpenStorageProvider(c.pointer(self.h_prov), cng.MS_PLATFORM_KEY_STORAGE_PROVIDER, 0)
if not cng.nt_success(status):
# If failed, try software key storage
status = cng.ncrypt.NCryptOpenStorageProvider(c.pointer(self.h_prov), cng.MS_KEY_STORAGE_PROVIDER, 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptOpenStorageProvider', status)

def __del__(self):
Cng().ncrypt.NCryptFreeObject(self.h_prov)

@staticmethod
def _convert_key_format(key_bits, key_type: str):
if key_type == 'rsa':
raise NotImplementedError('RSA on CNG is not implemented yet')
elif key_type == 'ec':
cng = Cng()
pubkey_stru = c.cast(key_bits, c.POINTER(cng.BcryptEcckeyBlob))[0]
base_idx = c.sizeof(cng.BcryptEcckeyBlob)
key_x = int.from_bytes(key_bits[base_idx:base_idx + pubkey_stru.cb_key], 'big')
key_y = int.from_bytes(key_bits[base_idx + pubkey_stru.cb_key:], 'big')
return ECC.construct(curve='P-256', point_x=key_x, point_y=key_y).export_key(format='DER')
else:
raise ValueError(f'Unsupported key type {key_type}')

def _get_key(self, key_label: str):
cng = Cng()
h_key = c.c_void_p()
status = cng.ncrypt.NCryptOpenKey(self.h_prov, c.pointer(h_key), c.c_wchar_p(key_label), 0, 0)
if not cng.nt_success(status):
if status == cng.NTE_BAD_KEYSET:
raise KeyError(f"Unable to find key with label {key_label}")
else:
raise OSError(f'Error {status} returned by NCryptOpenKey', status)

cb_property = c.c_ulong(4)
sig_len = c.c_ulong()
status = cng.ncrypt.NCryptGetProperty(h_key, c.c_wchar_p('SignatureLength'), c.pointer(sig_len), cb_property,
c.pointer(cb_property), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptGetProperty', status)

cb_property.value = 40
key_type = (c.c_wchar * 20)()
status = cng.ncrypt.NCryptGetProperty(h_key, c.c_wchar_p('AlgorithmName'), c.pointer(key_type), cb_property,
c.pointer(cb_property), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptGetProperty', status)

return key_type.value, sig_len.value, h_key

def get_signer(self, key_name: NonStrictName) -> Signer:
name_hash = Name.to_bytes(key_name).hex()
key_type, sig_len, h_key = self._get_key(name_hash)
return CngSigner(key_name, sig_len, key_type, h_key)

def key_exist(self, key_name: FormalName) -> bool:
try:
name_hash = Name.to_bytes(key_name).hex()
self._get_key(name_hash)
return True
except KeyError:
return False

def delete_key(self, key_name: FormalName):
name_hash = Name.to_bytes(key_name).hex()
_, _, h_key = self._get_key(name_hash)
Cng().ncrypt.NCryptDeleteKey(h_key, 0)

@staticmethod
def _convert_pub_key_format(key_bits: BinaryStr, key_type: str):
if key_type == 'rsa':
return RSA.import_key(key_bits).export_key(format='DER')
elif key_type == 'ec':
xp = int.from_bytes(key_bits[1:33], 'big')
yp = int.from_bytes(key_bits[33:], 'big')
return ECC.construct(curve='P-256', point_x=xp, point_y=yp).export_key(format='DER')
else:
raise ValueError(f'Unsupported key type {key_type}')

def generate_key(self, id_name: FormalName, key_type: str = 'rsa', **kwargs) -> Tuple[FormalName, BinaryStr]:
cng = Cng()
with ReleaseGuard() as defer:
logging.debug('Generating CNG Key %s' % key_type)
key_name = self.construct_key_name(id_name, b'', key_id_type='random')
name_hash = Name.to_bytes(key_name).hex()

if key_type == 'ec':
algo = cng.NCRYPT_ECDSA_P256_ALGORITHM
elif key_type == 'rsa':
raise NotImplementedError('RSA on CNG is not implemented yet')
else:
raise ValueError(f'Unsupported key type {key_type}')

h_key = c.c_void_p()
status = cng.ncrypt.NCryptCreatePersistedKey(self.h_prov, c.pointer(h_key), algo,
c.c_wchar_p(name_hash), 0,
cng.NCRYPT_OVERWRITE_KEY_FLAG)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptCreatePersistedKey', status)
defer += lambda: cng.ncrypt.NCryptFreeObject(h_key)

status = cng.ncrypt.NCryptFinalizeKey(h_key, 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptFinalizeKey', status)

if key_type == 'ec':
prop = cng.BCRYPT_ECCPUBLIC_BLOB
elif key_type == 'rsa':
raise NotImplementedError('RSA on CNG is not implemented yet')
else:
raise ValueError(f'Unsupported key type {key_type}')

cb_pubkey = c.c_ulong()
status = cng.ncrypt.NCryptExportKey(h_key, 0, prop, 0, 0, 0, c.pointer(cb_pubkey), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptExportKey', status)

pubkey_blob = (c.c_ubyte * cb_pubkey.value)()
status = cng.ncrypt.NCryptExportKey(h_key, 0, prop, 0, pubkey_blob, cb_pubkey, c.pointer(cb_pubkey), 0)
if not cng.nt_success(status):
raise OSError(f'Error {status} returned by NCryptExportKey', status)

pub_key = self._convert_key_format(bytes(pubkey_blob), key_type)
return key_name, bytes(pub_key)
1 change: 1 addition & 0 deletions src/ndn/security/tpm/tpm_osx_keychain.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class TpmOsxKeychain(Tpm):
def _get_key(key_name: NonStrictName):
sec = OsxSec()
with ReleaseGuard() as g:
# TODO: what about name convension?
logging.debug('Get OSX Key %s' % Name.to_str(key_name))
g.key_label = CFSTR(Name.to_str(key_name))
g.query = ObjCInstance(cf.CFDictionaryCreateMutable(None, 6, cf.kCFTypeDictionaryKeyCallBacks, None))
Expand Down

0 comments on commit 11c78f8

Please sign in to comment.