Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

wire a context in most of the data pipeline #36

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// blockstore. As expected, in the case of FileManager blocks, only the
// reference is deleted, not its contents. It may return
// ErrNotFound when the block is not stored.
func (f *Filestore) DeleteBlock(c cid.Cid) error {
err1 := f.bs.DeleteBlock(c)
func (f *Filestore) DeleteBlock(ctx context.Context, c cid.Cid) error {
err1 := f.bs.DeleteBlock(ctx, c)
if err1 != nil && err1 != blockstore.ErrNotFound {
return err1
}

err2 := f.fm.DeleteBlock(c)
err2 := f.fm.DeleteBlock(ctx, c)
// if we successfully removed something from the blockstore, but the
// filestore didnt have it, return success

Expand All @@ -140,36 +140,36 @@ func (f *Filestore) DeleteBlock(c cid.Cid) error {

// Get retrieves the block with the given Cid. It may return
// ErrNotFound when the block is not stored.
func (f *Filestore) Get(c cid.Cid) (blocks.Block, error) {
blk, err := f.bs.Get(c)
func (f *Filestore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blk, err := f.bs.Get(ctx, c)
switch err {
case nil:
return blk, nil
case blockstore.ErrNotFound:
return f.fm.Get(c)
return f.fm.Get(ctx, c)
default:
return nil, err
}
}

// GetSize returns the size of the requested block. It may return ErrNotFound
// when the block is not stored.
func (f *Filestore) GetSize(c cid.Cid) (int, error) {
size, err := f.bs.GetSize(c)
func (f *Filestore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
size, err := f.bs.GetSize(ctx, c)
switch err {
case nil:
return size, nil
case blockstore.ErrNotFound:
return f.fm.GetSize(c)
return f.fm.GetSize(ctx, c)
default:
return -1, err
}
}

// Has returns true if the block with the given Cid is
// stored in the Filestore.
func (f *Filestore) Has(c cid.Cid) (bool, error) {
has, err := f.bs.Has(c)
func (f *Filestore) Has(ctx context.Context, c cid.Cid) (bool, error) {
has, err := f.bs.Has(ctx, c)
if err != nil {
return false, err
}
Expand All @@ -178,15 +178,15 @@ func (f *Filestore) Has(c cid.Cid) (bool, error) {
return true, nil
}

return f.fm.Has(c)
return f.fm.Has(ctx, c)
}

// Put stores a block in the Filestore. For blocks of
// underlying type FilestoreNode, the operation is
// delegated to the FileManager, while the rest of blocks
// are handled by the regular blockstore.
func (f *Filestore) Put(b blocks.Block) error {
has, err := f.Has(b.Cid())
func (f *Filestore) Put(ctx context.Context, b blocks.Block) error {
has, err := f.Has(ctx, b.Cid())
if err != nil {
return err
}
Expand All @@ -197,20 +197,20 @@ func (f *Filestore) Put(b blocks.Block) error {

switch b := b.(type) {
case *posinfo.FilestoreNode:
return f.fm.Put(b)
return f.fm.Put(ctx, b)
default:
return f.bs.Put(b)
return f.bs.Put(ctx, b)
}
}

// PutMany is like Put(), but takes a slice of blocks, allowing
// the underlying blockstore to perform batch transactions.
func (f *Filestore) PutMany(bs []blocks.Block) error {
func (f *Filestore) PutMany(ctx context.Context, bs []blocks.Block) error {
var normals []blocks.Block
var fstores []*posinfo.FilestoreNode

for _, b := range bs {
has, err := f.Has(b.Cid())
has, err := f.Has(ctx, b.Cid())
if err != nil {
return err
}
Expand All @@ -228,14 +228,14 @@ func (f *Filestore) PutMany(bs []blocks.Block) error {
}

if len(normals) > 0 {
err := f.bs.PutMany(normals)
err := f.bs.PutMany(ctx, normals)
if err != nil {
return err
}
}

if len(fstores) > 0 {
err := f.fm.PutMany(fstores)
err := f.fm.PutMany(ctx, fstores)
if err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func makeFile(dir string, data []byte) (string, error) {
}

func TestBasicFilestore(t *testing.T) {
ctx := context.Background()
dir, fs := newTestFilestore(t)

buf := make([]byte, 1000)
Expand All @@ -65,15 +66,15 @@ func TestBasicFilestore(t *testing.T) {
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(n)
err := fs.Put(ctx, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(c)
blk, err := fs.Get(ctx, c)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -105,6 +106,7 @@ func TestBasicFilestore(t *testing.T) {
}

func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, []cid.Cid) {
ctx := context.Background()
buf := make([]byte, size)
rand.Read(buf)

Expand All @@ -122,7 +124,7 @@ func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, [
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}
err := fs.Put(n)
err := fs.Put(ctx, n)
if err != nil {
t.Fatal(err)
}
Expand All @@ -133,19 +135,20 @@ func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, [
}

func TestDeletes(t *testing.T) {
ctx := context.Background()
dir, fs := newTestFilestore(t)
_, cids := randomFileAdd(t, fs, dir, 100)
todelete := cids[:4]
for _, c := range todelete {
err := fs.DeleteBlock(c)
err := fs.DeleteBlock(ctx, c)
if err != nil {
t.Fatal(err)
}
}

deleted := make(map[string]bool)
for _, c := range todelete {
_, err := fs.Get(c)
_, err := fs.Get(ctx, c)
if err != blockstore.ErrNotFound {
t.Fatal("expected blockstore not found error")
}
Expand Down
38 changes: 19 additions & 19 deletions fsrefstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewFileManager(ds ds.Batching, root string) *FileManager {
func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
q := dsq.Query{KeysOnly: true}

res, err := f.ds.Query(q)
res, err := f.ds.Query(ctx, q)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -100,8 +100,8 @@ func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {

// DeleteBlock deletes the reference-block from the underlying
// datastore. It does not touch the referenced data.
func (f *FileManager) DeleteBlock(c cid.Cid) error {
err := f.ds.Delete(dshelp.MultihashToDsKey(c.Hash()))
func (f *FileManager) DeleteBlock(ctx context.Context, c cid.Cid) error {
err := f.ds.Delete(ctx, dshelp.MultihashToDsKey(c.Hash()))
if err == ds.ErrNotFound {
return blockstore.ErrNotFound
}
Expand All @@ -112,8 +112,8 @@ func (f *FileManager) DeleteBlock(c cid.Cid) error {
// is done in two steps: the first step retrieves the reference
// block from the datastore. The second step uses the stored
// path and offsets to read the raw block data directly from disk.
func (f *FileManager) Get(c cid.Cid) (blocks.Block, error) {
dobj, err := f.getDataObj(c.Hash())
func (f *FileManager) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
dobj, err := f.getDataObj(ctx, c.Hash())
if err != nil {
return nil, err
}
Expand All @@ -129,8 +129,8 @@ func (f *FileManager) Get(c cid.Cid) (blocks.Block, error) {
//
// This method may successfully return the size even if returning the block
// would fail because the associated file is no longer available.
func (f *FileManager) GetSize(c cid.Cid) (int, error) {
dobj, err := f.getDataObj(c.Hash())
func (f *FileManager) GetSize(ctx context.Context, c cid.Cid) (int, error) {
dobj, err := f.getDataObj(ctx, c.Hash())
if err != nil {
return -1, err
}
Expand All @@ -144,8 +144,8 @@ func (f *FileManager) readDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, error)
return f.readFileDataObj(m, d)
}

func (f *FileManager) getDataObj(m mh.Multihash) (*pb.DataObj, error) {
o, err := f.ds.Get(dshelp.MultihashToDsKey(m))
func (f *FileManager) getDataObj(ctx context.Context, m mh.Multihash) (*pb.DataObj, error) {
o, err := f.ds.Get(ctx, dshelp.MultihashToDsKey(m))
switch err {
case ds.ErrNotFound:
return nil, blockstore.ErrNotFound
Expand Down Expand Up @@ -261,24 +261,24 @@ func (f *FileManager) readURLDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, err

// Has returns if the FileManager is storing a block reference. It does not
// validate the data, nor checks if the reference is valid.
func (f *FileManager) Has(c cid.Cid) (bool, error) {
func (f *FileManager) Has(ctx context.Context, c cid.Cid) (bool, error) {
// NOTE: interesting thing to consider. Has doesnt validate the data.
// So the data on disk could be invalid, and we could think we have it.
dsk := dshelp.MultihashToDsKey(c.Hash())
return f.ds.Has(dsk)
return f.ds.Has(ctx, dsk)
}

type putter interface {
Put(ds.Key, []byte) error
Put(context.Context, ds.Key, []byte) error
}

// Put adds a new reference block to the FileManager. It does not check
// that the reference is valid.
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
return f.putTo(b, f.ds)
func (f *FileManager) Put(ctx context.Context, b *posinfo.FilestoreNode) error {
return f.putTo(ctx, b, f.ds)
}

func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
func (f *FileManager) putTo(ctx context.Context, b *posinfo.FilestoreNode, to putter) error {
var dobj pb.DataObj

if IsURL(b.PosInfo.FullPath) {
Expand Down Expand Up @@ -309,24 +309,24 @@ func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
return err
}

return to.Put(dshelp.MultihashToDsKey(b.Cid().Hash()), data)
return to.Put(ctx, dshelp.MultihashToDsKey(b.Cid().Hash()), data)
}

// PutMany is like Put() but takes a slice of blocks instead,
// allowing it to create a batch transaction.
func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
func (f *FileManager) PutMany(ctx context.Context, bs []*posinfo.FilestoreNode) error {
batch, err := f.ds.Batch()
if err != nil {
return err
}

for _, b := range bs {
if err := f.putTo(b, batch); err != nil {
if err := f.putTo(ctx, b, batch); err != nil {
return err
}
}

return batch.Commit()
return batch.Commit(ctx)
}

// IsURL returns true if the string represents a valid URL that the
Expand Down
33 changes: 17 additions & 16 deletions util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filestore

import (
"context"
"fmt"
"sort"

Expand Down Expand Up @@ -86,42 +87,42 @@ func (r *ListRes) FormatLong(enc func(cid.Cid) string) string {
// of the given Filestore and returns a ListRes object with the information.
// List does not verify that the reference is valid or whether the
// raw data is accesible. See Verify().
func List(fs *Filestore, key cid.Cid) *ListRes {
return list(fs, false, key.Hash())
func List(ctx context.Context, fs *Filestore, key cid.Cid) *ListRes {
return list(ctx, fs, false, key.Hash())
}

// ListAll returns a function as an iterator which, once invoked, returns
// one by one each block in the Filestore's FileManager.
// ListAll does not verify that the references are valid or whether
// the raw data is accessible. See VerifyAll().
func ListAll(fs *Filestore, fileOrder bool) (func() *ListRes, error) {
func ListAll(ctx context.Context, fs *Filestore, fileOrder bool) (func() *ListRes, error) {
if fileOrder {
return listAllFileOrder(fs, false)
return listAllFileOrder(ctx, fs, false)
}
return listAll(fs, false)
return listAll(ctx, fs, false)
}

// Verify fetches the block with the given key from the Filemanager
// of the given Filestore and returns a ListRes object with the information.
// Verify makes sure that the reference is valid and the block data can be
// read.
func Verify(fs *Filestore, key cid.Cid) *ListRes {
return list(fs, true, key.Hash())
func Verify(ctx context.Context, fs *Filestore, key cid.Cid) *ListRes {
return list(ctx, fs, true, key.Hash())
}

// VerifyAll returns a function as an iterator which, once invoked,
// returns one by one each block in the Filestore's FileManager.
// VerifyAll checks that the reference is valid and that the block data
// can be read.
func VerifyAll(fs *Filestore, fileOrder bool) (func() *ListRes, error) {
func VerifyAll(ctx context.Context, fs *Filestore, fileOrder bool) (func() *ListRes, error) {
if fileOrder {
return listAllFileOrder(fs, true)
return listAllFileOrder(ctx, fs, true)
}
return listAll(fs, true)
return listAll(ctx, fs, true)
}

func list(fs *Filestore, verify bool, key mh.Multihash) *ListRes {
dobj, err := fs.fm.getDataObj(key)
func list(ctx context.Context, fs *Filestore, verify bool, key mh.Multihash) *ListRes {
dobj, err := fs.fm.getDataObj(ctx, key)
if err != nil {
return mkListRes(key, nil, err)
}
Expand All @@ -131,9 +132,9 @@ func list(fs *Filestore, verify bool, key mh.Multihash) *ListRes {
return mkListRes(key, dobj, err)
}

func listAll(fs *Filestore, verify bool) (func() *ListRes, error) {
func listAll(ctx context.Context, fs *Filestore, verify bool) (func() *ListRes, error) {
q := dsq.Query{}
qr, err := fs.fm.ds.Query(q)
qr, err := fs.fm.ds.Query(ctx, q)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,9 +170,9 @@ func next(qr dsq.Results) (mh.Multihash, *pb.DataObj, error) {
return mhash, dobj, nil
}

func listAllFileOrder(fs *Filestore, verify bool) (func() *ListRes, error) {
func listAllFileOrder(ctx context.Context, fs *Filestore, verify bool) (func() *ListRes, error) {
q := dsq.Query{}
qr, err := fs.fm.ds.Query(q)
qr, err := fs.fm.ds.Query(ctx, q)
if err != nil {
return nil, err
}
Expand Down