Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
xujihui1985 committed Sep 27, 2018
1 parent 4e301ec commit f750049
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 0 deletions.
101 changes: 101 additions & 0 deletions concurrency/logger/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"fmt"
"io"
"os"
"os/signal"
"sync"
"syscall"
"time"
)

// logger provider support to throw log away if log wirtes start to timeout due to latency
type logger struct {
write chan string //
wg sync.WaitGroup
}

func New(w io.Writer, size int) *logger {
l := logger{
write: make(chan string, size),
}

// wg to track the write goroutine, we can add more G here to pick logger from write channel
l.wg.Add(1)

// this is the write goroutine that performs the actual writes
go func() {
// range over channel, once the channel is close and flushed
// the loop will terminated
for d := range l.write {
fmt.Fprintln(w, d)
}

// when write channel closed, mark the logger shutdown
l.wg.Done()
}()

return &l
}

func (l *logger) Shutdown() {

// close the write channel
close(l.write)

// wait for the write goroutine to terminate and all log was flushed
l.wg.Wait()
}

func (l *logger) Write(data string) {

select {
case l.write <- data:
// log has been send to write channel
default:
// log was dropped, or we can stash the log
fmt.Println("drop log")
}
}

type device struct {
off bool
}

func (d *device) Write(p []byte) (n int, err error) {
if d.off {
// simulate network error
time.Sleep(2 * time.Second)
}
fmt.Println(string(p))
return len(p), nil
}

func main() {
fmt.Printf("current pid is %d\n", os.Getpid())
const grs = 10
var d device

l := New(&d, grs)

for i := 0; i < grs; i++ {
go func(id int) {
for {
l.Write(fmt.Sprintf("%d: log data, pid: %d", id, os.Getpid()))
time.Sleep(10 * time.Millisecond)
}
}(i)
}

// we want to control the simulated disk blocking.
// capture USR1 signals to toggle device issues

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGUSR1)

for {
<-sigChan
d.off = !d.off
}
}
5 changes: 5 additions & 0 deletions concurrency/pool/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package main

func main() {

}
81 changes: 81 additions & 0 deletions concurrency/task/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"log"
"math/rand"
"sync"
"time"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

type Worker interface {
Work()
}

// Task provides a pool of goroutine that can execute any worker
type Task struct {
work chan Worker
wg sync.WaitGroup
}

func New(maxG int) *Task {
t := Task{
// Using an unbuffered channel because we want the
// guarantee of knowing the work being submitted is
// actually being worked on after the call to run returns
work: make(chan Worker),
}
t.wg.Add(maxG)
for i := 0; i < maxG; i++ {
go func() {
for w := range t.work {
w.Work()
}
t.wg.Done()
}()
}
return &t
}

func (t *Task) Do(w Worker) {
// we can use ctx to perform timeout or cancel
// we can also record the counter of waiting worker to do
t.work <- w
}

func (t *Task) Shutdown() {
close(t.work)
t.wg.Wait()
}

type logWriter struct {
count int
}

func (l *logWriter) Work() {
log.Println("print count", l.count)
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
}

func main() {

t := New(10)
var wg sync.WaitGroup

for i := 0; i < 10000; i++ {
wg.Add(1)
l := logWriter{
count: i,
}
t.Do(&l)
wg.Done()
}
wg.Wait()

// shutdown the task pool
t.Shutdown()

}

0 comments on commit f750049

Please sign in to comment.