/breezy/trunk

To get this branch, use:
bzr branch https://code.breezy-vcs.org/breezy/trunk

« back to all changes in this revision

Viewing changes to breezy/gpg.py

  • Committer: Jelmer Vernooij
  • Date: 2017-07-23 22:06:41 UTC
  • mfrom: (6738 trunk)
  • mto: This revision was merged to the branch mainline in revision 6739.
  • Revision ID: jelmer@jelmer.uk-20170723220641-69eczax9bmv8d6kk
Merge trunk, address review comments.

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
 
30
30
from breezy import (
31
31
    config,
32
 
    errors,
33
32
    trace,
34
33
    ui,
35
34
    )
39
38
    )
40
39
""")
41
40
 
 
41
from . import (
 
42
    errors,
 
43
    )
42
44
from .sixish import (
43
45
    BytesIO,
44
46
    )
51
53
SIGNATURE_EXPIRED = 4
52
54
 
53
55
 
 
56
class GpgNotInstalled(errors.DependencyNotPresent):
 
57
 
 
58
    _fmt = 'python-gpg is not installed, it is needed to verify signatures'
 
59
 
 
60
    def __init__(self, error):
 
61
        errors.DependencyNotPresent.__init__(self, 'gpg', error)
 
62
 
 
63
 
 
64
class SigningFailed(errors.BzrError):
 
65
 
 
66
    _fmt = 'Failed to GPG sign data: "%(error)s"'
 
67
 
 
68
    def __init__(self, error):
 
69
        errors.BzrError.__init__(self, error=error)
 
70
 
 
71
 
 
72
class SignatureVerificationFailed(errors.BzrError):
 
73
 
 
74
    _fmt = 'Failed to verify GPG signature data with error "%(error)s"'
 
75
 
 
76
    def __init__(self, error):
 
77
        errors.BzrError.__init__(self, error=error)
 
78
 
 
79
 
54
80
def bulk_verify_signatures(repository, revids, strategy,
55
81
        process_events_callback=None):
56
82
    """Do verifications on a set of revisions
101
127
        """Real strategies take a configuration."""
102
128
 
103
129
    def sign(self, content):
104
 
        raise errors.SigningFailed('Signing is disabled.')
 
130
        raise SigningFailed('Signing is disabled.')
105
131
 
106
132
    def verify(self, content, testament):
