Skip to content

Commit

Permalink
Add more debug logging for CloudFetch (#227)
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Jun 4, 2024
1 parent 869c98e commit 6ec1955
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
5 changes: 5 additions & 0 deletions internal/rows/arrowbased/arrowRows.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ func NewArrowRowScanner(resultSetMetadata *cli_service.TGetResultSetMetadataResp
var bl BatchLoader
var err2 dbsqlerr.DBError
if len(rowSet.ResultLinks) > 0 {
logger.Debug().Msgf("Initialize CloudFetch loader, row set start offset: %d, file list:", rowSet.StartRowOffset)
for _, resultLink := range rowSet.ResultLinks {
logger.Debug().Msgf("- start row offset: %d, row count: %d", resultLink.StartRowOffset, resultLink.RowCount)
}
bl, err2 = NewCloudBatchLoader(context.Background(), rowSet.ResultLinks, rowSet.StartRowOffset, cfg)
logger.Debug().Msgf("Created CloudFetch concurrent loader, rows range [%d..%d]", bl.Start(), bl.End())
} else {
bl, err2 = NewLocalBatchLoader(context.Background(), rowSet.ArrowBatches, rowSet.StartRowOffset, schemaBytes, cfg)
}
Expand Down
13 changes: 13 additions & 0 deletions internal/rows/arrowbased/batchloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/databricks/databricks-sql-go/internal/cli_service"
dbsqlerrint "github.com/databricks/databricks-sql-go/internal/errors"
"github.com/databricks/databricks-sql-go/internal/fetcher"
"github.com/databricks/databricks-sql-go/logger"
)

type BatchIterator interface {
Expand Down Expand Up @@ -126,15 +127,22 @@ var _ BatchLoader = (*batchLoader[*localBatch])(nil)

func (cbl *batchLoader[T]) GetBatchFor(rowNumber int64) (SparkArrowBatch, dbsqlerr.DBError) {

logger.Debug().Msgf("batchLoader.GetBatchFor(%d)", rowNumber)

for i := range cbl.arrowBatches {
logger.Debug().Msgf(" trying batch for range [%d..%d]", cbl.arrowBatches[i].Start(), cbl.arrowBatches[i].End())
if cbl.arrowBatches[i].Contains(rowNumber) {
logger.Debug().Msgf(" found batch containing the requested row %d", rowNumber)
return cbl.arrowBatches[i], nil
}
}

logger.Debug().Msgf(" batch not found, trying to download more")

batchChan, _, err := cbl.fetcher.Start()
var emptyBatch SparkArrowBatch
if err != nil {
logger.Debug().Msgf(" no batch found for row %d", rowNumber)
return emptyBatch, dbsqlerrint.NewDriverError(cbl.ctx, errArrowRowsInvalidRowNumber(rowNumber), err)
}

Expand All @@ -143,17 +151,22 @@ func (cbl *batchLoader[T]) GetBatchFor(rowNumber int64) (SparkArrowBatch, dbsqle
if !ok {
err := cbl.fetcher.Err()
if err != nil {
logger.Debug().Msgf(" no batch found for row %d", rowNumber)
return emptyBatch, dbsqlerrint.NewDriverError(cbl.ctx, errArrowRowsInvalidRowNumber(rowNumber), err)
}
break
}

cbl.arrowBatches = append(cbl.arrowBatches, batch)
logger.Debug().Msgf(" trying newly downloaded batch for range [%d..%d]", batch.Start(), batch.End())
if batch.Contains(rowNumber) {
logger.Debug().Msgf(" found batch containing the requested row %d", rowNumber)
return batch, nil
}
}

logger.Debug().Msgf(" no batch found for row %d", rowNumber)

return emptyBatch, dbsqlerrint.NewDriverError(cbl.ctx, errArrowRowsInvalidRowNumber(rowNumber), err)
}

Expand Down

0 comments on commit 6ec1955

Please sign in to comment.