Skip to content

Commit

Permalink
Chunk store (#848)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Nov 5, 2024
1 parent c63dd61 commit 5fd9a08
Show file tree
Hide file tree
Showing 8 changed files with 674 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
}
fragmentInfo := &encoding.FragmentInfo{
TotalChunkSizeBytes: 100,
NumFragments: 10,
FragmentSizeBytes: 1024 * 1024 * 4,
}
err := blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo)
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions disperser/controller/encoding_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestHandleBatch(t *testing.T) {
c := newTestComponents(t)
c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(&encoding.FragmentInfo{
TotalChunkSizeBytes: 100,
NumFragments: 5,
FragmentSizeBytes: 1024 * 1024 * 4,
}, nil)

err = c.EncodingManager.HandleBatch(ctx)
Expand All @@ -139,7 +139,7 @@ func TestHandleBatch(t *testing.T) {
assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey)
}
assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100))
assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5))
assert.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4))
}

func TestHandleBatchNoBlobs(t *testing.T) {
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestHandleBatchRetrySuccess(t *testing.T) {
c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()
c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(&encoding.FragmentInfo{
TotalChunkSizeBytes: 100,
NumFragments: 5,
FragmentSizeBytes: 1024 * 1024 * 4,
}, nil)

err = c.EncodingManager.HandleBatch(ctx)
Expand All @@ -198,7 +198,7 @@ func TestHandleBatchRetrySuccess(t *testing.T) {
assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey)
}
assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100))
assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5))
assert.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4))
c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2)
}

