Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

wire a context in most of Datastore/Blockstore methods, connect a few #421

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 32 additions & 27 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ import (

bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/internal/session"
bsnet "github.com/ipfs/go-bitswap/network"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
bsnet "github.com/ipfs/go-bitswap/network"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)

type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)
type distFunc func(b *testing.B, ctx context.Context, provs []testinstance.Instance, blocks []blocks.Block)

type runStats struct {
DupsRcvd uint64
Expand Down Expand Up @@ -105,13 +105,14 @@ var benches = []bench{
}

func BenchmarkFixedDelay(b *testing.B) {
ctx := context.Background()
benchmarkLog = nil
fixedDelay := delay.Fixed(10 * time.Millisecond)
bstoreLatency := time.Duration(0)

for _, bch := range benches {
b.Run(bch.name, func(b *testing.B) {
subtestDistributeAndFetch(b, bch.nodeCount, bch.blockCount, fixedDelay, bstoreLatency, bch.distFn, bch.fetchFn)
subtestDistributeAndFetch(b, ctx, bch.nodeCount, bch.blockCount, fixedDelay, bstoreLatency, bch.distFn, bch.fetchFn)
})
}

Expand All @@ -134,6 +135,7 @@ var mixedBenches = []mixedBench{
}

func BenchmarkFetchFromOldBitswap(b *testing.B) {
ctx := context.Background()
benchmarkLog = nil
fixedDelay := delay.Fixed(10 * time.Millisecond)
bstoreLatency := time.Duration(0)
Expand Down Expand Up @@ -176,7 +178,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
blocks[0] = rootBlock[0]

// Run the distribution
runDistributionMulti(b, instances[:fetcherCount], instances[fetcherCount:], blocks, bstoreLatency, bch.distFn, bch.fetchFn)
runDistributionMulti(b, ctx, instances[:fetcherCount], instances[fetcherCount:], blocks, bstoreLatency, bch.distFn, bch.fetchFn)

newNodeGenerator.Close()
oldNodeGenerator.Close()
Expand Down Expand Up @@ -208,6 +210,7 @@ const stdBlockSize = 8000
const largeBlockSize = int64(256 * 1024)

func BenchmarkRealWorld(b *testing.B) {
ctx := context.Background()
benchmarkLog = nil
benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
var randomGen *rand.Rand = nil
Expand All @@ -233,20 +236,21 @@ func BenchmarkRealWorld(b *testing.B) {
bstoreLatency := time.Duration(0)

b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
subtestDistributeAndFetchRateLimited(b, ctx, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
})
b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
subtestDistributeAndFetchRateLimited(b, ctx, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
})
b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
subtestDistributeAndFetchRateLimited(b, ctx, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
})
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
_ = ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
printResults(benchmarkLog)
}

func BenchmarkDatacenter(b *testing.B) {
ctx := context.Background()
benchmarkLog = nil
benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
var randomGen *rand.Rand = nil
Expand All @@ -262,14 +266,15 @@ func BenchmarkDatacenter(b *testing.B) {
bstoreLatency := time.Millisecond * 25

b.Run("3Nodes-Overlap3-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetchRateLimited(b, 3, 100, datacenterNetworkDelay, datacenterBandwidthGenerator, largeBlockSize, bstoreLatency, allToAll, unixfsFileFetch)
subtestDistributeAndFetchRateLimited(b, ctx, 3, 100, datacenterNetworkDelay, datacenterBandwidthGenerator, largeBlockSize, bstoreLatency, allToAll, unixfsFileFetch)
})
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
_ = ioutil.WriteFile("tmp/rb-benchmark.json", out, 0666)
printResults(benchmarkLog)
}

func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
ctx := context.Background()
benchmarkLog = nil
benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64)
var randomGen *rand.Rand = nil
Expand Down Expand Up @@ -301,7 +306,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {

instances := ig.Instances(numnodes)
blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
runDistributionMulti(b, instances[:3], instances[3:], blocks, bstoreLatency, df, ff)
runDistributionMulti(b, ctx, instances[:3], instances[3:], blocks, bstoreLatency, df, ff)
}
})

Expand All @@ -310,7 +315,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
printResults(benchmarkLog)
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
func subtestDistributeAndFetch(b *testing.B, ctx context.Context, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.VirtualNetwork(mockrouting.NewServer(), d)

Expand All @@ -320,12 +325,12 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b
rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
blocks := testutil.GenerateBlocksOfSize(numblks, stdBlockSize)
blocks[0] = rootBlock[0]
runDistribution(b, instances, blocks, bstoreLatency, df, ff)
runDistribution(b, ctx, instances, blocks, bstoreLatency, df, ff)
ig.Close()
}
}

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
func subtestDistributeAndFetchRateLimited(b *testing.B, ctx context.Context, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)

