Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add loss/cost on validation data https://github.com/uma-pi1/kge/issues/2 #99

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
9 changes: 7 additions & 2 deletions kge/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ KvsAll:
s_o: False
_po: True

# Dataset splits from which the labels for a query are taken from. Default: If
# nothing is specified, then the train split is used.
label_splits: []
samuelbroscheit marked this conversation as resolved.
Show resolved Hide resolved

# Options for negative sampling training (train.type=="negative_sampling")
negative_sampling:
# Negative sampler to use
Expand Down Expand Up @@ -264,7 +268,7 @@ negative_sampling:
p: False # as above
o: False # as above

split: '' # split containing the positives; default is train.split
splits: [] # splits containing the positives; default is train.split
samuelbroscheit marked this conversation as resolved.
Show resolved Hide resolved

# Implementation to use for filtering.
# standard: use slow generic implementation, available for all samplers
Expand Down Expand Up @@ -327,7 +331,8 @@ eval:
# mean_reciprocal_rank_filtered_with_test.
filter_with_test: True

# Type of evaluation (entity_ranking only at the moment)
# Type of evaluation (entity_ranking or training_loss). Currently,
# entity_ranking runs training_loss as well.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for disabling the automatic running of training_loss. Makes evaluation too costly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation and test data are a lot smaller (1/10 is the worst valid/train ratio in our current datasets).

type: entity_ranking

# Compute Hits@K for these choices of K
Expand Down
19 changes: 19 additions & 0 deletions kge/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import defaultdict, OrderedDict
import numba
import numpy as np
from kge.misc import powerset, merge_dicts_of_1dim_torch_tensors


def _group_by(keys, values) -> dict:
Expand Down Expand Up @@ -222,13 +223,31 @@ def _invert_ids(dataset, obj: str):
dataset.config.log(f"Indexed {len(inv)} {obj} ids", prefix=" ")


def merge_KvsAll_indexes(dataset, split, key):
value = dict([("sp", "o"), ("po", "s"), ("so", "p")])[key]
split_combi_str = "_".join(sorted(split))
index_name = f"{split_combi_str}_{key}_to_{value}"
indexes = [dataset.index(f"{_split}_{key}_to_{value}") for _split in split]
dataset._indexes[index_name] = merge_dicts_of_1dim_torch_tensors(indexes)
return dataset._indexes[index_name]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of merging indexes (very expensive memory wise), the code should support using multiple indexes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For NS I see that it works, for KvsAll I have to merge them. But granted, not neccessarily as an index.

Copy link
Member

@rgemulla rgemulla May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant here: the correspoding training jobs should support this. Alternatively, one may write a wrapper index that takes muliple indexes and merges there results on demand given a key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For KvsAll, you can also just add the labels of each label split sequentially in the collate function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not allow using label_splits other than for forward only. Otherwise they leak valid/test data. Likewise for negative sampling, I guess.