107
 
        raise errors.SignatureVerificationFailed('Signature verification is \
 
133
        raise SignatureVerificationFailed('Signature verification is \
108
134
disabled.')
109
135
 
110
136
    def set_acceptable_keys(self, command_line_input):
162
188
    def __init__(self, config_stack):
163
189
        self._config_stack = config_stack
164
190
        try:
165
 
            import gpgme
166
 
            self.context = gpgme.Context()
 
191
            import gpg
 
192
            self.context = gpg.Context()
167
193
        except ImportError as error:
168
194
            pass # can't use verify()
169
195
 
 
196
        self.context.signers = self._get_signing_keys()
 
197
 
 
198
    def _get_signing_keys(self):
 
199
        import gpg
 
200
        keyname = self._config_stack.get('gpg_signing_key')
 
201
        if keyname:
 
202
            try:
 
203
                return [self.context.get_key(keyname)]
 
204
            except gpg.errors.KeyNotFound:
 
205
                pass
 
206
 
 
207
        if keyname is None or keyname == 'default':
 
208
            # 'default' or not setting gpg_signing_key at all means we should
 
209
            # use the user email address
 
210
            keyname = config.extract_email_address(self._config_stack.get('email'))
 
211
        possible_keys = self.context.keylist(keyname, secret=True)
 
212
        try:
 
213
            return [possible_keys.next()]
 
214
        except StopIteration:
 
215
            return []
 
216
 
170
217
    @staticmethod
171
218
    def verify_signatures_available():
172
219
        """
175
222
        :return: boolean if this strategy can verify signatures
176
223
        """
177
224
        try:
178
 
            import gpgme
 
225
            import gpg
179
226
            return True
180
227
        except ImportError as error:
181
228
            return False
182
229
 
183
 
    def _command_line(self):
184
 
        key = self._config_stack.get('gpg_signing_key')
185
 
        if key is None or key == 'default':
186
 
            # 'default' or not setting gpg_signing_key at all means we should
187
 
            # use the user email address
188
 
            key = config.extract_email_address(self._config_stack.get('email'))
189
 
        return [self._config_stack.get('gpg_signing_command'), '--clearsign',
190
 
                '-u', key]
191
 
 
192
230
    def sign(self, content):
 
231
        import gpg
193
232
        if isinstance(content, unicode):
194
233
            raise errors.BzrBadParameterUnicode('content')
195
 
        ui.ui_factory.clear_term()
196
234
 
197
 
        preexec_fn = _set_gpg_tty
198
 
        if sys.platform == 'win32':
199
 
            # Win32 doesn't support preexec_fn, but wouldn't support TTY anyway.
200
 
            preexec_fn = None
 
235
        plain_text = gpg.Data(content)
201
236
        try:
202
 
            process = subprocess.Popen(self._command_line(),
203
 
                                       stdin=subprocess.PIPE,
204
 
                                       stdout=subprocess.PIPE,
205
 
                                       preexec_fn=preexec_fn)
206
 
            try:
207
 
                result = process.communicate(content)[0]
208
 
                if process.returncode is None:
209
 
                    process.wait()
210
 
                if process.returncode != 0:
211
 
                    raise errors.SigningFailed(self._command_line())
212
 
                return result
213
 
            except OSError as e:
214
 
                if e.errno == errno.EPIPE:
215
 
                    raise errors.SigningFailed(self._command_line())
216
 
                else:
217
 
                    raise
218
 
        except ValueError:
219
 
            # bad subprocess parameters, should never happen.
220
 
            raise
221
 
        except OSError as e:
222
 
            if e.errno == errno.ENOENT:
223
 
                # gpg is not installed
224
 
                raise errors.SigningFailed(self._command_line())
225
 
            else:
226
 
                raise
 
237
            output, result = self.context.sign(
 
238
                plain_text, mode=gpg.constants.sig.mode.CLEAR)
 
239
        except gpg.errors.GPGMEError as error:
 
240
            raise SigningFailed(str(error))
 
241
 
 
242
        return output
227
243
 
228
244
    def verify(self, content, testament):
229
245
        """Check content has a valid signature.
234
250
        :return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
235
251
        """
236
252
        try:
237
 
            import gpgme
 
253
            import gpg
238
254
        except ImportError as error:
239
 
            raise errors.GpgmeNotInstalled(error)
 
255
            raise errors.GpgNotInstalled(error)
240
256
 
241
 
        signature = BytesIO(content)
242
 
        plain_output = BytesIO()
 
257
        signature = gpg.Data(content)
 
258
        sink = gpg.Data()
243
259
        try:
244
 
            result = self.context.verify(signature, None, plain_output)
245
 
        except gpgme.GpgmeError as error:
246
 
            raise errors.SignatureVerificationFailed(error[2])
 
260
            plain_output, result = self.context.verify(signature)
 
261
        except gpg.errors.BadSignatures as error:
 
262
            fingerprint = error.result.signatures[0].fpr
 
263
            if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
 
264
                expires = self.context.get_key(error.result.signatures[0].fpr).subkeys[0].expires
 
265
                if expires > error.result.signatures[0].timestamp:
 
266
                    # The expired key was not expired at time of signing.
 
267
                    # test_verify_expired_but_valid()
 
268
                    return SIGNATURE_EXPIRED, fingerprint[-8:]
 
269
                else:
 
270
                    # I can't work out how to create a test where the signature
 
271
                    # was expired at the time of signing.
 
272
                    return SIGNATURE_NOT_VALID, None
 
273
 
 
274
            # GPG does not know this key.
 
275
            # test_verify_unknown_key()
 
276
            if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_MISSING:
 
277
                return SIGNATURE_KEY_MISSING, fingerprint[-8:]
 
278
 
 
279
            return SIGNATURE_NOT_VALID, None
 
280
        except gpg.errors.GPGMEError as error:
 
281
            raise SignatureVerificationFailed(error[2])
247
282
 
248
283
        # No result if input is invalid.
249
284
        # test_verify_invalid()
250
 
        if len(result) == 0:
 
285
        if len(result.signatures) == 0:
251
286
            return SIGNATURE_NOT_VALID, None
252
287
        # User has specified a list of acceptable keys, check our result is in
253
288
        # it.  test_verify_unacceptable_key()
254
 
        fingerprint = result[0].fpr
 
289
        fingerprint = result.signatures[0].fpr
255
290
        if self.acceptable_keys is not None:
256
291
            if not fingerprint in self.acceptable_keys:
257
292
                return SIGNATURE_KEY_MISSING, fingerprint[-8:]
258
293
        # Check the signature actually matches the testament.
259
294
        # test_verify_bad_testament()
260
 
        if testament != plain_output.getvalue():
 
295
        if testament != plain_output:
261
296
            return SIGNATURE_NOT_VALID, None
262
 
        # Yay gpgme set the valid bit.
 
297
        # Yay gpg set the valid bit.
263
298
        # Can't write a test for this one as you can't set a key to be
264
 
        # trusted using gpgme.
265
 
        if result[0].summary & gpgme.SIGSUM_VALID:
 
299
        # trusted using gpg.
 
300
        if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
266
301
            key = self.context.get_key(fingerprint)
267
302
            name = key.uids[0].name
268
303
            email = key.uids[0].email
269
304
            return SIGNATURE_VALID, name + " <" + email + ">"
270
305
        # Sigsum_red indicates a problem, unfortunatly I have not been able
271
306
        # to write any tests which actually set this.
272
 
        if result[0].summary & gpgme.SIGSUM_RED:
 
307
        if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
273
308
            return SIGNATURE_NOT_VALID, None
274
 
        # GPG does not know this key.
275
 
        # test_verify_unknown_key()
276
 
        if result[0].summary & gpgme.SIGSUM_KEY_MISSING:
277
 
            return SIGNATURE_KEY_MISSING, fingerprint[-8:]
278
309
        # Summary isn't set if sig is valid but key is untrusted but if user
279
310
        # has explicity set the key as acceptable we can validate it.
280
 
        if result[0].summary == 0 and self.acceptable_keys is not None:
 
311
        if result.signatures[0].summary == 0 and self.acceptable_keys is not None:
281
312
            if fingerprint in self.acceptable_keys:
282
313
                # test_verify_untrusted_but_accepted()
283
314
                return SIGNATURE_VALID, None
284
315
        # test_verify_valid_but_untrusted()
285
 
        if result[0].summary == 0 and self.acceptable_keys is None:
286
 
            return SIGNATURE_NOT_VALID, None
287
 
        if result[0].summary & gpgme.SIGSUM_KEY_EXPIRED:
288
 
            expires = self.context.get_key(result[0].fpr).subkeys[0].expires
289
 
            if expires > result[0].timestamp:
290
 
                # The expired key was not expired at time of signing.
291
 
                # test_verify_expired_but_valid()
292
 
                return SIGNATURE_EXPIRED, fingerprint[-8:]
293
 
            else:
294
 
                # I can't work out how to create a test where the signature
295
 
                # was expired at the time of signing.
296
 
                return SIGNATURE_NOT_VALID, None
297
 
        # A signature from a revoked key gets this.
298
 
        # test_verify_revoked_signature()
299
 
        if ((result[0].summary & gpgme.SIGSUM_SYS_ERROR
300
 
             or result[0].status.strerror == 'Certificate revoked')):
 
316
        if result.signatures[0].summary == 0 and self.acceptable_keys is None:
301
317
            return SIGNATURE_NOT_VALID, None
302
318
        # Other error types such as revoked keys should (I think) be caught by
303
319
        # SIGSUM_RED so anything else means something is buggy.
304
 
        raise errors.SignatureVerificationFailed(
 
320
        raise SignatureVerificationFailed(
305
321
            "Unknown GnuPG key verification result")
306
322
 
307
323
    def set_acceptable_keys(self, command_line_input):