Skip to content

Commit

Permalink
Refine code
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Jul 10, 2024
1 parent aeb03a5 commit b65c005
Showing 1 changed file with 38 additions and 31 deletions.
69 changes: 38 additions & 31 deletions internal/rows/arrowbased/batchloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func (bi *cloudBatchIterator) Next() (SparkArrowBatch, error) {
ctx: bi.ctx,
useLz4Compression: bi.cfg.UseLz4Compression,
link: link,
resultChan: make(chan SparkArrowBatch),
errorChan: make(chan error),
resultChan: make(chan cloudFetchDownloadTaskResult),
minTimeToExpiry: bi.cfg.MinTimeToExpiry,
}
task.Run()
Expand All @@ -161,88 +160,96 @@ func (bi *cloudBatchIterator) Close() {
bi.downloadTasks.Clear() // Clear the list
}

type cloudFetchDownloadTaskResult struct {
batch SparkArrowBatch
err error
}

type cloudFetchDownloadTask struct {
ctx context.Context
useLz4Compression bool
minTimeToExpiry time.Duration
link *cli_service.TSparkArrowResultLink
resultChan chan SparkArrowBatch
errorChan chan error
resultChan chan cloudFetchDownloadTaskResult
}

func (cft *cloudFetchDownloadTask) GetResult() (SparkArrowBatch, error) {
link := cft.link

select {
case batch, ok := <-cft.resultChan:
if ok {
logger.Debug().Msgf(
"CloudFetch: received data for link at offset %d row count %d",
link.StartRowOffset,
link.RowCount,
)
return batch, nil
}
case err, ok := <-cft.errorChan:
if ok {
result, ok := <-cft.resultChan
if ok {
if result.err != nil {
logger.Debug().Msgf(
"CloudFetch: failed to download link at offset %d row count %d",
link.StartRowOffset,
link.RowCount,
)
return nil, err
return nil, result.err
}
logger.Debug().Msgf(
"CloudFetch: received data for link at offset %d row count %d",
link.StartRowOffset,
link.RowCount,
)
return result.batch, nil
}

logger.Debug().Msgf(
"CloudFetch: this should never happen; link at offset %d row count %d",
"CloudFetch: channel was closed before result was received; link at offset %d row count %d",
link.StartRowOffset,
link.RowCount,
)
return nil, nil // TODO: ???
return nil, nil // TODO: return error?
}

func (cft *cloudFetchDownloadTask) Run() {
go func() {
link := cft.link

logger.Debug().Msgf(
"CloudFetch: start downloading link at offset %d row count %d",
link.StartRowOffset,
link.RowCount,
cft.link.StartRowOffset,
cft.link.RowCount,
)
data, err := cft.fetchBatchBytes()
data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry)
if err != nil {
cft.errorChan <- err
cft.resultChan <- cloudFetchDownloadTaskResult{batch: nil, err: err}
return
}

// TODO: error handling?
defer data.Close()

logger.Debug().Msgf(
"CloudFetch: reading records for link at offset %d row count %d",
cft.link.StartRowOffset,
cft.link.RowCount,
)
reader := getReader(data, cft.useLz4Compression)

records, err := getArrowRecords(reader, cft.link.StartRowOffset)
if err != nil {
cft.errorChan <- err
cft.resultChan <- cloudFetchDownloadTaskResult{batch: nil, err: err}
return
}

batch := sparkArrowBatch{
Delimiter: rowscanner.NewDelimiter(cft.link.StartRowOffset, cft.link.RowCount),
arrowRecords: records,
}
cft.resultChan <- &batch
cft.resultChan <- cloudFetchDownloadTaskResult{batch: &batch, err: nil}
}()
}

func (cft *cloudFetchDownloadTask) fetchBatchBytes() (io.ReadCloser, error) {
if isLinkExpired(cft.link.ExpiryTime, cft.minTimeToExpiry) {
func fetchBatchBytes(
ctx context.Context,
link *cli_service.TSparkArrowResultLink,
minTimeToExpiry time.Duration,
) (io.ReadCloser, error) {
if isLinkExpired(link.ExpiryTime, minTimeToExpiry) {
return nil, errors.New(dbsqlerr.ErrLinkExpired)
}

// TODO: Retry on HTTP errors
req, err := http.NewRequestWithContext(cft.ctx, "GET", cft.link.FileLink, nil)
req, err := http.NewRequestWithContext(ctx, "GET", link.FileLink, nil)
if err != nil {
return nil, err
}
Expand All @@ -253,7 +260,7 @@ func (cft *cloudFetchDownloadTask) fetchBatchBytes() (io.ReadCloser, error) {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, dbsqlerrint.NewDriverError(cft.ctx, errArrowRowsCloudFetchDownloadFailure, err)
return nil, dbsqlerrint.NewDriverError(ctx, errArrowRowsCloudFetchDownloadFailure, err)
}

return res.Body, nil
Expand Down

0 comments on commit b65c005

Please sign in to comment.