diff --git a/benchmarks_test.go b/benchmarks_test.go index d3aaf04f..6452e1d8 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -19,9 +19,9 @@ import ( bitswap "github.com/ipfs/go-bitswap" bssession "github.com/ipfs/go-bitswap/internal/session" + bsnet "github.com/ipfs/go-bitswap/network" testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" - bsnet "github.com/ipfs/go-bitswap/network" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" mockrouting "github.com/ipfs/go-ipfs-routing/mock" @@ -29,7 +29,7 @@ import ( type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) -type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) +type distFunc func(b *testing.B, ctx context.Context, provs []testinstance.Instance, blocks []blocks.Block) type runStats struct { DupsRcvd uint64 @@ -105,13 +105,14 @@ var benches = []bench{ } func BenchmarkFixedDelay(b *testing.B) { + ctx := context.Background() benchmarkLog = nil fixedDelay := delay.Fixed(10 * time.Millisecond) bstoreLatency := time.Duration(0) for _, bch := range benches { b.Run(bch.name, func(b *testing.B) { - subtestDistributeAndFetch(b, bch.nodeCount, bch.blockCount, fixedDelay, bstoreLatency, bch.distFn, bch.fetchFn) + subtestDistributeAndFetch(b, ctx, bch.nodeCount, bch.blockCount, fixedDelay, bstoreLatency, bch.distFn, bch.fetchFn) }) } @@ -134,6 +135,7 @@ var mixedBenches = []mixedBench{ } func BenchmarkFetchFromOldBitswap(b *testing.B) { + ctx := context.Background() benchmarkLog = nil fixedDelay := delay.Fixed(10 * time.Millisecond) bstoreLatency := time.Duration(0) @@ -176,7 +178,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) { blocks[0] = rootBlock[0] // Run the distribution - runDistributionMulti(b, instances[:fetcherCount], instances[fetcherCount:], blocks, bstoreLatency, bch.distFn, bch.fetchFn) + runDistributionMulti(b, ctx, instances[:fetcherCount], instances[fetcherCount:], blocks, bstoreLatency, bch.distFn, bch.fetchFn) newNodeGenerator.Close() oldNodeGenerator.Close() @@ -208,6 +210,7 @@ const stdBlockSize = 8000 const largeBlockSize = int64(256 * 1024) func BenchmarkRealWorld(b *testing.B) { + ctx := context.Background() benchmarkLog = nil benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64) var randomGen *rand.Rand = nil @@ -233,13 +236,13 @@ func BenchmarkRealWorld(b *testing.B) { bstoreLatency := time.Duration(0) b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) { - subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll) + subtestDistributeAndFetchRateLimited(b, ctx, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll) }) b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) { - subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll) + subtestDistributeAndFetchRateLimited(b, ctx, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll) }) b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) { - subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll) + subtestDistributeAndFetchRateLimited(b, ctx, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll) }) out, _ := json.MarshalIndent(benchmarkLog, "", " ") _ = ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666) @@ -247,6 +250,7 @@ func BenchmarkRealWorld(b *testing.B) { } func BenchmarkDatacenter(b *testing.B) { + ctx := context.Background() benchmarkLog = nil benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64) var randomGen *rand.Rand = nil @@ -262,7 +266,7 @@ func BenchmarkDatacenter(b *testing.B) { bstoreLatency := time.Millisecond * 25 b.Run("3Nodes-Overlap3-UnixfsFetch", func(b *testing.B) { - subtestDistributeAndFetchRateLimited(b, 3, 100, datacenterNetworkDelay, datacenterBandwidthGenerator, largeBlockSize, bstoreLatency, allToAll, unixfsFileFetch) + subtestDistributeAndFetchRateLimited(b, ctx, 3, 100, datacenterNetworkDelay, datacenterBandwidthGenerator, largeBlockSize, bstoreLatency, allToAll, unixfsFileFetch) }) out, _ := json.MarshalIndent(benchmarkLog, "", " ") _ = ioutil.WriteFile("tmp/rb-benchmark.json", out, 0666) @@ -270,6 +274,7 @@ func BenchmarkDatacenter(b *testing.B) { } func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { + ctx := context.Background() benchmarkLog = nil benchmarkSeed, err := strconv.ParseInt(os.Getenv("BENCHMARK_SEED"), 10, 64) var randomGen *rand.Rand = nil @@ -301,7 +306,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { instances := ig.Instances(numnodes) blocks := testutil.GenerateBlocksOfSize(numblks, blockSize) - runDistributionMulti(b, instances[:3], instances[3:], blocks, bstoreLatency, df, ff) + runDistributionMulti(b, ctx, instances[:3], instances[3:], blocks, bstoreLatency, df, ff) } }) @@ -310,7 +315,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { printResults(benchmarkLog) } -func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { +func subtestDistributeAndFetch(b *testing.B, ctx context.Context, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { net := tn.VirtualNetwork(mockrouting.NewServer(), d) @@ -320,12 +325,12 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize) blocks := testutil.GenerateBlocksOfSize(numblks, stdBlockSize) blocks[0] = rootBlock[0] - runDistribution(b, instances, blocks, bstoreLatency, df, ff) + runDistribution(b, ctx, instances, blocks, bstoreLatency, df, ff) ig.Close() } } -func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { +func subtestDistributeAndFetchRateLimited(b *testing.B, ctx context.Context, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) @@ -336,13 +341,13 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d rootBlock := testutil.GenerateBlocksOfSize(1, rootBlockSize) blocks := testutil.GenerateBlocksOfSize(numblks, blockSize) blocks[0] = rootBlock[0] - runDistribution(b, instances, blocks, bstoreLatency, df, ff) + runDistribution(b, ctx, instances, blocks, bstoreLatency, df, ff) } } -func runDistributionMulti(b *testing.B, fetchers []testinstance.Instance, seeds []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { +func runDistributionMulti(b *testing.B, ctx context.Context, fetchers []testinstance.Instance, seeds []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { // Distribute blocks to seed nodes - df(b, seeds, blocks) + df(b, ctx, seeds, blocks) // Set the blockstore latency on seed nodes if bstoreLatency > 0 { @@ -392,13 +397,13 @@ func runDistributionMulti(b *testing.B, fetchers []testinstance.Instance, seeds // b.Logf("send/recv: %d / %d (dups: %d)", nst.MessagesSent, nst.MessagesRecvd, st.DupBlksReceived) } -func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { +func runDistribution(b *testing.B, ctx context.Context, instances []testinstance.Instance, blocks []blocks.Block, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { numnodes := len(instances) fetcher := instances[numnodes-1] // Distribute blocks to seed nodes seeds := instances[:numnodes-1] - df(b, seeds, blocks) + df(b, ctx, seeds, blocks) // Set the blockstore latency on seed nodes if bstoreLatency > 0 { @@ -435,9 +440,9 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b // b.Logf("send/recv: %d / %d (dups: %d)", nst.MessagesSent, nst.MessagesRecvd, st.DupBlksReceived) } -func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) { +func allToAll(b *testing.B, ctx context.Context, provs []testinstance.Instance, blocks []blocks.Block) { for _, p := range provs { - if err := p.Blockstore().PutMany(blocks); err != nil { + if err := p.Blockstore().PutMany(ctx, blocks); err != nil { b.Fatal(err) } } @@ -445,24 +450,24 @@ func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block // overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks // to the second peer. This means both peers have the middle 50 blocks -func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { +func overlap1(b *testing.B, ctx context.Context, provs []testinstance.Instance, blks []blocks.Block) { if len(provs) != 2 { b.Fatal("overlap1 only works with 2 provs") } bill := provs[0] jeff := provs[1] - if err := bill.Blockstore().PutMany(blks[:75]); err != nil { + if err := bill.Blockstore().PutMany(ctx, blks[:75]); err != nil { b.Fatal(err) } - if err := jeff.Blockstore().PutMany(blks[25:]); err != nil { + if err := jeff.Blockstore().PutMany(ctx, blks[25:]); err != nil { b.Fatal(err) } } // overlap2 gives every even numbered block to the first peer, odd numbered // blocks to the second. it also gives every third block to both peers -func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { +func overlap2(b *testing.B, ctx context.Context, provs []testinstance.Instance, blks []blocks.Block) { if len(provs) != 2 { b.Fatal("overlap2 only works with 2 provs") } @@ -473,12 +478,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) even := i%2 == 0 third := i%3 == 0 if third || even { - if err := bill.Blockstore().Put(blk); err != nil { + if err := bill.Blockstore().Put(ctx, blk); err != nil { b.Fatal(err) } } if third || !even { - if err := jeff.Blockstore().Put(blk); err != nil { + if err := jeff.Blockstore().Put(ctx, blk); err != nil { b.Fatal(err) } } @@ -488,9 +493,9 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) // onePeerPerBlock picks a random peer to hold each block // with this layout, we shouldnt actually ever see any duplicate blocks // but we're mostly just testing performance of the sync algorithm -func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { +func onePeerPerBlock(b *testing.B, ctx context.Context, provs []testinstance.Instance, blks []blocks.Block) { for _, blk := range blks { - err := provs[rand.Intn(len(provs))].Blockstore().Put(blk) + err := provs[rand.Intn(len(provs))].Blockstore().Put(ctx, blk) if err != nil { b.Fatal(err) } diff --git a/bitswap.go b/bitswap.go index 9afe5d27..874d0db9 100644 --- a/bitswap.go +++ b/bitswap.go @@ -307,8 +307,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks // HasBlock announces the existence of a block to this bitswap service. The // service will potentially notify its peers. -func (bs *Bitswap) HasBlock(blk blocks.Block) error { - return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil) +func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { + return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil) } // TODO: Some of this stuff really only needs to be done when adding a block @@ -335,7 +335,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b // Put wanted blocks into blockstore if len(wanted) > 0 { - err := bs.blockstore.PutMany(wanted) + err := bs.blockstore.PutMany(ctx, wanted) if err != nil { log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err) return err @@ -414,7 +414,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg iblocks := incoming.Blocks() if len(iblocks) > 0 { - bs.updateReceiveCounters(iblocks) + bs.updateReceiveCounters(ctx, iblocks) for _, b := range iblocks { log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p) } @@ -432,11 +432,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg } } -func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) { +func (bs *Bitswap) updateReceiveCounters(ctx context.Context, blocks []blocks.Block) { // Check which blocks are in the datastore // (Note: any errors from the blockstore are simply logged out in // blockstoreHas()) - blocksHas := bs.blockstoreHas(blocks) + blocksHas := bs.blockstoreHas(ctx, blocks) bs.counterLk.Lock() defer bs.counterLk.Unlock() @@ -462,7 +462,7 @@ func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) { } } -func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool { +func (bs *Bitswap) blockstoreHas(ctx context.Context, blks []blocks.Block) []bool { res := make([]bool, len(blks)) wg := sync.WaitGroup{} @@ -471,7 +471,7 @@ func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool { go func(i int, b blocks.Block) { defer wg.Done() - has, err := bs.blockstore.Has(b.Cid()) + has, err := bs.blockstore.Has(ctx, b.Cid()) if err != nil { log.Infof("blockstore.Has error: %s", err) has = false diff --git a/bitswap_test.go b/bitswap_test.go index ba89e038..f17952ce 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -11,9 +11,9 @@ import ( bitswap "github.com/ipfs/go-bitswap" decision "github.com/ipfs/go-bitswap/internal/decision" bssession "github.com/ipfs/go-bitswap/internal/session" + "github.com/ipfs/go-bitswap/message" testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" - "github.com/ipfs/go-bitswap/message" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" detectrace "github.com/ipfs/go-detect-race" @@ -78,7 +78,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this } func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { - + ctx := context.Background() net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) @@ -88,7 +88,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { hasBlock := peers[0] defer hasBlock.Exchange.Close() - if err := hasBlock.Exchange.HasBlock(block); err != nil { + if err := hasBlock.Exchange.HasBlock(ctx, block); err != nil { t.Fatal(err) } @@ -109,6 +109,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { + ctx := context.Background() net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)} @@ -121,7 +122,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { wantsBlock := ig.Next() defer wantsBlock.Exchange.Close() - if err := hasBlock.Exchange.HasBlock(block); err != nil { + if err := hasBlock.Exchange.HasBlock(ctx, block); err != nil { t.Fatal(err) } @@ -143,7 +144,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { // Tests that a received block is not stored in the blockstore if the block was // not requested by the client func TestUnwantedBlockNotAdded(t *testing.T) { - + ctx := context.Background() net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) bsMessage := message.New(true) @@ -156,7 +157,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) { hasBlock := peers[0] defer hasBlock.Exchange.Close() - if err := hasBlock.Exchange.HasBlock(block); err != nil { + if err := hasBlock.Exchange.HasBlock(ctx, block); err != nil { t.Fatal(err) } @@ -168,7 +169,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) { doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage) - blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid()) + blockInStore, err := doesNotWantBlock.Blockstore().Has(ctx, block.Cid()) if err != nil || blockInStore { t.Fatal("Unwanted block added to block store") } @@ -227,7 +228,7 @@ func TestPendingBlockAdded(t *testing.T) { } // Make sure Bitswap adds the block to the blockstore - blockInStore, err := instance.Blockstore().Has(lastBlock.Cid()) + blockInStore, err := instance.Blockstore().Has(ctx, lastBlock.Cid()) if err != nil { t.Fatal(err) } @@ -296,7 +297,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { first := instances[0] for _, b := range blocks { blkeys = append(blkeys, b.Cid()) - err := first.Exchange.HasBlock(b) + err := first.Exchange.HasBlock(ctx, b) if err != nil { t.Fatal(err) } @@ -335,7 +336,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { for _, inst := range instances { for _, b := range blocks { - if _, err := inst.Blockstore().Get(b.Cid()); err != nil { + if _, err := inst.Blockstore().Get(ctx, b.Cid()); err != nil { t.Fatal(err) } } @@ -372,7 +373,7 @@ func TestSendToWantingPeer(t *testing.T) { } // peerB announces to the network that he has block alpha - err = peerB.Exchange.HasBlock(alpha) + err = peerB.Exchange.HasBlock(ctx, alpha) if err != nil { t.Fatal(err) } @@ -423,6 +424,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6 } func TestBasicBitswap(t *testing.T) { + ctx := context.Background() net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -434,7 +436,7 @@ func TestBasicBitswap(t *testing.T) { blocks := bg.Blocks(1) // First peer has block - err := instances[0].Exchange.HasBlock(blocks[0]) + err := instances[0].Exchange.HasBlock(ctx, blocks[0]) if err != nil { t.Fatal(err) } @@ -535,7 +537,8 @@ func TestDoubleGet(t *testing.T) { t.Fatal("expected channel to be closed") } - err = instances[0].Exchange.HasBlock(blocks[0]) + ctx := context.Background() + err = instances[0].Exchange.HasBlock(ctx, blocks[0]) if err != nil { t.Fatal(err) } @@ -689,6 +692,7 @@ func newReceipt(sent, recv, exchanged uint64) *decision.Receipt { } func TestBitswapLedgerOneWay(t *testing.T) { + ctx := context.Background() net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -698,7 +702,7 @@ func TestBitswapLedgerOneWay(t *testing.T) { instances := ig.Instances(2) blocks := bg.Blocks(1) - err := instances[0].Exchange.HasBlock(blocks[0]) + err := instances[0].Exchange.HasBlock(ctx, blocks[0]) if err != nil { t.Fatal(err) } @@ -741,6 +745,7 @@ func TestBitswapLedgerOneWay(t *testing.T) { } func TestBitswapLedgerTwoWay(t *testing.T) { + ctx := context.Background() net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -750,12 +755,12 @@ func TestBitswapLedgerTwoWay(t *testing.T) { instances := ig.Instances(2) blocks := bg.Blocks(2) - err := instances[0].Exchange.HasBlock(blocks[0]) + err := instances[0].Exchange.HasBlock(ctx, blocks[0]) if err != nil { t.Fatal(err) } - err = instances[1].Exchange.HasBlock(blocks[1]) + err = instances[1].Exchange.HasBlock(ctx, blocks[1]) if err != nil { t.Fatal(err) } diff --git a/bitswap_with_sessions_test.go b/bitswap_with_sessions_test.go index f710879a..525cee96 100644 --- a/bitswap_with_sessions_test.go +++ b/bitswap_with_sessions_test.go @@ -34,7 +34,7 @@ func TestBasicSessions(t *testing.T) { b := inst[1] // Add a block to Peer B - if err := b.Blockstore().Put(block); err != nil { + if err := b.Blockstore().Put(ctx, block); err != nil { t.Fatal(err) } @@ -82,7 +82,7 @@ func TestSessionBetweenPeers(t *testing.T) { // Add 101 blocks to Peer A blks := bgen.Blocks(101) - if err := inst[0].Blockstore().PutMany(blks); err != nil { + if err := inst[0].Blockstore().PutMany(ctx, blks); err != nil { t.Fatal(err) } @@ -143,7 +143,7 @@ func TestSessionSplitFetch(t *testing.T) { // Add 10 distinct blocks to each of 10 peers blks := bgen.Blocks(100) for i := 0; i < 10; i++ { - if err := inst[i].Blockstore().PutMany(blks[i*10 : (i+1)*10]); err != nil { + if err := inst[i].Blockstore().PutMany(ctx, blks[i*10:(i+1)*10]); err != nil { t.Fatal(err) } } @@ -187,7 +187,7 @@ func TestFetchNotConnected(t *testing.T) { // Provide 10 blocks on Peer A blks := bgen.Blocks(10) for _, block := range blks { - if err := other.Exchange.HasBlock(block); err != nil { + if err := other.Exchange.HasBlock(ctx, block); err != nil { t.Fatal(err) } } @@ -243,7 +243,7 @@ func TestFetchAfterDisconnect(t *testing.T) { firstBlks := blks[:5] for _, block := range firstBlks { - if err := peerA.Exchange.HasBlock(block); err != nil { + if err := peerA.Exchange.HasBlock(ctx, block); err != nil { t.Fatal(err) } } @@ -279,7 +279,7 @@ func TestFetchAfterDisconnect(t *testing.T) { // Provide remaining blocks lastBlks := blks[5:] for _, block := range lastBlks { - if err := peerA.Exchange.HasBlock(block); err != nil { + if err := peerA.Exchange.HasBlock(ctx, block); err != nil { t.Fatal(err) } } @@ -334,7 +334,7 @@ func TestInterestCacheOverflow(t *testing.T) { // wait to ensure that all the above cids were added to the sessions cache time.Sleep(time.Millisecond * 50) - if err := b.Exchange.HasBlock(blks[0]); err != nil { + if err := b.Exchange.HasBlock(ctx, blks[0]); err != nil { t.Fatal(err) } @@ -381,7 +381,7 @@ func TestPutAfterSessionCacheEvict(t *testing.T) { // wait to ensure that all the above cids were added to the sessions cache time.Sleep(time.Millisecond * 50) - if err := a.Exchange.HasBlock(blks[17]); err != nil { + if err := a.Exchange.HasBlock(ctx, blks[17]); err != nil { t.Fatal(err) } @@ -423,7 +423,7 @@ func TestMultipleSessions(t *testing.T) { } time.Sleep(time.Millisecond * 10) - if err := b.Exchange.HasBlock(blk); err != nil { + if err := b.Exchange.HasBlock(ctx, blk); err != nil { t.Fatal(err) } diff --git a/internal/decision/blockstoremanager.go b/internal/decision/blockstoremanager.go index 8d880a6c..ee8e8605 100644 --- a/internal/decision/blockstoremanager.go +++ b/internal/decision/blockstoremanager.go @@ -70,7 +70,7 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) ( var lk sync.Mutex return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) { - size, err := bsm.bs.GetSize(c) + size, err := bsm.bs.GetSize(ctx, c) if err != nil { if err != bstore.ErrNotFound { // Note: this isn't a fatal error. We shouldn't abort the request @@ -92,7 +92,7 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[ var lk sync.Mutex return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) { - blk, err := bsm.bs.Get(c) + blk, err := bsm.bs.Get(ctx, c) if err != nil { if err != bstore.ErrNotFound { // Note: this isn't a fatal error. We shouldn't abort the request diff --git a/internal/decision/blockstoremanager_test.go b/internal/decision/blockstoremanager_test.go index cac0a5b0..c0bad804 100644 --- a/internal/decision/blockstoremanager_test.go +++ b/internal/decision/blockstoremanager_test.go @@ -78,7 +78,7 @@ func TestBlockstoreManager(t *testing.T) { } // Put all blocks in the blockstore except the last one - if err := bstore.PutMany(blks[:len(blks)-1]); err != nil { + if err := bstore.PutMany(ctx, blks[:len(blks)-1]); err != nil { t.Fatal(err) } @@ -158,7 +158,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) { ks = append(ks, b.Cid()) } - err := bstore.PutMany(blks) + err := bstore.PutMany(ctx, blks) if err != nil { t.Fatal(err) } @@ -200,7 +200,7 @@ func TestBlockstoreManagerClose(t *testing.T) { ks = append(ks, b.Cid()) } - err := bstore.PutMany(blks) + err := bstore.PutMany(ctx, blks) if err != nil { t.Fatal(err) } @@ -221,6 +221,7 @@ func TestBlockstoreManagerClose(t *testing.T) { } func TestBlockstoreManagerCtxDone(t *testing.T) { + ctx := context.Background() delayTime := 20 * time.Millisecond bsdelay := delay.Fixed(delayTime) @@ -237,7 +238,7 @@ func TestBlockstoreManagerCtxDone(t *testing.T) { ks = append(ks, b.Cid()) } - err := bstore.PutMany(blks) + err := bstore.PutMany(ctx, blks) if err != nil { t.Fatal(err) } diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 3cb76597..8a87a211 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -203,13 +203,14 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) { } func TestPartnerWantHaveWantBlockNonActive(t *testing.T) { + ctx := context.Background() alphabet := "abcdefghijklmnopqrstuvwxyz" vowels := "aeiou" bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) for _, letter := range strings.Split(alphabet, "") { block := blocks.NewBlock([]byte(letter)) - if err := bs.Put(block); err != nil { + if err := bs.Put(ctx, block); err != nil { t.Fatal(err) } } @@ -542,12 +543,13 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) { } func TestPartnerWantHaveWantBlockActive(t *testing.T) { + ctx := context.Background() alphabet := "abcdefghijklmnopqrstuvwxyz" bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) for _, letter := range strings.Split(alphabet, "") { block := blocks.NewBlock([]byte(letter)) - if err := bs.Put(block); err != nil { + if err := bs.Put(ctx, block); err != nil { t.Fatal(err) } } @@ -813,6 +815,7 @@ func formatPresencesDiff(presences []message.BlockPresence, expHaves []string, e } func TestPartnerWantsThenCancels(t *testing.T) { + ctx := context.Background() numRounds := 10 if testing.Short() { numRounds = 1 @@ -846,12 +849,11 @@ func TestPartnerWantsThenCancels(t *testing.T) { bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) for _, letter := range alphabet { block := blocks.NewBlock([]byte(letter)) - if err := bs.Put(block); err != nil { + if err := bs.Put(ctx, block); err != nil { t.Fatal(err) } } - ctx := context.Background() for i := 0; i < numRounds; i++ { expected := make([][]string, 0, len(testcases)) e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) @@ -875,6 +877,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { } func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { + ctx := context.Background() bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) @@ -897,7 +900,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { t.Fatal("expected no envelope yet") } - if err := bs.PutMany([]blocks.Block{blks[0], blks[2]}); err != nil { + if err := bs.PutMany(ctx, []blocks.Block{blks[0], blks[2]}); err != nil { t.Fatal(err) } e.ReceiveFrom(otherPeer, []blocks.Block{blks[0], blks[2]}, []cid.Cid{}) @@ -919,6 +922,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { } func TestSendDontHave(t *testing.T) { + ctx := context.Background() bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) @@ -960,7 +964,7 @@ func TestSendDontHave(t *testing.T) { } // Receive all the blocks - if err := bs.PutMany(blks); err != nil { + if err := bs.PutMany(ctx, blks); err != nil { t.Fatal(err) } e.ReceiveFrom(otherPeer, blks, []cid.Cid{}) @@ -1025,7 +1029,7 @@ func TestTaggingPeers(t *testing.T) { keys := []string{"a", "b", "c", "d", "e"} for _, letter := range keys { block := blocks.NewBlock([]byte(letter)) - if err := sanfrancisco.Blockstore.Put(block); err != nil { + if err := sanfrancisco.Blockstore.Put(ctx, block); err != nil { t.Fatal(err) } }