From 0c42dc146962205994adae74d69d1452d9c03efc Mon Sep 17 00:00:00 2001 From: Tingluo Huang Date: Wed, 15 Feb 2023 11:25:32 -0500 Subject: [PATCH] Revert #2422 and release 2.302.1 runner (#2438) * Revert "Uploading step logs to Results as well (#2422)" (#2437) This reverts commit e979331be42e147b2d1b398b94f7807162e13138. * Release 2.302.1 runner. --------- Co-authored-by: Yang Cao --- releaseNote.md | 1 - releaseVersion | 2 +- src/Runner.Common/JobServer.cs | 14 +- src/Runner.Common/JobServerQueue.cs | 116 +++++----------- src/Runner.Common/Logging.cs | 63 ++------- src/Runner.Worker/ExecutionContext.cs | 2 +- src/Sdk/DTWebApi/WebApi/TaskAttachment.cs | 1 - src/Sdk/WebApi/WebApi/Contracts.cs | 48 ------- src/Sdk/WebApi/WebApi/ResultsHttpClient.cs | 151 +-------------------- src/runnerversion | 2 +- 10 files changed, 51 insertions(+), 349 deletions(-) diff --git a/releaseNote.md b/releaseNote.md index 0c47fbc92d6..2e7367c7a70 100644 --- a/releaseNote.md +++ b/releaseNote.md @@ -1,7 +1,6 @@ ## Features - Add support for ghe.com domain (#2420) - Add docker cli to the runner image. (#2425) -- Uploading step logs to Results service (#2422) ## Bugs - Fix URL construction bug for RunService (#2396) diff --git a/releaseVersion b/releaseVersion index ed039ac8fab..22155b3fbc4 100644 --- a/releaseVersion +++ b/releaseVersion @@ -1 +1 @@ -2.302.0 +2.302.1 diff --git a/src/Runner.Common/JobServer.cs b/src/Runner.Common/JobServer.cs index 085cc84979d..8a6c4a6a76e 100644 --- a/src/Runner.Common/JobServer.cs +++ b/src/Runner.Common/JobServer.cs @@ -30,8 +30,7 @@ public interface IJobServer : IRunnerService, IAsyncDisposable Task AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken); Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long? startLine, CancellationToken cancellationToken); Task CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken); - Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken); - Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken); + Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken); Task CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken); Task CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken); Task> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable records, CancellationToken cancellationToken); @@ -317,7 +316,7 @@ public Task CreateAttachmentAsync(Guid scopeIdentifier, string h return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken); } - public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken) + public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken) { if (_resultsClient != null) { @@ -326,15 +325,6 @@ public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, str throw new InvalidOperationException("Results client is not initialized."); } - public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) - { - if (_resultsClient != null) - { - return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken); - } - throw new InvalidOperationException("Results client is not initialized."); - } - public Task CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken) { diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index bc20fed4404..6440da73607 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -20,7 +20,7 @@ public interface IJobServerQueue : IRunnerService, IThrottlingReporter void Start(Pipelines.AgentJobRequestMessage jobRequest); void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null); void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource); - void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines = 0); + void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource); void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord); } @@ -31,7 +31,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000); - private static readonly TimeSpan _delayForResultsUploadDequeue = TimeSpan.FromMilliseconds(1000); + private static readonly TimeSpan _delayForSummaryUploadDequeue = TimeSpan.FromMilliseconds(1000); // Job message information private Guid _scopeIdentifier; @@ -46,7 +46,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue // queue for file upload (log file or attachment) private readonly ConcurrentQueue _fileUploadQueue = new(); - private readonly ConcurrentQueue _resultsFileUploadQueue = new(); + private readonly ConcurrentQueue _summaryFileUploadQueue = new(); // queue for timeline or timeline record update (one queue per timeline) private readonly ConcurrentDictionary> _timelineUpdateQueue = new(); @@ -60,7 +60,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue // Task for each queue's dequeue process private Task _webConsoleLineDequeueTask; private Task _fileUploadDequeueTask; - private Task _resultsUploadDequeueTask; + private Task _summaryUploadDequeueTask; private Task _timelineUpdateDequeueTask; // common @@ -140,12 +140,12 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest) _fileUploadDequeueTask = ProcessFilesUploadQueueAsync(); Trace.Info("Start results file upload queue."); - _resultsUploadDequeueTask = ProcessResultsUploadQueueAsync(); + _summaryUploadDequeueTask = ProcessSummaryUploadQueueAsync(); Trace.Info("Start process timeline update queue."); _timelineUpdateDequeueTask = ProcessTimelinesUpdateQueueAsync(); - _allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask }; + _allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _summaryUploadDequeueTask }; _queueInProcess = true; } @@ -176,9 +176,9 @@ public async Task ShutdownAsync() await ProcessFilesUploadQueueAsync(runOnce: true); Trace.Info("File upload queue drained."); - Trace.Verbose("Draining results upload queue."); - await ProcessResultsUploadQueueAsync(runOnce: true); - Trace.Info("Results upload queue drained."); + Trace.Verbose("Draining results summary upload queue."); + await ProcessSummaryUploadQueueAsync(runOnce: true); + Trace.Info("Results summary upload queue drained."); // ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown // if there is any timeline records that failed to update contains output variabls. @@ -230,31 +230,21 @@ public void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, _fileUploadQueue.Enqueue(newFile); } - public void QueueResultsUpload(Guid recordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines) + public void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource) { - if (recordId == _jobTimelineRecordId) - { - Trace.Verbose("Skipping job log {0} for record {1}", path, recordId); - return; - } - // all parameter not null, file path exist. - var newFile = new ResultsUploadFileInfo() + var newFile = new SummaryUploadFileInfo() { Name = name, Path = path, - Type = type, PlanId = _planId.ToString(), JobId = _jobTimelineRecordId.ToString(), - RecordId = recordId, - DeleteSource = deleteSource, - Finalize = finalize, - FirstBlock = firstBlock, - TotalLines = totalLines, + StepId = stepRecordId.ToString(), + DeleteSource = deleteSource }; - Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, recordId); - _resultsFileUploadQueue.Enqueue(newFile); + Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, stepRecordId); + _summaryFileUploadQueue.Enqueue(newFile); } public void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord) @@ -447,18 +437,18 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false) } } - private async Task ProcessResultsUploadQueueAsync(bool runOnce = false) + private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false) { Trace.Info("Starting results-based upload queue..."); while (!_jobCompletionSource.Task.IsCompleted || runOnce) { - List filesToUpload = new(); - ResultsUploadFileInfo dequeueFile; - while (_resultsFileUploadQueue.TryDequeue(out dequeueFile)) + List filesToUpload = new(); + SummaryUploadFileInfo dequeueFile; + while (_summaryFileUploadQueue.TryDequeue(out dequeueFile)) { filesToUpload.Add(dequeueFile); - // process at most 10 file uploads. + // process at most 10 file upload. if (!runOnce && filesToUpload.Count > 10) { break; @@ -469,7 +459,7 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false) { if (runOnce) { - Trace.Info($"Uploading {filesToUpload.Count} file(s) in one shot through results service."); + Trace.Info($"Uploading {filesToUpload.Count} summary files in one shot through results service."); } int errorCount = 0; @@ -477,22 +467,11 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false) { try { - if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase)) - { - await UploadSummaryFile(file); - } - else if (String.Equals(file.Type, CoreAttachmentType.ResultsLog, StringComparison.OrdinalIgnoreCase)) - { - if (file.RecordId != _jobTimelineRecordId) - { - Trace.Info($"Got a step log file to send to results service."); - await UploadResultsStepLogFile(file); - } - } + await UploadSummaryFile(file); } catch (Exception ex) { - var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" }; + var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during summary file upload to results. {ex.Message}" }; issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure; var telemetryRecord = new TimelineRecord() @@ -502,13 +481,16 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false) telemetryRecord.Issues.Add(issue); QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord); - Trace.Info("Catch exception during file upload to results, keep going since the process is best effort."); + Trace.Info("Catch exception during summary file upload to results, keep going since the process is best effort."); Trace.Error(ex); + } + finally + { errorCount++; } } - Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount); + Trace.Info("Tried to upload {0} summary files to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount); } if (runOnce) @@ -517,7 +499,7 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false) } else { - await Task.Delay(_delayForResultsUploadDequeue); + await Task.Delay(_delayForSummaryUploadDequeue); } } } @@ -794,7 +776,7 @@ private async Task UploadFile(UploadFileInfo file) } } - private async Task UploadSummaryFile(ResultsUploadFileInfo file) + private async Task UploadSummaryFile(SummaryUploadFileInfo file) { bool uploadSucceed = false; try @@ -802,7 +784,7 @@ private async Task UploadSummaryFile(ResultsUploadFileInfo file) // Upload the step summary Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}"); var cancellationTokenSource = new CancellationTokenSource(); - await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, cancellationTokenSource.Token); + await _jobServer.CreateStepSymmaryAsync(file.PlanId, file.JobId, file.StepId, file.Path, cancellationTokenSource.Token); uploadSucceed = true; } @@ -822,34 +804,6 @@ private async Task UploadSummaryFile(ResultsUploadFileInfo file) } } } - - private async Task UploadResultsStepLogFile(ResultsUploadFileInfo file) - { - bool uploadSucceed = false; - try - { - Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}"); - var cancellationTokenSource = new CancellationTokenSource(); - await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, cancellationTokenSource.Token); - - uploadSucceed = true; - } - finally - { - if (uploadSucceed && file.DeleteSource) - { - try - { - File.Delete(file.Path); - } - catch (Exception ex) - { - Trace.Info("Exception encountered during deletion of a temporary file that was already successfully uploaded to results."); - Trace.Error(ex); - } - } - } - } } internal class PendingTimelineRecord @@ -868,18 +822,14 @@ internal class UploadFileInfo public bool DeleteSource { get; set; } } - internal class ResultsUploadFileInfo + internal class SummaryUploadFileInfo { public string Name { get; set; } - public string Type { get; set; } public string Path { get; set; } public string PlanId { get; set; } public string JobId { get; set; } - public Guid RecordId { get; set; } + public string StepId { get; set; } public bool DeleteSource { get; set; } - public bool Finalize { get; set; } - public bool FirstBlock { get; set; } - public long TotalLines { get; set; } } diff --git a/src/Runner.Common/Logging.cs b/src/Runner.Common/Logging.cs index 8cf23749d57..40be5cdcff0 100644 --- a/src/Runner.Common/Logging.cs +++ b/src/Runner.Common/Logging.cs @@ -32,19 +32,6 @@ public class PagingLogger : RunnerService, IPagingLogger private string _pagesFolder; private IJobServerQueue _jobServerQueue; - // For Results - public static string BlocksFolder = "blocks"; - - // 2 MB - public const int BlockSize = 2 * 1024 * 1024; - - private string _resultsDataFileName; - private FileStream _resultsBlockData; - private StreamWriter _resultsBlockWriter; - private string _resultsBlockFolder; - private int _blockByteCount; - private int _blockCount; - public long TotalLines => _totalLines; public override void Initialize(IHostContext hostContext) @@ -52,10 +39,8 @@ public override void Initialize(IHostContext hostContext) base.Initialize(hostContext); _totalLines = 0; _pagesFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), PagingFolder); - Directory.CreateDirectory(_pagesFolder); - _resultsBlockFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), BlocksFolder); - Directory.CreateDirectory(_resultsBlockFolder); _jobServerQueue = HostContext.GetService(); + Directory.CreateDirectory(_pagesFolder); } public void Setup(Guid timelineId, Guid timelineRecordId) @@ -75,17 +60,11 @@ public void Write(string message) // lazy creation on write if (_pageWriter == null) { - NewPage(); - } - - if (_resultsBlockWriter == null) - { - NewBlock(); + Create(); } string line = $"{DateTime.UtcNow.ToString("O")} {message}"; _pageWriter.WriteLine(line); - _resultsBlockWriter.WriteLine(line); _totalLines++; if (line.IndexOf('\n') != -1) @@ -99,25 +78,21 @@ public void Write(string message) } } - var bytes = System.Text.Encoding.UTF8.GetByteCount(line); - _byteCount += bytes; - _blockByteCount += bytes; + _byteCount += System.Text.Encoding.UTF8.GetByteCount(line); if (_byteCount >= PageSize) { NewPage(); } - - if (_blockByteCount >= BlockSize) - { - NewBlock(); - } - } public void End() { EndPage(); - EndBlock(true); + } + + private void Create() + { + NewPage(); } private void NewPage() @@ -142,27 +117,5 @@ private void EndPage() _jobServerQueue.QueueFileUpload(_timelineId, _timelineRecordId, "DistributedTask.Core.Log", "CustomToolLog", _dataFileName, true); } } - - private void NewBlock() - { - EndBlock(false); - _blockByteCount = 0; - _resultsDataFileName = Path.Combine(_resultsBlockFolder, $"{_timelineId}_{_timelineRecordId}.{++_blockCount}"); - _resultsBlockData = new FileStream(_resultsDataFileName, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.ReadWrite); - _resultsBlockWriter = new StreamWriter(_resultsBlockData, System.Text.Encoding.UTF8); - } - - private void EndBlock(bool finalize) - { - if (_resultsBlockWriter != null) - { - _resultsBlockWriter.Flush(); - _resultsBlockData.Flush(); - _resultsBlockWriter.Dispose(); - _resultsBlockWriter = null; - _resultsBlockData = null; - _jobServerQueue.QueueResultsUpload(_timelineRecordId, "ResultsLog", _resultsDataFileName, "Results.Core.Log", deleteSource: true, finalize, firstBlock: _resultsDataFileName.EndsWith(".1"), totalLines: _totalLines); - } - } } } diff --git a/src/Runner.Worker/ExecutionContext.cs b/src/Runner.Worker/ExecutionContext.cs index 8d06a147e13..d12cb8e343c 100644 --- a/src/Runner.Worker/ExecutionContext.cs +++ b/src/Runner.Worker/ExecutionContext.cs @@ -872,7 +872,7 @@ public void QueueSummaryFile(string name, string filePath, Guid stepRecordId) throw new FileNotFoundException($"Can't upload (name:{name}) file: {filePath}. File does not exist."); } - _jobServerQueue.QueueResultsUpload(stepRecordId, name, filePath, ChecksAttachmentType.StepSummary, deleteSource: false, finalize: false, firstBlock: false); + _jobServerQueue.QueueSummaryUpload(stepRecordId, name, filePath, deleteSource: false); } // Add OnMatcherChanged diff --git a/src/Sdk/DTWebApi/WebApi/TaskAttachment.cs b/src/Sdk/DTWebApi/WebApi/TaskAttachment.cs index 572b1f6426b..17027d1261a 100644 --- a/src/Sdk/DTWebApi/WebApi/TaskAttachment.cs +++ b/src/Sdk/DTWebApi/WebApi/TaskAttachment.cs @@ -100,7 +100,6 @@ public class CoreAttachmentType public static readonly String Summary = "DistributedTask.Core.Summary"; public static readonly String FileAttachment = "DistributedTask.Core.FileAttachment"; public static readonly String DiagnosticLog = "DistributedTask.Core.DiagnosticLog"; - public static readonly String ResultsLog = "Results.Core.Log"; } [GenerateAllConstants] diff --git a/src/Sdk/WebApi/WebApi/Contracts.cs b/src/Sdk/WebApi/WebApi/Contracts.cs index bc6361d62bc..d240cc1e230 100644 --- a/src/Sdk/WebApi/WebApi/Contracts.cs +++ b/src/Sdk/WebApi/WebApi/Contracts.cs @@ -28,30 +28,6 @@ public class GetSignedStepSummaryURLResponse public string BlobStorageType; } - [DataContract] - [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] - public class GetSignedStepLogsURLRequest - { - [DataMember] - public string WorkflowJobRunBackendId; - [DataMember] - public string WorkflowRunBackendId; - [DataMember] - public string StepBackendId; - } - - [DataContract] - [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] - public class GetSignedStepLogsURLResponse - { - [DataMember] - public string LogsUrl; - [DataMember] - public long SoftSizeLimit; - [DataMember] - public string BlobStorageType; - } - [DataContract] [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] public class StepSummaryMetadataCreate @@ -76,30 +52,6 @@ public class CreateStepSummaryMetadataResponse public bool Ok; } - [DataContract] - [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] - public class StepLogsMetadataCreate - { - [DataMember] - public string StepBackendId; - [DataMember] - public string WorkflowRunBackendId; - [DataMember] - public string WorkflowJobRunBackendId; - [DataMember] - public string UploadedAt; - [DataMember] - public long LineCount; - } - - [DataContract] - [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] - public class CreateStepLogsMetadataResponse - { - [DataMember] - public bool Ok; - } - public static class BlobStorageTypes { public static readonly string AzureBlobStorage = "BLOB_STORAGE_TYPE_AZURE"; diff --git a/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs b/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs index 989bc239563..77a733eaeba 100644 --- a/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs @@ -24,13 +24,13 @@ public ResultsHttpClient( m_formatter = new JsonMediaTypeFormatter(); } - public async Task GetStepSummaryUploadUrlAsync(string planId, string jobId, Guid stepId, CancellationToken cancellationToken) + public async Task GetStepSummaryUploadUrlAsync(string planId, string jobId, string stepId, CancellationToken cancellationToken) { var request = new GetSignedStepSummaryURLRequest() { WorkflowJobRunBackendId= jobId, WorkflowRunBackendId= planId, - StepBackendId= stepId.ToString() + StepBackendId= stepId }; var stepSummaryUploadRequest = new Uri(m_resultsServiceUrl, "twirp/results.services.receiver.Receiver/GetStepSummarySignedBlobURL"); @@ -51,41 +51,14 @@ public async Task GetStepSummaryUploadUrlAsync( } } - public async Task GetStepLogUploadUrlAsync(string planId, string jobId, Guid stepId, CancellationToken cancellationToken) - { - var request = new GetSignedStepLogsURLRequest() - { - WorkflowJobRunBackendId= jobId, - WorkflowRunBackendId= planId, - StepBackendId= stepId.ToString(), - }; - - var stepLogsUploadRequest = new Uri(m_resultsServiceUrl, "twirp/results.services.receiver.Receiver/GetStepLogsSignedBlobURL"); - - using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, stepLogsUploadRequest)) - { - requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); - requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); - - using (HttpContent content = new ObjectContent(request, m_formatter)) - { - requestMessage.Content = content; - using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) - { - return await ReadJsonContentAsync(response, cancellationToken); - } - } - } - } - - private async Task StepSummaryUploadCompleteAsync(string planId, string jobId, Guid stepId, long size, CancellationToken cancellationToken) + private async Task StepSummaryUploadCompleteAsync(string planId, string jobId, string stepId, long size, CancellationToken cancellationToken) { var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd'T'HH:mm:ss.fffK"); var request = new StepSummaryMetadataCreate() { WorkflowJobRunBackendId= jobId, WorkflowRunBackendId= planId, - StepBackendId = stepId.ToString(), + StepBackendId = stepId, Size = size, UploadedAt = timestamp }; @@ -112,40 +85,6 @@ private async Task StepSummaryUploadCompleteAsync(string planId, string jobId, G } } - private async Task StepLogUploadCompleteAsync(string planId, string jobId, Guid stepId, long lineCount, CancellationToken cancellationToken) - { - var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd'T'HH:mm:ss.fffK"); - var request = new StepLogsMetadataCreate() - { - WorkflowJobRunBackendId= jobId, - WorkflowRunBackendId= planId, - StepBackendId = stepId.ToString(), - UploadedAt = timestamp, - LineCount = lineCount, - }; - - var stepLogsUploadCompleteRequest = new Uri(m_resultsServiceUrl, "twirp/results.services.receiver.Receiver/CreateStepLogsMetadata"); - - using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, stepLogsUploadCompleteRequest)) - { - requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); - requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); - - using (HttpContent content = new ObjectContent(request, m_formatter)) - { - requestMessage.Content = content; - using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) - { - var jsonResponse = await ReadJsonContentAsync(response, cancellationToken); - if (!jsonResponse.Ok) - { - throw new Exception($"Failed to mark step log upload as complete, status code: {response.StatusCode}, ok: {jsonResponse.Ok}, timestamp: {timestamp}"); - } - } - } - } - } - private async Task UploadFileAsync(string url, string blobStorageType, FileStream file, CancellationToken cancellationToken) { // Upload the file to the url @@ -169,55 +108,8 @@ private async Task UploadFileAsync(string url, string blobS } } - private async Task CreateAppendFileAsync(string url, string blobStorageType, CancellationToken cancellationToken) - { - var request = new HttpRequestMessage(HttpMethod.Put, url) - { - Content = new StringContent("") - }; - if (blobStorageType == BlobStorageTypes.AzureBlobStorage) - { - request.Content.Headers.Add("x-ms-blob-type", "AppendBlob"); - request.Content.Headers.Add("Content-Length", "0"); - } - - using (var response = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, userState: null, cancellationToken)) - { - if (!response.IsSuccessStatusCode) - { - throw new Exception($"Failed to create append file, status code: {response.StatusCode}, reason: {response.ReasonPhrase}"); - } - return response; - } - } - - private async Task UploadAppendFileAsync(string url, string blobStorageType, FileStream file, bool finalize, long fileSize, CancellationToken cancellationToken) - { - var comp = finalize ? "&comp=appendblock&seal=true" : "&comp=appendblock"; - // Upload the file to the url - var request = new HttpRequestMessage(HttpMethod.Put, url + comp) - { - Content = new StreamContent(file) - }; - - if (blobStorageType == BlobStorageTypes.AzureBlobStorage) - { - request.Content.Headers.Add("Content-Length", fileSize.ToString()); - request.Content.Headers.Add("x-ms-blob-sealed", finalize.ToString()); - } - - using (var response = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, userState: null, cancellationToken)) - { - if (!response.IsSuccessStatusCode) - { - throw new Exception($"Failed to upload append file, status code: {response.StatusCode}, reason: {response.ReasonPhrase}, object: {response}, fileSize: {fileSize}"); - } - return response; - } - } - // Handle file upload for step summary - public async Task UploadStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken) + public async Task UploadStepSummaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken) { // Get the upload url var uploadUrlResponse = await GetStepSummaryUploadUrlAsync(planId, jobId, stepId, cancellationToken); @@ -243,39 +135,6 @@ public async Task UploadStepSummaryAsync(string planId, string jobId, Guid stepI await StepSummaryUploadCompleteAsync(planId, jobId, stepId, fileSize, cancellationToken); } - // Handle file upload for step log - public async Task UploadResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) - { - // Get the upload url - var uploadUrlResponse = await GetStepLogUploadUrlAsync(planId, jobId, stepId, cancellationToken); - if (uploadUrlResponse == null || uploadUrlResponse.LogsUrl == null) - { - throw new Exception("Failed to get step log upload url"); - } - - // Do we want to throw an exception here or should we just be uploading/truncating the data - var fileSize = new FileInfo(file).Length; - - // Create the Append blob - if (firstBlock) - { - await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken); - } - - // Upload content - using (var fileStream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true)) - { - var response = await UploadAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, fileStream, finalize, fileSize, cancellationToken); - } - - // Update metadata - if (finalize) - { - // Send step log upload complete message - await StepLogUploadCompleteAsync(planId, jobId, stepId, lineCount, cancellationToken); - } - } - private MediaTypeFormatter m_formatter; private Uri m_resultsServiceUrl; private string m_token; diff --git a/src/runnerversion b/src/runnerversion index ed039ac8fab..22155b3fbc4 100644 --- a/src/runnerversion +++ b/src/runnerversion @@ -1 +1 @@ -2.302.0 +2.302.1