Skip to content

Commit

Permalink
Merge pull request #1604 from vogel/peer-address
Browse files Browse the repository at this point in the history
add a PeerBareAddress class as an abstraction over peer address

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita authored Mar 16, 2018
2 parents a8b6a9d + 7cc8683 commit a340312
Show file tree
Hide file tree
Showing 18 changed files with 527 additions and 385 deletions.
9 changes: 3 additions & 6 deletions src/main/CommandHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,7 @@ CommandHandler::peers(std::string const&, std::string& retStr)
int counter = 0;
for (auto peer : mApp.getOverlayManager().getPendingPeers())
{
root["pending_peers"][counter]["ip"] = peer->getIP();
root["pending_peers"][counter]["port"] =
(int)peer->getRemoteListeningPort();
root["pending_peers"][counter]["address"] = peer->toString();

counter++;
}
Expand All @@ -460,9 +458,8 @@ CommandHandler::peers(std::string const&, std::string& retStr)
counter = 0;
for (auto peer : mApp.getOverlayManager().getAuthenticatedPeers())
{
root["authenticated_peers"][counter]["ip"] = peer.second->getIP();
root["authenticated_peers"][counter]["port"] =
(int)peer.second->getRemoteListeningPort();
root["authenticated_peers"][counter]["address"] =
peer.second->toString();
root["authenticated_peers"][counter]["ver"] =
peer.second->getRemoteVersion();
root["authenticated_peers"][counter]["olver"] =
Expand Down
19 changes: 13 additions & 6 deletions src/overlay/LoopbackPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ LoopbackPeer::LoopbackPeer(Application& app, PeerRole role) : Peer(app, role)
{
}

PeerBareAddress
LoopbackPeer::makeAddress(unsigned short remoteListeningPort) const
{
if (remoteListeningPort <= 0 || remoteListeningPort > UINT16_MAX)
{
return PeerBareAddress{};
}
else
{
return PeerBareAddress{"127.0.0.1", remoteListeningPort};
}
}

AuthCert
LoopbackPeer::getAuthCert()
{
Expand Down Expand Up @@ -63,12 +76,6 @@ LoopbackPeer::sendMessage(xdr::msg_ptr&& msg)
}
}

std::string
LoopbackPeer::getIP()
{
return "127.0.0.1";
}

void
LoopbackPeer::drop(bool)
{
Expand Down
3 changes: 2 additions & 1 deletion src/overlay/LoopbackPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class LoopbackPeer : public Peer
Stats mStats;

void sendMessage(xdr::msg_ptr&& xdrBytes) override;
PeerBareAddress
makeAddress(unsigned short remoteListeningPort) const override;
AuthCert getAuthCert() override;

void processInQueue();
Expand All @@ -63,7 +65,6 @@ class LoopbackPeer : public Peer
}
LoopbackPeer(Application& app, PeerRole role);
void drop(bool force = true) override;
std::string getIP() override;

