#!/usr/bin/python
+import getpass as _getpass
import hashlib as _hashlib
+import math as _math
import re as _re
import subprocess as _subprocess
import struct as _struct
+import Crypto.Cipher.AES as _crypto_cipher_aes
+import Crypto.Cipher.Blowfish as _crypto_cipher_blowfish
+import Crypto.Cipher.CAST as _crypto_cipher_cast
+import Crypto.Cipher.DES3 as _crypto_cipher_des3
+
def _get_stdout(args, stdin=None):
stdin_pipe = None
'aes with 192-bit key': 128,
'aes with 256-bit key': 128,
'cast5': 64,
+ 'twofish': 128,
+ }
+
+ _crypto_module = {
+ 'aes with 128-bit key': _crypto_cipher_aes,
+ 'aes with 192-bit key': _crypto_cipher_aes,
+ 'aes with 256-bit key': _crypto_cipher_aes,
+ 'blowfish': _crypto_cipher_blowfish,
+ 'cast5': _crypto_cipher_cast,
+ 'tripledes': _crypto_cipher_des3,
+ }
+
+ _key_size = { # in bits
+ 'aes with 128-bit key': 128,
+ 'aes with 192-bit key': 192,
+ 'aes with 256-bit key': 256,
+ 'cast5': 128,
}
_compression_algorithms = {
110: 'private',
}
+ _hashlib_name = { # map OpenPGP-based names to hashlib names
+ 'md5': 'md5',
+ 'sha-1': 'sha1',
+ 'ripe-md/160': 'ripemd160',
+ 'sha256': 'sha256',
+ 'sha384': 'sha384',
+ 'sha512': 'sha512',
+ 'sha224': 'sha224',
+ }
+
_string_to_key_types = {
0: 'simple',
1: 'salted',
110: 'private',
}
+ _string_to_key_expbias = 6
+
_signature_types = {
0x00: 'binary document',
0x01: 'canonical text document',
_clean_type_regex = _re.compile('\W+')
+ def __init__(self, key=None):
+ super(PGPPacket, self).__init__()
+ self.key = key
+
def _clean_type(self, type=None):
if type is None:
type = self['type']
return self._clean_type_regex.sub('_', type)
+ @staticmethod
+ def _reverse(dict, value):
+ """Reverse lookups in dictionaries
+
+ >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet')
+ 6
+ """
+ return [k for k,v in dict.items() if v == value][0]
+
def __str__(self):
method_name = '_str_{}'.format(self._clean_type())
method = getattr(self, method_name, None)
def _str_public_subkey_packet(self):
return self._str_generic_key_packet()
+ def _str_generic_key_packet(self):
+ return self['fingerprint'][-8:].upper()
+
def _str_secret_key_packet(self):
- return self._str_generic_key_packet()
+ return self._str_generic_secret_key_packet()
def _str_secret_subkey_packet(self):
- return self._str_generic_key_packet()
+ return self._str_generic_secret_key_packet()
- def _str_generic_key_packet(self):
- return self['fingerprint'][-8:].upper()
+ def _str_generic_secret_key_packet(self):
+ lines = [self._str_generic_key_packet()]
+ for label, key in [
+ ('symmetric encryption',
+ 'symmetric-encryption-algorithm'),
+ ('s2k hash', 'string-to-key-hash-algorithm'),
+ ('s2k count', 'string-to-key-count'),
+ ('s2k salt', 'string-to-key-salt'),
+ ('IV', 'initial-vector'),
+ ]:
+ if key in self:
+ value = self[key]
+ if isinstance(value, bytes):
+ value = ' '.join('{:02x}'.format(byte) for byte in value)
+ lines.append(' {}: {}'.format(label, value))
+ return '\n'.join(lines)
def _str_signature_packet(self):
lines = [self['signature-type']]
+ if self['hashed-subpackets']:
+ lines.append(' hashed subpackets:')
+ lines.extend(self._str_signature_subpackets(
+ self['hashed-subpackets'], prefix=' '))
if self['unhashed-subpackets']:
lines.append(' unhashed subpackets:')
- for subpacket in self['unhashed-subpackets']:
- method_name = '_str_{}_signature_subpacket'.format(
- self._clean_type(type=subpacket['type']))
- method = getattr(self, method_name, None)
- if method:
- lines.append(' {}: {}'.format(
- subpacket['type'],
- method(subpacket=subpacket)))
- else:
- lines.append(' {}'.format(subpacket['type']))
+ lines.extend(self._str_signature_subpackets(
+ self['unhashed-subpackets'], prefix=' '))
return '\n'.join(lines)
+ def _str_signature_subpackets(self, subpackets, prefix):
+ lines = []
+ for subpacket in subpackets:
+ method_name = '_str_{}_signature_subpacket'.format(
+ self._clean_type(type=subpacket['type']))
+ method = getattr(self, method_name, None)
+ if method:
+ lines.append(' {}: {}'.format(
+ subpacket['type'],
+ method(subpacket=subpacket)))
+ else:
+ lines.append(' {}'.format(subpacket['type']))
+ return lines
+
+ def _str_signature_creation_time_signature_subpacket(self, subpacket):
+ return str(subpacket['signature-creation-time'])
+
def _str_issuer_signature_subpacket(self, subpacket):
return subpacket['issuer'][-8:].upper()
+ def _str_key_expiration_time_signature_subpacket(self, subpacket):
+ return str(subpacket['key-expiration-time'])
+
+ def _str_preferred_symmetric_algorithms_signature_subpacket(
+ self, subpacket):
+ return ', '.join(
+ algo for algo in subpacket['preferred-symmetric-algorithms'])
+
+ def _str_preferred_hash_algorithms_signature_subpacket(
+ self, subpacket):
+ return ', '.join(
+ algo for algo in subpacket['preferred-hash-algorithms'])
+
+ def _str_preferred_compression_algorithms_signature_subpacket(
+ self, subpacket):
+ return ', '.join(
+ algo for algo in subpacket['preferred-compression-algorithms'])
+
+ def _str_key_server_preferences_signature_subpacket(self, subpacket):
+ return ', '.join(
+ x for x in sorted(subpacket['key-server-preferences']))
+
+ def _str_primary_user_id_signature_subpacket(self, subpacket):
+ return str(subpacket['primary-user-id'])
+
+ def _str_key_flags_signature_subpacket(self, subpacket):
+ return ', '.join(x for x in sorted(subpacket['key-flags']))
+
+ def _str_features_signature_subpacket(self, subpacket):
+ return ', '.join(x for x in sorted(subpacket['features']))
+
def _str_embedded_signature_signature_subpacket(self, subpacket):
return subpacket['embedded']['signature-type']
offset += length
return (offset, value)
+ @classmethod
+ def _decode_string_to_key_count(cls, data):
+ r"""Decode RFC 4880's string-to-key count
+
+ >>> PGPPacket._decode_string_to_key_count(b'\x97'[0])
+ 753664
+ """
+ return (16 + (data & 15)) << ((data >> 4) + cls._string_to_key_expbias)
+
def _parse_string_to_key_specifier(self, data):
self['string-to-key-type'] = self._string_to_key_types[data[0]]
offset = 1
offset += 1
self['string-to-key-salt'] = data[offset: offset + 8]
offset += 8
- self['string-to-key-coded-count'] = data[offset]
+ self['string-to-key-count'] = self._decode_string_to_key_count(
+ data=data[offset])
offset += 1
else:
raise NotImplementedError(
self['symmetric-encryption-algorithm']))
self['initial-vector'] = data[offset: offset + block_size]
offset += block_size
+ ciphertext = data[offset:]
+ offset += len(ciphertext)
+ decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext)
+ else:
+ decrypted_data = data[offset:key_end]
if string_to_key_usage in [0, 255]:
key_end = -2
+ elif string_to_key_usage == 254:
+ key_end = -20
else:
key_end = 0
- self['secret-key'] = data[offset:key_end]
+ secret_key = decrypted_data[:key_end]
if key_end:
- self['secret-key-checksum'] = data[key_end:]
+ secret_key_checksum = decrypted_data[key_end:]
+ if key_end == -2:
+ calculated_checksum = sum(secret_key) % 65536
+ else:
+ checksum_hash = _hashlib.sha1()
+ checksum_hash.update(secret_key)
+ calculated_checksum = checksum_hash.digest()
+ if secret_key_checksum != calculated_checksum:
+ raise ValueError(
+ 'corrupt secret key (checksum {} != expected {})'.format(
+ secret_key_checksum, calculated_checksum))
+ self['secret-key'] = secret_key
def _parse_signature_subpackets(self, data):
offset = 0
offset += 1
hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
offset += 2
- self['hashed-subpackets'] = data[offset: offset + hashed_count]
+ self['hashed-subpackets'] = list(self._parse_signature_subpackets(
+ data[offset: offset + hashed_count]))
offset += hashed_count
unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
offset += 2
offset += 2
self['signature'] = data[offset:]
+ def _parse_signature_creation_time_signature_subpacket(
+ self, data, subpacket):
+ subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
+
def _parse_issuer_signature_subpacket(self, data, subpacket):
subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
+ def _parse_key_expiration_time_signature_subpacket(
+ self, data, subpacket):
+ subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
+
+ def _parse_preferred_symmetric_algorithms_signature_subpacket(
+ self, data, subpacket):
+ subpacket['preferred-symmetric-algorithms'] = [
+ self._symmetric_key_algorithms[d] for d in data]
+
+ def _parse_preferred_hash_algorithms_signature_subpacket(
+ self, data, subpacket):
+ subpacket['preferred-hash-algorithms'] = [
+ self._hash_algorithms[d] for d in data]
+
+ def _parse_preferred_compression_algorithms_signature_subpacket(
+ self, data, subpacket):
+ subpacket['preferred-compression-algorithms'] = [
+ self._compression_algorithms[d] for d in data]
+
+ def _parse_key_server_preferences_signature_subpacket(
+ self, data, subpacket):
+ subpacket['key-server-preferences'] = set()
+ if data[0] & 0x80:
+ subpacket['key-server-preferences'].add('no-modify')
+
+ def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
+ subpacket['primary-user-id'] = bool(data[0])
+
+ def _parse_key_flags_signature_subpacket(self, data, subpacket):
+ subpacket['key-flags'] = set()
+ if data[0] & 0x1:
+ subpacket['key-flags'].add('can certify')
+ if data[0] & 0x2:
+ subpacket['key-flags'].add('can sign')
+ if data[0] & 0x4:
+ subpacket['key-flags'].add('can encrypt communications')
+ if data[0] & 0x8:
+ subpacket['key-flags'].add('can encrypt storage')
+ if data[0] & 0x10:
+ subpacket['key-flags'].add('private split')
+ if data[0] & 0x20:
+ subpacket['key-flags'].add('can authenticate')
+ if data[0] & 0x80:
+ subpacket['key-flags'].add('private shared')
+
+ def _parse_features_signature_subpacket(self, data, subpacket):
+ subpacket['features'] = set()
+ if data[0] & 0x1:
+ subpacket['features'].add('modification detection')
+
def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
- subpacket['embedded'] = PGPPacket()
+ subpacket['embedded'] = PGPPacket(key=self.key)
subpacket['embedded']._parse_signature_packet(data=data)
def _parse_user_id_packet(self, data):
self['user'] = str(data, 'utf-8')
def to_bytes(self):
- pass
+ method_name = '_serialize_{}'.format(self._clean_type())
+ method = getattr(self, method_name, None)
+ if not method:
+ raise NotImplementedError(
+ 'cannot serialize packet type {!r}'.format(self['type']))
+ body = method()
+ self['length'] = len(body)
+ return b''.join([
+ self._serialize_header(),
+ body,
+ ])
+
+ def _serialize_header(self):
+ always_one = 1
+ new_format = 0
+ type_code = self._reverse(self._packet_types, self['type'])
+ packet_tag = (
+ always_one * (1 << 7) |
+ new_format * (1 << 6) |
+ type_code * (1 << 2) |
+ self['length-type']
+ )
+ length_bytes, length_type = self._old_format_packet_length_type[
+ self['length-type']]
+ length_format = '>{}'.format(length_type)
+ length_data = _struct.pack(length_format, self['length'])
+ return b''.join([
+ bytes([packet_tag]),
+ length_data,
+ ])
+
+ @staticmethod
+ def _serialize_multiprecision_integer(integer):
+ r"""Serialize RFC 4880's multipricision integers
+
+ >>> PGPPacket._serialize_multiprecision_integer(1)
+ b'\x00\x01\x01'
+ >>> PGPPacket._serialize_multiprecision_integer(511)
+ b'\x00\t\x01\xff'
+ """
+ bit_length = int(_math.log(integer, 2)) + 1
+ chunks = [
+ _struct.pack('>H', bit_length),
+ ]
+ while integer > 0:
+ chunks.insert(1, bytes([integer & 0xff]))
+ integer = integer >> 8
+ return b''.join(chunks)
+
+ @classmethod
+ def _encode_string_to_key_count(cls, count):
+ r"""Encode RFC 4880's string-to-key count
+
+ >>> PGPPacket._encode_string_to_key_count(753664)
+ b'\x97'
+ """
+ coded_count = 0
+ count = count >> cls._string_to_key_expbias
+ while not count & 1:
+ count = count >> 1
+ coded_count += 1 << 4
+ coded_count += count & 15
+ return bytes([coded_count])
+
+ def _serialize_string_to_key_specifier(self):
+ string_to_key_type = bytes([
+ self._reverse(
+ self._string_to_key_types, self['string-to-key-type']),
+ ])
+ chunks = [string_to_key_type]
+ if self['string-to-key-type'] == 'simple':
+ chunks.append(bytes([self._reverse(
+ self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
+ elif self['string-to-key-type'] == 'salted':
+ chunks.append(bytes([self._reverse(
+ self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
+ chunks.append(self['string-to-key-salt'])
+ elif self['string-to-key-type'] == 'iterated and salted':
+ chunks.append(bytes([self._reverse(
+ self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
+ chunks.append(self['string-to-key-salt'])
+ chunks.append(self._encode_string_to_key_count(
+ count=self['string-to-key-count']))
+ else:
+ raise NotImplementedError(
+ 'string-to-key type {}'.format(self['string-to-key-type']))
+ return offset
+ return b''.join(chunks)
+
+ def _serialize_public_key_packet(self):
+ return self._serialize_generic_public_key_packet()
+
+ def _serialize_public_subkey_packet(self):
+ return self._serialize_generic_public_key_packet()
+
+ def _serialize_generic_public_key_packet(self):
+ key_version = bytes([self['key-version']])
+ chunks = [key_version]
+ if self['key-version'] != 4:
+ raise NotImplementedError(
+ 'public (sub)key packet version {}'.format(
+ self['key-version']))
+ chunks.append(_struct.pack('>I', self['creation-time']))
+ chunks.append(bytes([self._reverse(
+ self._public_key_algorithms, self['public-key-algorithm'])]))
+ if self['public-key-algorithm'].startswith('rsa '):
+ chunks.append(self._serialize_multiprecision_integer(
+ self['public-modulus']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['public-exponent']))
+ elif self['public-key-algorithm'].startswith('dsa '):
+ chunks.append(self._serialize_multiprecision_integer(
+ self['prime']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['group-order']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['group-generator']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['public-key']))
+ elif self['public-key-algorithm'].startswith('elgamal '):
+ chunks.append(self._serialize_multiprecision_integer(
+ self['prime']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['group-generator']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['public-key']))
+ else:
+ raise NotImplementedError(
+ 'algorithm-specific key fields for {}'.format(
+ self['public-key-algorithm']))
+ return b''.join(chunks)
+ def _string_to_key(self, string, key_size):
+ if key_size % 8:
+ raise ValueError(
+ '{}-bit key is not an integer number of bytes'.format(
+ key_size))
+ key_size_bytes = key_size // 8
+ hash_name = self._hashlib_name[
+ self['string-to-key-hash-algorithm']]
+ string_hash = _hashlib.new(hash_name)
+ hashes = _math.ceil(key_size_bytes / string_hash.digest_size)
+ key = b''
+ if self['string-to-key-type'] == 'simple':
+ update_bytes = string
+ elif self['string-to-key-type'] in [
+ 'salted',
+ 'iterated and salted',
+ ]:
+ update_bytes = self['string-to-key-salt'] + string
+ if self['string-to-key-type'] == 'iterated and salted':
+ count = self['string-to-key-count']
+ if count < len(update_bytes):
+ count = len(update_bytes)
+ else:
+ raise NotImplementedError(
+ 'key calculation for string-to-key type {}'.format(
+ self['string-to-key-type']))
+ for padding in range(hashes):
+ string_hash = _hashlib.new(hash_name)
+ string_hash.update(padding * b'\x00')
+ if self['string-to-key-type'] in [
+ 'simple',
+ 'salted',
+ ]:
+ string_hash.update(update_bytes)
+ elif self['string-to-key-type'] == 'iterated and salted':
+ remaining = count
+ while remaining > 0:
+ string_hash.update(update_bytes[:remaining])
+ remaining -= len(update_bytes)
+ key += string_hash.digest()
+ key = key[:key_size_bytes]
+ return key
-def packets_from_bytes(data):
- offset = 0
- while offset < len(data):
- packet = PGPPacket()
- offset += packet.from_bytes(data=data[offset:])
- yield packet
+ def decrypt_symmetric_encryption(self, data):
+ """Decrypt OpenPGP's Cipher Feedback mode"""
+ algorithm = self['symmetric-encryption-algorithm']
+ module = self._crypto_module[algorithm]
+ key_size = self._key_size[algorithm]
+ segment_size_bits = self._cipher_block_size[algorithm]
+ if segment_size_bits % 8:
+ raise NotImplementedError(
+ ('{}-bit segment size for {} is not an integer number of bytes'
+ ).format(segment_size_bits, algorithm))
+ segment_size_bytes = segment_size_bits // 8
+ padding = segment_size_bytes - len(data) % segment_size_bytes
+ if padding:
+ data += b'\x00' * padding
+ passphrase = _getpass.getpass(
+ 'passphrase for {}: '.format(self['fingerprint'][-8:]))
+ passphrase = passphrase.encode('ascii')
+ key = self._string_to_key(string=passphrase, key_size=key_size)
+ cipher = module.new(
+ key=key,
+ mode=module.MODE_CFB,
+ IV=self['initial-vector'],
+ segment_size=segment_size_bits)
+ plaintext = cipher.decrypt(data)
+ if padding:
+ plaintext = plaintext[:-padding]
+ return plaintext
class PGPKey (object):
+ """An OpenPGP key with public and private parts.
+
+ From RFC 4880 [1]:
+
+ OpenPGP users may transfer public keys. The essential elements
+ of a transferable public key are as follows:
+
+ - One Public-Key packet
+ - Zero or more revocation signatures
+ - One or more User ID packets
+ - After each User ID packet, zero or more Signature packets
+ (certifications)
+ - Zero or more User Attribute packets
+ - After each User Attribute packet, zero or more Signature
+ packets (certifications)
+ - Zero or more Subkey packets
+ - After each Subkey packet, one Signature packet, plus
+ optionally a revocation
+
+ Secret keys have a similar packet stream [2]:
+
+ OpenPGP users may transfer secret keys. The format of a
+ transferable secret key is the same as a transferable public key
+ except that secret-key and secret-subkey packets are used
+ instead of the public key and public-subkey packets.
+ Implementations SHOULD include self-signatures on any user IDs
+ and subkeys, as this allows for a complete public key to be
+ automatically extracted from the transferable secret key.
+ Implementations MAY choose to omit the self-signatures,
+ especially if a transferable public key accompanies the
+ transferable secret key.
+
+ [1]: http://tools.ietf.org/search/rfc4880#section-11.1
+ [2]: http://tools.ietf.org/search/rfc4880#section-11.2
+ """
def __init__(self, fingerprint):
self.fingerprint = fingerprint
self.public_packets = None
key_export = _get_stdout(
['gpg', '--export', self.fingerprint])
self.public_packets = list(
- packets_from_bytes(data=key_export))
+ self._packets_from_bytes(data=key_export))
if self.public_packets[0]['type'] != 'public-key packet':
raise ValueError(
'{} does not start with a public-key packet'.format(
key_secret_export = _get_stdout(
['gpg', '--export-secret-keys', self.fingerprint])
self.secret_packets = list(
- packets_from_bytes(data=key_secret_export))
+ self._packets_from_bytes(data=key_secret_export))
+ if self.secret_packets[0]['type'] != 'secret-key packet':
+ raise ValueError(
+ '{} does not start with a secret-key packet'.format(
+ self.fingerprint))
+
+ def _packets_from_bytes(self, data):
+ offset = 0
+ while offset < len(data):
+ packet = PGPPacket(key=self)
+ offset += packet.from_bytes(data=data[offset:])
+ yield packet
def export_to_gpg(self):
raise NotImplemetedError('export to gpg')