Skip to content

Commit

Permalink
Use context provided by request for client -> agent RPC
Browse files Browse the repository at this point in the history
Scoping all API and RPC calls to the same context makes debugging
far easier since the request will be logged.

Oslo_log tries to extract the context either by explicit passing to the
logger or taking it from the thread-local store. The
latter does not work since some of the requests enter a queue before being processed by a different thread.
Therefore the context is passed in job entering the queue and passed to
every log line necessary.
  • Loading branch information
sven-rosenzweig committed Aug 15, 2023
1 parent 42f97b3 commit 1c23104
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 121 deletions.
12 changes: 7 additions & 5 deletions networking_nsxv3/common/synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import functools
import collections

from oslo_context import context

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,10 +70,11 @@ def retry_next(self):

class Runnable(object):

def __init__(self, idn, fn, priority=Priority.LOWEST):
def __init__(self, idn, fn, priority=Priority.LOWEST, context=None):
self.priority = priority
self.idn = idn
self.fn = fn
self.context = context

def __repr__(self):
return str(self.idn)
Expand Down Expand Up @@ -137,7 +139,7 @@ def __init__(self, active_size=INFINITY, passive_size=INFINITY,
self._idle = workers_size
self._state = "not started"

def run(self, priority, ids, fn):
def run(self, priority, ids, fn, context=None):
""" Submit a job with priority
Keyword arguments:
Expand All @@ -154,7 +156,7 @@ def run(self, priority, ids, fn):
try:
LOG.info(MESSAGE.format("Enqueued", jid, priority.name, fn.__name__))

job = Runnable(jid, fn, priority.value)
job = Runnable(jid, fn, priority.value, context)
if priority.value == Priority.HIGHEST:
self._active.put_nowait(job)
else:
Expand All @@ -172,8 +174,8 @@ def _start(self):
self._active.put_nowait(self._passive.get_nowait())
self._passive.task_done()
job = self._active.get(block=True, timeout=TIMEOUT)
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__))
self._workers.spawn(job.fn, job.idn)#.wait()
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__), context=job.context)
self._workers.spawn(job.fn, job.idn, job.context)#.wait()
self._active.task_done()
except eventlet.queue.Empty:
LOG.info("No activity for the last {} seconds.".format(TIMEOUT))
Expand Down
32 changes: 16 additions & 16 deletions networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,55 +72,55 @@ def get_network_bridge(self, context, current, network_segments, network_current
seg_id = ns.get("segmentation_id")
net_type = ns.get("network_type")
if seg_id and net_type in nsxv3_constants.NSXV3_AGENT_NETWORK_TYPES:
network_meta = self.realizer.network(seg_id)
network_meta = self.realizer.network(seg_id, context=context)
break

if try_create_port and bool(network_meta.get("nsx-logical-switch-id")):
self.realizer.precreate_port(current["id"], network_meta)
self.realizer.precreate_port(current["id"], network_meta, context=context)

return network_meta

def security_groups_member_updated(self, context, **kwargs):
self.callback(kwargs["security_groups"], self.realizer.security_group_members)
self.callback(kwargs["security_groups"], self.realizer.security_group_members, context)

def security_groups_rule_updated(self, context, **kwargs):
self.callback(kwargs["security_groups"], self.realizer.security_group_rules)
self.callback(kwargs["security_groups"], self.realizer.security_group_rules, context)

def port_create(self, **kwargs):
self.realizer.port(kwargs["port"]["id"])

def port_update(self, context, **kwargs):
# Ensure security groups attached to the port are synced first
for sg in kwargs["port"].get("security_groups", []):
self.callback(sg, self.realizer.security_group_rules)
self.callback(sg, self.realizer.security_group_rules, context)
# Also ensure allowed_address_pairs are re-processed
self.callback(sg, self.realizer.security_group_members)
self.callback(kwargs["port"]["id"], self.realizer.port)
self.callback(sg, self.realizer.security_group_members, context)
self.callback(kwargs["port"]["id"], self.realizer.port, context)

def port_delete(self, context, **kwargs):
# Ports removed by the background synchronization
pass

def create_policy(self, context, policy):
self.update_policy(context, policy)
self.update_policy(context, policy, context)

def delete_policy(self, context, policy):
self.update_policy(context, policy)
self.update_policy(context, policy, context)

def update_policy(self, context, policy):
self.callback(policy["id"], self.realizer.qos)
self.callback(policy["id"], self.realizer.qos, context)

def validate_policy(self, context, policy):
pass

def create_log(self, context, log_obj):
self.callback(log_obj, self.realizer.enable_policy_logging)
self.callback(log_obj, self.realizer.enable_policy_logging, context)

def create_log_precommit(self, context, log_obj):
pass

def update_log(self, context, log_obj):
self.callback(log_obj, self.realizer.update_policy_logging)
self.callback(log_obj, self.realizer.update_policy_logging, context)

def update_log_precommit(self, context, log_obj):
pass
Expand Down Expand Up @@ -162,15 +162,15 @@ def _sync_all(self):
except Exception as err:
LOG.error("Synchronization has failed. Error: %s", err)

def _sync_immediate(self, os_ids, realizer):
def _sync_immediate(self, os_ids, realizer, context=None):
ids = list(os_ids) if isinstance(os_ids, set) else os_ids
ids = ids if isinstance(ids, list) else [ids]
self.runner.run(sync.Priority.HIGHEST, ids, realizer)
self.runner.run(sync.Priority.HIGHEST, ids, realizer, context)

def _sync_delayed(self, os_ids, realizer):
def _sync_delayed(self, os_ids, realizer, context=None):
ids = list(os_ids) if isinstance(os_ids, set) else os_ids
ids = ids if isinstance(ids, list) else [ids]
self.runner.run(sync.Priority.HIGH, ids, realizer)
self.runner.run(sync.Priority.HIGH, ids, realizer, context)

def kpi(self):
return {"active": self.runner.active(), "passive": self.runner.passive()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def metadata(self, resource_type, os_id) -> ResourceMeta:
def meta_provider(self, resource_type) -> base.MetaProvider:
return self._metadata.get(resource_type)

def _realize(self, resource_type, delete, convertor, os_o, provider_o):
def _realize(self, resource_type, delete, convertor, os_o, provider_o, context=None):
os_id = os_o.get("id")

begin_report = "[{}] Resource: {} with ID: {} is going to be %s.".format(self.provider, resource_type, os_id)
Expand All @@ -601,24 +601,24 @@ def _realize(self, resource_type, delete, convertor, os_o, provider_o):
if resource_type == Provider.PORT:
res = self.client.get(path=path)
if res.status_code == 404:
LOG.info(end_report, "rescheduled due 404: not found")
LOG.info(end_report, "rescheduled due 404: not found", context=context)
return metadata

o = res.json()
if o.get("attachment", {}).get("context", {}).get("vif_type") != "CHILD":
stamp = int(o.get("_last_modified_time")) / 1000

if not self.orphan_ports_tmout_passed(stamp):
LOG.info(end_report, "rescheduled for deletion")
LOG.info(end_report, "rescheduled for deletion", context=context)
return metadata

self.client.delete(path=path, params=params)

LOG.info(end_report, "deleted")
LOG.info(end_report, "deleted", context=context)

return self.metadata_delete(resource_type, os_id)
else:
LOG.info(begin_report, "updated")
LOG.info(begin_report, "updated", context=context)
if resource_type == Provider.SG_RULES_EXT:
LOG.debug(
"Skipping update of NSGroup:%s",
Expand All @@ -627,15 +627,15 @@ def _realize(self, resource_type, delete, convertor, os_o, provider_o):
if metadata.revision != None:
data["_revision"] = metadata.revision
o = self.client.put(path=path, data=data)
LOG.info(end_report, "updated")
LOG.info(end_report, "updated", context=context)
return self.metadata_update(resource_type, o.json())
else:
if not delete:
LOG.info(begin_report, "created")
LOG.info(begin_report, "created", context=context)
o = self.client.post(path=path, data=convertor(os_o, provider_o))
LOG.info(end_report, "created")
LOG.info(end_report, "created", context=context)
return self.metadata_update(resource_type, o.json())
LOG.info(end_report, "already deleted")
LOG.info(end_report, "already deleted", context=context)

def outdated(self, resource_type: str, os_meta):
self.metadata_refresh(resource_type)
Expand Down Expand Up @@ -698,7 +698,7 @@ def get_port(self, os_id):
return self.metadata_update(Provider.PORT, port), port
return None

def port_realize(self, os_port: dict, delete=False):
def port_realize(self, os_port: dict, delete=False, context=None):
provider_port = dict()

if delete:
Expand All @@ -711,39 +711,39 @@ def port_realize(self, os_port: dict, delete=False):
if parent_port and parent_port[0]:
provider_port["parent_id"] = parent_port[0].id
else:
LOG.warning("Not found. Parent Port:%s for Child Port:%s", os_port.get("parent_id"), os_port.get("id"))
LOG.warning("Not found. Parent Port:%s for Child Port:%s", os_port.get("parent_id"), os_port.get("id"), context=context)
return
else:
# Parent port is NOT always created externally
port = self.get_port(os_port.get("id"))
if port and port[0]:
provider_port["id"] = port[0].id
else:
LOG.warning("Not found. Port: %s", os_port.get("id"))
LOG.warning("Not found. Port: %s", os_port.get("id"), context=context)

if os_port.get("qos_policy_id"):
meta_qos = self.metadata(Provider.QOS, os_port.get("qos_policy_id"))
if meta_qos:
provider_port["qos_policy_id"] = meta_qos.id
else:
LOG.warning("Not found. QoS:%s for Port:%s", os_port.get("qos_policy_id"), os_port.get("id"))
LOG.warning("Not found. QoS:%s for Port:%s", os_port.get("qos_policy_id"), os_port.get("id"), context=context)

provider_port["switching_profile_ids"] = copy.deepcopy(self.switch_profiles)

return self._realize(Provider.PORT, False, self.payload.port, os_port, provider_port)
return self._realize(Provider.PORT, False, self.payload.port, os_port, provider_port, context=context)

def qos_realize(self, qos, delete=False):
return self._realize(Provider.QOS, delete, self.payload.qos, qos, dict())
def qos_realize(self, qos, delete=False, context=None):
return self._realize(Provider.QOS, delete, self.payload.qos, qos, dict(), context)

def sg_members_realize(self, sg, delete=False):
def sg_members_realize(self, sg, delete=False, context=None):
if delete and self.metadata(Provider.SG_RULES, sg.get("id")):
LOG.warning(
"Resource: %s with ID: %s deletion is rescheduled due to dependency.", Provider.SG_MEMBERS, sg.get("id")
)
return
return self._realize(Provider.SG_MEMBERS, delete, self.payload.sg_members_container, sg, dict())
return self._realize(Provider.SG_MEMBERS, delete, self.payload.sg_members_container, sg, dict(), context)

def sg_rules_realize(self, os_sg, delete=False, logged=False):
def sg_rules_realize(self, os_sg, delete=False, logged=False, context=None):
provider_sg = dict()

nsg_args = [Provider.SG_RULES_EXT, delete, self.payload.sg_rules_ext_container, os_sg, dict()]
Expand All @@ -764,7 +764,7 @@ def sg_rules_realize(self, os_sg, delete=False, logged=False):

# Update section tags(revision) when all rules applied successfully
provider_sg["tags_update"] = True
self._realize(*sec_args)
self._realize(*sec_args, context=context)

def _sg_rules_realize(self, os_sg, meta_sg: ResourceMeta, logged=False):

Expand Down Expand Up @@ -824,7 +824,7 @@ def _create_sg_provider_rule_remote_prefix(self, cidr):
def _delete_sg_provider_rule_remote_prefix(self, id):
self.client.delete(path=API.IPSET.format(id))

def network_realize(self, segmentation_id):
def network_realize(self, segmentation_id, context=None):
meta = self.metadata(self.NETWORK, segmentation_id)
if not meta:
os_net = {"id": "{}-{}".format(self.zone_name, segmentation_id), "segmentation_id": segmentation_id}
Expand Down
Loading

0 comments on commit 1c23104

Please sign in to comment.