Skip to content

Commit

Permalink
Use provided context for agent --> server RPC
Browse files Browse the repository at this point in the history
Calls, initially started from the driver, made from the agent to neutron server also used a one-time created
context instead of the context associated with the call, leading to one
single request id for all calls made by the agent, instead of them being
tied to a client request for which they were made.

For calls, made from the agent to neutron server initially triggered by the sync-loop, still use
the one-time created context.
  • Loading branch information
sven-rosenzweig committed Aug 16, 2023
1 parent e16c94e commit ae333f9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 27 deletions.
39 changes: 21 additions & 18 deletions networking_nsxv3/api/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def __init__(self, context):

self.context = context
self.rpc = rpc.get_client(target)

def _get_call_context(self, host=None):
topic = topics.get_topic_name(
topics.AGENT, nsxv3_constants.NSXV3, topics.UPDATE, host)
Expand Down Expand Up @@ -133,7 +132,11 @@ def __init__(self):
self.context = neutron_context.get_admin_context()
self.client = rpc.get_client(target)
self.host = cfg.CONF.host


def _choose_context(self, request_context):
if not request_context:
return self.context
return request_context
@log_helpers.log_method_call
def get_ports_with_revisions(self, limit, cursor):
cctxt = self.client.prepare()
Expand All @@ -153,38 +156,38 @@ def get_security_groups_with_revisions(self, limit, cursor):
host=self.host, limit=limit, cursor=cursor)

@log_helpers.log_method_call
def get_security_group(self, security_group_id):
def get_security_group(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_security_group',
return cctxt.call(self._choose_context(request_context), 'get_security_group',
host=self.host, security_group_id=security_group_id)

@log_helpers.log_method_call
def get_qos(self, qos_id):
def get_qos(self, qos_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_qos', host=self.host, qos_id=qos_id)
return cctxt.call(self._choose_context(request_context), 'get_qos', host=self.host, qos_id=qos_id)

@log_helpers.log_method_call
def get_port(self, port_id):
def get_port(self, port_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_port', host=self.host, port_id=port_id)
return cctxt.call(self._choose_context(request_context), 'get_port', host=self.host, port_id=port_id)

@log_helpers.log_method_call
def get_rules_for_security_group_id(self, security_group_id):
def get_rules_for_security_group_id(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_rules_for_security_group_id',
return cctxt.call(self._choose_context(request_context), 'get_rules_for_security_group_id',
security_group_id=security_group_id)

@log_helpers.log_method_call
def get_security_group_members_effective_ips(self, security_group_id):
def get_security_group_members_effective_ips(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context,
return cctxt.call(self._choose_context(request_context),
'get_security_group_members_effective_ips',
security_group_id=security_group_id)

@log_helpers.log_method_call
def get_security_group_port_ids(self, security_group_id):
def get_security_group_port_ids(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_security_group_port_ids',
return cctxt.call(self._choose_context(request_context), 'get_security_group_port_ids',
host=self.host, security_group_id=security_group_id)

@log_helpers.log_method_call
Expand All @@ -200,9 +203,9 @@ def get_remote_security_groups_for_host(self, limit, cursor):
host=self.host, limit=limit, cursor=cursor)

@log_helpers.log_method_call
def has_security_group_used_by_host(self, security_group_id):
def has_security_group_used_by_host(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'has_security_group_used_by_host',
return cctxt.call(self._choose_context(request_context), 'has_security_group_used_by_host',
host=self.host, security_group_id=security_group_id)

@log_helpers.log_method_call
Expand All @@ -211,9 +214,9 @@ def get_port_logging(self, port_id):
return cctxt.call(self.context, 'get_port_logging', port_id=port_id)

@log_helpers.log_method_call
def has_security_group_logging(self, security_group_id):
def has_security_group_logging(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'has_security_group_logging',
return cctxt.call(self._choose_context(request_context), 'has_security_group_logging',
security_group_id=security_group_id)


Expand Down
18 changes: 9 additions & 9 deletions networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ def security_group_members(self, os_id: str, reference=False, context=None):
pp = self.plcy_provider
meta = pp.metadata(pp.SG_MEMBERS, os_id)
if not (reference and meta):
if self.rpc.has_security_group_used_by_host(os_id):
cidrs = self.rpc.get_security_group_members_effective_ips(os_id)
port_ids = set(self.rpc.get_security_group_port_ids(os_id))
if self.rpc.has_security_group_used_by_host(os_id, context):
cidrs = self.rpc.get_security_group_members_effective_ips(os_id, context)
port_ids = set(self.rpc.get_security_group_port_ids(os_id, context))

segment_ports = pp.get_port_meta_by_ids(port_ids)
paths = [p.path for p in segment_ports]
Expand All @@ -232,20 +232,20 @@ def security_group_rules(self, os_id: str, context=None):
LOG.info(f"{self.MIGR_IN_PROGRESS_MSG.format('security_group_rules realization')}")
return
with LockManager.get_lock("rules-{}".format(os_id)):
os_sg = self.rpc.get_security_group(os_id)
os_sg = self.rpc.get_security_group(os_id, context)

if os_sg and os_sg.get("ports"):
# Create Members Container
self.security_group_members(os_id, reference=True, context=context)

os_sg["rules"] = self.rpc.get_rules_for_security_group_id(os_id)
os_sg["rules"] = self.rpc.get_rules_for_security_group_id(os_id, context)

for os_rule in os_sg["rules"]:
remote_id = os_rule.get("remote_group_id")
if remote_id:
self.security_group_members(remote_id, reference=True, context=context)

logged = self.rpc.has_security_group_logging(os_id)
logged = self.rpc.has_security_group_logging(os_id, context)
LOG.info(f"Neutron DB logged flag for {os_id}: rpc.has_security_group_logging(os_id): {logged}",
context=context)
self.plcy_provider.sg_rules_realize(os_sg, logged=logged, context=context)
Expand All @@ -264,7 +264,7 @@ def precreate_port(self, os_id: str, network_meta: dict, context=None):
LOG.info(f"{self.MIGR_IN_PROGRESS_MSG.format('port realization')}")
return
with LockManager.get_lock("port-{}".format(os_id)):
port: dict = self.rpc.get_port(os_id)
port: dict = self.rpc.get_port(os_id, context)
if port:
os_qid = port.get("qos_policy_id")
if os_qid:
Expand All @@ -283,7 +283,7 @@ def port(self, os_id: str, context=None):
LOG.info(f"{self.MIGR_IN_PROGRESS_MSG.format('port realization')}")
return
with LockManager.get_lock("port-{}".format(os_id)):
port: dict = self.rpc.get_port(os_id)
port: dict = self.rpc.get_port(os_id, context)
if port:
os_qid = port.get("qos_policy_id")
if os_qid:
Expand All @@ -307,7 +307,7 @@ def qos(self, os_id: str, reference=False, context=None):

meta = provider.metadata(provider.QOS, os_id)
if not (reference and meta):
qos = self.rpc.get_qos(os_id)
qos = self.rpc.get_qos(os_id, context)
if qos:
self._qos_realize(os_qos=qos, context=context)
else:
Expand Down

0 comments on commit ae333f9

Please sign in to comment.