Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace code generation with generics #61

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
jobs:
build:
docker:
- image: golang:1.11
- image: golang:1.18
working_directory: /projects/dataloaden
steps: &steps
- checkout
Expand Down
225 changes: 207 additions & 18 deletions dataloaden.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,217 @@
package main
package dataloaden

import (
"fmt"
"os"

"github.com/vektah/dataloaden/pkg/generator"
"sync"
"time"
)

func main() {
if len(os.Args) != 4 {
fmt.Println("usage: name keyType valueType")
fmt.Println(" example:")
fmt.Println(" dataloaden 'UserLoader int []*github.com/my/package.User'")
os.Exit(1)
// LoaderConfig captures the config to create a new Loader
type LoaderConfig[K comparable, T any] struct {
// Fetch is a method that provides the data for the loader
Fetch func(keys []K) ([]T, []error)

// Wait is how long wait before sending a batch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a typo here? i.e.

Suggested change
// Wait is how long wait before sending a batch
// Wait is how long to wait before sending a batch

Wait time.Duration

// MaxBatch will limit the maximum number of keys to send in one batch, 0 = not limit
MaxBatch int
}

// NewLoader creates a new Loader given a fetch, wait, and maxBatch
func NewLoader[K comparable, T any](config LoaderConfig[K, T]) *Loader[K, T] {
return &Loader[K, T]{
fetch: config.Fetch,
wait: config.Wait,
maxBatch: config.MaxBatch,
}
}

// Loader batches and caches requests
type Loader[K comparable, T any] struct {
// this method provides the data for the loader
fetch func(keys []K) ([]T, []error)

// how long to done before sending a batch
wait time.Duration

// this will limit the maximum number of keys to send in one batch, 0 = no limit
maxBatch int

// INTERNAL

// lazily created cache
cache map[K]T

// the current batch. keys will continue to be collected until timeout is hit,
// then everything will be sent to the fetch method and out to the listeners
batch *loaderBatch[K, T]

// mutex to prevent races
mu sync.Mutex
}

type loaderBatch[K comparable, T any] struct {
keys []K
data []T
error []error
closing bool
done chan struct{}
}

// Load a User by key, batching and caching will be applied automatically
func (l *Loader[K, T]) Load(key K) (T, error) {
return l.LoadThunk(key)()
}

// LoadThunk returns a function that when called will block waiting for a User.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// LoadThunk returns a function that when called will block waiting for a User.
// LoadThunk returns a function that when called will block waiting for a T.

// This method should be used if you want one goroutine to make requests to many
// different data loaders without blocking until the thunk is called.
func (l *Loader[K, T]) LoadThunk(key K) func() (T, error) {
l.mu.Lock()
if it, ok := l.cache[key]; ok {
l.mu.Unlock()
return func() (T, error) {
return it, nil
}
}
if l.batch == nil {
l.batch = &loaderBatch[K, T]{done: make(chan struct{})}
}
batch := l.batch
pos := batch.keyIndex(l, key)
l.mu.Unlock()

return func() (T, error) {
<-batch.done

var data T
if pos < len(batch.data) {
data = batch.data[pos]
}

var err error
// its convenient to be able to return a single error for everything
if len(batch.error) == 1 {
err = batch.error[0]
} else if batch.error != nil {
err = batch.error[pos]
}

if err == nil {
l.mu.Lock()
l.unsafeSet(key, data)
l.mu.Unlock()
}

return data, err
}
}

// LoadAll fetches many keys at once. It will be broken into appropriate sized
// sub batches depending on how the loader is configured
func (l *Loader[K, T]) LoadAll(keys []K) ([]T, []error) {
results := make([]func() (T, error), len(keys))

wd, err := os.Getwd()
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(2)
for i, key := range keys {
results[i] = l.LoadThunk(key)
}

if err := generator.Generate(os.Args[1], os.Args[2], os.Args[3], wd); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(2)
users := make([]T, len(keys))
errors := make([]error, len(keys))
for i, thunk := range results {
users[i], errors[i] = thunk()
}
return users, errors
}

// LoadAllThunk returns a function that when called will block waiting for a Users.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// LoadAllThunk returns a function that when called will block waiting for a Users.
// LoadAllThunk returns a function that when called will block waiting for Ts.

// This method should be used if you want one goroutine to make requests to many
// different data loaders without blocking until the thunk is called.
func (l *Loader[K, T]) LoadAllThunk(keys []K) func() ([]T, []error) {
results := make([]func() (T, error), len(keys))
for i, key := range keys {
results[i] = l.LoadThunk(key)
}
return func() ([]T, []error) {
users := make([]T, len(keys))
errors := make([]error, len(keys))
for i, thunk := range results {
users[i], errors[i] = thunk()
}
return users, errors
}
}

// Prime the cache with the provided key and value. If the key already exists, no change is made
// and false is returned.
// (To forcefully prime the cache, clear the key first with loader.clear(key).prime(key, value).)
func (l *Loader[K, T]) Prime(key K, value T) bool {
l.mu.Lock()
var found bool
if _, found = l.cache[key]; !found {
l.unsafeSet(key, value)
}
l.mu.Unlock()
return !found
}

// Clear the value at key from the cache, if it exists
func (l *Loader[K, T]) Clear(key K) {
l.mu.Lock()
delete(l.cache, key)
l.mu.Unlock()
}

func (l *Loader[K, T]) unsafeSet(key K, value T) {
if l.cache == nil {
l.cache = map[K]T{}
}
l.cache[key] = value
}

// keyIndex will return the location of the key in the batch, if its not found
// it will add the key to the batch
func (b *loaderBatch[K, T]) keyIndex(l *Loader[K, T], key K) int {
for i, existingKey := range b.keys {
if key == existingKey {
return i
}
}

pos := len(b.keys)
b.keys = append(b.keys, key)
if pos == 0 {
go b.startTimer(l)
}

if l.maxBatch != 0 && pos >= l.maxBatch-1 {
if !b.closing {
b.closing = true
l.batch = nil
go b.end(l)
}
}

return pos
}

func (b *loaderBatch[K, T]) startTimer(l *Loader[K, T]) {
time.Sleep(l.wait)
l.mu.Lock()

// we must have hit a batch limit and are already finalizing this batch
if b.closing {
l.mu.Unlock()
return
}

l.batch = nil
l.mu.Unlock()

b.end(l)
}

func (b *loaderBatch[K, T]) end(l *Loader[K, T]) {
b.data, b.error = l.fetch(b.keys)
close(b.done)
}
68 changes: 0 additions & 68 deletions example/benchmark_test.go

This file was deleted.

3 changes: 0 additions & 3 deletions example/pkgname/user.go

This file was deleted.

Loading