Expand All @@ -336,13 +341,13 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize)
blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
blocks[0] = rootBlock[0]
runDistribution(b, instances, blocks, bstoreLatency, df, ff)
runDistribution(b, ctx, instances, blocks, bstoreLatency, df, ff)
}
}

func runDistributionMulti(b *testing.B, fetchers []testinstance.Instance, seeds []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
func runDistributionMulti(b *testing.B, ctx context.Context, fetchers []testinstance.Instance, seeds []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
// Distribute blocks to seed nodes
df(b, seeds, blocks)
df(b, ctx, seeds, blocks)

// Set the blockstore latency on seed nodes
if bstoreLatency > 0 {
Expand Down Expand Up @@ -392,13 +397,13 @@ func runDistributionMulti(b *testing.B, fetchers []testinstance.Instance, seeds
// b.Logf("send/recv: %d / %d (dups: %d)", nst.MessagesSent, nst.MessagesRecvd, st.DupBlksReceived)
}

func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
func runDistribution(b *testing.B, ctx context.Context, instances []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
numnodes := len(instances)
fetcher := instances[numnodes-1]

// Distribute blocks to seed nodes
seeds := instances[:numnodes-1]
df(b, seeds, blocks)
df(b, ctx, seeds, blocks)

// Set the blockstore latency on seed nodes
if bstoreLatency > 0 {
Expand Down Expand Up @@ -435,34 +440,34 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b
// b.Logf("send/recv: %d / %d (dups: %d)", nst.MessagesSent, nst.MessagesRecvd, st.DupBlksReceived)
}

func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
func allToAll(b *testing.B, ctx context.Context, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(blocks); err != nil {
if err := p.Blockstore().PutMany(ctx, blocks); err != nil {
b.Fatal(err)
}
}
}

// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
func overlap1(b *testing.B, ctx context.Context, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap1 only works with 2 provs")
}
bill := provs[0]
jeff := provs[1]

if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
if err := bill.Blockstore().PutMany(ctx, blks[:75]); err != nil {
b.Fatal(err)
}
if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
if err := jeff.Blockstore().PutMany(ctx, blks[25:]); err != nil {
b.Fatal(err)
}
}

// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second. it also gives every third block to both peers
func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
func overlap2(b *testing.B, ctx context.Context, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap2 only works with 2 provs")
}
Expand All @@ -473,12 +478,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
even := i%2 == 0
third := i%3 == 0
if third || even {
if err := bill.Blockstore().Put(blk); err != nil {
if err := bill.Blockstore().Put(ctx, blk); err != nil {
b.Fatal(err)
}
}
if third || !even {
if err := jeff.Blockstore().Put(blk); err != nil {
if err := jeff.Blockstore().Put(ctx, blk); err != nil {
b.Fatal(err)
}
}
Expand All @@ -488,9 +493,9 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
func onePeerPerBlock(b *testing.B, ctx context.Context, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
err := provs[rand.Intn(len(provs))].Blockstore().Put(blk)
err := provs[rand.Intn(len(provs))].Blockstore().Put(ctx, blk)
if err != nil {
b.Fatal(err)
}
Expand Down
16 changes: 8 additions & 8 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks

// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
Expand All @@ -335,7 +335,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b

// Put wanted blocks into blockstore
if len(wanted) > 0 {
err := bs.blockstore.PutMany(wanted)
err := bs.blockstore.PutMany(ctx, wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
Expand Down Expand Up @@ -414,7 +414,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
iblocks := incoming.Blocks()

if len(iblocks) > 0 {
bs.updateReceiveCounters(iblocks)
bs.updateReceiveCounters(ctx, iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
}
Expand All @@ -432,11 +432,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
}

func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
func (bs *Bitswap) updateReceiveCounters(ctx context.Context, blocks []blocks.Block) {
// Check which blocks are in the datastore
// (Note: any errors from the blockstore are simply logged out in
// blockstoreHas())
blocksHas := bs.blockstoreHas(blocks)
blocksHas := bs.blockstoreHas(ctx, blocks)

bs.counterLk.Lock()
defer bs.counterLk.Unlock()
Expand All @@ -462,7 +462,7 @@ func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
}
}

func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
func (bs *Bitswap) blockstoreHas(ctx context.Context, blks []blocks.Block) []bool {
res := make([]bool, len(blks))

wg := sync.WaitGroup{}
Expand All @@ -471,7 +471,7 @@ func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
go func(i int, b blocks.Block) {
defer wg.Done()

has, err := bs.blockstore.Has(b.Cid())
has, err := bs.blockstore.Has(ctx, b.Cid())
if err != nil {
log.Infof("blockstore.Has error: %s", err)
has = false
Expand Down
Loading