Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Append tes tasks to JSONL #360

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/TesApi.Web/AzureProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -20,6 +21,7 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Rest;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Polly;
Expand Down Expand Up @@ -672,6 +674,49 @@ public async Task<string> GetStorageAccountKeyAsync(StorageAccountInfo storageAc
}
}

public async Task UploadBlockToTesTasksAppendBlobAsync(Uri containerAbsoluteUri, string content, CancellationToken cancellationToken)
{
var container = new CloudBlobContainer(containerAbsoluteUri);
CloudAppendBlob appendBlob = null;
const int maxBlocks = 50000;

for (int i = 0; i < int.MaxValue; i++)
{
var blobName = $"testasks{i}.jsonl";
appendBlob = container.GetAppendBlobReference(blobName);

// If the blob exists and has less than the max number of blocks, use it.
if (await appendBlob.ExistsAsync(null, null, cancellationToken))
{
await appendBlob.FetchAttributesAsync(null, null, null, cancellationToken);

if (appendBlob.Properties.AppendBlobCommittedBlockCount < maxBlocks)
{
break;
}

continue; // If the blob has max blocks, continue to the next blob.
}

// If the blob does not exist, try creating it.
try
{
await appendBlob.CreateOrReplaceAsync(null, null, null, cancellationToken);
break; // If successful, break the loop.
}
catch (StorageException ex) when (ex.RequestInformation?.HttpStatusCode == 409) // Conflict
{
// If there's a conflict, another process might have created the blob, so continue the loop.
continue;
}
}

// Append the text content to the blob.
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(content));
await appendBlob.AppendBlockAsync(stream, null, null, null, null, cancellationToken);
}


/// <inheritdoc/>
public Task UploadBlobAsync(Uri blobAbsoluteUri, string content, CancellationToken cancellationToken)
=> new CloudBlockBlob(blobAbsoluteUri).UploadTextAsync(content, null, null, null, null, cancellationToken);
Expand Down
5 changes: 5 additions & 0 deletions src/TesApi.Web/CachingWithRetriesAzureProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,5 +229,10 @@ public Task DeleteBatchPoolIfExistsAsync(string poolId, CancellationToken cancel

/// <inheritdoc/>
public Task EnableBatchPoolAutoScaleAsync(string poolId, bool preemptable, TimeSpan interval, IAzureProxy.BatchPoolAutoScaleFormulaFactory formulaFactory, CancellationToken cancellationToken) => azureProxy.EnableBatchPoolAutoScaleAsync(poolId, preemptable, interval, formulaFactory, cancellationToken);

public Task UploadBlockToTesTasksAppendBlobAsync(Uri containerAbsoluteUri, string content, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}
}
9 changes: 9 additions & 0 deletions src/TesApi.Web/IAzureProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,5 +324,14 @@ public interface IAzureProxy
/// <param name="currentTarget">Current number of compute nodes.</param>
/// <returns></returns>
delegate string BatchPoolAutoScaleFormulaFactory(bool preemptable, int currentTarget);

/// <summary>
/// Appends TES task JSON to an existing blob, or creates a new one
/// </summary>
/// <param name="containerAbsoluteUri"></param>
/// <param name="content"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task UploadBlockToTesTasksAppendBlobAsync(Uri containerAbsoluteUri, string content, CancellationToken cancellationToken);
}
}
12 changes: 10 additions & 2 deletions src/TesApi.Web/Scheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
// Licensed under the MIT License.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Tes.Extensions;
using Tes.Models;
using Tes.Repository;
Expand All @@ -25,17 +27,18 @@ public class Scheduler : BackgroundService
private readonly IBatchScheduler batchScheduler;
private readonly ILogger<Scheduler> logger;
private readonly TimeSpan runInterval = TimeSpan.FromSeconds(5);

private readonly ITesTaskAppender tesTaskAppender;
/// <summary>
/// Default constructor
/// </summary>
/// <param name="repository">The main TES task database repository implementation</param>
/// <param name="batchScheduler">The batch scheduler implementation</param>
/// <param name="logger">The logger instance</param>
public Scheduler(IRepository<TesTask> repository, IBatchScheduler batchScheduler, ILogger<Scheduler> logger)
public Scheduler(IRepository<TesTask> repository, IBatchScheduler batchScheduler, ITesTaskAppender tesTaskAppender, ILogger<Scheduler> logger)
{
this.repository = repository;
this.batchScheduler = batchScheduler;
this.tesTaskAppender = tesTaskAppender;
this.logger = logger;
}

Expand Down Expand Up @@ -199,6 +202,11 @@ private async ValueTask OrchestrateTesTasksOnBatch(CancellationToken stoppingTok
}

await repository.UpdateItemAsync(tesTask, stoppingToken);

if (!tesTask.IsActiveState())
{
tesTaskAppender.Append(tesTask);
}
}
}
catch (RepositoryCollisionException exc)
Expand Down
66 changes: 66 additions & 0 deletions src/TesApi.Web/TesTaskAppender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Tes.Models;

namespace TesApi.Web
{
/// <inheritdoc/>
public interface ITesTaskAppender
{
/// <inheritdoc/>
void Append(TesTask tesTask);
}

/// <inheritdoc/>
public class TesTaskAppender : BackgroundService, ITesTaskAppender
{
private readonly ConcurrentQueue<TesTask> tesTaskJsonAppendQueue = new ConcurrentQueue<TesTask>();
private readonly ILogger<TesTaskAppender> logger;
private readonly IAzureProxy azureProxy;

/// <inheritdoc/>
public TesTaskAppender(ILogger<TesTaskAppender> logger, IAzureProxy azureProxy)
{
this.logger = logger;
this.azureProxy = azureProxy;
}

/// <inheritdoc/>
public void Append(TesTask tesTask)
{
tesTaskJsonAppendQueue.Enqueue(tesTask);
}

/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
while (!stoppingToken.IsCancellationRequested && tesTaskJsonAppendQueue.TryDequeue(out TesTask tesTask))
{
var jsonl = JsonConvert.SerializeObject(tesTask) + "\n";

try
{
// TODO auth, retries, exception handling
await azureProxy.UploadBlockToTesTasksAppendBlobAsync(null, jsonl, stoppingToken);
}
catch (Exception exc)
{
logger.LogError(exc, exc.Message);
}
}

await Task.Delay(TimeSpan.FromSeconds(1));
}
}
}
}