+ @staticmethod
+ def _parse_multiprecision_integer(data):
+ r"""Parse RFC 4880's multiprecision integers
+
+ >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
+ (3, 1)
+ >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
+ (4, 511)
+ """
+ bits = _struct.unpack('>H', data[:2])[0]
+ offset = 2
+ length = (bits + 7) // 8
+ value = 0
+ for i in range(length):
+ value += data[offset + i] * 1 << (8 * (length - i - 1))
+ offset += length
+ return (offset, value)
+
+ def _parse_string_to_key_specifier(self, data):
+ self['string-to-key-type'] = self._string_to_key_types[data[0]]
+ offset = 1
+ if self['string-to-key-type'] == 'simple':
+ self['string-to-key-hash-algorithm'] = self._hash_algorithms[
+ data[offset]]
+ offset += 1
+ elif self['string-to-key-type'] == 'salted':
+ self['string-to-key-hash-algorithm'] = self._hash_algorithms[
+ data[offset]]
+ offset += 1
+ self['string-to-key-salt'] = data[offset: offset + 8]
+ offset += 8
+ elif self['string-to-key-type'] == 'iterated and salted':
+ self['string-to-key-hash-algorithm'] = self._hash_algorithms[
+ data[offset]]
+ offset += 1
+ self['string-to-key-salt'] = data[offset: offset + 8]
+ offset += 8
+ self['string-to-key-coded-count'] = data[offset]
+ offset += 1
+ else:
+ raise NotImplementedError(
+ 'string-to-key type {}'.format(self['string-to-key-type']))
+ return offset
+
+ def _parse_public_key_packet(self, data):
+ self._parse_generic_public_key_packet(data=data)
+
+ def _parse_public_subkey_packet(self, data):
+ self._parse_generic_public_key_packet(data=data)
+
+ def _parse_generic_public_key_packet(self, data):
+ self['key-version'] = data[0]
+ offset = 1
+ if self['key-version'] != 4:
+ raise NotImplementedError(
+ 'public (sub)key packet version {}'.format(
+ self['key-version']))
+ length = 5
+ self['creation-time'], algorithm = _struct.unpack(
+ '>IB', data[offset: offset + length])
+ offset += length
+ self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
+ if self['public-key-algorithm'].startswith('rsa '):
+ o, self['public-modulus'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ o, self['public-exponent'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ elif self['public-key-algorithm'].startswith('dsa '):
+ o, self['prime'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ o, self['group-order'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ o, self['group-generator'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ o, self['public-key'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ elif self['public-key-algorithm'].startswith('elgamal '):
+ o, self['prime'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ o, self['group-generator'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ o, self['public-key'] = self._parse_multiprecision_integer(
+ data[offset:])
+ offset += o
+ else:
+ raise NotImplementedError(
+ 'algorithm-specific key fields for {}'.format(
+ self['public-key-algorithm']))
+ fingerprint = _hashlib.sha1()
+ fingerprint.update(b'\x99')
+ fingerprint.update(_struct.pack('>H', len(data)))
+ fingerprint.update(data)
+ self['fingerprint'] = fingerprint.hexdigest()
+ return offset
+
+ def _parse_secret_key_packet(self, data):
+ self._parse_generic_secret_key_packet(data=data)
+
+ def _parse_secret_subkey_packet(self, data):
+ self._parse_generic_secret_key_packet(data=data)
+
+ def _parse_generic_secret_key_packet(self, data):
+ offset = self._parse_generic_public_key_packet(data=data)
+ string_to_key_usage = data[offset]
+ offset += 1
+ if string_to_key_usage in [255, 254]:
+ self['symmetric-encryption-algorithm'] = (
+ self._symmetric_key_algorithms[data[offset]])
+ offset += 1
+ offset += self._parse_string_to_key_specifier(data=data[offset:])
+ else:
+ self['symmetric-encryption-algorithm'] = (
+ self._symmetric_key_algorithms[string_to_key_usage])
+ if string_to_key_usage:
+ block_size_bits = self._cipher_block_size.get(
+ self['symmetric-encryption-algorithm'], None)
+ if block_size_bits % 8:
+ raise NotImplementedError(
+ ('{}-bit block size for {} is not an integer number of bytes'
+ ).format(
+ block_size_bits, self['symmetric-encryption-algorithm']))
+ block_size = block_size_bits // 8
+ if not block_size:
+ raise NotImplementedError(
+ 'unknown block size for {}'.format(
+ self['symmetric-encryption-algorithm']))
+ self['initial-vector'] = data[offset: offset + block_size]
+ offset += block_size
+ if string_to_key_usage in [0, 255]:
+ key_end = -2
+ else:
+ key_end = 0
+ self['secret-key'] = data[offset:key_end]
+ if key_end:
+ self['secret-key-checksum'] = data[key_end:]
+
+ def _parse_signature_subpackets(self, data):
+ offset = 0
+ while offset < len(data):
+ o, subpacket = self._parse_signature_subpacket(data=data[offset:])
+ offset += o
+ yield subpacket
+
+ def _parse_signature_subpacket(self, data):
+ subpacket = {}
+ first = data[0]
+ offset = 1
+ if first < 192:
+ length = first
+ elif first >= 192 and first < 255:
+ second = data[offset]
+ offset += 1
+ length = ((first - 192) << 8) + second + 192
+ else:
+ length = _struct.unpack(
+ '>I', data[offset: offset + 4])[0]
+ offset += 4
+ subpacket['type'] = self._signature_subpacket_types[data[offset]]
+ offset += 1
+ subpacket_data = data[offset: offset + length - 1]
+ offset += len(subpacket_data)
+ method_name = '_parse_{}_signature_subpacket'.format(
+ self._clean_type(type=subpacket['type']))
+ method = getattr(self, method_name, None)
+ if not method:
+ raise NotImplementedError(
+ 'cannot parse signature subpacket type {!r}'.format(
+ subpacket['type']))
+ method(data=subpacket_data, subpacket=subpacket)
+ return (offset, subpacket)
+
+ def _parse_signature_packet(self, data):
+ self['signature-version'] = data[0]
+ offset = 1
+ if self['signature-version'] != 4:
+ raise NotImplementedError(
+ 'signature packet version {}'.format(
+ self['signature-version']))
+ self['signature-type'] = self._signature_types[data[offset]]
+ offset += 1
+ self['public-key-algorithm'] = self._public_key_algorithms[
+ data[offset]]
+ offset += 1
+ self['hash-algorithm'] = self._hash_algorithms[data[offset]]
+ offset += 1
+ hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
+ offset += 2
+ self['hashed-subpackets'] = list(self._parse_signature_subpackets(
+ data[offset: offset + hashed_count]))
+ offset += hashed_count
+ unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
+ offset += 2
+ self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
+ data=data[offset: offset + unhashed_count]))
+ offset += unhashed_count
+ self['signed-hash-word'] = data[offset: offset + 2]
+ offset += 2
+ self['signature'] = data[offset:]
+
+ def _parse_signature_creation_time_signature_subpacket(
+ self, data, subpacket):
+ subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
+
+ def _parse_issuer_signature_subpacket(self, data, subpacket):
+ subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
+
+ def _parse_key_expiration_time_signature_subpacket(
+ self, data, subpacket):
+ subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
+
+ def _parse_preferred_symmetric_algorithms_signature_subpacket(
+ self, data, subpacket):
+ subpacket['preferred-symmetric-algorithms'] = [
+ self._symmetric_key_algorithms[d] for d in data]
+
+ def _parse_preferred_hash_algorithms_signature_subpacket(
+ self, data, subpacket):
+ subpacket['preferred-hash-algorithms'] = [
+ self._hash_algorithms[d] for d in data]
+
+ def _parse_preferred_compression_algorithms_signature_subpacket(
+ self, data, subpacket):
+ subpacket['preferred-compression-algorithms'] = [
+ self._compression_algorithms[d] for d in data]
+
+ def _parse_key_server_preferences_signature_subpacket(
+ self, data, subpacket):
+ subpacket['key-server-preferences'] = set()
+ if data[0] & 0x80:
+ subpacket['key-server-preferences'].add('no-modify')
+
+ def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
+ subpacket['primary-user-id'] = bool(data[0])
+
+ def _parse_key_flags_signature_subpacket(self, data, subpacket):
+ subpacket['key-flags'] = set()
+ if data[0] & 0x1:
+ subpacket['key-flags'].add('can certify')
+ if data[0] & 0x2:
+ subpacket['key-flags'].add('can sign')
+ if data[0] & 0x4:
+ subpacket['key-flags'].add('can encrypt communications')
+ if data[0] & 0x8:
+ subpacket['key-flags'].add('can encrypt storage')
+ if data[0] & 0x10:
+ subpacket['key-flags'].add('private split')
+ if data[0] & 0x20:
+ subpacket['key-flags'].add('can authenticate')
+ if data[0] & 0x80:
+ subpacket['key-flags'].add('private shared')
+
+ def _parse_features_signature_subpacket(self, data, subpacket):
+ subpacket['features'] = set()
+ if data[0] & 0x1:
+ subpacket['features'].add('modification detection')
+
+ def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
+ subpacket['embedded'] = PGPPacket()
+ subpacket['embedded']._parse_signature_packet(data=data)
+
+ def _parse_user_id_packet(self, data):
+ self['user'] = str(data, 'utf-8')
+