Skip to content

Commit

Permalink
Added option to disable padding to BlockFeeder.
Browse files Browse the repository at this point in the history
  • Loading branch information
ricmoo committed May 25, 2016
1 parent dc0c9ac commit 5518ae8
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 21 deletions.
1 change: 1 addition & 0 deletions pyaes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@

from .aes import AES, AESModeOfOperationCTR, AESModeOfOperationCBC, AESModeOfOperationCFB, AESModeOfOperationECB, AESModeOfOperationOFB, AESModesOfOperation, Counter
from .blockfeeder import decrypt_stream, Decrypter, encrypt_stream, Encrypter
from .blockfeeder import PADDING_NONE, PADDING_DEFAULT
77 changes: 57 additions & 20 deletions pyaes/blockfeeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@
# - Given a size, determine how many bytes could be consumed in
# a single call to either the decrypt or encrypt method
#
# _final_encrypt(data)
# _final_encrypt(data, padding = PADDING_DEFAULT)
# - call and return encrypt on this (last) chunk of data,
# padding as necessary; this will always be at least 16
# bytes unless the total incoming input was less than 16
# bytes
#
# _final_decrypt(data)
# _final_decrypt(data, padding = PADDING_DEFAULT)
# - same as _final_encrypt except for decrypt, for
# stripping off padding
#

PADDING_NONE = 'none'
PADDING_DEFAULT = 'default'

# @TODO: Ciphertext stealing and explicit PKCS#7
# PADDING_CIPHERTEXT_STEALING
# PADDING_PKCS7

# ECB and CBC are block-only ciphers

Expand All @@ -50,14 +56,32 @@ def _block_can_consume(self, size):
return 0

# After padding, we may have more than one block
def _block_final_encrypt(self, data):
data = append_PKCS7_padding(data)
def _block_final_encrypt(self, data, padding = PADDING_DEFAULT):
if padding == PADDING_DEFAULT:
data = append_PKCS7_padding(data)

elif padding == PADDING_NONE:
if len(data) != 16:
raise Exception('invalid data length for final block')
else:
raise Exception('invalid padding option')

if len(data) == 32:
return self.encrypt(data[:16]) + self.encrypt(data[16:])

return self.encrypt(data)

def _block_final_decrypt(self, data):
return strip_PKCS7_padding(self.decrypt(data))

def _block_final_decrypt(self, data, padding = PADDING_DEFAULT):
if padding == PADDING_DEFAULT:
return strip_PKCS7_padding(self.decrypt(data))

if padding == PADDING_NONE:
if len(data) != 16:
raise Exception('invalid data length for final block')
return self.decrypt(data)

raise Exception('invalid padding option')

