Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement error handler #105

Merged
merged 20 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions cmd/timescaledb-parallel-copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ var (
quoteCharacter string
escapeCharacter string

fromFile string
columns string
skipHeader bool
headerLinesCnt int
fromFile string
columns string
skipHeader bool
headerLinesCnt int
batchErrorOutputDir string
skipBatchErrors bool

workers int
limit int64
Expand Down Expand Up @@ -67,6 +69,9 @@ func init() {
flag.BoolVar(&skipHeader, "skip-header", false, "Skip the first line of the input")
flag.IntVar(&headerLinesCnt, "header-line-count", 1, "Number of header lines")

flag.StringVar(&batchErrorOutputDir, "batch-error-output-dir", "", "directory to store batch errors. Settings this will save a .csv file with the contents of the batch that failed and continue with the rest of the data.")
flag.BoolVar(&skipBatchErrors, "skip-batch-errors", false, "if true, the copy will continue even if a batch fails")

flag.IntVar(&batchSize, "batch-size", 5000, "Number of rows per insert")
flag.Int64Var(&limit, "limit", 0, "Number of rows to insert overall; 0 means to insert all")
flag.IntVar(&workers, "workers", 1, "Number of parallel requests to make")
Expand Down Expand Up @@ -94,8 +99,10 @@ func main() {
if dbName != "" {
log.Fatalf("Error: Deprecated flag -db-name is being used. Update -connection to connect to the given database")
}
logger := &csvCopierLogger{}

opts := []csvcopy.Option{
csvcopy.WithLogger(&csvCopierLogger{}),
csvcopy.WithLogger(logger),
csvcopy.WithSchemaName(schemaName),
csvcopy.WithCopyOptions(copyOptions),
csvcopy.WithSplitCharacter(splitCharacter),
Expand All @@ -110,6 +117,19 @@ func main() {
csvcopy.WithVerbose(verbose),
}

batchErrorHandler := csvcopy.BatchHandlerError()
if skipBatchErrors {
batchErrorHandler = csvcopy.BatchHandlerNoop()
}
if batchErrorOutputDir != "" {
log.Printf("batch errors will be stored at %s", batchErrorOutputDir)
batchErrorHandler = csvcopy.BatchHandlerSaveToFile(batchErrorOutputDir, batchErrorHandler)
}
if verbose || skipBatchErrors {
batchErrorHandler = csvcopy.BatchHandlerLog(logger, batchErrorHandler)
}
opts = append(opts, csvcopy.WithBatchErrorHandler(batchErrorHandler))
Comment on lines +120 to +131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this could be extractor to an auxiliar function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far I think main is the best place to put this logic, moving this to a function won't make it much easier. as you need to pass all the flags.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can have more functions directly in the main.go file so you would still have access to the flags since they're global. It's just a way to avoid an ever-growing main function that happens as programs become more complex


if skipHeader {
opts = append(opts,
csvcopy.WithSkipHeaderCount(headerLinesCnt),
Expand Down
83 changes: 69 additions & 14 deletions internal/batch/scan.go → pkg/batch/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"log"
"net"
)

Expand All @@ -22,18 +23,55 @@ type Options struct {
type Batch struct {
Data net.Buffers
Location Location

// backup holds the same data as Data. It is used to rewind if something goes wrong
// Because it copies the slice, the memory is not duplicated
// Because we only read data, the underlaying memory is not modified either
backup net.Buffers
}

func NewBatch(data net.Buffers, location Location) Batch {
b := Batch{
Data: data,
Location: location,
}
b.snapshot()
return b
}

func (b *Batch) snapshot() {
b.backup = net.Buffers{}
b.backup = append(b.backup, b.Data...)
}

// Makes data available again to read
func (b *Batch) Rewind() {
b.Data = net.Buffers{}
b.Data = append(b.Data, b.backup...)
}

// Location positions a batch within the original data
type Location struct {
// StartRow represents the index of the row where the batch starts.
// First row of the file is row 0
// The header counts as a line
StartRow int64
Length int
// RowCount is the number of rows in the batch
RowCount int
// ByteOffset is the byte position in the original file.
// It can be used with ReadAt to process the same batch again.
ByteOffset int
// ByteLen represents the number of bytes for the batch.
// It can be used to know how big the batch is and read it accordingly
ByteLen int
}

func NewLocation(rowsRead int64, bufferedRows int, skip int) Location {
func NewLocation(rowsRead int64, bufferedRows int, skip int, byteOffset int, byteLen int) Location {
AdrianLC marked this conversation as resolved.
Show resolved Hide resolved
return Location{
StartRow: rowsRead - int64(bufferedRows) + int64(skip),
Length: bufferedRows,
StartRow: rowsRead - int64(bufferedRows) + int64(skip) - 1, // Index rows starting at 0
RowCount: bufferedRows,
ByteOffset: byteOffset,
ByteLen: byteLen,
}
}

Expand All @@ -49,7 +87,8 @@ func NewLocation(rowsRead int64, bufferedRows int, skip int) Location {
// opts.Escape as the QUOTE and ESCAPE characters used for the CSV input.
func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) error {
var rowsRead int64
reader := bufio.NewReader(r)
counter := &CountReader{Reader: r}
reader := bufio.NewReader(counter)

for skip := opts.Skip; skip > 0; {
// The use of ReadLine() here avoids copying or buffering data that
Expand All @@ -62,7 +101,6 @@ func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) erro
} else if err != nil {
return fmt.Errorf("skipping header: %w", err)
}

if !isPrefix {
// We pulled a full row from the buffer.
skip--
Expand Down Expand Up @@ -91,6 +129,7 @@ func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) erro
bufs := make(net.Buffers, 0, opts.Size)
var bufferedRows int

byteStart := counter.Total - reader.Buffered()
for {
eol := false

Expand Down Expand Up @@ -130,16 +169,18 @@ func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) erro
}

if bufferedRows >= opts.Size { // dispatch to COPY worker & reset
byteEnd := counter.Total - reader.Buffered()
select {
case out <- Batch{
Data: bufs,
Location: NewLocation(rowsRead, bufferedRows, opts.Skip),
}:
case out <- NewBatch(
bufs,
NewLocation(rowsRead, bufferedRows, opts.Skip, byteStart, byteEnd-byteStart),
):
case <-ctx.Done():
return ctx.Err()
}
bufs = make(net.Buffers, 0, opts.Size)
bufferedRows = 0
byteStart = byteEnd
}
}

Expand All @@ -153,15 +194,17 @@ func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) erro

// Finished reading input, make sure last batch goes out.
if len(bufs) > 0 {
byteEnd := counter.Total - reader.Buffered()
select {
case out <- Batch{
Data: bufs,
Location: NewLocation(rowsRead, bufferedRows, opts.Skip),
}:
case out <- NewBatch(
bufs,
NewLocation(rowsRead, bufferedRows, opts.Skip, byteStart, byteEnd-byteStart),
):
case <-ctx.Done():
return ctx.Err()
}
}
log.Print("total rows ", rowsRead)

return nil
}
Expand Down Expand Up @@ -257,3 +300,15 @@ func (c *csvRowState) NeedsMore() bool {
// c.inQuote is also true.
return c.inQuote
}

// CountReader is a wrapper that counts how many bytes have been read from the given reader
type CountReader struct {
Reader io.Reader
Total int
}

func (c *CountReader) Read(b []byte) (int, error) {
n, err := c.Reader.Read(b)
c.Total += n
return n, err
}
File renamed without changes.
45 changes: 44 additions & 1 deletion internal/batch/scan_test.go → pkg/batch/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"errors"
"fmt"
"io"
"net"
"reflect"
"strings"
"testing"

"github.com/timescale/timescaledb-parallel-copy/internal/batch"
"github.com/stretchr/testify/require"
"github.com/timescale/timescaledb-parallel-copy/pkg/batch"
"golang.org/x/exp/rand"
)

func TestScan(t *testing.T) {
Expand Down Expand Up @@ -428,3 +431,43 @@ func BenchmarkScan(b *testing.B) {
}
}
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ,")

func RandString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

func TestRewind(t *testing.T) {
randomData := RandString(5000)
data := net.Buffers(bytes.Split([]byte(randomData), []byte(",")))

batch := batch.NewBatch(data, batch.NewLocation(0, 0, 0, 0, 0))

var err error
// reads all the data
buf := bytes.Buffer{}
_, err = buf.ReadFrom(&batch.Data)
require.NoError(t, err)
require.Equal(t, strings.Replace(randomData, ",", "", -1), buf.String())
require.Empty(t, batch.Data)

// Reading again returns nothing
buf = bytes.Buffer{}
_, err = buf.ReadFrom(&batch.Data)
require.NoError(t, err)
require.Empty(t, buf.String())
require.Empty(t, batch.Data)

// Reading again after rewind, returns all data
batch.Rewind()
buf = bytes.Buffer{}
_, err = buf.ReadFrom(&batch.Data)
require.NoError(t, err)
require.Equal(t, strings.Replace(randomData, ",", "", -1), buf.String())

}
62 changes: 62 additions & 0 deletions pkg/csvcopy/batch_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package csvcopy

import (
"fmt"
"io"
"os"
"path/filepath"

"github.com/timescale/timescaledb-parallel-copy/pkg/batch"
)

// BatchHandlerSaveToFile saves the errors to the given directory using the batch start row as file name.
func BatchHandlerSaveToFile(dir string, next BatchErrorHandler) BatchErrorHandler {
return BatchErrorHandler(func(batch batch.Batch, reason error) error {
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return fmt.Errorf("failed to ensure directory exists: %w", err)
}

fileName := fmt.Sprintf("%d.csv", batch.Location.StartRow)
AdrianLC marked this conversation as resolved.
Show resolved Hide resolved
path := filepath.Join(dir, fileName)

dst, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to create file to store batch error, %w", err)
}
defer dst.Close()

batch.Rewind()
_, err = io.Copy(dst, &batch.Data)
if err != nil {
return fmt.Errorf("failed to write file to store batch error, %w", err)
}

if next != nil {
return next(batch, reason)
}
return nil
})
}

// BatchHandlerLog prints a log line that reports the error in the given batch
func BatchHandlerLog(log Logger, next BatchErrorHandler) BatchErrorHandler {
return BatchErrorHandler(func(batch batch.Batch, reason error) error {
log.Infof("Batch %d, starting at byte %d with len %d, has error: %s", batch.Location.StartRow, batch.Location.ByteOffset, batch.Location.ByteLen, reason.Error())

if next != nil {
return next(batch, reason)
}
return nil
})
}

// BatchHandlerNoop no operation
func BatchHandlerNoop() BatchErrorHandler {
return BatchErrorHandler(func(_ batch.Batch, _ error) error { return nil })
}

// BatchHandlerError fails the process
func BatchHandlerError() BatchErrorHandler {
return BatchErrorHandler(func(_ batch.Batch, err error) error { return err })
}
Loading
Loading