Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for issue #290 - allow passing key and cert file to the connect() #311

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions pyeapi/eapilib.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,16 +251,14 @@ def connect(self):
if self._tunnel_host:
self.sock = sock
self._tunnel()
context = ssl.SSLContext()
context.load_cert_chain( certfile=self.cert_file, keyfile=self.key_file )
# If there's no CA File, don't force Server Certificate Check
context.verify_mode = ssl.CERT_NONE
if self.ca_file:
self.sock = ssl.SSLContext.wrap_socket(sock, self.key_file,
self.cert_file,
ca_certs=self.ca_file,
cert_reqs=ssl.CERT_REQUIRED)
else:
self.sock = ssl.SSLContext.wrap_socket(sock, self.key_file,
self.cert_file,
cert_reqs=ssl.CERT_NONE)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations( ca_certs=self.ca_file )
self.sock = context.wrap_socket( sock )


class EapiConnection(object):
Expand Down Expand Up @@ -692,8 +690,9 @@ def disable_certificate_verification(self):


class HttpsEapiCertConnection(EapiConnection):
def __init__(self, host, port=None, path=None, key_file=None,
cert_file=None, ca_file=None, timeout=60, **kwargs):
def __init__(self, host, port=None, path=None, username=None,
password=None, key_file=None, cert_file=None,
ca_file=None, timeout=60, **kwargs):
if key_file is None or cert_file is None:
raise ValueError("For https_cert connections both a key_file and "
"cert_file are required. A ca_file is also "
Expand All @@ -706,7 +705,7 @@ def __init__(self, host, port=None, path=None, key_file=None,
key_file=key_file,
cert_file=cert_file,
ca_file=ca_file, timeout=timeout)

self.authentication(username, password)

class SessionApiConnection(object):
def authentication(self, username, password):
Expand Down
65 changes: 65 additions & 0 deletions test/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import os
import unittest
import tempfile

import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
Expand All @@ -42,6 +43,55 @@
import pyeapi.client
import pyeapi.eapilib

# key and its certificate generated for secure connection test
private_key = """-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAq+jUTkoQ7dVaU9sKRQGKY7mJnMu5r63tHH1qx5oY7AwB9r9u
imhXiRw30mtzHf2KMqn5aO6qzgVSXIJzblOM8Dy4ik/iEmL6rV6Y9bWrBftH5/YK
PjLeLynAMPmhPURT6hWRZ/IQG4tDXG7W+iRanJnjLtF96wAWP+UUfR2v7gLBfyu2
YEZ9oFo1/1dMb0x2I1O6QV91zUguHbLyQmax2IyYus6Faoh4RgPk56XbDnacV5fR
ICZtV0Dh4gVTgIBvjKRPuG72nchcY21apqsscP6uo+QYnTlcNKBCRctyI0xS0rLz
EHHg3rQthklvi3bx4fJYofZp/rGeuSdb4CEGIQIDAQABAoIBACZTczm9E4cioM+/
LsvxqvvOupplZRGAsjM+1taHSXUevDVZunhLCPD9hIh6AiE2jF/9OyikxRnHX/RV
9Qwsvmg08WOMqbc1r/OE+o8VIHrl6cMSPHhfeN+E7F8+2C7Dk/3FLzTAZ8zsQGlU
IMOF5VmyiU6/z9XboBpApU+7laR3RS1AuBzJunEKZkhanfX70J2OxOBiS+hnudcQ
qgBu+EpdpKZjMroKwsCsfh9xMG42qt4HvX5QWvFioyxffFEiyVEtcjk5BvyPkjZz
SvB2DMz/IqcG3za3fuESkaT4zL0ccbYWiThaaoFAi2RJ3iQbD2TPqA7ZdQneoMFZ
+NzgzM0CgYEA2/CuIoaectmNGHGVO+u2dGkcgRLWmil6g95B8wbVH2KGspV6ueDZ
7oMNyKGno4Lof2lETrvudsVMODDk4PDRcOw3vXwkE4C/nkSAnUtn5OmIpB5JStD9
CjRxlOyz+l+ahCzwScwW7sdYgTYxXxgZAPzI9cd8hv5PU4ldwhALNTcCgYEAyBg1
MRNAOdMakgDAHTlIe69UEqr3TjcXehYMP/QnQl3PZAizWlSDr2yYiLrV3kSyw//a
qFLXIypuY6j8RaBcnsmD3gCYC9Jv8hIAvevmM/mPLCRWEIrsGTj6S07r81wE8pvf
yxWmiLDBBWSN1UzRL4kABXCa3QDqWeebBrD0y2cCgYEAjzBrfkjcYXNnW7Ge8ers
128TQqksFCPLAo0xrHIXUJ6JiTyuMNPFrnWeBK/R/y8cBM9YzFWn06VxkOesKxI9
mOIBDBkFN7lLh1Ob1EwicLLl5ctd9hqHkxw/kjBkoC2b4E+NhM4dZAlegojwrbN3
m9/3SaQ9W3m31XAKHWzqjxMCgYEAulWAw1CwEKk8Jxa30P8VNskRO8kmQBohrLl3
ct8E6FK/3OIVU1s8vlIcwcdrfm7vIoLStsleOws6fWhSdOxfFCeIu2ZGMUwon36Q
XkydtW0DHRJBa2pTbzGWNCcspxXcLalmgJKK4OPo/AKl6ip86w1jja1NKd2+XzbF
MTf83qUCgYEAzr12heGk4U4JlLLxeY39SILuTE/kHXDqvFoDFHZ8gLqaCVJOoePE
wGz54FrhF3MtSqXTjjVfR93x5a1MFcpai5x6VweonArz4h8LON0qNG6lyPG4sf23
hnZQW4MuldYOmJ33I7cl6hRjAukZ7PIGiC309ZLcqxVSLnqqfzqd94o=
-----END RSA PRIVATE KEY-----
"""
certificate = """-----BEGIN CERTIFICATE-----
MIICpjCCAY4CCQCFjpRIq17xVTANBgkqhkiG9w0BAQUFADAUMRIwEAYDVQQDDAls
b2NhbGhvc3QwIBcNMjIwNzI3MDg1NDAyWhgPMjI5NjA1MTAwODU0MDJaMBQxEjAQ
BgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
AKvo1E5KEO3VWlPbCkUBimO5iZzLua+t7Rx9aseaGOwMAfa/bopoV4kcN9Jrcx39
ijKp+Wjuqs4FUlyCc25TjPA8uIpP4hJi+q1emPW1qwX7R+f2Cj4y3i8pwDD5oT1E
U+oVkWfyEBuLQ1xu1vokWpyZ4y7RfesAFj/lFH0dr+4CwX8rtmBGfaBaNf9XTG9M
diNTukFfdc1ILh2y8kJmsdiMmLrOhWqIeEYD5Oel2w52nFeX0SAmbVdA4eIFU4CA
b4ykT7hu9p3IXGNtWqarLHD+rqPkGJ05XDSgQkXLciNMUtKy8xBx4N60LYZJb4t2
8eHyWKH2af6xnrknW+AhBiECAwEAATANBgkqhkiG9w0BAQUFAAOCAQEAJ4//TDQD
jOgkc4M2GvcNn+9fQ0NUg11VFV2ppj6dxn8BFiwy0xFjkstEHRt0MQQT6HCpxdg2
3pqyb0kHA++sowFkIgSHfthAnLrffIkCtHkoFC6dXHXw3x1JRBl9aVg+qp4mF8+y
FB17KuVJiXja3EDnvXcV+hLstrcRmCWBvEPSmc4bHg63Il7xcdhS50Qi4dDU9iqF
ze4fnH4tR6kjJnnMcYGPgd3QuhnGkyQrFzBzLSwMRWW/1NvbfmJGTGnEIOXWzz4n
guaCItZ7qaNzcVXtfVItoFRSi3XWnEVRjGQ0kQT0rrDzISrQWt5bALT6AkbR2KMZ
tsQrthec9BAFAg==
-----END CERTIFICATE-----
"""



class TestClient(unittest.TestCase):

Expand Down Expand Up @@ -259,6 +309,21 @@ def tearDown(self):
else:
dut.config("no enable secret")

def test_secure_transport( self ):
# create key and cert temp files
with tempfile.NamedTemporaryFile() as kf, \
tempfile.NamedTemporaryFile() as cf:
kf.write( private_key.encode() )
kf.flush()
cf.write( certificate.encode() )
cf.flush()
for dut in self.duts:
node = pyeapi.client.connect( host=dut.settings['host'],
transport='https_certs', username=dut.settings['username'],
password=dut.settings['password'], key_file=kf.name,
cert_file=cf.name, return_node=True)
res = node.enable( 'show version' )
self.assertIn( 'version', res[0]['result'] )

class TestNode(unittest.TestCase):

Expand Down
Loading