From 3d6e3f4a45a7e101e7741ed00330926bcd27e484 Mon Sep 17 00:00:00 2001 From: Terry Howe Date: Mon, 22 Jul 2024 18:28:39 -0600 Subject: [PATCH] refactor: Isolate progress channel in Messenger (#1447) Signed-off-by: Terry Howe --- .../display/status/progress/manager.go | 20 ++-- .../display/status/progress/messenger.go | 95 +++++++++++++++++++ .../display/status/progress/messenger_test.go | 94 ++++++++++++++++++ .../display/status/progress/status.go | 39 +------- .../display/status/progress/status_test.go | 8 +- .../internal/display/status/track/reader.go | 28 +++--- 6 files changed, 214 insertions(+), 70 deletions(-) create mode 100644 cmd/oras/internal/display/status/progress/messenger.go create mode 100644 cmd/oras/internal/display/status/progress/messenger_test.go diff --git a/cmd/oras/internal/display/status/progress/manager.go b/cmd/oras/internal/display/status/progress/manager.go index a6723e23b..2b526e47e 100644 --- a/cmd/oras/internal/display/status/progress/manager.go +++ b/cmd/oras/internal/display/status/progress/manager.go @@ -34,12 +34,9 @@ const ( var errManagerStopped = errors.New("progress output manager has already been stopped") -// Status is print message channel -type Status chan *status - // Manager is progress view master type Manager interface { - Add() (Status, error) + Add() (*Messenger, error) SendAndStop(desc ocispec.Descriptor, prompt string) error Close() error } @@ -107,7 +104,7 @@ func (m *manager) render() { } // Add appends a new status with 2-line space for rendering. -func (m *manager) Add() (Status, error) { +func (m *manager) Add() (*Messenger, error) { if m.closed() { return nil, errManagerStopped } @@ -124,26 +121,25 @@ func (m *manager) Add() (Status, error) { // SendAndStop send message for descriptor and stop timing. func (m *manager) SendAndStop(desc ocispec.Descriptor, prompt string) error { - status, err := m.Add() + messenger, err := m.Add() if err != nil { return err } - defer close(status) - status <- NewStatusMessage(prompt, desc, desc.Size) - status <- EndTiming() + messenger.Send(prompt, desc, desc.Size) + messenger.Stop() return nil } -func (m *manager) statusChan(s *status) Status { +func (m *manager) statusChan(s *status) *Messenger { ch := make(chan *status, BufferSize) m.updating.Add(1) go func() { defer m.updating.Done() for newStatus := range ch { - s.Update(newStatus) + s.update(newStatus) } }() - return ch + return &Messenger{ch: ch} } // Close stops all status and waits for updating and rendering. diff --git a/cmd/oras/internal/display/status/progress/messenger.go b/cmd/oras/internal/display/status/progress/messenger.go new file mode 100644 index 000000000..9f0188b5a --- /dev/null +++ b/cmd/oras/internal/display/status/progress/messenger.go @@ -0,0 +1,95 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "oras.land/oras/cmd/oras/internal/display/status/progress/humanize" + "time" +) + +// Messenger is progress message channel. +type Messenger struct { + ch chan *status + closed bool +} + +// Start initializes the messenger. +func (sm *Messenger) Start() { + if sm.ch == nil { + return + } + sm.ch <- startTiming() +} + +// Send a status message for the specified descriptor. +func (sm *Messenger) Send(prompt string, descriptor ocispec.Descriptor, offset int64) { + for { + select { + case sm.ch <- newStatusMessage(prompt, descriptor, offset): + return + case <-sm.ch: + // purge the channel until successfully pushed + default: + // ch is nil + return + } + } +} + +// Stop the messenger after sending a end message. +func (sm *Messenger) Stop() { + if sm.closed { + return + } + sm.ch <- endTiming() + close(sm.ch) + sm.closed = true +} + +// newStatus generates a base empty status. +func newStatus() *status { + return &status{ + offset: -1, + total: humanize.ToBytes(0), + speedWindow: newSpeedWindow(framePerSecond), + } +} + +// newStatusMessage generates a status for messaging. +func newStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64) *status { + return &status{ + prompt: prompt, + descriptor: descriptor, + offset: offset, + } +} + +// startTiming creates start timing message. +func startTiming() *status { + return &status{ + offset: -1, + startTime: time.Now(), + } +} + +// endTiming creates end timing message. +func endTiming() *status { + return &status{ + offset: -1, + endTime: time.Now(), + } +} diff --git a/cmd/oras/internal/display/status/progress/messenger_test.go b/cmd/oras/internal/display/status/progress/messenger_test.go new file mode 100644 index 000000000..a8b782e55 --- /dev/null +++ b/cmd/oras/internal/display/status/progress/messenger_test.go @@ -0,0 +1,94 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "testing" +) + +func Test_Messenger(t *testing.T) { + var msg *status + ch := make(chan *status, BufferSize) + messenger := &Messenger{ch: ch} + + messenger.Start() + select { + case msg = <-ch: + if msg.offset != -1 { + t.Errorf("Expected start message with offset -1, got %d", msg.offset) + } + default: + t.Error("Expected start message") + } + + desc := v1.Descriptor{ + Digest: "mouse", + Size: 100, + } + expected := int64(50) + messenger.Send("Reading", desc, expected) + select { + case msg = <-ch: + if msg.offset != expected { + t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset) + } + if msg.prompt != "Reading" { + t.Errorf("Expected status message prompt Reading, got %s", msg.prompt) + } + default: + t.Error("Expected status message") + } + + messenger.Send("Reading", desc, expected) + messenger.Send("Read", desc, desc.Size) + select { + case msg = <-ch: + if msg.offset != desc.Size { + t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset) + } + if msg.prompt != "Read" { + t.Errorf("Expected status message prompt Read, got %s", msg.prompt) + } + default: + t.Error("Expected status message") + } + select { + case msg = <-ch: + t.Errorf("Unexpected status message %v", msg) + default: + } + + expected = int64(-1) + messenger.Stop() + select { + case msg = <-ch: + if msg.offset != expected { + t.Errorf("Expected END status message with offset %d, got %d", expected, msg.offset) + } + default: + t.Error("Expected END status message") + } + + messenger.Stop() + select { + case msg = <-ch: + if msg != nil { + t.Errorf("Unexpected status message %v", msg) + } + default: + } +} diff --git a/cmd/oras/internal/display/status/progress/status.go b/cmd/oras/internal/display/status/progress/status.go index b2abb6ee4..a1074e1e3 100644 --- a/cmd/oras/internal/display/status/progress/status.go +++ b/cmd/oras/internal/display/status/progress/status.go @@ -56,40 +56,6 @@ type status struct { lock sync.Mutex } -// newStatus generates a base empty status. -func newStatus() *status { - return &status{ - offset: -1, - total: humanize.ToBytes(0), - speedWindow: newSpeedWindow(framePerSecond), - } -} - -// NewStatusMessage generates a status for messaging. -func NewStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64) *status { - return &status{ - prompt: prompt, - descriptor: descriptor, - offset: offset, - } -} - -// StartTiming starts timing. -func StartTiming() *status { - return &status{ - offset: -1, - startTime: time.Now(), - } -} - -// EndTiming ends timing and set status to done. -func EndTiming() *status { - return &status{ - offset: -1, - endTime: time.Now(), - } -} - func (s *status) isZero() bool { return s.offset < 0 && s.startTime.IsZero() && s.endTime.IsZero() } @@ -121,7 +87,7 @@ func (s *status) String(width int) (string, string) { percent = 1 default: // 0% ~ 99%, show 2-digit precision if total != 0 && s.offset >= 0 { - // percentage calculatable + // calculate percentage percent = float64(s.offset) / float64(total) } offset = fmt.Sprintf("%.2f", humanize.RoundTo(s.total.Size*percent)) @@ -190,8 +156,7 @@ func (s *status) durationString() string { return d.String() } -// Update updates a status. -func (s *status) Update(n *status) { +func (s *status) update(n *status) { s.lock.Lock() defer s.lock.Unlock() diff --git a/cmd/oras/internal/display/status/progress/status_test.go b/cmd/oras/internal/display/status/progress/status_test.go index e0b3bb912..9f8223f6e 100644 --- a/cmd/oras/internal/display/status/progress/status_test.go +++ b/cmd/oras/internal/display/status/progress/status_test.go @@ -35,7 +35,7 @@ func Test_status_String(t *testing.T) { } // not done - s.Update(&status{ + s.update(&status{ prompt: "test", descriptor: ocispec.Descriptor{ MediaType: "application/vnd.oci.empty.oras.test.v1+json", @@ -57,7 +57,7 @@ func Test_status_String(t *testing.T) { t.Error(err) } // done - s.Update(&status{ + s.update(&status{ endTime: time.Now(), offset: s.descriptor.Size, descriptor: s.descriptor, @@ -76,7 +76,7 @@ func Test_status_String_zeroWidth(t *testing.T) { } // not done - s.Update(&status{ + s.update(&status{ prompt: "test", descriptor: ocispec.Descriptor{ MediaType: "application/vnd.oci.empty.oras.test.v1+json", @@ -93,7 +93,7 @@ func Test_status_String_zeroWidth(t *testing.T) { t.Error(err) } // done - s.Update(&status{ + s.update(&status{ endTime: time.Now(), offset: s.descriptor.Size, descriptor: s.descriptor, diff --git a/cmd/oras/internal/display/status/track/reader.go b/cmd/oras/internal/display/status/track/reader.go index 28f647d36..93919381f 100644 --- a/cmd/oras/internal/display/status/track/reader.go +++ b/cmd/oras/internal/display/status/track/reader.go @@ -30,7 +30,7 @@ type reader struct { donePrompt string descriptor ocispec.Descriptor manager progress.Manager - status progress.Status + messenger *progress.Messenger } // NewReader returns a new reader with tracked progress. @@ -43,7 +43,7 @@ func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, } func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress.Manager, actionPrompt string, donePrompt string) (*reader, error) { - ch, err := manager.Add() + messenger, err := manager.Add() if err != nil { return nil, err } @@ -54,11 +54,11 @@ func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress. actionPrompt: actionPrompt, donePrompt: donePrompt, manager: manager, - status: ch, + messenger: messenger, }, nil } -// StopManager stops the status channel and related manager. +// StopManager stops the messenger channel and related manager. func (r *reader) StopManager() { r.Close() _ = r.manager.Close() @@ -66,18 +66,18 @@ func (r *reader) StopManager() { // Done sends message to mark the tracked progress as complete. func (r *reader) Done() { - r.status <- progress.NewStatusMessage(r.donePrompt, r.descriptor, r.descriptor.Size) - r.status <- progress.EndTiming() + r.messenger.Send(r.donePrompt, r.descriptor, r.descriptor.Size) + r.messenger.Stop() } // Close closes the update channel. func (r *reader) Close() { - close(r.status) + r.messenger.Stop() } -// Start sends the start timing to the status channel. +// Start sends the start timing to the messenger channel. func (r *reader) Start() { - r.status <- progress.StartTiming() + r.messenger.Start() } // Read reads from the underlying reader and updates the progress. @@ -93,12 +93,6 @@ func (r *reader) Read(p []byte) (int, error) { return n, io.ErrUnexpectedEOF } } - for { - select { - case r.status <- progress.NewStatusMessage(r.actionPrompt, r.descriptor, r.offset): - // purge the channel until successfully pushed - return n, err - case <-r.status: - } - } + r.messenger.Send(r.actionPrompt, r.descriptor, r.offset) + return n, err }