def create_default_index_functions(dataset: "Dataset"):
for split in dataset.files_of_type("triples"):
for key, value in [("sp", "o"), ("po", "s"), ("so", "p")]:
# self assignment needed to capture the loop var
dataset.index_functions[f"{split}_{key}_to_{value}"] = IndexWrapper(
index_KvsAll, split=split, key=key
)
# create all combinations of splits of length 2 and 3
for split_combi in powerset(dataset.files_of_type("triples"), [2, 3]):
for key, value in [("sp", "o"), ("po", "s"), ("so", "p")]:
split_combi_str = "_".join(sorted(split_combi))
index_name = f"{split_combi_str}_{key}_to_{value}"
dataset.index_functions[index_name] = IndexWrapper(
merge_KvsAll_indexes, split=split_combi, key=key
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it even worse.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? They are not invoked.

As a sidenote: This was neccasary because there is currently no way to determine the cross-validation semantics of a dataset file (train,valid,test). We should (in another PR) add a xvaltype attribute to the dataset files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also not needed. Let's keep the code clean and not add "orphans"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding more information to the dataset configuration is a good idea (if useful for something).


dataset.index_functions["relation_types"] = index_relation_types
dataset.index_functions["relations_per_type"] = index_relation_types
dataset.index_functions["frequency_percentiles"] = index_frequency_percentiles
Expand Down
50 changes: 13 additions & 37 deletions kge/job/entity_ranking.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import math
import time
from typing import Dict, Any

import torch
import kge.job
from kge.job import EvaluationJob, Job
from kge.job import EvaluationJob, Job, TrainingJob
from kge import Config, Dataset
from collections import defaultdict

Expand All @@ -13,18 +14,23 @@ class EntityRankingJob(EvaluationJob):

def __init__(self, config: Config, dataset: Dataset, parent_job, model):
super().__init__(config, dataset, parent_job, model)
self.is_prepared = False

if self.__class__ == EntityRankingJob:
for f in Job.job_created_hooks:
f(self)

max_k = min(
self.dataset.num_entities(), max(self.config.get("eval.hits_at_k_s"))
)
self.hits_at_k_s = list(
filter(lambda x: x <= max_k, self.config.get("eval.hits_at_k_s"))
)
self.filter_with_test = config.get("eval.filter_with_test")


def _prepare(self):
"""Construct all indexes needed to run."""

if self.is_prepared:
return

# create data and precompute indexes
self.triples = self.dataset.split(self.config.get("eval.split"))
for split in self.filter_splits:
Expand Down Expand Up @@ -75,16 +81,8 @@ def _collate(self, batch):
return batch, label_coords, test_label_coords

@torch.no_grad()
def run(self) -> dict:
self._prepare()

was_training = self.model.training
self.model.eval()
self.config.log(
"Evaluating on "
+ self.eval_split
+ " data (epoch {})...".format(self.epoch)
)
def _run(self) -> Dict[str, Any]:

num_entities = self.dataset.num_entities()

# we also filter with test data if requested
Expand Down Expand Up @@ -399,28 +397,6 @@ def merge_hist(target_hists, source_hists):
event="eval_completed",
**metrics,
)
for f in self.post_epoch_trace_hooks:
f(self, trace_entry)

# if validation metric is not present, try to compute it
metric_name = self.config.get("valid.metric")
if metric_name not in trace_entry:
trace_entry[metric_name] = eval(
self.config.get("valid.metric_expr"),
None,
dict(config=self.config, **trace_entry),
)

# write out trace
trace_entry = self.trace(**trace_entry, echo=True, echo_prefix=" ", log=True)

# reset model and return metrics
if was_training:
self.model.train()
self.config.log("Finished evaluating on " + self.eval_split + " split.")

for f in self.post_valid_hooks:
f(self, trace_entry)

return trace_entry

Expand Down
151 changes: 138 additions & 13 deletions kge/job/eval.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import torch
import time
from typing import Any, Optional, Dict

import torch
from kge import Config, Dataset
from kge.job import Job
from kge.model import KgeModel

from typing import Dict, Union, Optional
from kge.job import Job, TrainingJob


