-
Notifications
You must be signed in to change notification settings - Fork 931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!(core): enable tls for grpc connection #3922
Changes from all commits
d187ddd
e0f0fd3
5cb7bbe
1117237
414fa5c
3829eeb
7bf3546
82c92e8
06bab7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package core | ||
|
||
import ( | ||
"crypto/tls" | ||
"encoding/json" | ||
"errors" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/celestiaorg/celestia-node/libs/utils" | ||
) | ||
|
||
const ( | ||
cert = "cert.pem" | ||
key = "key.pem" | ||
xtoken = "xtoken.json" | ||
) | ||
|
||
func EmptyTLSConfig() *tls.Config { | ||
return &tls.Config{MinVersion: tls.VersionTLS12} | ||
} | ||
|
||
// TLS creates a TLS configuration using the certificate and key files from the specified path. | ||
// It constructs the full paths to the certificate and key files by joining the provided directory path | ||
// with their respective file names. | ||
// If either file is missing, it returns an os.ErrNotExist error. | ||
// If the files exist, it loads the X.509 key pair from the specified files and sets up a tls.Config. | ||
// Parameters: | ||
// * tlsPath: The directory path where the TLS certificate ("cert.pem") and key ("key.pem") files are located. | ||
// Returns: | ||
// * A tls.Config structure configured with the provided certificate and key. | ||
// * An error if the certificate or key file does not exist, or if loading the key pair fails. | ||
func TLS(tlsPath string) (*tls.Config, error) { | ||
certPath := filepath.Join(tlsPath, cert) | ||
keyPath := filepath.Join(tlsPath, key) | ||
exist := utils.Exists(certPath) && utils.Exists(keyPath) | ||
if !exist { | ||
return nil, os.ErrNotExist | ||
} | ||
|
||
cfg := EmptyTLSConfig() | ||
cert, err := tls.LoadX509KeyPair(certPath, keyPath) | ||
if err != nil { | ||
return nil, err | ||
} | ||
cfg.Certificates = append(cfg.Certificates, cert) | ||
return cfg, nil | ||
} | ||
|
||
type AuthToken struct { | ||
Token string `json:"x-token"` | ||
} | ||
|
||
// XToken retrieves the authentication token from a JSON file at the specified path. | ||
// It first constructs the full file path by joining the provided directory path with the token file name. | ||
// If the file does not exist, it returns an os.ErrNotExist error. | ||
// If the file exists, it reads the content and unmarshalls it. | ||
// If the field in the unmarshalled struct is empty, an error is returned indicating that the token is missing. | ||
// Parameters: | ||
// * xtokenPath: The directory path where the JSON file containing the X-Token is located. | ||
// Returns: | ||
// * The X-Token as a string if successfully retrieved. | ||
// * An error if the file does not exist, reading fails, unmarshalling fails, or the token is empty. | ||
func XToken(xtokenPath string) (string, error) { | ||
xtokenPath = filepath.Join(xtokenPath, xtoken) | ||
exist := utils.Exists(xtokenPath) | ||
if !exist { | ||
return "", os.ErrNotExist | ||
} | ||
|
||
token, err := os.ReadFile(xtokenPath) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
var auth AuthToken | ||
err = json.Unmarshal(token, &auth) | ||
if err != nil { | ||
return "", err | ||
} | ||
if auth.Token == "" { | ||
return "", errors.New("x-token is empty. Please setup a token or cleanup xtokenPath") | ||
} | ||
return auth.Token, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,9 @@ | ||
package state | ||
|
||
import ( | ||
"errors" | ||
"os" | ||
|
||
"github.com/cosmos/cosmos-sdk/crypto/keyring" | ||
|
||
libfraud "github.com/celestiaorg/go-fraud" | ||
|
@@ -30,14 +33,31 @@ func coreAccessor( | |
*modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader], | ||
error, | ||
) { | ||
ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, corecfg.IP, corecfg.GRPCPort, | ||
network.String(), opts...) | ||
if corecfg.TLSEnabled { | ||
tlsCfg, err := core.TLS(corecfg.TLSPath) | ||
switch { | ||
case err == nil: | ||
case errors.Is(err, os.ErrNotExist): | ||
// set an empty config if path is empty under `TLSEnabled=true` | ||
tlsCfg = core.EmptyTLSConfig() | ||
default: | ||
return nil, nil, nil, err | ||
} | ||
|
||
xtoken, err := core.XToken(corecfg.XTokenPath) | ||
if err != nil && !errors.Is(err, os.ErrNotExist) { | ||
return nil, nil, nil, err | ||
} | ||
opts = append(opts, state.WithTLSConfig(tlsCfg), state.WithXToken(xtoken)) | ||
Comment on lines
+47
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If token is not present, perhaps we should not invoke state.WithXToken options There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this case xToken is |
||
} | ||
|
||
ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, | ||
corecfg.IP, corecfg.GRPCPort, network.String(), opts...) | ||
|
||
sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{ | ||
Service: ca, | ||
FraudType: byzantine.BadEncoding, | ||
FraudServ: fraudServ, | ||
} | ||
|
||
return ca, ca, sBreaker, err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package state | |
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
|
@@ -20,7 +21,9 @@ import ( | |
"github.com/tendermint/tendermint/proto/tendermint/crypto" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/credentials/insecure" | ||
"google.golang.org/grpc/metadata" | ||
|
||
"github.com/celestiaorg/celestia-app/v3/app" | ||
"github.com/celestiaorg/celestia-app/v3/app/encoding" | ||
|
@@ -46,6 +49,18 @@ var ( | |
// to configure parameters. | ||
type Option func(ca *CoreAccessor) | ||
|
||
func WithTLSConfig(cfg *tls.Config) Option { | ||
return func(ca *CoreAccessor) { | ||
ca.tls = cfg | ||
} | ||
} | ||
|
||
func WithXToken(xtoken string) Option { | ||
return func(ca *CoreAccessor) { | ||
ca.xtoken = xtoken | ||
} | ||
} | ||
|
||
// CoreAccessor implements service over a gRPC connection | ||
// with a celestia-core node. | ||
type CoreAccessor struct { | ||
|
@@ -72,6 +87,9 @@ type CoreAccessor struct { | |
grpcPort string | ||
network string | ||
|
||
tls *tls.Config | ||
xtoken string | ||
|
||
// these fields are mutatable and thus need to be protected by a mutex | ||
lock sync.Mutex | ||
lastPayForBlob int64 | ||
|
@@ -90,9 +108,7 @@ func NewCoreAccessor( | |
keyring keyring.Keyring, | ||
keyname string, | ||
getter libhead.Head[*header.ExtendedHeader], | ||
coreIP, | ||
grpcPort string, | ||
network string, | ||
coreIP, grpcPort, network string, | ||
options ...Option, | ||
) (*CoreAccessor, error) { | ||
// create verifier | ||
|
@@ -122,24 +138,11 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { | |
} | ||
ca.ctx, ca.cancel = context.WithCancel(context.Background()) | ||
|
||
// dial given celestia-core endpoint | ||
endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort) | ||
client, err := grpc.NewClient( | ||
endpoint, | ||
grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
) | ||
err := ca.startGRPCClient(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
// this ensures we can't start the node without core connection | ||
client.Connect() | ||
if !client.WaitForStateChange(ctx, connectivity.Ready) { | ||
// hits the case when context is canceled | ||
return fmt.Errorf("couldn't connect to core endpoint(%s): %w", endpoint, ctx.Err()) | ||
return fmt.Errorf("failed to start grpc client: %w", err) | ||
} | ||
|
||
ca.coreConn = client | ||
|
||
// create the staking query client | ||
ca.stakingCli = stakingtypes.NewQueryClient(ca.coreConn) | ||
ca.feeGrantCli = feegrant.NewQueryClient(ca.coreConn) | ||
|
@@ -601,6 +604,50 @@ func (ca *CoreAccessor) setupTxClient(ctx context.Context, keyName string) (*use | |
) | ||
} | ||
|
||
func (ca *CoreAccessor) startGRPCClient(ctx context.Context) error { | ||
// dial given celestia-core endpoint | ||
endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort) | ||
// By default, the gRPC client is configured to handle an insecure connection. | ||
// If the TLS configuration is not empty, it will be applied to the client's options. | ||
// If the TLS configuration is empty but the X-Token is provided, | ||
// the X-Token will be applied as an interceptor along with an empty TLS configuration. | ||
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} | ||
if ca.tls != nil { | ||
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(ca.tls))) | ||
} | ||
if ca.xtoken != "" { | ||
authInterceptor := func(ctx context.Context, | ||
method string, | ||
req, reply interface{}, | ||
cc *grpc.ClientConn, | ||
invoker grpc.UnaryInvoker, | ||
opts ...grpc.CallOption, | ||
) error { | ||
ctx = metadata.AppendToOutgoingContext(ctx, "x-token", ca.xtoken) | ||
return invoker(ctx, method, req, reply, cc, opts...) | ||
} | ||
Comment on lines
+619
to
+628
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, but would be easier to read if interceptor is extracted There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I was planning to move it out. Thanks 🙏 |
||
opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor)) | ||
} | ||
|
||
client, err := grpc.NewClient( | ||
endpoint, | ||
opts..., | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
// this ensures we can't start the node without core connection | ||
client.Connect() | ||
if !client.WaitForStateChange(ctx, connectivity.Ready) { | ||
// hits the case when context is canceled | ||
return fmt.Errorf("couldn't connect to core endpoint(%s): %w", endpoint, ctx.Err()) | ||
} | ||
ca.coreConn = client | ||
|
||
log.Infof("Connection with core endpoint(%s) established", endpoint) | ||
return nil | ||
} | ||
|
||
func (ca *CoreAccessor) submitMsg( | ||
ctx context.Context, | ||
msg sdktypes.Msg, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should just return empty tls config? Seems os.ErrNotExist is not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I added this behavior initially but then realized that we may have more usecases in the future so the caller will decide what to do with this error.