+ method_name = '_serialize_{}'.format(self._clean_type())
+ method = getattr(self, method_name, None)
+ if not method:
+ raise NotImplementedError(
+ 'cannot serialize packet type {!r}'.format(self['type']))
+ body = method()
+ self['length'] = len(body)
+ return b''.join([
+ self._serialize_header(),
+ body,
+ ])
+
+ def _serialize_header(self):
+ always_one = 1
+ new_format = 0
+ type_code = self._reverse(self._packet_types, self['type'])
+ packet_tag = (
+ always_one * (1 << 7) |
+ new_format * (1 << 6) |
+ type_code * (1 << 2) |
+ self['length-type']
+ )
+ length_bytes, length_type = self._old_format_packet_length_type[
+ self['length-type']]
+ length_format = '>{}'.format(length_type)
+ length_data = _struct.pack(length_format, self['length'])
+ return b''.join([
+ bytes([packet_tag]),
+ length_data,
+ ])
+
+ @staticmethod
+ def _serialize_multiprecision_integer(integer):
+ r"""Serialize RFC 4880's multipricision integers
+
+ >>> PGPPacket._serialize_multiprecision_integer(1)
+ b'\x00\x01\x01'
+ >>> PGPPacket._serialize_multiprecision_integer(511)
+ b'\x00\t\x01\xff'
+ """
+ bit_length = int(_math.log(integer, 2)) + 1
+ chunks = [
+ _struct.pack('>H', bit_length),
+ ]
+ while integer > 0:
+ chunks.insert(1, bytes([integer & 0xff]))
+ integer = integer >> 8
+ return b''.join(chunks)
+
+ @classmethod
+ def _encode_string_to_key_count(cls, count):
+ r"""Encode RFC 4880's string-to-key count
+
+ >>> PGPPacket._encode_string_to_key_count(753664)
+ b'\x97'
+ """
+ coded_count = 0
+ count = count >> cls._string_to_key_expbias
+ while not count & 1:
+ count = count >> 1
+ coded_count += 1 << 4
+ coded_count += count & 15
+ return bytes([coded_count])
+
+ def _serialize_string_to_key_specifier(self):
+ string_to_key_type = bytes([
+ self._reverse(
+ self._string_to_key_types, self['string-to-key-type']),
+ ])
+ chunks = [string_to_key_type]
+ if self['string-to-key-type'] == 'simple':
+ chunks.append(bytes([self._reverse(
+ self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
+ elif self['string-to-key-type'] == 'salted':
+ chunks.append(bytes([self._reverse(
+ self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
+ chunks.append(self['string-to-key-salt'])
+ elif self['string-to-key-type'] == 'iterated and salted':
+ chunks.append(bytes([self._reverse(
+ self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
+ chunks.append(self['string-to-key-salt'])
+ chunks.append(self._encode_string_to_key_count(
+ count=self['string-to-key-count']))
+ else:
+ raise NotImplementedError(
+ 'string-to-key type {}'.format(self['string-to-key-type']))
+ return offset
+ return b''.join(chunks)
+
+ def _serialize_public_key_packet(self):
+ return self._serialize_generic_public_key_packet()
+
+ def _serialize_public_subkey_packet(self):
+ return self._serialize_generic_public_key_packet()
+
+ def _serialize_generic_public_key_packet(self):
+ key_version = bytes([self['key-version']])
+ chunks = [key_version]
+ if self['key-version'] != 4:
+ raise NotImplementedError(
+ 'public (sub)key packet version {}'.format(
+ self['key-version']))
+ chunks.append(_struct.pack('>I', self['creation-time']))
+ chunks.append(bytes([self._reverse(
+ self._public_key_algorithms, self['public-key-algorithm'])]))
+ if self['public-key-algorithm'].startswith('rsa '):
+ chunks.append(self._serialize_multiprecision_integer(
+ self['public-modulus']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['public-exponent']))
+ elif self['public-key-algorithm'].startswith('dsa '):
+ chunks.append(self._serialize_multiprecision_integer(
+ self['prime']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['group-order']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['group-generator']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['public-key']))
+ elif self['public-key-algorithm'].startswith('elgamal '):
+ chunks.append(self._serialize_multiprecision_integer(
+ self['prime']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['group-generator']))
+ chunks.append(self._serialize_multiprecision_integer(
+ self['public-key']))
+ else:
+ raise NotImplementedError(
+ 'algorithm-specific key fields for {}'.format(
+ self['public-key-algorithm']))
+ return b''.join(chunks)
+
+ def _string_to_key(self, string, key_size):
+ if key_size % 8:
+ raise ValueError(
+ '{}-bit key is not an integer number of bytes'.format(
+ key_size))
+ key_size_bytes = key_size // 8
+ hash_name = self._hashlib_name[
+ self['string-to-key-hash-algorithm']]
+ string_hash = _hashlib.new(hash_name)
+ hashes = _math.ceil(key_size_bytes / string_hash.digest_size)
+ key = b''
+ if self['string-to-key-type'] == 'simple':
+ update_bytes = string
+ elif self['string-to-key-type'] == 'salted':
+ update_bytes = self['string-to-key-salt'] + string
+ else:
+ raise NotImplementedError(
+ 'key calculation for string-to-key type {}'.format(
+ self['string-to-key-type']))
+ for padding in range(hashes):
+ string_hash = _hashlib.new(hash_name)
+ string_hash.update(padding * b'\x00')
+ if self['string-to-key-type'] in [
+ 'simple',
+ 'salted',
+ ]:
+ string_hash.update(update_bytes)
+ key += string_hash.digest()
+ key = key[:key_size_bytes]
+ return key
+
+ def decrypt_symmetric_encryption(self, data):
+ """Decrypt OpenPGP's Cipher Feedback mode"""
+ algorithm = self['symmetric-encryption-algorithm']
+ module = self._crypto_module[algorithm]
+ key_size = self._key_size[algorithm]
+ segment_size_bits = self._cipher_block_size[algorithm]
+ if segment_size_bits % 8:
+ raise NotImplementedError(
+ ('{}-bit segment size for {} is not an integer number of bytes'
+ ).format(segment_size_bits, algorithm))
+ segment_size_bytes = segment_size_bits // 8
+ padding = segment_size_bytes - len(data) % segment_size_bytes
+ if padding:
+ data += b'\x00' * padding
+ passphrase = _getpass.getpass(
+ 'passphrase for {}: '.format(self['fingerprint'][-8:]))
+ passphrase = passphrase.encode('ascii')
+ key = self._string_to_key(string=passphrase, key_size=key_size)
+ cipher = module.new(
+ key=key,
+ mode=module.MODE_CFB,
+ IV=self['initial-vector'],
+ segment_size=segment_size_bits)
+ plaintext = cipher.decrypt(data)
+ if padding:
+ plaintext = plaintext[:-padding]
+ return plaintext