From 6ec1955fcecd71e7e19e2b17ac3a394181965af0 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Wed, 5 Jun 2024 01:46:12 +0300 Subject: [PATCH] Add more debug logging for CloudFetch (#227) Signed-off-by: Levko Kravets --- internal/rows/arrowbased/arrowRows.go | 5 +++++ internal/rows/arrowbased/batchloader.go | 13 +++++++++++++ 2 files changed, 18 insertions(+) diff --git a/internal/rows/arrowbased/arrowRows.go b/internal/rows/arrowbased/arrowRows.go index 164d9ff..89fe9b9 100644 --- a/internal/rows/arrowbased/arrowRows.go +++ b/internal/rows/arrowbased/arrowRows.go @@ -115,7 +115,12 @@ func NewArrowRowScanner(resultSetMetadata *cli_service.TGetResultSetMetadataResp var bl BatchLoader var err2 dbsqlerr.DBError if len(rowSet.ResultLinks) > 0 { + logger.Debug().Msgf("Initialize CloudFetch loader, row set start offset: %d, file list:", rowSet.StartRowOffset) + for _, resultLink := range rowSet.ResultLinks { + logger.Debug().Msgf("- start row offset: %d, row count: %d", resultLink.StartRowOffset, resultLink.RowCount) + } bl, err2 = NewCloudBatchLoader(context.Background(), rowSet.ResultLinks, rowSet.StartRowOffset, cfg) + logger.Debug().Msgf("Created CloudFetch concurrent loader, rows range [%d..%d]", bl.Start(), bl.End()) } else { bl, err2 = NewLocalBatchLoader(context.Background(), rowSet.ArrowBatches, rowSet.StartRowOffset, schemaBytes, cfg) } diff --git a/internal/rows/arrowbased/batchloader.go b/internal/rows/arrowbased/batchloader.go index 0c6d3be..e36153a 100644 --- a/internal/rows/arrowbased/batchloader.go +++ b/internal/rows/arrowbased/batchloader.go @@ -18,6 +18,7 @@ import ( "github.com/databricks/databricks-sql-go/internal/cli_service" dbsqlerrint "github.com/databricks/databricks-sql-go/internal/errors" "github.com/databricks/databricks-sql-go/internal/fetcher" + "github.com/databricks/databricks-sql-go/logger" ) type BatchIterator interface { @@ -126,15 +127,22 @@ var _ BatchLoader = (*batchLoader[*localBatch])(nil) func (cbl *batchLoader[T]) GetBatchFor(rowNumber int64) (SparkArrowBatch, dbsqlerr.DBError) { + logger.Debug().Msgf("batchLoader.GetBatchFor(%d)", rowNumber) + for i := range cbl.arrowBatches { + logger.Debug().Msgf(" trying batch for range [%d..%d]", cbl.arrowBatches[i].Start(), cbl.arrowBatches[i].End()) if cbl.arrowBatches[i].Contains(rowNumber) { + logger.Debug().Msgf(" found batch containing the requested row %d", rowNumber) return cbl.arrowBatches[i], nil } } + logger.Debug().Msgf(" batch not found, trying to download more") + batchChan, _, err := cbl.fetcher.Start() var emptyBatch SparkArrowBatch if err != nil { + logger.Debug().Msgf(" no batch found for row %d", rowNumber) return emptyBatch, dbsqlerrint.NewDriverError(cbl.ctx, errArrowRowsInvalidRowNumber(rowNumber), err) } @@ -143,17 +151,22 @@ func (cbl *batchLoader[T]) GetBatchFor(rowNumber int64) (SparkArrowBatch, dbsqle if !ok { err := cbl.fetcher.Err() if err != nil { + logger.Debug().Msgf(" no batch found for row %d", rowNumber) return emptyBatch, dbsqlerrint.NewDriverError(cbl.ctx, errArrowRowsInvalidRowNumber(rowNumber), err) } break } cbl.arrowBatches = append(cbl.arrowBatches, batch) + logger.Debug().Msgf(" trying newly downloaded batch for range [%d..%d]", batch.Start(), batch.End()) if batch.Contains(rowNumber) { + logger.Debug().Msgf(" found batch containing the requested row %d", rowNumber) return batch, nil } } + logger.Debug().Msgf(" no batch found for row %d", rowNumber) + return emptyBatch, dbsqlerrint.NewDriverError(cbl.ctx, errArrowRowsInvalidRowNumber(rowNumber), err) }