Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed naming conventions and used RecyclableMemoryStream instead of M… #4

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 154 additions & 147 deletions S3UploadStream/S3UploadStream.cs
Original file line number Diff line number Diff line change
@@ -1,187 +1,194 @@
using Amazon.S3;
using Amazon.S3.Model;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.IO;

namespace Cppl.Utilities.AWS
namespace Infrastructure.Common.IO;

public class S3UploadStream : Stream
{
public class S3UploadStream : Stream
/* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't
* safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT
* is ~50TB, which is too big for S3. */
private const long MinPartLength = 5 * 1024 * 1024; // 5MBs
private const long MaxPartLength = 10 * 1024 * 1024; // 10MB max per PUT
private const long MaxPartCount = 10000; // no more than 10,000 parts total
private const long DefaultPartLength = MinPartLength;
private const int MaxTaskRunningInParallel = 6;

private class S3UploadContext
{
/* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't
* safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT
* is ~50TB, which is too big for S3. */
const long MIN_PART_LENGTH = 5L * 1024 * 1024; // all parts but the last this size or greater
const long MAX_PART_LENGTH = 5L * 1024 * 1024 * 1024; // 5GB max per PUT
const long MAX_PART_COUNT = 10000; // no more than 10,000 parts total
const long DEFAULT_PART_LENGTH = MIN_PART_LENGTH;

internal class Metadata
{
public string BucketName;
public string Key;
public long PartLength = DEFAULT_PART_LENGTH;

public int PartCount = 0;
public string UploadId;
public MemoryStream CurrentStream;
public string BucketName { get; init; } = string.Empty;
public string Key { get; init; } = string.Empty;
public long PartLength { get; set; } = DefaultPartLength;
public int PartCount { get; set; }
public string UploadId { get; set; } = string.Empty;
public MemoryStream? CurrentStream { get; set; }
public long Position { get; set; } // based on bytes written
public long Length { get; set; } // based on bytes written or SetLength, whichever is larger (no truncation)
public readonly List<Task> Tasks = new();
public readonly ConcurrentDictionary<int, string> PartETags = new();
}

public long Position = 0; // based on bytes written
public long Length = 0; // based on bytes written or SetLength, whichever is larger (no truncation)
private S3UploadContext _context;
private readonly IAmazonS3 _s3;
private readonly RecyclableMemoryStreamManager _memoryStreamManager = new();

public List<Task> Tasks = new List<Task>();
public ConcurrentDictionary<int, string> PartETags = new ConcurrentDictionary<int, string>();
}
public S3UploadStream(IAmazonS3 s3, string s3Uri, long partLength = DefaultPartLength)
: this(s3, new Uri(s3Uri), partLength)
{
}

Metadata _metadata = new Metadata();
IAmazonS3 _s3 = null;
private S3UploadStream(IAmazonS3 s3, Uri s3Uri, long partLength = DefaultPartLength)
: this(s3, s3Uri.Host, s3Uri.LocalPath[1..], partLength)
{
}

public S3UploadStream(IAmazonS3 s3, string s3uri, long partLength = DEFAULT_PART_LENGTH)
: this(s3, new Uri(s3uri), partLength)
public S3UploadStream(IAmazonS3 s3, string bucket, string key, long partLength = DefaultPartLength)
{
_s3 = s3;
_context = new S3UploadContext
{
}
BucketName = bucket,
Key = key,
PartLength = partLength
};
}

public S3UploadStream(IAmazonS3 s3, Uri s3uri, long partLength = DEFAULT_PART_LENGTH)
: this (s3, s3uri.Host, s3uri.LocalPath.Substring(1), partLength)
protected override void Dispose(bool disposing)
{
if (disposing)
{
Flush(true);
CompleteUpload();
}
_context = new S3UploadContext();
base.Dispose(disposing);
}

public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;

public override long Length => _context.Length = Math.Max(_context.Length, _context.Position);

public override long Position
{
get => _context.Position;
set => throw new NotImplementedException();
}

public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();

public override void SetLength(long value)
{
_context.Length = Math.Max(_context.Length, value);
_context.PartLength = Math.Max(MinPartLength, Math.Min(MaxPartLength, _context.Length / MaxPartCount));
}

public S3UploadStream(IAmazonS3 s3, string bucket, string key, long partLength = DEFAULT_PART_LENGTH)
private void StartNewPart()
{
if (_context.CurrentStream != null)
{
_s3 = s3;
_metadata.BucketName = bucket;
_metadata.Key = key;
_metadata.PartLength = partLength;
Flush(false);
}

protected override void Dispose(bool disposing)
_context.CurrentStream = _memoryStreamManager.GetStream("S3_UPLOAD_STREAM");
_context.PartLength = Math.Min(MaxPartLength,
Math.Max(_context.PartLength, (_context.PartCount / 2 + 1) * MinPartLength));
}

public override void Flush()
{
Flush(false);
}

private void Flush(bool disposing)
{
if ((_context.CurrentStream == null || _context.CurrentStream.Length < MinPartLength) &&
!disposing)
return;

if (string.IsNullOrEmpty(_context.UploadId))
{
if (disposing)
_context.UploadId = _s3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest
{
if (_metadata != null)
{
Flush(true);
CompleteUpload();
}
}
_metadata = null;
base.Dispose(disposing);
BucketName = _context.BucketName,
Key = _context.Key
}).GetAwaiter().GetResult().UploadId;
}

