Skip to content

Commit

Permalink
Revert #2422 and release 2.302.1 runner (#2438)
Browse files Browse the repository at this point in the history
* Revert "Uploading step logs to Results as well  (#2422)" (#2437)

This reverts commit e979331.

* Release 2.302.1 runner.

---------

Co-authored-by: Yang Cao <[email protected]>
  • Loading branch information
TingluoHuang and yacaovsnc authored Feb 15, 2023
1 parent b831e03 commit 0c42dc1
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 349 deletions.
1 change: 0 additions & 1 deletion releaseNote.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
## Features
- Add support for ghe.com domain (#2420)
- Add docker cli to the runner image. (#2425)
- Uploading step logs to Results service (#2422)

## Bugs
- Fix URL construction bug for RunService (#2396)
Expand Down
2 changes: 1 addition & 1 deletion releaseVersion
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.302.0
2.302.1
14 changes: 2 additions & 12 deletions src/Runner.Common/JobServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public interface IJobServer : IRunnerService, IAsyncDisposable
Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken);
Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken);
Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken);
Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken);
Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken);
Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken);
Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken);
Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken);
Task<List<TimelineRecord>> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken);
Expand Down Expand Up @@ -317,7 +316,7 @@ public Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string h
return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken);
}

public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken)
public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
Expand All @@ -326,15 +325,6 @@ public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, str
throw new InvalidOperationException("Results client is not initialized.");
}

public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}


public Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken)
{
Expand Down
116 changes: 33 additions & 83 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IJobServerQueue : IRunnerService, IThrottlingReporter
void Start(Pipelines.AgentJobRequestMessage jobRequest);
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines = 0);
void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource);
void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord);
}

Expand All @@ -31,7 +31,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _delayForResultsUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _delayForSummaryUploadDequeue = TimeSpan.FromMilliseconds(1000);

// Job message information
private Guid _scopeIdentifier;
Expand All @@ -46,7 +46,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
// queue for file upload (log file or attachment)
private readonly ConcurrentQueue<UploadFileInfo> _fileUploadQueue = new();

private readonly ConcurrentQueue<ResultsUploadFileInfo> _resultsFileUploadQueue = new();
private readonly ConcurrentQueue<SummaryUploadFileInfo> _summaryFileUploadQueue = new();

// queue for timeline or timeline record update (one queue per timeline)
private readonly ConcurrentDictionary<Guid, ConcurrentQueue<TimelineRecord>> _timelineUpdateQueue = new();
Expand All @@ -60,7 +60,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
// Task for each queue's dequeue process
private Task _webConsoleLineDequeueTask;
private Task _fileUploadDequeueTask;
private Task _resultsUploadDequeueTask;
private Task _summaryUploadDequeueTask;
private Task _timelineUpdateDequeueTask;

// common
Expand Down Expand Up @@ -140,12 +140,12 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest)
_fileUploadDequeueTask = ProcessFilesUploadQueueAsync();

Trace.Info("Start results file upload queue.");
_resultsUploadDequeueTask = ProcessResultsUploadQueueAsync();
_summaryUploadDequeueTask = ProcessSummaryUploadQueueAsync();

Trace.Info("Start process timeline update queue.");
_timelineUpdateDequeueTask = ProcessTimelinesUpdateQueueAsync();

_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask };
_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _summaryUploadDequeueTask };
_queueInProcess = true;
}

Expand Down Expand Up @@ -176,9 +176,9 @@ public async Task ShutdownAsync()
await ProcessFilesUploadQueueAsync(runOnce: true);
Trace.Info("File upload queue drained.");

Trace.Verbose("Draining results upload queue.");
await ProcessResultsUploadQueueAsync(runOnce: true);
Trace.Info("Results upload queue drained.");
Trace.Verbose("Draining results summary upload queue.");
await ProcessSummaryUploadQueueAsync(runOnce: true);
Trace.Info("Results summary upload queue drained.");

// ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
// if there is any timeline records that failed to update contains output variabls.
Expand Down Expand Up @@ -230,31 +230,21 @@ public void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type,
_fileUploadQueue.Enqueue(newFile);
}

public void QueueResultsUpload(Guid recordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines)
public void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource)
{
if (recordId == _jobTimelineRecordId)
{
Trace.Verbose("Skipping job log {0} for record {1}", path, recordId);
return;
}

// all parameter not null, file path exist.
var newFile = new ResultsUploadFileInfo()
var newFile = new SummaryUploadFileInfo()
{
Name = name,
Path = path,
Type = type,
PlanId = _planId.ToString(),
JobId = _jobTimelineRecordId.ToString(),
RecordId = recordId,
DeleteSource = deleteSource,
Finalize = finalize,
FirstBlock = firstBlock,
TotalLines = totalLines,
StepId = stepRecordId.ToString(),
DeleteSource = deleteSource
};

Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, recordId);
_resultsFileUploadQueue.Enqueue(newFile);
Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, stepRecordId);
_summaryFileUploadQueue.Enqueue(newFile);
}