AESBlockModeOfOperation._can_consume = _block_can_consume
AESBlockModeOfOperation._final_encrypt = _block_final_encrypt
Expand All @@ -71,13 +95,19 @@ def _segment_can_consume(self, size):
return self.segment_bytes * int(size // self.segment_bytes)

# CFB can handle a non-segment-sized block at the end using the remaining cipherblock
def _segment_final_encrypt(self, data):
def _segment_final_encrypt(self, data, padding = PADDING_DEFAULT):
if padding != PADDING_DEFAULT:
raise Exception('invalid padding option')

faux_padding = (chr(0) * (self.segment_bytes - (len(data) % self.segment_bytes)))
padded = data + to_bufferable(faux_padding)
return self.encrypt(padded)[:len(data)]

# CFB can handle a non-segment-sized block at the end using the remaining cipherblock
def _segment_final_decrypt(self, data):
def _segment_final_decrypt(self, data, padding = PADDING_DEFAULT):
if padding != PADDING_DEFAULT:
raise Exception('invalid padding option')

faux_padding = (chr(0) * (self.segment_bytes - (len(data) % self.segment_bytes)))
padded = data + to_bufferable(faux_padding)
return self.decrypt(padded)[:len(data)]
Expand All @@ -93,10 +123,16 @@ def _segment_final_decrypt(self, data):
def _stream_can_consume(self, size):
return size

def _stream_final_encrypt(self, data):
def _stream_final_encrypt(self, data, padding = PADDING_DEFAULT):
if padding not in [PADDING_NONE, PADDING_DEFAULT]:
raise Exception('invalid padding option')

return self.encrypt(data)

def _stream_final_decrypt(self, data):
def _stream_final_decrypt(self, data, padding = PADDING_DEFAULT):
if padding not in [PADDING_NONE, PADDING_DEFAULT]:
raise Exception('invalid padding option')

return self.decrypt(data)

AESStreamModeOfOperation._can_consume = _stream_can_consume
Expand All @@ -110,11 +146,12 @@ class BlockFeeder(object):
into the appropriate block size for the underlying mode of operation
and applying (or stripping) padding, as necessary.'''

def __init__(self, mode, feed, final):
def __init__(self, mode, feed, final, padding = PADDING_DEFAULT):
self._mode = mode
self._feed = feed
self._final = final
self._buffer = to_bufferable("")
self._padding = padding

def feed(self, data = None):
'''Provide bytes to encrypt (or decrypt), returning any bytes
Expand All @@ -129,7 +166,7 @@ def feed(self, data = None):

# Finalize; process the spare bytes we were keeping
if not data:
result = self._final(self._buffer)
result = self._final(self._buffer, self._padding)
self._buffer = None
return result

Expand All @@ -149,15 +186,15 @@ def feed(self, data = None):
class Encrypter(BlockFeeder):
'Accepts bytes of plaintext and returns encrypted ciphertext.'

def __init__(self, mode):
BlockFeeder.__init__(self, mode, mode.encrypt, mode._final_encrypt)
def __init__(self, mode, padding = PADDING_DEFAULT):
BlockFeeder.__init__(self, mode, mode.encrypt, mode._final_encrypt, padding)


class Decrypter(BlockFeeder):
'Accepts bytes of ciphertext and returns decrypted plaintext.'

def __init__(self, mode):
BlockFeeder.__init__(self, mode, mode.decrypt, mode._final_decrypt)
def __init__(self, mode, padding = PADDING_DEFAULT):
BlockFeeder.__init__(self, mode, mode.decrypt, mode._final_decrypt, padding)


# 8kb blocks
Expand All @@ -176,15 +213,15 @@ def _feed_stream(feeder, in_stream, out_stream, block_size = BLOCK_SIZE):
out_stream.write(converted)


def encrypt_stream(mode, in_stream, out_stream, block_size = BLOCK_SIZE):
def encrypt_stream(mode, in_stream, out_stream, block_size = BLOCK_SIZE, padding = PADDING_DEFAULT):
'Encrypts a stream of bytes from in_stream to out_stream using mode.'

encrypter = Encrypter(mode)
_feed_stream(encrypter, in_stream, out_stream, block_size)
_feed_stream(encrypter, in_stream, out_stream, block_size, padding)


def decrypt_stream(mode, in_stream, out_stream, block_size = BLOCK_SIZE):
def decrypt_stream(mode, in_stream, out_stream, block_size = BLOCK_SIZE, padding = PADDING_DEFAULT):
'Decrypts a stream of bytes from in_stream to out_stream using mode.'

decrypter = Decrypter(mode)
_feed_stream(decrypter, in_stream, out_stream, block_size)
_feed_stream(decrypter, in_stream, out_stream, block_size, padding)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
for API reference and details.'''

setup(name = 'pyaes',
version = '1.3.0',
version = '1.5.0',
description = 'Pure-Python Implementation of the AES block-cipher and common modes of operation',
long_description = LONG_DESCRIPTION,
author = 'Richard Moore',
Expand Down
42 changes: 42 additions & 0 deletions tests/test-blockfeeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,45 @@
passed = decrypted == plaintext
cipher_length = len(ciphertext)
print(" cipher-length=%(cipher_length)s passed=%(passed)s" % locals())

# Test block modes of operation with no padding
plaintext = os.urandom(1024)

for mode_name in ['ecb', 'cbc']:
mode = pyaes.AESModesOfOperation[mode_name]
print(mode.name + ' (no padding)')

kw = dict(key = key)
if mode_name == 'cbc':
kw['iv'] = os.urandom(16)

encrypter = Encrypter(mode(**kw), padding = pyaes.PADDING_NONE)
ciphertext = to_bufferable('')

# Feed the encrypter random number of bytes at a time
index = 0
while index < len(plaintext):
length = random.randint(1, 128)
if index + length > len(plaintext): length = len(plaintext) - index
ciphertext += encrypter.feed(plaintext[index: index + length])
index += length
ciphertext += encrypter.feed(None)

if len(ciphertext) != len(plaintext):
print(' failed to encrypt with correct padding')

decrypter = Decrypter(mode(**kw), padding = pyaes.PADDING_NONE)
decrypted = to_bufferable('')

# Feed the decrypter random number of bytes at a time
index = 0
while index < len(ciphertext):
length = random.randint(1, 128)
if index + length > len(ciphertext): length = len(ciphertext) - index
decrypted += decrypter.feed(ciphertext[index: index + length])
index += length
decrypted += decrypter.feed(None)

passed = decrypted == plaintext
cipher_length = len(ciphertext)
print(" cipher-length=%(cipher_length)s passed=%(passed)s" % locals())

0 comments on commit 5518ae8

Please sign in to comment.