From 49e3f732b098fd053d95a552a9e7e10fd1a62de4 Mon Sep 17 00:00:00 2001 From: "vladimir.gonzalez" Date: Tue, 31 Jan 2023 10:48:16 -0600 Subject: [PATCH] fixed naming conventions and used RecyclableMemoryStream instead of MemoryStream --- S3UploadStream/S3UploadStream.cs | 301 ++++++++++++++------------- S3UploadStream/S3UploadStream.csproj | 1 + 2 files changed, 155 insertions(+), 147 deletions(-) diff --git a/S3UploadStream/S3UploadStream.cs b/S3UploadStream/S3UploadStream.cs index b1c181c..3fa7926 100644 --- a/S3UploadStream/S3UploadStream.cs +++ b/S3UploadStream/S3UploadStream.cs @@ -1,187 +1,194 @@ using Amazon.S3; using Amazon.S3.Model; -using System; using System.Collections.Concurrent; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading.Tasks; +using Microsoft.IO; -namespace Cppl.Utilities.AWS +namespace Infrastructure.Common.IO; + +public class S3UploadStream : Stream { - public class S3UploadStream : Stream + /* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't + * safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT + * is ~50TB, which is too big for S3. */ + private const long MinPartLength = 5 * 1024 * 1024; // 5MBs + private const long MaxPartLength = 10 * 1024 * 1024; // 10MB max per PUT + private const long MaxPartCount = 10000; // no more than 10,000 parts total + private const long DefaultPartLength = MinPartLength; + private const int MaxTaskRunningInParallel = 6; + + private class S3UploadContext { - /* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't - * safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT - * is ~50TB, which is too big for S3. */ - const long MIN_PART_LENGTH = 5L * 1024 * 1024; // all parts but the last this size or greater - const long MAX_PART_LENGTH = 5L * 1024 * 1024 * 1024; // 5GB max per PUT - const long MAX_PART_COUNT = 10000; // no more than 10,000 parts total - const long DEFAULT_PART_LENGTH = MIN_PART_LENGTH; - - internal class Metadata - { - public string BucketName; - public string Key; - public long PartLength = DEFAULT_PART_LENGTH; - - public int PartCount = 0; - public string UploadId; - public MemoryStream CurrentStream; + public string BucketName { get; init; } = string.Empty; + public string Key { get; init; } = string.Empty; + public long PartLength { get; set; } = DefaultPartLength; + public int PartCount { get; set; } + public string UploadId { get; set; } = string.Empty; + public MemoryStream? CurrentStream { get; set; } + public long Position { get; set; } // based on bytes written + public long Length { get; set; } // based on bytes written or SetLength, whichever is larger (no truncation) + public readonly List Tasks = new(); + public readonly ConcurrentDictionary PartETags = new(); + } - public long Position = 0; // based on bytes written - public long Length = 0; // based on bytes written or SetLength, whichever is larger (no truncation) + private S3UploadContext _context; + private readonly IAmazonS3 _s3; + private readonly RecyclableMemoryStreamManager _memoryStreamManager = new(); - public List Tasks = new List(); - public ConcurrentDictionary PartETags = new ConcurrentDictionary(); - } + public S3UploadStream(IAmazonS3 s3, string s3Uri, long partLength = DefaultPartLength) + : this(s3, new Uri(s3Uri), partLength) + { + } - Metadata _metadata = new Metadata(); - IAmazonS3 _s3 = null; + private S3UploadStream(IAmazonS3 s3, Uri s3Uri, long partLength = DefaultPartLength) + : this(s3, s3Uri.Host, s3Uri.LocalPath[1..], partLength) + { + } - public S3UploadStream(IAmazonS3 s3, string s3uri, long partLength = DEFAULT_PART_LENGTH) - : this(s3, new Uri(s3uri), partLength) + public S3UploadStream(IAmazonS3 s3, string bucket, string key, long partLength = DefaultPartLength) + { + _s3 = s3; + _context = new S3UploadContext { - } + BucketName = bucket, + Key = key, + PartLength = partLength + }; + } - public S3UploadStream(IAmazonS3 s3, Uri s3uri, long partLength = DEFAULT_PART_LENGTH) - : this (s3, s3uri.Host, s3uri.LocalPath.Substring(1), partLength) + protected override void Dispose(bool disposing) + { + if (disposing) { + Flush(true); + CompleteUpload(); } + _context = new S3UploadContext(); + base.Dispose(disposing); + } + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override long Length => _context.Length = Math.Max(_context.Length, _context.Position); + + public override long Position + { + get => _context.Position; + set => throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + + public override void SetLength(long value) + { + _context.Length = Math.Max(_context.Length, value); + _context.PartLength = Math.Max(MinPartLength, Math.Min(MaxPartLength, _context.Length / MaxPartCount)); + } - public S3UploadStream(IAmazonS3 s3, string bucket, string key, long partLength = DEFAULT_PART_LENGTH) + private void StartNewPart() + { + if (_context.CurrentStream != null) { - _s3 = s3; - _metadata.BucketName = bucket; - _metadata.Key = key; - _metadata.PartLength = partLength; + Flush(false); } - protected override void Dispose(bool disposing) + _context.CurrentStream = _memoryStreamManager.GetStream("S3_UPLOAD_STREAM"); + _context.PartLength = Math.Min(MaxPartLength, + Math.Max(_context.PartLength, (_context.PartCount / 2 + 1) * MinPartLength)); + } + + public override void Flush() + { + Flush(false); + } + + private void Flush(bool disposing) + { + if ((_context.CurrentStream == null || _context.CurrentStream.Length < MinPartLength) && + !disposing) + return; + + if (string.IsNullOrEmpty(_context.UploadId)) { - if (disposing) + _context.UploadId = _s3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest { - if (_metadata != null) - { - Flush(true); - CompleteUpload(); - } - } - _metadata = null; - base.Dispose(disposing); + BucketName = _context.BucketName, + Key = _context.Key + }).GetAwaiter().GetResult().UploadId; } - - public override bool CanRead => false; - public override bool CanSeek => false; - public override bool CanWrite => true; - public override long Length => _metadata.Length = Math.Max(_metadata.Length, _metadata.Position); - public override long Position - { - get => _metadata.Position; - set => throw new NotImplementedException(); - } + if (_context.CurrentStream == null) + return; - public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException(); - public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + var i = ++_context.PartCount; - public override void SetLength(long value) + _context.CurrentStream.Seek(0, SeekOrigin.Begin); + var request = new UploadPartRequest { - _metadata.Length = Math.Max(_metadata.Length, value); - _metadata.PartLength = Math.Max(MIN_PART_LENGTH, Math.Min(MAX_PART_LENGTH, _metadata.Length / MAX_PART_COUNT)); - } + BucketName = _context.BucketName, + Key = _context.Key, + UploadId = _context.UploadId, + PartNumber = i, + IsLastPart = disposing, + InputStream = _context.CurrentStream + }; + + _context.CurrentStream = null; - private void StartNewPart() + if (_context.Tasks.Count > MaxTaskRunningInParallel) { - if (_metadata.CurrentStream != null) { - Flush(false); - } - _metadata.CurrentStream = new MemoryStream(); - _metadata.PartLength = Math.Min(MAX_PART_LENGTH, Math.Max(_metadata.PartLength, (_metadata.PartCount / 2 + 1) * MIN_PART_LENGTH)); + Task.WaitAll(_context.Tasks.ToArray()); + _context.Tasks.Clear(); } - public override void Flush() + var upload = Task.Run(async () => { - Flush(false); - } + var response = await _s3.UploadPartAsync(request); + _context.PartETags.AddOrUpdate(i, response.ETag, + (_, _) => response.ETag); + await request.InputStream.DisposeAsync(); + }); + _context.Tasks.Add(upload); + } - private void Flush(bool disposing) + private void CompleteUpload() + { + Task.WaitAll(_context.Tasks.ToArray()); + + if (Length > 0) { - if ((_metadata.CurrentStream == null || _metadata.CurrentStream.Length < MIN_PART_LENGTH) && - !disposing) - return; - - if (_metadata.UploadId == null) { - _metadata.UploadId = _s3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest() - { - BucketName = _metadata.BucketName, - Key = _metadata.Key - }).GetAwaiter().GetResult().UploadId; - } - - if (_metadata.CurrentStream != null) + _s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest { - var i = ++_metadata.PartCount; - - _metadata.CurrentStream.Seek(0, SeekOrigin.Begin); - var request = new UploadPartRequest() - { - BucketName = _metadata.BucketName, - Key = _metadata.Key, - UploadId = _metadata.UploadId, - PartNumber = i, - IsLastPart = disposing, - InputStream = _metadata.CurrentStream - }; - _metadata.CurrentStream = null; - - var upload = Task.Run(async () => - { - var response = await _s3.UploadPartAsync(request); - _metadata.PartETags.AddOrUpdate(i, response.ETag, - (n, s) => response.ETag); - request.InputStream.Dispose(); - }); - _metadata.Tasks.Add(upload); - } + BucketName = _context.BucketName, + Key = _context.Key, + PartETags = _context.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(), + UploadId = _context.UploadId + }).GetAwaiter().GetResult(); } + } - private void CompleteUpload() - { - Task.WaitAll(_metadata.Tasks.ToArray()); - - if (Length > 0) { - _s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest() - { - BucketName = _metadata.BucketName, - Key = _metadata.Key, - PartETags = _metadata.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(), - UploadId = _metadata.UploadId - }).GetAwaiter().GetResult(); - } - } + public override void Write(byte[] buffer, int offset, int count) + { + if (count == 0) return; - public override void Write(byte[] buffer, int offset, int count) + // write as much of the buffer as will fit to the current part, and if needed + // allocate a new part and continue writing to it (and so on). + var o = offset; + var c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to + do { - if (count == 0) return; + if (_context.CurrentStream == null || _context.CurrentStream.Length >= _context.PartLength) + StartNewPart(); - // write as much of the buffer as will fit to the current part, and if needed - // allocate a new part and continue writing to it (and so on). - var o = offset; - var c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to - do - { - if (_metadata.CurrentStream == null || _metadata.CurrentStream.Length >= _metadata.PartLength) - StartNewPart(); - - var remaining = _metadata.PartLength - _metadata.CurrentStream.Length; - var w = Math.Min(c, (int)remaining); - _metadata.CurrentStream.Write(buffer, o, w); + var remaining = _context.PartLength - _context.CurrentStream!.Length; + var w = Math.Min(c, (int)remaining); - _metadata.Position += w; - c -= w; - o += w; - } while (c > 0); - } + _context.CurrentStream.Write(buffer, o, w); + _context.Position += w; + c -= w; + o += w; + } while (c > 0); } -} +} \ No newline at end of file diff --git a/S3UploadStream/S3UploadStream.csproj b/S3UploadStream/S3UploadStream.csproj index ee03d6f..484eda3 100644 --- a/S3UploadStream/S3UploadStream.csproj +++ b/S3UploadStream/S3UploadStream.csproj @@ -15,6 +15,7 @@ +