void deliverOne();
void deliverAll();
Expand Down
16 changes: 10 additions & 6 deletions src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@
namespace stellar
{

class PeerRecord;
class PeerAuth;
class PeerBareAddress;
class PeerRecord;
class LoadManager;

class OverlayManager
Expand Down Expand Up @@ -78,10 +79,9 @@ class OverlayManager
// Return a list of random peers from the set of authenticated peers.
virtual std::vector<Peer::pointer> getRandomAuthenticatedPeers() = 0;

// Return an already-connected peer at the given ip address and port;
// returns a `nullptr`-valued pointer if no such connected peer exists.
virtual Peer::pointer getConnectedPeer(std::string const& ip,
unsigned short port) = 0;
// Return an already-connected peer at the given address; returns a
// `nullptr`-valued pointer if no such connected peer exists.
virtual Peer::pointer getConnectedPeer(PeerBareAddress const& address) = 0;

// Add a peer to the in-memory set of pending peers.
virtual void addPendingPeer(Peer::pointer peer) = 0;
Expand Down Expand Up @@ -118,7 +118,11 @@ class OverlayManager
// a TCP port number.
virtual void connectTo(std::string const& addr) = 0;

// Attempt to connect to a peer identified by peer record.
// Attempt to connect to a peer identified by peer address.
virtual void connectTo(PeerBareAddress const& address) = 0;

// Attempt to connect to a peer identified by peer record. Can modify back
// off value of pr and save it do database.
virtual void connectTo(PeerRecord& pr) = 0;

// returns the list of peers that sent us the item with hash `h`
Expand Down
42 changes: 24 additions & 18 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "database/Database.h"
#include "main/Application.h"
#include "main/Config.h"
#include "overlay/PeerBareAddress.h"
#include "overlay/PeerRecord.h"
#include "overlay/TCPPeer.h"
#include "util/Logging.h"
Expand Down Expand Up @@ -109,25 +110,32 @@ OverlayManagerImpl::connectTo(std::string const& peerStr)
{
try
{
auto pr = PeerRecord::parseIPPort(peerStr, mApp);
connectTo(pr);
auto address = PeerBareAddress::resolve(peerStr, mApp);
connectTo(address);
}
catch (const std::runtime_error&)
{
CLOG(ERROR, "Overlay") << "Unable to add peer '" << peerStr << "'";
}
}

void
OverlayManagerImpl::connectTo(PeerBareAddress const& address)
{
auto pr = PeerRecord{address, mApp.getClock().now(), 0};
connectTo(pr);
}

void
OverlayManagerImpl::connectTo(PeerRecord& pr)
{
mConnectionsAttempted.Mark();
if (!getConnectedPeer(pr.ip(), pr.port()))
if (!getConnectedPeer(pr.getAddress()))
{
pr.backOff(mApp.getClock());
pr.storePeerRecord(mApp.getDatabase());

addPendingPeer(TCPPeer::initiate(mApp, pr.ip(), pr.port()));
addPendingPeer(TCPPeer::initiate(mApp, pr.getAddress()));
}
else
{
Expand All @@ -145,7 +153,8 @@ OverlayManagerImpl::storePeerList(std::vector<std::string> const& list,
{
try
{
auto pr = PeerRecord::parseIPPort(peerStr, mApp);
auto address = PeerBareAddress::resolve(peerStr, mApp);
auto pr = PeerRecord{address, mApp.getClock().now(), 0};
pr.setPreferred(preferred);
if (resetBackOff)
{
Expand Down Expand Up @@ -173,7 +182,7 @@ OverlayManagerImpl::storeConfigPeers()
{
try
{
auto pr = PeerRecord::parseIPPort(s, mApp);
auto pr = PeerBareAddress::resolve(s, mApp);
auto r = mPreferredPeers.insert(pr.toString());
if (r.second)
{
Expand All @@ -197,11 +206,10 @@ OverlayManagerImpl::getPreferredPeersFromConfig()
std::vector<PeerRecord> peers;
for (auto& pp : mPreferredPeers)
{
auto prParsed = PeerRecord::parseIPPort(pp, mApp);
if (!getConnectedPeer(prParsed.ip(), prParsed.port()))
auto address = PeerBareAddress::resolve(pp, mApp);
if (!getConnectedPeer(address))
{
auto pr = PeerRecord::loadPeerRecord(
mApp.getDatabase(), prParsed.ip(), prParsed.port());
auto pr = PeerRecord::loadPeerRecord(mApp.getDatabase(), address);
if (pr && pr->mNextAttempt <= mApp.getClock().now())
{
peers.emplace_back(*pr);
Expand All @@ -223,7 +231,7 @@ OverlayManagerImpl::getPeersToConnectTo(int maxNum)
[&](PeerRecord const& pr) {
// skip peers that we're already
// connected/connecting to
if (!getConnectedPeer(pr.ip(), pr.port()))
if (!getConnectedPeer(pr.getAddress()))
{
peers.emplace_back(pr);
}
Expand Down Expand Up @@ -293,13 +301,12 @@ OverlayManagerImpl::tick()
}

Peer::pointer
OverlayManagerImpl::getConnectedPeer(std::string const& ip, unsigned short port)
OverlayManagerImpl::getConnectedPeer(PeerBareAddress const& address)
{
auto pendingPeerIt =
std::find_if(std::begin(mPendingPeers), std::end(mPendingPeers),
[ip, port](Peer::pointer const& peer) {
return peer->getIP() == ip &&
peer->getRemoteListeningPort() == port;
[address](Peer::pointer const& peer) {
return peer->getAddress() == address;
});
if (pendingPeerIt != std::end(mPendingPeers))
{
Expand All @@ -308,9 +315,8 @@ OverlayManagerImpl::getConnectedPeer(std::string const& ip, unsigned short port)

auto authenticatedPeerIt = std::find_if(
std::begin(mAuthenticatedPeers), std::end(mAuthenticatedPeers),
[ip, port](std::pair<NodeID, Peer::pointer> const& peer) {
return peer.second->getIP() == ip &&
peer.second->getRemoteListeningPort() == port;
[address](std::pair<NodeID, Peer::pointer> const& peer) {
return peer.second->getAddress() == address;
});
if (authenticatedPeerIt != std::end(mAuthenticatedPeers))
{
Expand Down
8 changes: 4 additions & 4 deletions src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class OverlayManagerImpl : public OverlayManager
void broadcastMessage(StellarMessage const& msg,
bool force = false) override;
void connectTo(std::string const& addr) override;
virtual void connectTo(PeerRecord& pr) override;
void connectTo(PeerRecord& pr) override;
void connectTo(PeerBareAddress const& address) override;

void addPendingPeer(Peer::pointer peer) override;
void dropPeer(Peer* peer) override;
Expand All @@ -86,9 +87,8 @@ class OverlayManagerImpl : public OverlayManager
getAuthenticatedPeers() const override;
int getAuthenticatedPeersCount() const override;

// returns NULL if the passed peer isn't found
Peer::pointer getConnectedPeer(std::string const& ip,
unsigned short port) override;
// returns nullptr if the passed peer isn't found
Peer::pointer getConnectedPeer(PeerBareAddress const& address) override;

void connectToMorePeers(vector<PeerRecord>& peers);
std::vector<Peer::pointer> getRandomAuthenticatedPeers() override;
Expand Down
20 changes: 11 additions & 9 deletions src/overlay/OverlayManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,22 @@ class PeerStub : public Peer
public:
int sent = 0;

PeerStub(Application& app, short port) : Peer(app, WE_CALLED_REMOTE)
PeerStub(Application& app, PeerBareAddress const& addres)
: Peer(app, WE_CALLED_REMOTE)
{
mPeerID = SecretKey::random().getPublicKey();
mState = GOT_AUTH;
mRemoteListeningPort = port;
mAddress = addres;
}
virtual void
drop(bool) override
virtual PeerBareAddress
makeAddress(unsigned short) const override
{
REQUIRE(false); // should not be called
return {};
}
virtual string
getIP() override
virtual void
drop(bool) override
{
return "127.0.0.1";
}
virtual void
sendMessage(xdr::msg_ptr&& xdrBytes) override
Expand All @@ -64,12 +66,12 @@ class OverlayManagerStub : public OverlayManagerImpl
virtual void
connectTo(PeerRecord& pr) override
{
if (!getConnectedPeer(pr.ip(), pr.port()))
if (!getConnectedPeer(pr.getAddress()))
{
pr.backOff(mApp.getClock());
pr.storePeerRecord(mApp.getDatabase());

auto peerStub = std::make_shared<PeerStub>(mApp, pr.port());
auto peerStub = std::make_shared<PeerStub>(mApp, pr.getAddress());
addPendingPeer(peerStub);
REQUIRE(acceptAuthenticatedPeer(peerStub));
}
Expand Down
Loading

0 comments on commit a340312

Please sign in to comment.