Skip to content

Commit

Permalink
Handle errors and cancel the remaining tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Jul 10, 2024
1 parent b65c005 commit c629bae
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions internal/rows/arrowbased/batchloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ func (bi *cloudBatchIterator) Next() (SparkArrowBatch, error) {
link.StartRowOffset,
link.RowCount,
)

cancelCtx, cancelFn := context.WithCancel(bi.ctx)
task := &cloudFetchDownloadTask{
ctx: bi.ctx,
ctx: cancelCtx,
cancel: cancelFn,
useLz4Compression: bi.cfg.UseLz4Compression,
link: link,
resultChan: make(chan cloudFetchDownloadTaskResult),
Expand All @@ -147,17 +150,29 @@ func (bi *cloudBatchIterator) Next() (SparkArrowBatch, error) {
return nil, io.EOF
}

return task.GetResult()
batch, err := task.GetResult()

// once we've got an errored out task - cancel the remaining ones
if err != nil {
bi.Close()
return nil, err
}

// explicitly call cancel function on successfully completed task to avoid context leak
task.cancel()
return batch, nil
}

func (bi *cloudBatchIterator) HasNext() bool {
return (bi.pendingLinks.Len() > 0) || (bi.downloadTasks.Len() > 0)
}

func (bi *cloudBatchIterator) Close() {
bi.pendingLinks.Clear() // Clear the list
// TODO: Cancel all download tasks
bi.downloadTasks.Clear() // Clear the list
bi.pendingLinks.Clear()
for bi.downloadTasks.Len() > 0 {
task := bi.downloadTasks.Dequeue()
task.cancel()
}
}

type cloudFetchDownloadTaskResult struct {
Expand All @@ -167,6 +182,7 @@ type cloudFetchDownloadTaskResult struct {

type cloudFetchDownloadTask struct {
ctx context.Context
cancel context.CancelFunc
useLz4Compression bool
minTimeToExpiry time.Duration
link *cli_service.TSparkArrowResultLink
Expand All @@ -180,9 +196,10 @@ func (cft *cloudFetchDownloadTask) GetResult() (SparkArrowBatch, error) {
if ok {
if result.err != nil {
logger.Debug().Msgf(
"CloudFetch: failed to download link at offset %d row count %d",
"CloudFetch: failed to download link at offset %d row count %d, reason: %s",
link.StartRowOffset,
link.RowCount,
result.err.Error(),
)
return nil, result.err
}
Expand Down

0 comments on commit c629bae

Please sign in to comment.