Skip to content

Commit

Permalink
GPU accelerated encoder (#895)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored and hopeyen committed Dec 5, 2024
1 parent cd908e1 commit b8c46a1
Show file tree
Hide file tree
Showing 80 changed files with 1,940 additions and 803 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ lightnode/docker/args.sh
.idea
.env
.vscode

icicle/*
5 changes: 3 additions & 2 deletions api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) {
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
LoadG2Points: true,
}

p, err := prover.NewProver(config, true)
p, err := prover.NewProver(config, nil)
if err != nil {
return nil, nil, err
}

v, err := verifier.NewVerifier(config, true)
v, err := verifier.NewVerifier(config, nil)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions core/test/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) {
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
LoadG2Points: true,
}

p, err := prover.NewProver(config, true)
p, err := prover.NewProver(config, nil)
if err != nil {
return nil, nil, err
}

v, err := verifier.NewVerifier(config, true)
v, err := verifier.NewVerifier(config, nil)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) {
SRSOrder: 8192,
SRSNumberToLoad: 8192,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
LoadG2Points: true,
}

p, err := prover.NewProver(config, true)
p, err := prover.NewProver(config, nil)
if err != nil {
return nil, nil, err
}

v, err := verifier.NewVerifier(config, true)
v, err := verifier.NewVerifier(config, nil)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,10 @@ func setup() {
SRSOrder: 8192,
SRSNumberToLoad: 8192,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
LoadG2Points: true,
}
prover, err = p.NewProver(config, true)

prover, err = p.NewProver(config, nil)
if err != nil {
teardown()
panic(fmt.Sprintf("failed to initialize KZG prover: %s", err.Error()))
Expand Down
3 changes: 2 additions & 1 deletion disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ func makeTestProver() (encoding.Prover, error) {
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
LoadG2Points: true,
}

return prover.NewProver(config, true)
return prover.NewProver(config, nil)
}

func makeTestBlob(securityParams []*core.SecurityParam) core.Blob {
Expand Down
3 changes: 2 additions & 1 deletion disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func RunDisperserServer(ctx *cli.Context) error {
bucketName := config.BlobstoreConfig.BucketName
logger.Info("Blob store", "bucket", bucketName)
if config.DisperserVersion == V2 {
prover, err := prover.NewProver(&config.EncodingConfig, true)
config.EncodingConfig.LoadG2Points = true
prover, err := prover.NewProver(&config.EncodingConfig, nil)
if err != nil {
return fmt.Errorf("failed to create encoder: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions disperser/cmd/encoder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Config struct {
EncoderConfig kzg.KzgConfig
LoggerConfig common.LoggerConfig
ServerConfig *encoder.ServerConfig
MetricsConfig encoder.MetrisConfig
MetricsConfig *encoder.MetricsConfig
}

func NewConfig(ctx *cli.Context) (Config, error) {
Expand Down Expand Up @@ -58,10 +58,12 @@ func NewConfig(ctx *cli.Context) (Config, error) {
RequestPoolSize: ctx.GlobalInt(flags.RequestPoolSizeFlag.Name),
EnableGnarkChunkEncoding: ctx.Bool(flags.EnableGnarkChunkEncodingFlag.Name),
PreventReencoding: ctx.Bool(flags.PreventReencodingFlag.Name),
Backend: ctx.String(flags.BackendFlag.Name),
GPUEnable: ctx.Bool(flags.GPUEnableFlag.Name),
PprofHttpPort: ctx.GlobalString(flags.PprofHttpPort.Name),
EnablePprof: ctx.GlobalBool(flags.EnablePprof.Name),
},
MetricsConfig: encoder.MetrisConfig{
MetricsConfig: &encoder.MetricsConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
Expand Down
16 changes: 16 additions & 0 deletions disperser/cmd/encoder/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flags
import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -67,6 +68,19 @@ var (
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_GNARK_CHUNK_ENCODING"),
}
GPUEnableFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "gpu-enable"),
Usage: "Enable GPU, falls back to CPU if not available",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GPU_ENABLE"),
}
BackendFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "backend"),
Usage: "Backend to use for encoding",
Required: false,
Value: string(encoding.GnarkBackend),
EnvVar: common.PrefixEnvVar(envVarPrefix, "BACKEND"),
}
PreventReencodingFlag = cli.BoolTFlag{
Name: common.PrefixFlag(FlagPrefix, "prevent-reencoding"),
Usage: "if true, will prevent reencoding of chunks by checking if the chunk already exists in the chunk store",
Expand Down Expand Up @@ -100,6 +114,8 @@ var optionalFlags = []cli.Flag{
EnableGnarkChunkEncodingFlag,
EncoderVersionFlag,
S3BucketNameFlag,
GPUEnableFlag,
BackendFlag,
PreventReencodingFlag,
PprofHttpPort,
EnablePprof,
Expand Down
62 changes: 62 additions & 0 deletions disperser/cmd/encoder/icicle.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
FROM nvidia/cuda:12.2.2-devel-ubuntu22.04 AS builder

# Install Go
ENV GOLANG_VERSION=1.21.1
ENV GOLANG_SHA256=b3075ae1ce5dab85f89bc7905d1632de23ca196bd8336afd93fa97434cfa55ae

ADD https://go.dev/dl/go${GOLANG_VERSION}.linux-amd64.tar.gz /tmp/go.tar.gz
RUN echo "${GOLANG_SHA256} /tmp/go.tar.gz" | sha256sum -c - && \
tar -C /usr/local -xzf /tmp/go.tar.gz && \
rm /tmp/go.tar.gz
ENV PATH="/usr/local/go/bin:${PATH}"

# Set up the working directory
WORKDIR /app

# Copy go.mod and go.sum first to leverage Docker cache
COPY go.mod go.sum ./

# Download dependencies
RUN go mod download

# Copy the source code
COPY ./disperser /app/disperser
COPY common /app/common
COPY contracts /app/contracts
COPY core /app/core
COPY api /app/api
COPY indexer /app/indexer
COPY encoding /app/encoding
COPY relay /app/relay

# Define Icicle versions and checksums
ENV ICICLE_VERSION=3.1.0
ENV ICICLE_BASE_SHA256=2e4e33b8bc3e335b2dd33dcfb10a9aaa18717885509614a24f492f47a2e4f4b1
ENV ICICLE_CUDA_SHA256=cdba907eac6297445a6c128081ebba5c711d352003f69310145406a8fd781647

# Download Icicle tarballs
ADD https://github.com/ingonyama-zk/icicle/releases/download/v${ICICLE_VERSION}/icicle_${ICICLE_VERSION//./_}-ubuntu22.tar.gz /tmp/icicle.tar.gz
ADD https://github.com/ingonyama-zk/icicle/releases/download/v${ICICLE_VERSION}/icicle_${ICICLE_VERSION//./_}-ubuntu22-cuda122.tar.gz /tmp/icicle-cuda.tar.gz

# Verify checksums and install Icicle
RUN echo "${ICICLE_BASE_SHA256} /tmp/icicle.tar.gz" | sha256sum -c - && \
echo "${ICICLE_CUDA_SHA256} /tmp/icicle-cuda.tar.gz" | sha256sum -c - && \
tar xzf /tmp/icicle.tar.gz && \
cp -r ./icicle/lib/* /usr/lib/ && \
cp -r ./icicle/include/icicle/ /usr/local/include/ && \
tar xzf /tmp/icicle-cuda.tar.gz -C /opt && \
rm /tmp/icicle.tar.gz /tmp/icicle-cuda.tar.gz

# Build the server with icicle backend
WORKDIR /app/disperser
RUN go build -tags=icicle -o ./bin/server ./cmd/encoder

# Start a new stage for the base image
FROM nvidia/cuda:12.2.2-base-ubuntu22.04

COPY --from=builder /app/disperser/bin/server /usr/local/bin/server
COPY --from=builder /usr/lib/libicicle* /usr/lib/
COPY --from=builder /usr/local/include/icicle /usr/local/include/icicle
COPY --from=builder /opt/icicle /opt/icicle

ENTRYPOINT ["server"]
31 changes: 25 additions & 6 deletions disperser/cmd/encoder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/disperser/cmd/encoder/flags"
blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/Layr-Labs/eigenda/disperser/encoder"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/kzg/prover"
"github.com/prometheus/client_golang/prometheus"
"github.com/Layr-Labs/eigenda/relay/chunkstore"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -69,9 +70,23 @@ func RunEncoderServer(ctx *cli.Context) error {
reg.MustRegister(grpcMetrics)
}

backendType, err := encoding.ParseBackendType(config.ServerConfig.Backend)
if err != nil {
return err
}

// Set the encoding config
encodingConfig := &encoding.Config{
BackendType: backendType,
GPUEnable: config.ServerConfig.GPUEnable,
NumWorker: config.EncoderConfig.NumWorker,
}

if config.EncoderVersion == V2 {
// We no longer compute the commitments in the encoder, so we don't need to load the G2 points
prover, err := prover.NewProver(&config.EncoderConfig, false)
// We no longer load the G2 points in V2 because the KZG commitments are computed
// on the API server side.
config.EncoderConfig.LoadG2Points = false
prover, err := prover.NewProver(&config.EncoderConfig, encodingConfig)
if err != nil {
return fmt.Errorf("failed to create encoder: %w", err)
}
Expand All @@ -82,6 +97,10 @@ func RunEncoderServer(ctx *cli.Context) error {
}

blobStoreBucketName := config.BlobStoreConfig.BucketName
if blobStoreBucketName == "" {
return fmt.Errorf("blob store bucket name is required")
}

blobStore := blobstorev2.NewBlobStore(blobStoreBucketName, s3Client, logger)
logger.Info("Blob store", "bucket", blobStoreBucketName)

Expand All @@ -101,13 +120,13 @@ func RunEncoderServer(ctx *cli.Context) error {
return server.Start()
}

prover, err := prover.NewProver(&config.EncoderConfig, true)
config.EncoderConfig.LoadG2Points = true
prover, err := prover.NewProver(&config.EncoderConfig, encodingConfig)
if err != nil {
return fmt.Errorf("failed to create encoder: %w", err)
}

server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics, grpcMetrics)

return server.Start()

}
2 changes: 2 additions & 0 deletions disperser/encoder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ type ServerConfig struct {
RequestPoolSize int
EnableGnarkChunkEncoding bool
PreventReencoding bool
Backend string
GPUEnable bool
PprofHttpPort string
EnablePprof bool
}
2 changes: 1 addition & 1 deletion disperser/encoder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type MetrisConfig struct {
type MetricsConfig struct {
HTTPPort string
EnableMetrics bool
}
Expand Down
15 changes: 7 additions & 8 deletions disperser/encoder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,6 @@ func (s *EncoderServer) Start() error {
return gs.Serve(listener)
}

func (s *EncoderServer) Close() {
if s.close == nil {
return
}
s.close()
}

func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) {
startTime := time.Now()
blobSize := len(req.GetData())
Expand Down Expand Up @@ -193,7 +186,6 @@ func (s *EncoderServer) handleEncoding(ctx context.Context, req *pb.EncodeBlobRe
}

var chunksData [][]byte

var format pb.ChunkEncodingFormat
if s.config.EnableGnarkChunkEncoding {
format = pb.ChunkEncodingFormat_GNARK
Expand Down Expand Up @@ -228,3 +220,10 @@ func (s *EncoderServer) handleEncoding(ctx context.Context, req *pb.EncodeBlobRe
ChunkEncodingFormat: format,
}, nil
}

func (s *EncoderServer) Close() {
if s.close == nil {
return
}
s.close()
}
3 changes: 2 additions & 1 deletion disperser/encoder/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func makeTestProver(numPoint uint64) (encoding.Prover, ServerConfig) {
SRSOrder: 3000,
SRSNumberToLoad: numPoint,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
LoadG2Points: true,
}

p, _ := prover.NewProver(kzgConfig, true)
p, _ := prover.NewProver(kzgConfig, nil)
encoderServerConfig := ServerConfig{
GrpcPort: "3000",
MaxConcurrentRequests: 16,
Expand Down
7 changes: 7 additions & 0 deletions disperser/encoder/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,10 @@ func extractProofsAndCoeffs(frames []*encoding.Frame) ([]*encoding.Proof, []*rs.
}
return proofs, coeffs
}

func (s *EncoderServerV2) Close() {
if s.close == nil {
return
}
s.close()
}
12 changes: 9 additions & 3 deletions disperser/encoder/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ func makeTestProver(numPoint uint64) (encoding.Prover, error) {
SRSOrder: 300000,
SRSNumberToLoad: numPoint,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
LoadG2Points: false,
}

p, err := prover.NewProver(kzgConfig, false)
p, err := prover.NewProver(kzgConfig, nil)

return p, err
}

func TestEncodeBlob(t *testing.T) {
const (
testDataSize = 16 * 1024
timeoutSeconds = 30
timeoutSeconds = 60
randSeed = uint64(42)
)

Expand Down Expand Up @@ -176,6 +176,12 @@ func TestEncodeBlob(t *testing.T) {
// Create and execute encoding request again
resp, err := server.EncodeBlob(ctx, req)
assert.NoError(t, err)

if !assert.NotNil(t, resp, "Response should not be nil") {
t.FailNow() // Stop the test here to prevent nil pointer panic
return
}

assert.Equal(t, uint32(294916), resp.FragmentInfo.TotalChunkSizeBytes, "Unexpected total chunk size")
assert.Equal(t, uint32(512*1024), resp.FragmentInfo.FragmentSizeBytes, "Unexpected fragment size")
assert.Equal(t, c.s3Client.Called["UploadObject"], expectedUploadCalls)
Expand Down
Loading

0 comments on commit b8c46a1

Please sign in to comment.