Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for paramiko exec_command #19

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion MockSSH.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,64 @@ def getPty(self, terminal, windowSize, attrs):
return None

def execCommand(self, protocol, cmd):
raise NotImplemented
if cmd:
print 'CMD: %s' % cmd
self.client = TransportWrapper(protocol)

cmd_and_args = cmd.split()
cmd, args = cmd_and_args[0], cmd_and_args[1:]
#func = self.get_exec_func(cmd)
if cmd in self.commands:
if args == self.commands[cmd].required_arguments[1:]:
print 'Command found (exec)', self.commands[cmd].required_arguments
for x in self.commands[cmd].success_callbacks:
x(WriteLn(self.client))
else:
print "Command found but args not found (exec)"
for x in self.commands[cmd].failure_callbacks:
x(WriteLn(self.client))
else:
print "command not found: [%s] (exec)" %cmd

self.client.loseConnection()
protocol.session.conn.transport.expectedLoseConnection = 1

def closed(self):
pass

def eofReceived(self):
pass

class WriteLn(object):
def __init__(self, client):
self.client = client

def writeln(self, data):
self.client.write(data)

class TransportWrapper(object):

def __init__(self, p):
self.protocol = p
p.makeConnection(self)
self.closed = False

def write(self, data):
self.protocol.outReceived(data)
self.protocol.outReceived('\r\n')

# Mimic 'exit' for the shell test
if '\x00' in data:
self.loseConnection()

def loseConnection(self):
if self.closed:
return

self.closed = True
self.protocol.inConnectionLost()
self.protocol.outConnectionLost()
self.protocol.errConnectionLost()

class SSHRealm:
implements(portal.IRealm)
Expand Down
61 changes: 61 additions & 0 deletions tests/test_mock_execCommand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/which python
#

import time
import unittest

import MockSSH
import paramiko


def exec_successful(instance):
instance.writeln("ok")

def exec_failure(instance):
instance.writeln("failure")

def recv_all(channel):
while not channel.recv_ready():
time.sleep(0.1)
stdout = ''
while channel.recv_ready():
stdout += channel.recv(1024)
return stdout

class TestParamikoExecCommand(unittest.TestCase):

def setUp(self):
users = {'admin': 'x'}
command = MockSSH.ArgumentValidatingCommand(
'ls',
[exec_successful],
[exec_failure],
*['123'])
MockSSH.startThreadedServer(
[command],
prompt="hostname>",
interface="localhost",
port=9999,
**users)

def tearDown(self):
MockSSH.stopThreadedServer()

def test_exec_command(self):
"""test paramiko exec_commanbd
"""
ssh = paramiko.Transport(('127.0.0.1', 9999))
ssh.connect(username='admin', password='x')
ch=ssh.open_session()
ch.exec_command('ls')
stdout = recv_all(ch)
self.assertEqual(stdout.strip(), 'failure')
ch=ssh.open_session()
ch.exec_command('ls 123')
stdout = recv_all(ch)
self.assertEqual(stdout.strip(), 'ok')
ch.close()
ssh.close()

if __name__ == "__main__":
unittest.main()