Skip to content

Commit

Permalink
add cmd/create-dynamodb-import tool per issue #3
Browse files Browse the repository at this point in the history
  • Loading branch information
whosonfirst committed Aug 19, 2022
1 parent 7a6d12f commit 2001f92
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 24 deletions.
189 changes: 189 additions & 0 deletions cmd/create-dynamodb-import/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// create-dynamodb-import is a command-line tool to create a CSV file derived by one or more whosonfirst-findingaid data archives
// suitable for importing in to DynamoDB. For example:
//
// $> go run cmd/create-dynamodb-import/main.go /usr/local/whosonfirst/whosonfirst-findingaids/data/*
//
// See also: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.HowItWorks.html
package main

import (
"archive/tar"
"bytes"
"compress/gzip"
"flag"
"fmt"
"github.com/sfomuseum/go-csvdict"
"io"
"log"
"os"
"sync"
)

const sources_csv string = "sources.csv"
const catalog_csv string = "catalog.csv"

func main() {

flag.Parse()

writers := []io.Writer{
os.Stdout,
}

wr := io.MultiWriter(writers...)

fieldnames := []string{
"id",
"repo",
}

csv_wr, err := csvdict.NewWriter(wr, fieldnames)

if err != nil {
log.Fatalf("Failed to create CSV writer, %w", err)
}

paths := flag.Args()

done_ch := make(chan bool)
err_ch := make(chan error)
row_ch := make(chan map[string]string)

for _, path := range paths {

go func(path string) {

defer func() {
done_ch <- true
}()

r, err := os.Open(path)

if err != nil {
err_ch <- fmt.Errorf("Failed to open %s for reading, %w", path, err)
return
}

gr, err := gzip.NewReader(r)

if err != nil {
err_ch <- fmt.Errorf("Failed to create gzip reader for %s, %w", path, err)
return
}

tr := tar.NewReader(gr)

var catalog []byte
var sources []byte

for {
hdr, err := tr.Next()

if err == io.EOF {
break // End of archive
}

if err != nil {
err_ch <- fmt.Errorf("Failed to advance, %w", err)
return
}

switch hdr.Name {
case sources_csv, catalog_csv:
// pass
default:
continue
}

body, err := io.ReadAll(tr)

if err != nil {
err_ch <- fmt.Errorf("Failed to read body for %s (%s), %w", path, hdr.Name, err)
return
}

switch hdr.Name {
case sources_csv:
sources = body
case catalog_csv:
catalog = body
default:
// pass
}
}

if len(sources) == 0 {
return
}

sources_map := make(map[string]string)

source_r, err := csvdict.NewReader(bytes.NewReader(sources))

if err != nil {
err_ch <- fmt.Errorf("Failed to create CSV reader for sources (%s), %w", path, err)
return
}

for {
row, err := source_r.Read()

if err == io.EOF {
break
}

if err != nil {
err_ch <- fmt.Errorf("Failed to read CSV row for sources (%s), %w", path, err)
return
}

sources_map[row["id"]] = row["name"]
}

catalog_r, err := csvdict.NewReader(bytes.NewReader(catalog))

if err != nil {
err_ch <- fmt.Errorf("Failed to create CSV reader for catalog (%s), %w", path, err)
return
}

for {
row, err := catalog_r.Read()

if err == io.EOF {
break
}

if err != nil {
err_ch <- fmt.Errorf("Failed to read CSV row for catalog (%s), %w", path, err)
return
}

repo_id := row["repo_id"]
repo_name := sources_map[repo_id]

row_ch <- map[string]string{"id": row["id"], "repo": repo_name}
}

}(path)

}

remaining := len(paths)

mu := new(sync.RWMutex)

for remaining > 0 {
select {
case <-done_ch:
remaining -= 1
case err := <-err_ch:
log.Fatal(err)
case row := <-row_ch:
mu.Lock()
csv_wr.WriteRow(row)
mu.Unlock()
}
}

}
2 changes: 1 addition & 1 deletion provider/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"fmt"
"github.com/jtacoma/uritemplates"
"github.com/whosonfirst/go-whosonfirst-github/organizations"
"github.com/sfomuseum/iso8601duration"
"github.com/whosonfirst/go-whosonfirst-github/organizations"
"net/url"
"regexp"
"strconv"
Expand Down
4 changes: 2 additions & 2 deletions resolver/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func (r *HTTPResolver) GetRepo(ctx context.Context, id int64) (string, error) {
if err != nil {
return "", fmt.Errorf("Failed to parse endpoint URL, %w", err)
}

str_id := strconv.FormatInt(id, 10)
u.Path = filepath.Join(u.Path, str_id)

rsp, err := http.Get(u.String())

if err != nil {
Expand Down
36 changes: 18 additions & 18 deletions resolver/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package resolver
import (
"context"
"fmt"
"github.com/tidwall/gjson"
"github.com/whosonfirst/go-reader"
"github.com/whosonfirst/go-whosonfirst-uri"
"io"
_ "log"
"net/url"
"github.com/whosonfirst/go-reader"
"github.com/tidwall/gjson"
"github.com/whosonfirst/go-whosonfirst-uri"
)

// type ReaderResolver implements the `Resolver` interface for data that can be resolved using a whosonfirst/go-reader.Reader instance.
type ReaderResolver struct {
Resolver
reader reader.Reader
reader reader.Reader
strategy string
}

Expand All @@ -30,11 +30,11 @@ func init() {
// reader://?reader={READER_URI}&strategy={STRATEGY}
//
// Where:
// * {READER_URI} is a valid whosonfirst/go-reader.Reader URI
// * {STRATEGY} is a string describing the strategy use to expand the 'id' parameter passed to the
// `GetRepo` method to a URI. Valid options are:
// ** 'fname' which will expand '101736545' to '101736545.geojson' (default)
// ** 'uri' which will expand '101736545' to '101/736/545/101736545.geojson'
// - {READER_URI} is a valid whosonfirst/go-reader.Reader URI
// - {STRATEGY} is a string describing the strategy use to expand the 'id' parameter passed to the
// `GetRepo` method to a URI. Valid options are:
// ** 'fname' which will expand '101736545' to '101736545.geojson' (default)
// ** 'uri' which will expand '101736545' to '101/736/545/101736545.geojson'
func NewReaderResolver(ctx context.Context, uri string) (Resolver, error) {

u, err := url.Parse(uri)
Expand All @@ -58,9 +58,9 @@ func NewReaderResolver(ctx context.Context, uri string) (Resolver, error) {
if s == "" {
s = "fname"
}

f := &ReaderResolver{
reader: r,
reader: r,
strategy: s,
}

Expand All @@ -74,36 +74,36 @@ func (r *ReaderResolver) GetRepo(ctx context.Context, id int64) (string, error)

switch r.strategy {
case "uri":

rel_path, err := uri.Id2RelPath(id)

if err != nil {
return "", fmt.Errorf("Failed to derive rel path, %w", err)
}

path = rel_path

case "fname":

fname, err := uri.Id2Fname(id)

if err != nil {
return "", fmt.Errorf("Failed to derive filename, %w", err)
}

path = fname

default:

return "", fmt.Errorf("Invalid strategy")
}

fh, err := r.reader.Read(ctx, path)

if err != nil {
return "", fmt.Errorf("Failed to read %s, %w", path, err)
}

defer fh.Close()

body, err := io.ReadAll(fh)
Expand All @@ -114,7 +114,7 @@ func (r *ReaderResolver) GetRepo(ctx context.Context, id int64) (string, error)

repo_rsp := gjson.GetBytes(body, "properties.wof:repo")

if !repo_rsp.Exists(){
if !repo_rsp.Exists() {
return "", fmt.Errorf("Body missing wof:repo property")
}

Expand Down
6 changes: 3 additions & 3 deletions resolver/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package resolver

import (
"context"
"testing"
"fmt"
"path/filepath"
"testing"
)

func TestReaderResolver(t *testing.T) {

ctx := context.Background()

fixtures := "../fixtures"
abs_path, err := filepath.Abs(fixtures)

Expand All @@ -36,6 +36,6 @@ func TestReaderResolver(t *testing.T) {
if repo != "whosonfirst-data-admin-ca" {
t.Fatalf("Invalid repo: %s", repo)
}

fmt.Printf(repo)
}

0 comments on commit 2001f92

Please sign in to comment.