Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask doesn't like NanoAODs with bad cross-references #468

Closed
mat-adamec opened this issue Mar 17, 2021 · 2 comments · Fixed by #427
Closed

Dask doesn't like NanoAODs with bad cross-references #468

mat-adamec opened this issue Mar 17, 2021 · 2 comments · Fixed by #427
Labels
bug Something isn't working

Comments

@mat-adamec
Copy link

Describe the bug
I am trying to use the newest version of coffea with Dask on a NanoAOD with messy cross-references (i.e., Electron_genPartIdx exists but GenPart does not). On futures, I could use:

NanoAODSchema.warn_missing_crossrefs = True

as a fix. On Dask, this does not seem to work, and the error of:

RuntimeError: Parsing indexer Electron_genPartIdx, expected to find collection GenPart but did not

persists.

To Reproduce
This issue can be reproduced by plugging this sample file: root://eospublic.cern.ch//eos/root-eos/benchmark/Run2012B_SingleMu.root

into a processor which executes on Dask. For a basic example:

class Processor(processor.ProcessorABC):
    def __init__(self):
        dataset_axis = hist.Cat("dataset", "")
        MET_axis = hist.Bin("MET", "MET [GeV]", 50, 0, 100)
        

        self._accumulator = processor.dict_accumulator({
            'MET': hist.Hist("Counts", dataset_axis, MET_axis),
            'cutflow': processor.defaultdict_accumulator(int)
        })
    
    @property
    def accumulator(self):
        return self._accumulator
    
    def process(self, events):
        output = self.accumulator.identity()
       
        dataset = events.metadata["dataset"]
        MET = events.MET.pt
     
        output['MET'].fill(dataset=dataset, MET=MET)
        return output

    def postprocess(self, accumulator):
        return accumulator

from dask.distributed import Client

client = Client(<some client>)

fileset = {'SingleMu' : ["root://eospublic.cern.ch//eos/root-eos/benchmark/Run2012B_SingleMu.root"]}
output = processor.run_uproot_job(fileset=fileset, 
                       treename="Events", 
                       processor_instance=Processor(),
                       executor=processor.dask_executor,
                       executor_args={'schema': processor.NanoAODSchema, 'client': client},
                       chunksize=250000)

Expected behavior
Expected behavior should be to run the processor without any issue, as happens with futures.

Output

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-7-c7111712ac70> in <module>
     13 client.run(fix)
     14 
---> 15 output = processor.run_uproot_job(fileset=fileset, 
     16                        treename="Events",
     17                        processor_instance=Processor(),

/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py in run_uproot_job(fileset, treename, processor_instance, executor, executor_args, pre_executor, pre_args, chunksize, maxchunks, metadata_cache)
   1216     }
   1217     exe_args.update(executor_args)
-> 1218     executor(chunks, closure, wrapped_out, **exe_args)
   1219 
   1220     if not use_dataframes:

/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py in dask_executor(items, function, accumulator, **kwargs)
    750             # FIXME: fancy widget doesn't appear, have to live with boring pbar
    751             progress(work, multi=True, notebook=False)
--> 752         accumulator += _maybe_decompress(work.result())
    753         return accumulator
    754     else:

/opt/conda/lib/python3.8/site-packages/distributed/client.py in result(self, timeout)
    220         if self.status == "error":
    221             typ, exc, tb = result
--> 222             raise exc.with_traceback(tb)
    223         elif self.status == "cancelled":
    224             raise result

/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py in __call__()
    128     # no @wraps due to pickle
    129     def __call__(self, *args, **kwargs):
--> 130         out = self.function(*args, **kwargs)
    131         return lz4f.compress(pickle.dumps(out, protocol=_PICKLE_PROTOCOL), compression_level=self.level)
    132 

/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py in _work_function()
    948         except Exception as e:
    949             if retries == retry_count:
--> 950                 raise e
    951             w_str = 'Attempt %d of %d. Will retry.' % (retry_count + 1, retries + 1)
    952             warnings.warn(w_str)

/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py in _work_function()
    883                 elif issubclass(schema, schemas.BaseSchema):
    884                     materialized = []
--> 885                     factory = NanoEventsFactory.from_root(
    886                         file=file,
    887                         treepath=item.treename,

/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/factory.py in from_root()
    117         base_form = mapping._extract_base_form(tree)
    118 
--> 119         return cls._from_mapping(
    120             mapping,
    121             partition_key,

/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/factory.py in _from_mapping()
    338         if metadata is not None:
    339             base_form["parameters"]["metadata"] = metadata
--> 340         schema = schemaclass(base_form)
    341         return cls(schema, mapping, tuple_to_key(partition_key), cache=runtime_cache)
    342 

/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/schemas/nanoaod.py in __init__()
    101         self._version = version
    102         super().__init__(base_form)
--> 103         self._form["contents"] = self._build_collections(self._form["contents"])
    104         self._form["parameters"]["metadata"]["version"] = self._version
    105 

/opt/conda/lib/python3.8/site-packages/coffea/nanoevents/schemas/nanoaod.py in _build_collections()
    134                         continue
    135                     else:
--> 136                         raise problem
    137                 branch_forms[k + "G"] = transforms.local2global_form(
    138                     branch_forms[k], branch_forms["o" + target]

RuntimeError: Parsing indexer Muon_genPartIdx, expected to find collection GenPart but did not

Additional context
A potential fix was offered by Nick:

def fix():
    processor.NanoAODSchema.warn_missing_crossrefs = True
    
client.run(fix)

but this fix only works with a set number of workers. If autoscaling is allowed, new workers don't get the fix applied to them.

@mat-adamec mat-adamec added the bug Something isn't working label Mar 17, 2021
@oshadura
Copy link

cc: @nsmith-

@nsmith-
Copy link
Member

nsmith- commented Mar 18, 2021

Here is what I was originally thinking as a more effective patch:

from distributed import WorkerPlugin

class WorkerPatcher(WorkerPlugin):
    name = 'WorkerPatcher'

    def __init__(self):
        pass

    def setup(self, worker):
        from coffea.nanoevents import NanoAODSchema
        NanoAODSchema.warn_missing_crossrefs = True

    def teardown(self, worker):
        pass
    
client.register_worker_plugin(WorkerPatcher())

but while testing it I found actually a simpler method:

def fix():
    from coffea.nanoevents import NanoAODSchema
    NanoAODSchema.warn_missing_crossrefs = True

client.register_worker_callbacks(fix)

I guess with the distinction that the callback is a simple function where the plugin could in theory hold state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants