Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RefID to support garbage collection of objects #100

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gateway/s3x/bucket_locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type bucketLocker struct {
}

//read read locks on bucket and returns the unlock function,
//example: defer b.read(bucketName)()
//example usage: defer b.read(bucketName)()
func (b *bucketLocker) read(bucket string) func() {
load, _ := b.m.LoadOrStore(bucket, &sync.RWMutex{})
rw := load.(*sync.RWMutex)
Expand All @@ -18,7 +18,7 @@ func (b *bucketLocker) read(bucket string) func() {
}

//write write locks on bucket and returns the unlock function,
//example: defer b.write(bucketName)()
//example usage: defer b.write(bucketName)()
func (b *bucketLocker) write(bucket string) func() {
load, _ := b.m.LoadOrStore(bucket, &sync.RWMutex{})
rw := load.(*sync.RWMutex)
Expand Down
24 changes: 16 additions & 8 deletions gateway/s3x/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,28 @@ func ipfsBucket(ctx context.Context, dag pb.NodeAPIClient, h string) (*Bucket, e
}

// ipfsSave saves any marshaller object and returns it's IPFS hash
func ipfsSave(ctx context.Context, dag pb.NodeAPIClient, m marshaller) (string, error) {
func ipfsSave(ctx context.Context, dag pb.NodeAPIClient, m marshaller, ref refID) (string, error) {
data, err := m.Marshal()
if err != nil {
return "", err
}
return ipfsSaveBytes(ctx, dag, data)
return ipfsSaveBytes(ctx, dag, data, ref)
}

// ipfsSaveBytes saves data and returns it's IPFS hash
func ipfsSaveBytes(ctx context.Context, dag pb.NodeAPIClient, data []byte) (string, error) {
func ipfsSaveBytes(ctx context.Context, dag pb.NodeAPIClient, data []byte, ref refID) (string, error) {
resp, err := dag.Dag(ctx, &pb.DagRequest{
RequestType: pb.DAGREQTYPE_DAG_PUT,
Data: data,
RefID: string(ref),
})
if err != nil {
return "", errors.Wrap(err, "dag client error in ipfsSaveBytes")
}
return resp.GetHashes()[0], nil
}

func ipfsSaveProtoNode(ctx context.Context, dag pb.NodeAPIClient, node *merkledag.ProtoNode) (string, error) {
func ipfsSaveProtoNode(ctx context.Context, dag pb.NodeAPIClient, node *merkledag.ProtoNode, ref refID) (string, error) {
data, err := node.Marshal()
if err != nil {
return "", err
Expand All @@ -85,6 +86,7 @@ func ipfsSaveProtoNode(ctx context.Context, dag pb.NodeAPIClient, node *merkleda
Data: data,
ObjectEncoding: "protobuf",
SerializationFormat: "protobuf",
RefID: string(ref),
})
if err != nil {
return "", errors.Wrap(err, "dag client error in ipfsSaveProtoNode")
Expand All @@ -97,7 +99,7 @@ func ipfsSaveProtoNode(ctx context.Context, dag pb.NodeAPIClient, node *merkleda

const chunkSize = 4*1024*1024 - 1024 //1KB less than 4MB for a good safety buffer

func ipfsFileUpload(ctx context.Context, fileClient pb.FileAPIClient, r io.Reader) (string, int, error) {
func ipfsFileUpload(ctx context.Context, fileClient pb.FileAPIClient, r io.Reader, ref refID) (string, int, error) {
stream, err := fileClient.UploadFile(ctx)
if err != nil {
return "", 0, err
Expand All @@ -116,10 +118,16 @@ func ipfsFileUpload(ctx context.Context, fileClient pb.FileAPIClient, r io.Reade
_ = stream.CloseSend()
return "", size, err
}
size = size + n
if err := stream.Send(&pb.UploadRequest{
req := &pb.UploadRequest{
Blob: &pb.Blob{Content: buf[:n]},
}); err != nil {
}
if size == 0 {
req.Options = &pb.UploadOptions{
RefID: string(ref),
}
}
size = size + n
if err := stream.Send(req); err != nil {
return "", size, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/s3x/ledger_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (ls *ledgerStore) saveBucket(ctx context.Context, bucket string, b *Bucket)
}

//save to ipfs and get hash
bHash, err := ipfsSave(ctx, ls.dag, b)
bHash, err := ipfsSave(ctx, ls.dag, b, ls.bucketRefID(bucket))
if err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions gateway/s3x/ledger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ var (
// Object hashes are saved in ipfs and cached in memory,
// Object data is saved in ipfs.
type ledgerStore struct {
ds datastore.Batching
dag pb.NodeAPIClient //to be used as direct access to ipfs to optimize algorithm
l *Ledger //a cache of the values in datastore and ipfs
ds datastore.Batching
dag pb.NodeAPIClient //to be used as direct access to ipfs to optimize algorithm
l *Ledger //a cache of the values in datastore and ipfs
refIDRoot datastore.Key //the calculated refID root, from stored secret and server SFSName

locker bucketLocker //a locker to protect buckets from concurrent access (per bucket)
plocker bucketLocker //a locker to protect MultipartUploads from concurrent access (per upload ID)
Expand All @@ -42,11 +43,12 @@ type ledgerStore struct {
cleanup []func() error //a list of functions to call before we close the backing database.
}

func newLedgerStore(ds datastore.Batching, dag pb.NodeAPIClient, passthrough bool) (*ledgerStore, error) {
func newLedgerStore(g *TEMX, ds datastore.Batching, dag pb.NodeAPIClient, passthrough bool) (*ledgerStore, error) {
ls := &ledgerStore{
ds: namespace.Wrap(ds, dsPrefix),
dag: dag,
}
ls.setRefIDRoot(g)
if !passthrough {
ls.l = &Ledger{
Buckets: make(map[string]*LedgerBucketEntry),
Expand Down Expand Up @@ -160,7 +162,7 @@ func (ls *ledgerStore) PutObject(ctx context.Context, bucket, object string, obj

//putObject saves an object by hash into the given bucket
func (ls *ledgerStore) putObject(ctx context.Context, bucket, object string, obj *Object) error {
oHash, err := ipfsSave(ctx, ls.dag, obj)
oHash, err := ipfsSave(ctx, ls.dag, obj, ls.objectRefID(bucket, object))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/s3x/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func testS3XLedgerStore(t *testing.T, dsType DSType, passthrough bool) {
}
}()

ledger, err := newLedgerStore(dssync.MutexWrap(datastore.NewMapDatastore()), gateway.dagClient, passthrough)
ledger, err := newLedgerStore(gateway.temx, dssync.MutexWrap(datastore.NewMapDatastore()), gateway.dagClient, passthrough)

if err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions gateway/s3x/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (x *xObjects) PutObjectPart(
if err != nil {
return pi, x.toMinioErr(err, bucket, "", "")
}
hash, size, err := ipfsFileUpload(ctx, x.fileClient, r)
hash, size, err := ipfsFileUpload(ctx, x.fileClient, r, x.ledgerStore.objectRefID(bucket, object))
if err != nil {
return pi, x.toMinioErr(err, bucket, object, uploadID)
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (x *xObjects) CompleteMultipartUpload(
return oi, x.toMinioErr(err, bucket, object, uploadID)
}
protoNode.SetData(data)
dataHash, err := ipfsSaveProtoNode(ctx, x.dagClient, protoNode)
dataHash, err := ipfsSaveProtoNode(ctx, x.dagClient, protoNode, x.ledgerStore.objectRefID(bucket, object))
if err != nil {
return oi, x.toMinioErr(err, bucket, object, uploadID)
}
Expand Down
3 changes: 1 addition & 2 deletions gateway/s3x/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (x *xObjects) PutObject(
if err != nil {
return minio.ObjectInfo{}, x.toMinioErr(err, bucket, "", "")
}
hash, size, err := ipfsFileUpload(ctx, x.fileClient, r)
hash, size, err := ipfsFileUpload(ctx, x.fileClient, r, x.ledgerStore.objectRefID(bucket, object))
if err != nil {
return minio.ObjectInfo{}, x.toMinioErr(err, bucket, object, "")
}
Expand Down Expand Up @@ -270,7 +270,6 @@ func (x *xObjects) DeleteObjects(
bucket string,
objects []minio.ObjectToDelete, opts minio.ObjectOptions,
) ([]minio.DeletedObject, []error) {

errs := make([]error, len(objects))
dobjects := make([]minio.DeletedObject, len(objects))
for idx, object := range objects {
Expand Down
51 changes: 51 additions & 0 deletions gateway/s3x/refid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package s3x

import (
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"fmt"

"github.com/ipfs/go-datastore"
"github.com/pkg/errors"
)

//refID is used to tag resources for future deletion in shared storage
type refID string

func (ls *ledgerStore) setRefIDRoot(g *TEMX) error {
key := datastore.NewKey("/rtrade/s3x/RefIDSecret")
v, err := ls.ds.Get(key)
if err == datastore.ErrNotFound {
v = make([]byte, 64)
if _, err = rand.Read(v); err != nil {
return err
}
err = ls.ds.Put(key, v)
}
if err != nil {
return err
}
if len(v) < 10 {
return errors.Errorf(`the RefIDSecret "%x" is not secure`, v)
}
h := sha256.New()
if _, err = h.Write(v); err != nil {
return err
}
if _, err = h.Write([]byte(g.SFSName)); err != nil {
return err
}
ls.refIDRoot = datastore.NewKey(base64.URLEncoding.EncodeToString(h.Sum(nil)))
return nil
}

func (ls *ledgerStore) objectRefID(bucket string, object string) refID {
key := ls.refIDRoot.ChildString("o").ChildString(fmt.Sprint(len(bucket))).ChildString(bucket).ChildString(object)
return refID(key.String())
}

func (ls *ledgerStore) bucketRefID(bucket string) refID {
key := ls.refIDRoot.ChildString("b").ChildString(bucket)
return refID(key.String())
}
53 changes: 53 additions & 0 deletions gateway/s3x/refid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package s3x

import (
"context"
"testing"
)

func TestSetRefIDRoot(t *testing.T) {
ctx := context.Background()
gateway := newTestGateway(t, DSTypeBadger, false)
defer func() {
if err := gateway.Shutdown(ctx); err != nil {
t.Fatal(err)
}
}()
ls := gateway.ledgerStore
root := ls.refIDRoot
if len(root.String()) < 20 {
t.Fatalf("refIDRoot %v is too short", root)
}
if err := ls.setRefIDRoot(gateway.temx); err != nil {
t.Fatalf("setRefIDRoot should no return error when recalled: %v", err)
}
if root != ls.refIDRoot {
t.Fatalf("refIDRoot changed from %v to %v", root, ls.refIDRoot)
}

ra := ls.objectRefID("a/b", "c")
rb := ls.objectRefID("a", "b/c")
if ra == rb {
t.Fatalf("RefID %v must be different from %v", ra, rb)
}
gateway.restart(t)
ls = gateway.ledgerStore
ra2 := ls.objectRefID("a/b", "c")
if ra != ra2 {
t.Fatalf("RefID must be the same after a restart %v != %v", ra, ra2)
}
gateway.temx.SFSName = "fake-server"
if err := ls.setRefIDRoot(gateway.temx); err != nil {
t.Fatal(err)
}
if root == ls.refIDRoot {
t.Fatalf("refIDRoot must change when SFSName changes")
}
gateway.temx.SFSName = ""
if err := ls.setRefIDRoot(gateway.temx); err != nil {
t.Fatal(err)
}
if root != ls.refIDRoot {
t.Fatalf("refIDRoot must change back when SFSName changes back %v != %v", root, ls.refIDRoot)
}
}
5 changes: 3 additions & 2 deletions gateway/s3x/s3x.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type TEMX struct {
DSPassthrough bool // turns off in-memory cache of some datastratures, lowers memory usage and allows network sync of the datastore to work, at a cost of performance.
CrdtTopic string // if the database type is crdt, then this is used as the pubsub topic
XAddr string // server and port of the XAPI address
SFSName string // name used to salt the RefID secret, so the secret can not be guessed for other severs.
Insecure bool // skip certificate verification when connecting to TemporalX
}

Expand Down Expand Up @@ -153,7 +154,7 @@ func (g *TEMX) newBadgerLedgerStore(dag pb.NodeAPIClient) (*ledgerStore, error)
if err != nil {
return nil, err
}
return newLedgerStore(ds, dag, g.DSPassthrough)
return newLedgerStore(g, ds, dag, g.DSPassthrough)
}

// newCrdtLedgerStore returns an instance of ledgerStore that uses crdt and backed by badgerv2
Expand Down Expand Up @@ -182,7 +183,7 @@ func (g *TEMX) newCrdtLedgerStore(ctx context.Context, dag pb.NodeAPIClient, pub
if err != nil {
return nil, err
}
ls, err := newLedgerStore(crdtds, dag, g.DSPassthrough)
ls, err := newLedgerStore(g, crdtds, dag, g.DSPassthrough)
if err != nil {
return nil, err
}
Expand Down
8 changes: 1 addition & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/RTradeLtd/s3x
go 1.15

require (
github.com/RTradeLtd/TxPB/v3 v3.4.2
github.com/RTradeLtd/TxPB/v3 v3.4.3-0.20200926080756-1c3b6905c710
github.com/RTradeLtd/go-ds-badger/v2 v2.1.0
github.com/dgraph-io/badger/v2 v2.0.3 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
Expand All @@ -18,22 +18,16 @@ require (
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-unixfs v0.2.4
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/minio/cli v1.22.0
github.com/minio/minio v0.0.0-20201003023654-017954e7ea23
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pkg/errors v0.9.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/segmentio/ksuid v1.0.3
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 // indirect
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/genproto v0.0.0-20201002142447-3860012362da
google.golang.org/grpc v1.29.1
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
honnef.co/go/tools v0.0.1-2020.1.5 // indirect
)
Loading