public void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord)
Expand Down Expand Up @@ -447,18 +437,18 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
}
}

private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false)
{
Trace.Info("Starting results-based upload queue...");

while (!_jobCompletionSource.Task.IsCompleted || runOnce)
{
List<ResultsUploadFileInfo> filesToUpload = new();
ResultsUploadFileInfo dequeueFile;
while (_resultsFileUploadQueue.TryDequeue(out dequeueFile))
List<SummaryUploadFileInfo> filesToUpload = new();
SummaryUploadFileInfo dequeueFile;
while (_summaryFileUploadQueue.TryDequeue(out dequeueFile))
{
filesToUpload.Add(dequeueFile);
// process at most 10 file uploads.
// process at most 10 file upload.
if (!runOnce && filesToUpload.Count > 10)
{
break;
Expand All @@ -469,30 +459,19 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
{
if (runOnce)
{
Trace.Info($"Uploading {filesToUpload.Count} file(s) in one shot through results service.");
Trace.Info($"Uploading {filesToUpload.Count} summary files in one shot through results service.");
}

int errorCount = 0;
foreach (var file in filesToUpload)
{
try
{
if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase))
{
await UploadSummaryFile(file);
}
else if (String.Equals(file.Type, CoreAttachmentType.ResultsLog, StringComparison.OrdinalIgnoreCase))
{
if (file.RecordId != _jobTimelineRecordId)
{
Trace.Info($"Got a step log file to send to results service.");
await UploadResultsStepLogFile(file);
}
}
await UploadSummaryFile(file);
}
catch (Exception ex)
{
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" };
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during summary file upload to results. {ex.Message}" };
issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure;

var telemetryRecord = new TimelineRecord()
Expand All @@ -502,13 +481,16 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
telemetryRecord.Issues.Add(issue);
QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord);

Trace.Info("Catch exception during file upload to results, keep going since the process is best effort.");
Trace.Info("Catch exception during summary file upload to results, keep going since the process is best effort.");
Trace.Error(ex);
}
finally
{
errorCount++;
}
}

Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
Trace.Info("Tried to upload {0} summary files to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
}

if (runOnce)
Expand All @@ -517,7 +499,7 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
}
else
{
await Task.Delay(_delayForResultsUploadDequeue);
await Task.Delay(_delayForSummaryUploadDequeue);
}
}
}
Expand Down Expand Up @@ -794,15 +776,15 @@ private async Task UploadFile(UploadFileInfo file)
}
}

private async Task UploadSummaryFile(ResultsUploadFileInfo file)
private async Task UploadSummaryFile(SummaryUploadFileInfo file)
{
bool uploadSucceed = false;
try
{
// Upload the step summary
Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}");
var cancellationTokenSource = new CancellationTokenSource();
await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, cancellationTokenSource.Token);
await _jobServer.CreateStepSymmaryAsync(file.PlanId, file.JobId, file.StepId, file.Path, cancellationTokenSource.Token);

uploadSucceed = true;
}
Expand All @@ -822,34 +804,6 @@ private async Task UploadSummaryFile(ResultsUploadFileInfo file)
}
}
}

private async Task UploadResultsStepLogFile(ResultsUploadFileInfo file)
{
bool uploadSucceed = false;
try
{
Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}");
var cancellationTokenSource = new CancellationTokenSource();
await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, cancellationTokenSource.Token);

uploadSucceed = true;
}
finally
{
if (uploadSucceed && file.DeleteSource)
{
try
{
File.Delete(file.Path);
}
catch (Exception ex)
{
Trace.Info("Exception encountered during deletion of a temporary file that was already successfully uploaded to results.");
Trace.Error(ex);
}
}
}
}
}

internal class PendingTimelineRecord
Expand All @@ -868,18 +822,14 @@ internal class UploadFileInfo
public bool DeleteSource { get; set; }
}

internal class ResultsUploadFileInfo
internal class SummaryUploadFileInfo
{
public string Name { get; set; }
public string Type { get; set; }
public string Path { get; set; }
public string PlanId { get; set; }
public string JobId { get; set; }
public Guid RecordId { get; set; }
public string StepId { get; set; }
public bool DeleteSource { get; set; }
public bool Finalize { get; set; }
public bool FirstBlock { get; set; }
public long TotalLines { get; set; }
}


Expand Down
Loading

0 comments on commit 0c42dc1

Please sign in to comment.