/usr/lib/python2.7/dist-packages/twisted/conch/test/test_default.py is in python-twisted-conch 1:13.2.0-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | # Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.conch.client.default}.
"""
try:
import Crypto.Cipher.DES3
import pyasn1
except ImportError:
skip = "PyCrypto and PyASN1 required for twisted.conch.client.default."
else:
from twisted.conch.client.agent import SSHAgentClient
from twisted.conch.client.default import SSHUserAuthClient
from twisted.conch.client.options import ConchOptions
from twisted.conch.ssh.keys import Key
from twisted.trial.unittest import TestCase
from twisted.python.filepath import FilePath
from twisted.conch.test import keydata
from twisted.test.proto_helpers import StringTransport
class SSHUserAuthClientTest(TestCase):
"""
Tests for L{SSHUserAuthClient}.
@type rsaPublic: L{Key}
@ivar rsaPublic: A public RSA key.
"""
def setUp(self):
self.rsaPublic = Key.fromString(keydata.publicRSA_openssh)
self.tmpdir = FilePath(self.mktemp())
self.tmpdir.makedirs()
self.rsaFile = self.tmpdir.child('id_rsa')
self.rsaFile.setContent(keydata.privateRSA_openssh)
self.tmpdir.child('id_rsa.pub').setContent(keydata.publicRSA_openssh)
def test_signDataWithAgent(self):
"""
When connected to an agent, L{SSHUserAuthClient} can use it to
request signatures of particular data with a particular L{Key}.
"""
client = SSHUserAuthClient("user", ConchOptions(), None)
agent = SSHAgentClient()
transport = StringTransport()
agent.makeConnection(transport)
client.keyAgent = agent
cleartext = "Sign here"
client.signData(self.rsaPublic, cleartext)
self.assertEqual(
transport.value(),
"\x00\x00\x00\x8b\r\x00\x00\x00u" + self.rsaPublic.blob() +
"\x00\x00\x00\t" + cleartext +
"\x00\x00\x00\x00")
def test_agentGetPublicKey(self):
"""
L{SSHUserAuthClient} looks up public keys from the agent using the
L{SSHAgentClient} class. That L{SSHAgentClient.getPublicKey} returns a
L{Key} object with one of the public keys in the agent. If no more
keys are present, it returns C{None}.
"""
agent = SSHAgentClient()
agent.blobs = [self.rsaPublic.blob()]
key = agent.getPublicKey()
self.assertEqual(key.isPublic(), True)
self.assertEqual(key, self.rsaPublic)
self.assertEqual(agent.getPublicKey(), None)
def test_getPublicKeyFromFile(self):
"""
L{SSHUserAuthClient.getPublicKey()} is able to get a public key from
the first file described by its options' C{identitys} list, and return
the corresponding public L{Key} object.
"""
options = ConchOptions()
options.identitys = [self.rsaFile.path]
client = SSHUserAuthClient("user", options, None)
key = client.getPublicKey()
self.assertEqual(key.isPublic(), True)
self.assertEqual(key, self.rsaPublic)
def test_getPublicKeyAgentFallback(self):
"""
If an agent is present, but doesn't return a key,
L{SSHUserAuthClient.getPublicKey} continue with the normal key lookup.
"""
options = ConchOptions()
options.identitys = [self.rsaFile.path]
agent = SSHAgentClient()
client = SSHUserAuthClient("user", options, None)
client.keyAgent = agent
key = client.getPublicKey()
self.assertEqual(key.isPublic(), True)
self.assertEqual(key, self.rsaPublic)
def test_getPublicKeyBadKeyError(self):
"""
If L{keys.Key.fromFile} raises a L{keys.BadKeyError}, the
L{SSHUserAuthClient.getPublicKey} tries again to get a public key by
calling itself recursively.
"""
options = ConchOptions()
self.tmpdir.child('id_dsa.pub').setContent(keydata.publicDSA_openssh)
dsaFile = self.tmpdir.child('id_dsa')
dsaFile.setContent(keydata.privateDSA_openssh)
options.identitys = [self.rsaFile.path, dsaFile.path]
self.tmpdir.child('id_rsa.pub').setContent('not a key!')
client = SSHUserAuthClient("user", options, None)
key = client.getPublicKey()
self.assertEqual(key.isPublic(), True)
self.assertEqual(key, Key.fromString(keydata.publicDSA_openssh))
self.assertEqual(client.usedFiles, [self.rsaFile.path, dsaFile.path])
def test_getPrivateKey(self):
"""
L{SSHUserAuthClient.getPrivateKey} will load a private key from the
last used file populated by L{SSHUserAuthClient.getPublicKey}, and
return a L{Deferred} which fires with the corresponding private L{Key}.
"""
rsaPrivate = Key.fromString(keydata.privateRSA_openssh)
options = ConchOptions()
options.identitys = [self.rsaFile.path]
client = SSHUserAuthClient("user", options, None)
# Populate the list of used files
client.getPublicKey()
def _cbGetPrivateKey(key):
self.assertEqual(key.isPublic(), False)
self.assertEqual(key, rsaPrivate)
return client.getPrivateKey().addCallback(_cbGetPrivateKey)
def test_getPrivateKeyPassphrase(self):
"""
L{SSHUserAuthClient} can get a private key from a file, and return a
Deferred called back with a private L{Key} object, even if the key is
encrypted.
"""
rsaPrivate = Key.fromString(keydata.privateRSA_openssh)
passphrase = 'this is the passphrase'
self.rsaFile.setContent(rsaPrivate.toString('openssh', passphrase))
options = ConchOptions()
options.identitys = [self.rsaFile.path]
client = SSHUserAuthClient("user", options, None)
# Populate the list of used files
client.getPublicKey()
def _getPassword(prompt):
self.assertEqual(prompt,
"Enter passphrase for key '%s': " % (
self.rsaFile.path,))
return passphrase
def _cbGetPrivateKey(key):
self.assertEqual(key.isPublic(), False)
self.assertEqual(key, rsaPrivate)
self.patch(client, '_getPassword', _getPassword)
return client.getPrivateKey().addCallback(_cbGetPrivateKey)
|