Skip to content

Commit

Permalink
Refactoring: Hide Arrow and CloudFetch batch loaders behind BatchIter…
Browse files Browse the repository at this point in the history
…ator interface (simplifies usage and encapsulates implementation details)

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Jul 4, 2024
1 parent bd10b62 commit 92d1ef4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 35 deletions.
23 changes: 2 additions & 21 deletions internal/rows/arrowbased/arrowRecordIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,29 +163,10 @@ func (ri *arrowRecordIterator) getBatchIterator() error {

// Create a new batch iterator from a page of the result set
func (ri *arrowRecordIterator) newBatchIterator(fr *cli_service.TFetchResultsResp) (BatchIterator, error) {
bl, err := ri.newBatchLoader(fr)
if err != nil {
return nil, err
}

bi, err := NewBatchIterator(bl)

return bi, err
}

// Create a new batch loader from a page of the result set
func (ri *arrowRecordIterator) newBatchLoader(fr *cli_service.TFetchResultsResp) (BatchLoader, error) {
rowSet := fr.Results
var bl BatchLoader
var err error
if len(rowSet.ResultLinks) > 0 {
bl, err = NewCloudBatchLoader(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
return NewCloudBatchIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
} else {
bl, err = NewLocalBatchLoader(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
return NewLocalBatchIterator(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
}
if err != nil {
return nil, err
}

return bl, nil
}
12 changes: 3 additions & 9 deletions internal/rows/arrowbased/arrowRows.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,21 @@ func NewArrowRowScanner(resultSetMetadata *cli_service.TGetResultSetMetadataResp
return nil, dbsqlerrint.NewDriverError(ctx, errArrowRowsToTimestampFn, err)
}

var bl BatchLoader
var bi BatchIterator
var err2 dbsqlerr.DBError
if len(rowSet.ResultLinks) > 0 {
logger.Debug().Msgf("Initialize CloudFetch loader, row set start offset: %d, file list:", rowSet.StartRowOffset)
for _, resultLink := range rowSet.ResultLinks {
logger.Debug().Msgf("- start row offset: %d, row count: %d", resultLink.StartRowOffset, resultLink.RowCount)
}
bl, err2 = NewCloudBatchLoader(context.Background(), rowSet.ResultLinks, rowSet.StartRowOffset, cfg)
logger.Debug().Msgf("Created CloudFetch concurrent loader, rows range [%d..%d]", bl.Start(), bl.End())
bi, err2 = NewCloudBatchIterator(context.Background(), rowSet.ResultLinks, rowSet.StartRowOffset, cfg)
} else {
bl, err2 = NewLocalBatchLoader(context.Background(), rowSet.ArrowBatches, rowSet.StartRowOffset, schemaBytes, cfg)
bi, err2 = NewLocalBatchIterator(context.Background(), rowSet.ArrowBatches, rowSet.StartRowOffset, schemaBytes, cfg)
}
if err2 != nil {
return nil, err2
}

bi, err := NewBatchIterator(bl)
if err != nil {
return nil, err2
}

var location *time.Location = time.UTC
if cfg != nil {
if cfg.Location != nil {
Expand Down
29 changes: 24 additions & 5 deletions internal/rows/arrowbased/batchloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,35 @@ type BatchLoader interface {
Close()
}

func NewBatchIterator(batchLoader BatchLoader) (BatchIterator, dbsqlerr.DBError) {
func NewCloudBatchIterator(ctx context.Context, files []*cli_service.TSparkArrowResultLink, startRowOffset int64, cfg *config.Config) (BatchIterator, dbsqlerr.DBError) {
bl, err := newCloudBatchLoader(ctx, files, startRowOffset, cfg)
if err != nil {
return nil, err
}

bi := &batchIterator{
nextBatchStart: bl.Start(),
batchLoader: bl,
}

return bi, nil
}

func NewLocalBatchIterator(ctx context.Context, batches []*cli_service.TSparkArrowBatch, startRowOffset int64, arrowSchemaBytes []byte, cfg *config.Config) (BatchIterator, dbsqlerr.DBError) {
bl, err := newLocalBatchLoader(ctx, batches, startRowOffset, arrowSchemaBytes, cfg)
if err != nil {
return nil, err
}

bi := &batchIterator{
nextBatchStart: batchLoader.Start(),
batchLoader: batchLoader,
nextBatchStart: bl.Start(),
batchLoader: bl,
}

return bi, nil
}

func NewCloudBatchLoader(ctx context.Context, files []*cli_service.TSparkArrowResultLink, startRowOffset int64, cfg *config.Config) (*batchLoader[*cloudURL], dbsqlerr.DBError) {
func newCloudBatchLoader(ctx context.Context, files []*cli_service.TSparkArrowResultLink, startRowOffset int64, cfg *config.Config) (*batchLoader[*cloudURL], dbsqlerr.DBError) {

if cfg == nil {
cfg = config.WithDefaults()
Expand Down Expand Up @@ -79,7 +98,7 @@ func NewCloudBatchLoader(ctx context.Context, files []*cli_service.TSparkArrowRe
return cbl, nil
}

func NewLocalBatchLoader(ctx context.Context, batches []*cli_service.TSparkArrowBatch, startRowOffset int64, arrowSchemaBytes []byte, cfg *config.Config) (*batchLoader[*localBatch], dbsqlerr.DBError) {
func newLocalBatchLoader(ctx context.Context, batches []*cli_service.TSparkArrowBatch, startRowOffset int64, arrowSchemaBytes []byte, cfg *config.Config) (*batchLoader[*localBatch], dbsqlerr.DBError) {

if cfg == nil {
cfg = config.WithDefaults()
Expand Down

0 comments on commit 92d1ef4

Please sign in to comment.