Expand Down
5 changes: 4 additions & 1 deletion encoding/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ type SubBatch struct {

type ChunkNumber = uint

// FragmentInfo contains metadata about how chunk coefficients file is stored.
type FragmentInfo struct {
// TotalChunkSizeBytes is the total size of the file containing all chunk coefficients for the blob.
TotalChunkSizeBytes uint32
NumFragments uint32
// FragmentSizeBytes is the maximum fragment size used to store the chunk coefficients.
FragmentSizeBytes uint32
}
111 changes: 110 additions & 1 deletion encoding/rs/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package rs

import (
"bytes"
"encoding/binary"
"encoding/gob"

"fmt"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254/fr"
)

Expand All @@ -13,6 +15,7 @@ type Frame struct {
Coeffs []fr.Element
}

// Encode serializes the frame into a byte slice.
func (f *Frame) Encode() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand All @@ -23,6 +26,7 @@ func (f *Frame) Encode() ([]byte, error) {
return buf.Bytes(), nil
}

// Decode deserializes a byte slice into a frame.
func Decode(b []byte) (Frame, error) {
var f Frame
buf := bytes.NewBuffer(b)
Expand All @@ -33,3 +37,108 @@ func Decode(b []byte) (Frame, error) {
}
return f, nil
}

// GnarkEncodeFrames serializes a slice of frames into a byte slice.
//
// Serialization format:
// [number of frames: 4 byte uint32]
// [size of frame 1: 4 byte uint32][frame 1]
// [size of frame 2: 4 byte uint32][frame 2]
// ...
// [size of frame n: 4 byte uint32][frame n]
//
// Where relevant, big endian encoding is used.
func GnarkEncodeFrames(frames []*Frame) ([]byte, error) {

// Count the number of bytes.
encodedSize := uint32(4) // stores the number of frames
for _, frame := range frames {
encodedSize += 4 // stores the size of the frame
encodedSize += GnarkFrameSize(frame) // size of the frame
}

serializedBytes := make([]byte, encodedSize)
binary.BigEndian.PutUint32(serializedBytes, uint32(len(frames)))
index := uint32(4)

for _, frame := range frames {
index += GnarkEncodeFrame(frame, serializedBytes[index:])
}

if index != encodedSize {
// Sanity check, this should never happen.
return nil, fmt.Errorf("encoded size mismatch: expected %d, got %d", encodedSize, index)
}

return serializedBytes, nil
}

// GnarkEncodeFrame serializes a frame into a target byte slice. Returns the number of bytes written.
func GnarkEncodeFrame(frame *Frame, target []byte) uint32 {
binary.BigEndian.PutUint32(target, uint32(len(frame.Coeffs)))
index := uint32(4)

for _, coeff := range frame.Coeffs {
serializedCoeff := coeff.Marshal()
copy(target[index:], serializedCoeff)
index += uint32(len(serializedCoeff))
}

return index
}

// GnarkFrameSize returns the size of a frame in bytes.
func GnarkFrameSize(frame *Frame) uint32 {
return uint32(encoding.BYTES_PER_SYMBOL * len(frame.Coeffs))
}

// GnarkDecodeFrames deserializes a byte slice into a slice of frames.
func GnarkDecodeFrames(serializedFrames []byte) ([]*Frame, error) {
frameCount := binary.BigEndian.Uint32(serializedFrames)
index := uint32(4)

frames := make([]*Frame, frameCount)

for i := 0; i < int(frameCount); i++ {
frame, bytesRead, err := GnarkDecodeFrame(serializedFrames[index:])

if err != nil {
return nil, fmt.Errorf("failed to decode frame %d: %w", i, err)
}

frames[i] = frame
index += bytesRead
}

if index != uint32(len(serializedFrames)) {
return nil, fmt.Errorf("decoded size mismatch: expected %d, got %d", len(serializedFrames), index)
}

return frames, nil
}

// GnarkDecodeFrame deserializes a byte slice into a frame. Returns the frame and the number of bytes read.
func GnarkDecodeFrame(serializedFrame []byte) (*Frame, uint32, error) {
if len(serializedFrame) < 4 {
return nil, 0, fmt.Errorf("invalid frame size: %d", len(serializedFrame))
}

frameCount := binary.BigEndian.Uint32(serializedFrame)
index := uint32(4)

if len(serializedFrame) < int(index+frameCount*encoding.BYTES_PER_SYMBOL) {
return nil, 0, fmt.Errorf("invalid frame size: %d", len(serializedFrame))
}

coeffs := make([]fr.Element, frameCount)
for i := 0; i < int(frameCount); i++ {
coeff := fr.Element{}
coeff.Unmarshal(serializedFrame[index : index+encoding.BYTES_PER_SYMBOL])
coeffs[i] = coeff
index += uint32(encoding.BYTES_PER_SYMBOL)
}

frame := &Frame{Coeffs: coeffs}

return frame, index, nil
}
79 changes: 79 additions & 0 deletions encoding/rs/frame_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rs_test

import (
"fmt"
"math"
"testing"

Expand Down Expand Up @@ -47,3 +48,81 @@ func TestEncodeDecodeFrame_AreInverses(t *testing.T) {

assert.Equal(t, frame, frames[0])
}

func TestGnarkEncodeDecodeFrame_AreInverses(t *testing.T) {
teardownSuite := setupSuite(t)
defer teardownSuite(t)

params := encoding.ParamsFromSysPar(numSys, numPar, uint64(len(GETTYSBURG_ADDRESS_BYTES)))
enc, _ := rs.NewEncoder(params, true)

n := uint8(math.Log2(float64(enc.NumEvaluations())))
if enc.ChunkLength == 1 {
n = uint8(math.Log2(float64(2 * enc.NumChunks)))
}
fs := fft.NewFFTSettings(n)

RsComputeDevice := &rs_cpu.RsCpuComputeDevice{
Fs: fs,
EncodingParams: params,
}

enc.Computer = RsComputeDevice
require.NotNil(t, enc)

frames, _, err := enc.EncodeBytes(GETTYSBURG_ADDRESS_BYTES)
require.Nil(t, err)
require.NotNil(t, frames, err)

serializedSize := rs.GnarkFrameSize(&frames[0]) + 4
bytes := make([]byte, serializedSize)
rs.GnarkEncodeFrame(&frames[0], bytes)

fmt.Printf("\n\n\n")

deserializedFrame, bytesRead, err := rs.GnarkDecodeFrame(bytes)
assert.NoError(t, err)
assert.Equal(t, bytesRead, serializedSize)
assert.Equal(t, &frames[0], deserializedFrame)
}

func TestGnarkEncodeDecodeFrames_AreInverses(t *testing.T) {
teardownSuite := setupSuite(t)
defer teardownSuite(t)

params := encoding.ParamsFromSysPar(numSys, numPar, uint64(len(GETTYSBURG_ADDRESS_BYTES)))
enc, _ := rs.NewEncoder(params, true)

n := uint8(math.Log2(float64(enc.NumEvaluations())))
if enc.ChunkLength == 1 {
n = uint8(math.Log2(float64(2 * enc.NumChunks)))
}
fs := fft.NewFFTSettings(n)

RsComputeDevice := &rs_cpu.RsCpuComputeDevice{
Fs: fs,
EncodingParams: params,
}

enc.Computer = RsComputeDevice
require.NotNil(t, enc)

frames, _, err := enc.EncodeBytes(GETTYSBURG_ADDRESS_BYTES)
assert.NoError(t, err)

framesPointers := make([]*rs.Frame, len(frames))
for i, frame := range frames {
framesPointers[i] = &frame
}

encodedFrames, err := rs.GnarkEncodeFrames(framesPointers)
assert.NoError(t, err)

decodedFrames, err := rs.GnarkDecodeFrames(encodedFrames)
assert.NoError(t, err)

assert.Equal(t, len(framesPointers), len(decodedFrames))
for i := range framesPointers {
assert.Equal(t, *framesPointers[i], *decodedFrames[i])
}
}
Loading

0 comments on commit 5fd9a08

Please sign in to comment.