Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BlockBlobDatabase as TES database option #194

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Tes/Models/BlockBlobDatabaseOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Tes.Models
{
public class BlockBlobDatabaseOptions
{
public const string SectionName = "BlockBlobDatabase";

public string StorageAccountName { get; set; }
public string ContainerName { get; set; } = "testasksdb";
public string ContainerSasToken { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/Tes/Models/TesTaskExtended.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public partial class TesTask : RepositoryItem<TesTask>

public bool IsActiveState()
{
return ActiveStates.Contains(this.State);
return ActiveStates.Contains(this.State) || (this.State == TesState.CANCELEDEnum && this.IsCancelRequested);
}
}
}
200 changes: 200 additions & 0 deletions src/Tes/Repository/BlockBlobDatabase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Storage.Blobs;
using Polly;

namespace Tes.Repository
{
public class BlockBlobDatabase<T> where T : class
{
private const int maxConcurrentItemDownloads = 64;
private const string activeStatePrefix = "a/";
private const string inactiveStatePrefix = "z/";

private readonly BlobServiceClient blobServiceClient;
private readonly BlobContainerClient container;

public string StorageAccountName { get; set; }
public string ContainerName { get; set; }

public BlockBlobDatabase(string storageAccountName, string containerName, string containerSasToken = null)
{
StorageAccountName = storageAccountName;
ContainerName = containerName;

if (!string.IsNullOrWhiteSpace(containerSasToken))
{
blobServiceClient = new BlobServiceClient(new Uri($"https://{StorageAccountName}.blob.core.windows.net?{containerSasToken.TrimStart('?')}"));
}
else
{
// Use managed identity. Token lifetime and refreshing is handled automatically.
blobServiceClient = new BlobServiceClient(new Uri($"https://{StorageAccountName}.blob.core.windows.net"), new DefaultAzureCredential());
}

container = blobServiceClient.GetBlobContainerClient(ContainerName);
container.CreateIfNotExistsAsync().Wait();
}

public async Task CreateOrUpdateItemAsync(string id, T item, bool isActive)
{
var json = System.Text.Json.JsonSerializer.Serialize(item);

if (!isActive)
{
// Delete active if it exists
var blobClient1 = container.GetBlobClient(GetActiveBlobNameById(id));
var task1 = blobClient1.DeleteIfExistsAsync();

// Update/create active
var blobClient2 = container.GetBlobClient(GetInactiveBlobNameById(id));
var task2 = blobClient2.UploadAsync(BinaryData.FromString(json), overwrite: true);

// Retry to reduce likelihood of one blob succeeding and the other failing
await Policy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I think this retry policy could lead to data loss. Two requests roughly at the same time hit this method, the first fails and the second succeeds (we want the second one to win), the first request will go in retry loop that could overwrite the latest data.

As an alternative you can consider an optimistic concurrency approach for the update, where you check if an item has changed since you last read it - storage supports this via headers, and etags. And for the create scenario, you turn overwrite off, to avoid any race condition.

.Handle<Exception>()
.WaitAndRetryAsync(10, retryAttempt => TimeSpan.FromSeconds(1))
.ExecuteAsync(async () => await Task.WhenAll(task1, task2));
}
else
{
// Assumption: a task can never go from inactive to active, so no need to delete anything here
var blobClient = container.GetBlobClient($"{id}.json");
var activeBlobTask = await blobClient.UploadAsync(BinaryData.FromString(json), overwrite: true);
}
}

public async Task DeleteItemAsync(string id)
{
var blobClient = container.GetBlobClient(GetActiveBlobNameById(id));
var blobClient2 = container.GetBlobClient(GetInactiveBlobNameById(id));
var task1 = blobClient.DeleteIfExistsAsync();
var task2 = blobClient2.DeleteIfExistsAsync();

// Retry to reduce likelihood of one blob succeeding and the other failing
await Policy
.Handle<Exception>()
.WaitAndRetryAsync(10, retryAttempt => TimeSpan.FromSeconds(1))
.ExecuteAsync(async () => await Task.WhenAll(task1, task2));
}

public async Task<T> GetItemAsync(string id)
{
// Check if inactive exists first, since inactive will never go to active state, to make more consistent results
var blobClient = container.GetBlobClient(GetInactiveBlobNameById(id));

if (await blobClient.ExistsAsync())
{
var inactiveBlobJson = (await blobClient.DownloadContentAsync()).Value.Content.ToString();
return System.Text.Json.JsonSerializer.Deserialize<T>(inactiveBlobJson);
}

blobClient = container.GetBlobClient(GetActiveBlobNameById(id));
var json = (await blobClient.DownloadContentAsync()).Value.Content.ToString();
return System.Text.Json.JsonSerializer.Deserialize<T>(json);
}

/// <summary>
/// Downloads all items in parallel
/// Specifically designed NOT to enumerate items to prevent caller stalling the download throughput
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public async Task<IList<T>> GetItemsAsync(bool activeOnly = false)
{
var enumerator = container.GetBlobsAsync(prefix: activeOnly ? activeStatePrefix : null).GetAsyncEnumerator();
var blobNames = new List<string>();

while (await enumerator.MoveNextAsync())
{
// example: a/0fb0858a-3166-4a22-85b6-4337df2f53c5.json
// example: z/0fb0858a-3166-4a22-85b6-4337df2f53c5.json
var blobName = enumerator.Current.Name;
blobNames.Add(blobName);
}

return await DownloadBlobsAsync(blobNames);
}

public async Task<(string, IList<T>)> GetItemsWithPagingAsync(bool activeOnly = false, int pageSize = 5000, string continuationToken = null)
{
var blobNames = new List<string>();

while (true)
{
var pages = container.GetBlobsAsync(prefix: activeOnly ? activeStatePrefix : null).AsPages(continuationToken, pageSize);
var enumerator = pages.GetAsyncEnumerator();
var isMoreItems = await enumerator.MoveNextAsync();

if (!isMoreItems)
{
return (null, new List<T>());
}

var page = enumerator.Current;

foreach (var blob in page.Values)
{
blobNames.Add(blob.Name);
}

return (page.ContinuationToken, await DownloadBlobsAsync(blobNames));
}
}

private string GetActiveBlobNameById(string id) => $"{activeStatePrefix}{id}.json";

private string GetInactiveBlobNameById(string id) => $"{inactiveStatePrefix}{id}.json";

private async Task<IList<T>> DownloadBlobsAsync(List<string> blobNames)
{
var downloadQueue = new ConcurrentQueue<string>(blobNames);
var items = new ConcurrentBag<T>();
long runningTasksCount = 0;

while (downloadQueue.TryDequeue(out var blobName))
{
while (Interlocked.Read(ref runningTasksCount) >= maxConcurrentItemDownloads)
{
// Pause while maxed out
await Task.Delay(50);
}

Interlocked.Increment(ref runningTasksCount);

_ = Task.Run(async () =>
{
try
{
var blobClient = container.GetBlobClient(blobName);
var json = (await blobClient.DownloadContentAsync()).Value.Content.ToString();
items.Add(System.Text.Json.JsonSerializer.Deserialize<T>(json));
}
catch (Exception exc)
{
// TODO log?
downloadQueue.Enqueue(blobName);
}

Interlocked.Decrement(ref runningTasksCount);
});
}

while (Interlocked.Read(ref runningTasksCount) > 0)
{
// Wait for all downloads to complete
await Task.Delay(50);
}

return items.ToList();
}
}
}
7 changes: 7 additions & 0 deletions src/Tes/Repository/IRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public interface IRepository<T> : IDisposable where T : RepositoryItem<T>
/// <returns>The item instance</returns>
Task<bool> TryGetItemAsync(string id, Action<T> onSuccess = null);

/// <summary>
/// Reads a collection of items from the repository
/// </summary>
/// <param name="predicate">The 'where' clause</param>
/// <returns>The collection of retrieved items</returns>
Task<IEnumerable<T>> GetActiveItemsAsync();

/// <summary>
/// Reads a collection of items from the repository
/// </summary>
Expand Down
115 changes: 115 additions & 0 deletions src/Tes/Repository/TesTaskAzureBlockBlobRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Tes.Models;

namespace Tes.Repository
{
public class TesTaskAzureBlockBlobRepository : IRepository<TesTask>
{
private readonly BlockBlobDatabase<TesTask> db;
private readonly ICache<TesTask> cache;
private readonly ILogger logger;

public TesTaskAzureBlockBlobRepository(IOptions<BlockBlobDatabaseOptions> options, ILogger<TesTaskAzureBlockBlobRepository> logger, ICache<TesTask> cache = null)
{
db = new BlockBlobDatabase<TesTask>(options.Value.StorageAccountName, options.Value.ContainerName, options.Value.ContainerSasToken);
this.cache = cache;
this.logger = logger;
WarmCacheAsync().Wait();
}

public async Task<TesTask> CreateItemAsync(TesTask item)
{
await db.CreateOrUpdateItemAsync(item.Id, item, item.IsActiveState());
return item;
}

public async Task DeleteItemAsync(string id)
{
await db.DeleteItemAsync(id);
}

public async Task<IEnumerable<TesTask>> GetItemsAsync(Expression<Func<TesTask, bool>> predicate)
{
return await db.GetItemsAsync();
}
public async Task<IEnumerable<TesTask>> GetActiveItemsAsync()
{
return await db.GetItemsAsync(activeOnly: true);
}

public async Task<(string, IEnumerable<TesTask>)> GetItemsAsync(Expression<Func<TesTask, bool>> predicate, int pageSize, string continuationToken)
{
// TODO - add support for listing tasks by name
return await db.GetItemsWithPagingAsync(false, pageSize, continuationToken);
}

public async Task<bool> TryGetItemAsync(string id, Action<TesTask> onSuccess = null)
{
var item = await db.GetItemAsync(id);
onSuccess?.Invoke(item);
return true;
}

public async Task<TesTask> UpdateItemAsync(TesTask item)
{
await db.CreateOrUpdateItemAsync(item.Id, item, item.IsActiveState());
return item;
}

private async Task WarmCacheAsync()
{
if (cache == null)
{
logger.LogWarning("Cache is null for TesTaskAzureBlockBlobRepository; no caching will be used.");
return;
}

var sw = Stopwatch.StartNew();
logger.LogInformation("Warming cache...");

// Don't allow the state of the system to change until the cache and system are consistent;
// this is a fast PostgreSQL query even for 1 million items
await Policy
.Handle<Exception>()
.WaitAndRetryAsync(3,
retryAttempt =>
{
logger.LogWarning($"Warming cache retry attempt #{retryAttempt}");
return TimeSpan.FromSeconds(10);
},
(ex, ts) =>
{
logger.LogCritical(ex, "Couldn't warm cache, is the storage account available?");
})
.ExecuteAsync(async () =>
{
var activeTasks = (await GetActiveItemsAsync()).ToList();
var tasksAddedCount = 0;

foreach (var task in activeTasks.OrderBy(t => t.CreationTime))
{
cache?.TryAdd(task.Id, task);
tasksAddedCount++;
}

logger.LogInformation($"Cache warmed successfully in {sw.Elapsed.TotalSeconds:n3} seconds. Added {tasksAddedCount:n0} items to the cache.");
});
}

public void Dispose()
{

}
}
}
6 changes: 6 additions & 0 deletions src/Tes/Repository/TesTaskPostgreSqlRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Tes.Repository
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Tes.Extensions;
using Tes.Models;
using Tes.Utilities;

Expand Down Expand Up @@ -242,6 +243,11 @@ public async Task DeleteItemAsync(string id)
return (null, results);
}

public async Task<IEnumerable<TesTask>> GetActiveItemsAsync()
{
return await GetItemsAsync(t => TesTask.ActiveStates.Contains(t.State) || (t.State == TesState.CANCELEDEnum && t.IsCancelRequested));
}

public void Dispose()
{

Expand Down
2 changes: 2 additions & 0 deletions src/Tes/Tes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.8.2" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" />
<PackageReference Include="LazyCache.AspNetCore" Version="2.4.0" />
<PackageReference Include="EFCore.NamingConventions" Version="7.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.3" />
Expand Down
5 changes: 5 additions & 0 deletions src/TesApi.Tests/TestServices/TestRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,10 @@ Task<T> IRepository<T>.UpdateItemAsync(T item)

throw new InvalidOperationException();
}

Task<IEnumerable<T>> IRepository<T>.GetActiveItemsAsync()
{
throw new NotImplementedException();
}
}
}
Loading