class EvaluationJob(Job):
Expand All @@ -16,12 +16,6 @@ def __init__(self, config, dataset, parent_job, model):
self.model = model
self.batch_size = config.get("eval.batch_size")
self.device = self.config.get("job.device")
max_k = min(
self.dataset.num_entities(), max(self.config.get("eval.hits_at_k_s"))
)
self.hits_at_k_s = list(
filter(lambda x: x <= max_k, self.config.get("eval.hits_at_k_s"))
)
self.config.check("train.trace_level", ["example", "batch", "epoch"])
self.trace_examples = self.config.get("eval.trace_level") == "example"
self.trace_batch = (
Expand All @@ -31,9 +25,11 @@ def __init__(self, config, dataset, parent_job, model):
self.filter_splits = self.config.get("eval.filter_splits")
if self.eval_split not in self.filter_splits:
self.filter_splits.append(self.eval_split)
self.filter_with_test = config.get("eval.filter_with_test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accidental delete? where was this moved?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entity_ranking

self.epoch = -1

self.verbose = True
samuelbroscheit marked this conversation as resolved.
Show resolved Hide resolved
self.is_prepared = False

#: Hooks run after training for an epoch.
#: Signature: job, trace_entry
self.post_epoch_hooks = []
Expand Down Expand Up @@ -64,6 +60,22 @@ def __init__(self, config, dataset, parent_job, model):
if config.get("eval.metrics_per.argument_frequency"):
self.hist_hooks.append(hist_per_frequency_percentile)

# Add the training loss as a default to every evaluation job
# TODO: create AggregatingEvaluationsJob that runs and aggregates a list
# of EvaluationAjobs, such that users can configure combinations of
# EvalJobs themselves. Then this can be removed.
# See https://github.com/uma-pi1/kge/issues/102
if not isinstance(self, TrainingLossEvaluationJob):
self.eval_train_loss_job = TrainingLossEvaluationJob(
config, dataset, parent_job=self, model=model
)
self.eval_train_loss_job.verbose = False
self.post_epoch_trace_hooks.append(
lambda job, trace: trace.update(
avg_loss=self.eval_train_loss_job.run()["avg_loss"]
)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop this.


# all done, run job_created_hooks if necessary
if self.__class__ == EvaluationJob:
for f in Job.job_created_hooks:
Expand All @@ -81,10 +93,66 @@ def create(config, dataset, parent_job=None, model=None):
return EntityPairRankingJob(
config, dataset, parent_job=parent_job, model=model
)
elif config.get("eval.type") == "training_loss":
return TrainingLossEvaluationJob(
config, dataset, parent_job=parent_job, model=model
)
else:
raise ValueError("eval.type")

def run(self) -> dict:
def _prepare(self):
"""Prepare this job for running. Guaranteed to be called exactly once
"""
raise NotImplementedError

def run(self) -> Dict[str, Any]:

samuelbroscheit marked this conversation as resolved.
Show resolved Hide resolved
if not self.is_prepared:
self._prepare()
self.model.prepare_job(self) # let the model add some hooks
self.is_prepared = True

was_training = self.model.training
self.model.eval()
self.config.log(
"Evaluating on "
+ self.eval_split
+ " data (epoch {})...".format(self.epoch),
echo=self.verbose,
)

trace_entry = self._run()

# if validation metric is not present, try to compute it
samuelbroscheit marked this conversation as resolved.
Show resolved Hide resolved
metric_name = self.config.get("valid.metric")
if metric_name not in trace_entry:
trace_entry[metric_name] = eval(
self.config.get("valid.metric_expr"),
None,
dict(config=self.config, **trace_entry),
)

for f in self.post_epoch_trace_hooks:
f(self, trace_entry)

# write out trace
trace_entry = self.trace(
**trace_entry, echo=self.verbose, echo_prefix=" ", log=True
)

# reset model and return metrics
if was_training:
self.model.train()
self.config.log(
"Finished evaluating on " + self.eval_split + " split.", echo=self.verbose
)

for f in self.post_valid_hooks:
f(self, trace_entry)

return trace_entry

def _run(self) -> Dict[str, Any]:
""" Compute evaluation metrics, output results to trace file """
raise NotImplementedError

Expand Down Expand Up @@ -132,8 +200,65 @@ def create_from(
return super().create_from(checkpoint, new_config, dataset, parent_job)


# HISTOGRAM COMPUTATION ###############################################################
class TrainingLossEvaluationJob(EvaluationJob):
""" Entity ranking evaluation protocol """

def __init__(self, config: Config, dataset: Dataset, parent_job, model):
super().__init__(config, dataset, parent_job, model)
self.is_prepared = True
samuelbroscheit marked this conversation as resolved.
Show resolved Hide resolved

train_job_on_eval_split_config = config.clone()
train_job_on_eval_split_config.set("train.split", self.eval_split)
train_job_on_eval_split_config.set("verbose", False)
train_job_on_eval_split_config.set(
"negative_sampling.filtering.splits",
[self.config.get("train.split"), self.eval_split] + ["valid"]
if self.eval_split == "test"
else [],
)
train_job_on_eval_split_config.set(
"KvsAll.label_splits",
[self.config.get("train.split"), self.eval_split] + ["valid"]
if self.eval_split == "test"
else [],
)
self._train_job = TrainingJob.create(
config=train_job_on_eval_split_config,
parent_job=self,
dataset=dataset,
model=model,
initialize_for_forward_only=True,
)
self._train_job_verbose = False

if self.__class__ == TrainingLossEvaluationJob:
for f in Job.job_created_hooks:
f(self)

@torch.no_grad()
def _run(self) -> Dict[str, Any]:

epoch_time = -time.time()

self.epoch = self.parent_job.epoch
epoch_time += time.time()

train_trace_entry = self._train_job.run_epoch()
# compute trace
trace_entry = dict(
type="training_loss",
scope="epoch",
split=self.eval_split,
epoch=self.epoch,
epoch_time=epoch_time,
event="eval_completed",
avg_loss=train_trace_entry["avg_loss"],
)

return trace_entry


# HISTOGRAM COMPUTATION ###############################################################

def __initialize_hist(hists, key, job):
"""If there is no histogram with given `key` in `hists`, add an empty one."""
Expand Down
Loading