public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => _metadata.Length = Math.Max(_metadata.Length, _metadata.Position);

public override long Position
{
get => _metadata.Position;
set => throw new NotImplementedException();
}
if (_context.CurrentStream == null)
return;

public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
var i = ++_context.PartCount;

public override void SetLength(long value)
_context.CurrentStream.Seek(0, SeekOrigin.Begin);
var request = new UploadPartRequest
{
_metadata.Length = Math.Max(_metadata.Length, value);
_metadata.PartLength = Math.Max(MIN_PART_LENGTH, Math.Min(MAX_PART_LENGTH, _metadata.Length / MAX_PART_COUNT));
}
BucketName = _context.BucketName,
Key = _context.Key,
UploadId = _context.UploadId,
PartNumber = i,
IsLastPart = disposing,
InputStream = _context.CurrentStream
};

_context.CurrentStream = null;

private void StartNewPart()
if (_context.Tasks.Count > MaxTaskRunningInParallel)
{
if (_metadata.CurrentStream != null) {
Flush(false);
}
_metadata.CurrentStream = new MemoryStream();
_metadata.PartLength = Math.Min(MAX_PART_LENGTH, Math.Max(_metadata.PartLength, (_metadata.PartCount / 2 + 1) * MIN_PART_LENGTH));
Task.WaitAll(_context.Tasks.ToArray());
_context.Tasks.Clear();
}

public override void Flush()
var upload = Task.Run(async () =>
{
Flush(false);
}
var response = await _s3.UploadPartAsync(request);
_context.PartETags.AddOrUpdate(i, response.ETag,
(_, _) => response.ETag);
await request.InputStream.DisposeAsync();
});
_context.Tasks.Add(upload);
}

private void Flush(bool disposing)
private void CompleteUpload()
{
Task.WaitAll(_context.Tasks.ToArray());

if (Length > 0)
{
if ((_metadata.CurrentStream == null || _metadata.CurrentStream.Length < MIN_PART_LENGTH) &&
!disposing)
return;

if (_metadata.UploadId == null) {
_metadata.UploadId = _s3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest()
{
BucketName = _metadata.BucketName,
Key = _metadata.Key
}).GetAwaiter().GetResult().UploadId;
}

if (_metadata.CurrentStream != null)
_s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest
{
var i = ++_metadata.PartCount;

_metadata.CurrentStream.Seek(0, SeekOrigin.Begin);
var request = new UploadPartRequest()
{
BucketName = _metadata.BucketName,
Key = _metadata.Key,
UploadId = _metadata.UploadId,
PartNumber = i,
IsLastPart = disposing,
InputStream = _metadata.CurrentStream
};
_metadata.CurrentStream = null;

var upload = Task.Run(async () =>
{
var response = await _s3.UploadPartAsync(request);
_metadata.PartETags.AddOrUpdate(i, response.ETag,
(n, s) => response.ETag);
request.InputStream.Dispose();
});
_metadata.Tasks.Add(upload);
}
BucketName = _context.BucketName,
Key = _context.Key,
PartETags = _context.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(),
UploadId = _context.UploadId
}).GetAwaiter().GetResult();
}
}

private void CompleteUpload()
{
Task.WaitAll(_metadata.Tasks.ToArray());

if (Length > 0) {
_s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest()
{
BucketName = _metadata.BucketName,
Key = _metadata.Key,
PartETags = _metadata.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(),
UploadId = _metadata.UploadId
}).GetAwaiter().GetResult();
}
}
public override void Write(byte[] buffer, int offset, int count)
{
if (count == 0) return;

public override void Write(byte[] buffer, int offset, int count)
// write as much of the buffer as will fit to the current part, and if needed
// allocate a new part and continue writing to it (and so on).
var o = offset;
var c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to
do
{
if (count == 0) return;
if (_context.CurrentStream == null || _context.CurrentStream.Length >= _context.PartLength)
StartNewPart();

// write as much of the buffer as will fit to the current part, and if needed
// allocate a new part and continue writing to it (and so on).
var o = offset;
var c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to
do
{
if (_metadata.CurrentStream == null || _metadata.CurrentStream.Length >= _metadata.PartLength)
StartNewPart();

var remaining = _metadata.PartLength - _metadata.CurrentStream.Length;
var w = Math.Min(c, (int)remaining);
_metadata.CurrentStream.Write(buffer, o, w);
var remaining = _context.PartLength - _context.CurrentStream!.Length;
var w = Math.Min(c, (int)remaining);

_metadata.Position += w;
c -= w;
o += w;
} while (c > 0);
}
_context.CurrentStream.Write(buffer, o, w);
_context.Position += w;
c -= w;
o += w;
} while (c > 0);
}
}
}
1 change: 1 addition & 0 deletions S3UploadStream/S3UploadStream.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

<ItemGroup>
<PackageReference Include="AWSSDK.S3" Version="3.5.4" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.2.1" />
</ItemGroup>

</Project>