Skip to content

Commit

Permalink
add timings.Monitor to csv2docstore
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisaaronland committed Nov 5, 2021
1 parent 38e0603 commit aceda1d
Showing 1 changed file with 74 additions and 7 deletions.
81 changes: 74 additions & 7 deletions cmd/csv2docstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,103 @@ import (
"context"
"flag"
"fmt"
"github.com/aaronland/go-aws-dynamodb"
"github.com/sfomuseum/go-csvdict"
"github.com/sfomuseum/go-timings"
"github.com/whosonfirst/go-whosonfirst-findingaid/v2/producer/docstore"
gc_docstore "gocloud.dev/docstore"
gc_dynamodb "gocloud.dev/docstore/awsdynamodb"
"io"
"io/ioutil"
"log"
"net/url"
"os"
"strconv"
"time"
)

func main() {

// docstore_uri := flag.String("docstore-uri", "", "...")
docstore_uri := flag.String("docstore-uri", "", "...")
flag.Parse()

archives := flag.Args()

ctx := context.Background()

d := time.Second * 60
monitor, err := timings.NewCounterMonitor(ctx, d)

if err != nil {
log.Fatalf("Failed to create timings monitor, %w", err)
}

monitor.Start(ctx, os.Stdout)
defer monitor.Stop(ctx)

// START OF put me in a package function or something

var collection *gc_docstore.Collection

u, err := url.Parse(*docstore_uri)

if err != nil {
log.Fatalf("Failed to parse URI, %v", err)
}

q := u.Query()

if u.Scheme == "awsdynamodb" {

// Connect local dynamodb using Golang
// https://gist.github.com/Tamal/02776c3e2db7eec73c001225ff52e827
// https://gocloud.dev/howto/docstore/#dynamodb-ctor

client, err := dynamodb.NewClientWithURI(ctx, *docstore_uri)

if err != nil {
log.Fatalf("Failed to create client, %v", err)
}

u, _ := url.Parse(*docstore_uri)
table_name := u.Host

partition_key := q.Get("partition_key")

col, err := gc_dynamodb.OpenCollection(client, table_name, partition_key, "", nil)

if err != nil {
log.Fatalf("Failed to open collection, %v", err)
}

collection = col

} else {

col, err := gc_docstore.OpenCollection(ctx, *docstore_uri)

if err != nil {
log.Fatalf("Failed to create database for '%s', %v", *docstore_uri, err)
}

collection = col
}

// END OF put me in a package function or something

for _, path := range archives {

err := processArchive(ctx, path, collection)
err := processArchive(ctx, path, collection, monitor)

if err != nil {
log.Fatalf("Failed to process %s, %v", path, err)
}
}
}

func processArchive(ctx context.Context, path string, collection *gc_docstore.Collection) error {
func processArchive(ctx context.Context, path string, collection *gc_docstore.Collection, monitor timings.Monitor) error {

log.Printf("Process %s\n", path)

f, err := os.Open(path)

Expand All @@ -49,10 +114,10 @@ func processArchive(ctx context.Context, path string, collection *gc_docstore.Co

defer f.Close()

return processArchiveWithReader(ctx, f, collection)
return processArchiveWithReader(ctx, f, collection, monitor)
}

func processArchiveWithReader(ctx context.Context, r io.Reader, collection *gc_docstore.Collection) error {
func processArchiveWithReader(ctx context.Context, r io.Reader, collection *gc_docstore.Collection, monitor timings.Monitor) error {

gzip_r, err := gzip.NewReader(r)

Expand Down Expand Up @@ -156,7 +221,7 @@ func processArchiveWithReader(ctx context.Context, r io.Reader, collection *gc_d

defer catalog_r.Close()

err = processCatalog(ctx, catalog_r, lookup, collection)
err = processCatalog(ctx, catalog_r, lookup, collection, monitor)

if err != nil {
return fmt.Errorf("Failed to process catalog, %w", err)
Expand All @@ -165,7 +230,7 @@ func processArchiveWithReader(ctx context.Context, r io.Reader, collection *gc_d
return nil
}

func processCatalog(ctx context.Context, r io.Reader, lookup map[int64]string, collection *gc_docstore.Collection) error {
func processCatalog(ctx context.Context, r io.Reader, lookup map[int64]string, collection *gc_docstore.Collection, monitor timings.Monitor) error {

csv_r, err := csvdict.NewReader(r)

Expand Down Expand Up @@ -219,6 +284,8 @@ func processCatalog(ctx context.Context, r io.Reader, lookup map[int64]string, c
if err != nil {
return fmt.Errorf("Failed to add row to catalog, %w", err)
}

go monitor.Signal(ctx)
}

return nil
Expand Down

0 comments on commit aceda1d

Please sign in to comment.