Skip to content

Commit

Permalink
Merge pull request #105 from timescale/vperez/skip-invalid-rows
Browse files Browse the repository at this point in the history
Implement error handler
  • Loading branch information
MetalBlueberry authored Dec 18, 2024
2 parents 82d0731 + bb17609 commit 7f3c49d
Show file tree
Hide file tree
Showing 8 changed files with 522 additions and 48 deletions.
30 changes: 25 additions & 5 deletions cmd/timescaledb-parallel-copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ var (
quoteCharacter string
escapeCharacter string

fromFile string
columns string
skipHeader bool
headerLinesCnt int
fromFile string
columns string
skipHeader bool
headerLinesCnt int
batchErrorOutputDir string
skipBatchErrors bool

workers int
limit int64
Expand Down Expand Up @@ -67,6 +69,9 @@ func init() {
flag.BoolVar(&skipHeader, "skip-header", false, "Skip the first line of the input")
flag.IntVar(&headerLinesCnt, "header-line-count", 1, "Number of header lines")

flag.StringVar(&batchErrorOutputDir, "batch-error-output-dir", "", "directory to store batch errors. Settings this will save a .csv file with the contents of the batch that failed and continue with the rest of the data.")
flag.BoolVar(&skipBatchErrors, "skip-batch-errors", false, "if true, the copy will continue even if a batch fails")

flag.IntVar(&batchSize, "batch-size", 5000, "Number of rows per insert")
flag.Int64Var(&limit, "limit", 0, "Number of rows to insert overall; 0 means to insert all")
flag.IntVar(&workers, "workers", 1, "Number of parallel requests to make")
Expand Down Expand Up @@ -94,8 +99,10 @@ func main() {
if dbName != "" {
log.Fatalf("Error: Deprecated flag -db-name is being used. Update -connection to connect to the given database")
}
logger := &csvCopierLogger{}

opts := []csvcopy.Option{
csvcopy.WithLogger(&csvCopierLogger{}),
csvcopy.WithLogger(logger),
csvcopy.WithSchemaName(schemaName),
csvcopy.WithCopyOptions(copyOptions),
csvcopy.WithSplitCharacter(splitCharacter),
Expand All @@ -110,6 +117,19 @@ func main() {
csvcopy.WithVerbose(verbose),
}

batchErrorHandler := csvcopy.BatchHandlerError()
if skipBatchErrors {
batchErrorHandler = csvcopy.BatchHandlerNoop()
}
if batchErrorOutputDir != "" {
log.Printf("batch errors will be stored at %s", batchErrorOutputDir)
batchErrorHandler = csvcopy.BatchHandlerSaveToFile(batchErrorOutputDir, batchErrorHandler)
}
if verbose || skipBatchErrors {
batchErrorHandler = csvcopy.BatchHandlerLog(logger, batchErrorHandler)
}
opts = append(opts, csvcopy.WithBatchErrorHandler(batchErrorHandler))

if skipHeader {
opts = append(opts,
csvcopy.WithSkipHeaderCount(headerLinesCnt),
Expand Down
83 changes: 69 additions & 14 deletions internal/batch/scan.go → pkg/batch/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"log"
"net"
)

Expand All @@ -22,18 +23,55 @@ type Options struct {
type Batch struct {
Data net.Buffers
Location Location

// backup holds the same data as Data. It is used to rewind if something goes wrong
// Because it copies the slice, the memory is not duplicated
// Because we only read data, the underlaying memory is not modified either
backup net.Buffers
}

func NewBatch(data net.Buffers, location Location) Batch {
b := Batch{
Data: data,
Location: location,
}
b.snapshot()
return b
}

func (b *Batch) snapshot() {
b.backup = net.Buffers{}
b.backup = append(b.backup, b.Data...)
}

// Makes data available again to read
func (b *Batch) Rewind() {
b.Data = net.Buffers{}
b.Data = append(b.Data, b.backup...)
}

// Location positions a batch within the original data
type Location struct {
// StartRow represents the index of the row where the batch starts.
// First row of the file is row 0
// The header counts as a line
StartRow int64
Length int
// RowCount is the number of rows in the batch
RowCount int
// ByteOffset is the byte position in the original file.
// It can be used with ReadAt to process the same batch again.
ByteOffset int
// ByteLen represents the number of bytes for the batch.
// It can be used to know how big the batch is and read it accordingly
ByteLen int
}

func NewLocation(rowsRead int64, bufferedRows int, skip int) Location {
func NewLocation(rowsRead int64, bufferedRows int, skip int, byteOffset int, byteLen int) Location {
return Location{
StartRow: rowsRead - int64(bufferedRows) + int64(skip),
Length: bufferedRows,
StartRow: rowsRead - int64(bufferedRows) + int64(skip) - 1, // Index rows starting at 0
RowCount: bufferedRows,
ByteOffset: byteOffset,
ByteLen: byteLen,
}
}

Expand All @@ -49,7 +87,8 @@ func NewLocation(rowsRead int64, bufferedRows int, skip int) Location {
// opts.Escape as the QUOTE and ESCAPE characters used for the CSV input.
func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) error {
var rowsRead int64
reader := bufio.NewReader(r)
counter := &CountReader{Reader: r}
reader := bufio.NewReader(counter)

for skip := opts.Skip; skip > 0; {
// The use of ReadLine() here avoids copying or buffering data that
Expand All @@ -62,7 +101,6 @@ func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) erro
} else if err != nil {
return fmt.Errorf("skipping header: %w", err)
}

if !isPrefix {
// We pulled a full row from the buffer.
skip--
Expand Down Expand Up @@ -91,6 +129,7 @@ func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) erro
bufs := make(net.Buffers, 0, opts.Size)
var bufferedRows int

byteStart := counter.Total - reader.Buffered()
for {
eol := false

Expand Down Expand Up @@ -130,16 +169,18 @@ func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) erro
}

if bufferedRows >= opts.Size { // dispatch to COPY worker & reset
byteEnd := counter.Total - reader.Buffered()
select {
case out <- Batch{
Data: bufs,
Location: NewLocation(rowsRead, bufferedRows, opts.Skip),
}:
case out <- NewBatch(
bufs,
NewLocation(rowsRead, bufferedRows, opts.Skip, byteStart, byteEnd-byteStart),
):
case <-ctx.Done():
return ctx.Err()
}
bufs = make(net.Buffers, 0, opts.Size)
bufferedRows = 0
byteStart = byteEnd
}
}

Expand All @@ -153,15 +194,17 @@ func Scan(ctx context.Context, r io.Reader, out chan<- Batch, opts Options) erro

// Finished reading input, make sure last batch goes out.
if len(bufs) > 0 {
byteEnd := counter.Total - reader.Buffered()
select {
case out <- Batch{
Data: bufs,
Location: NewLocation(rowsRead, bufferedRows, opts.Skip),
}:
case out <- NewBatch(
bufs,
NewLocation(rowsRead, bufferedRows, opts.Skip, byteStart, byteEnd-byteStart),
):
case <-ctx.Done():
return ctx.Err()
}
}
log.Print("total rows ", rowsRead)

return nil
}
Expand Down Expand Up @@ -257,3 +300,15 @@ func (c *csvRowState) NeedsMore() bool {
// c.inQuote is also true.
return c.inQuote
}

// CountReader is a wrapper that counts how many bytes have been read from the given reader
type CountReader struct {
Reader io.Reader
Total int
}

func (c *CountReader) Read(b []byte) (int, error) {
n, err := c.Reader.Read(b)
c.Total += n
return n, err
}
File renamed without changes.
45 changes: 44 additions & 1 deletion internal/batch/scan_test.go → pkg/batch/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"errors"
"fmt"
"io"
"net"
"reflect"
"strings"
"testing"

"github.com/timescale/timescaledb-parallel-copy/internal/batch"
"github.com/stretchr/testify/require"
"github.com/timescale/timescaledb-parallel-copy/pkg/batch"
"golang.org/x/exp/rand"
)

func TestScan(t *testing.T) {
Expand Down Expand Up @@ -428,3 +431,43 @@ func BenchmarkScan(b *testing.B) {
}
}
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ,")

func RandString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

func TestRewind(t *testing.T) {
randomData := RandString(5000)
data := net.Buffers(bytes.Split([]byte(randomData), []byte(",")))

batch := batch.NewBatch(data, batch.NewLocation(0, 0, 0, 0, 0))

var err error
// reads all the data
buf := bytes.Buffer{}
_, err = buf.ReadFrom(&batch.Data)
require.NoError(t, err)
require.Equal(t, strings.Replace(randomData, ",", "", -1), buf.String())
require.Empty(t, batch.Data)

// Reading again returns nothing
buf = bytes.Buffer{}
_, err = buf.ReadFrom(&batch.Data)
require.NoError(t, err)
require.Empty(t, buf.String())
require.Empty(t, batch.Data)

// Reading again after rewind, returns all data
batch.Rewind()
buf = bytes.Buffer{}
_, err = buf.ReadFrom(&batch.Data)
require.NoError(t, err)
require.Equal(t, strings.Replace(randomData, ",", "", -1), buf.String())

}
62 changes: 62 additions & 0 deletions pkg/csvcopy/batch_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package csvcopy

import (
"fmt"
"io"
"os"
"path/filepath"

"github.com/timescale/timescaledb-parallel-copy/pkg/batch"
)

// BatchHandlerSaveToFile saves the errors to the given directory using the batch start row as file name.
func BatchHandlerSaveToFile(dir string, next BatchErrorHandler) BatchErrorHandler {
return BatchErrorHandler(func(batch batch.Batch, reason error) error {
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return fmt.Errorf("failed to ensure directory exists: %w", err)
}

fileName := fmt.Sprintf("%d.csv", batch.Location.StartRow)
path := filepath.Join(dir, fileName)

dst, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to create file to store batch error, %w", err)
}
defer dst.Close()

batch.Rewind()
_, err = io.Copy(dst, &batch.Data)
if err != nil {
return fmt.Errorf("failed to write file to store batch error, %w", err)
}

if next != nil {
return next(batch, reason)
}
return nil
})
}

// BatchHandlerLog prints a log line that reports the error in the given batch
func BatchHandlerLog(log Logger, next BatchErrorHandler) BatchErrorHandler {
return BatchErrorHandler(func(batch batch.Batch, reason error) error {
log.Infof("Batch %d, starting at byte %d with len %d, has error: %s", batch.Location.StartRow, batch.Location.ByteOffset, batch.Location.ByteLen, reason.Error())

if next != nil {
return next(batch, reason)
}
return nil
})
}

// BatchHandlerNoop no operation
func BatchHandlerNoop() BatchErrorHandler {
return BatchErrorHandler(func(_ batch.Batch, _ error) error { return nil })
}

// BatchHandlerError fails the process
func BatchHandlerError() BatchErrorHandler {
return BatchErrorHandler(func(_ batch.Batch, err error) error { return err })
}
Loading

0 comments on commit 7f3c49d

Please sign in to comment.