Skip to content
This repository has been archived by the owner on Mar 9, 2021. It is now read-only.

Commit

Permalink
add kafka source command group
Browse files Browse the repository at this point in the history
  • Loading branch information
Daisy Guo committed May 19, 2020
1 parent 909a4c3 commit 12422c9
Show file tree
Hide file tree
Showing 72 changed files with 16,847 additions and 4 deletions.
2 changes: 2 additions & 0 deletions plugins/source-kafka/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kn-admin*
.DS_Store
22 changes: 20 additions & 2 deletions plugins/source-kafka/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,27 @@
package main

import (
"github.com/maximilien/kn-source-pkg/pkg/factories"
"fmt"
"os"

"github.com/maximilien/kn-source-pkg/pkg/core"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
"knative.dev/client-contrib/plugins/source-kafka/pkg/factories"
)

func main() {
factories.NewDefaultKnSourceFactory()
kafkaSourceFactory := factories.NewKafkaSourceFactory()

kafkaCommandFactory := factories.NewKafkaSourceCommandFactory(kafkaSourceFactory)
kafkaFlagsFactory := factories.NewKafkaSourceFlagsFactory(kafkaSourceFactory)
kafkaRunEFactory := factories.NewKafkaSourceRunEFactory(kafkaSourceFactory)

err := core.NewKnSourceCommand(kafkaSourceFactory, kafkaCommandFactory, kafkaFlagsFactory, kafkaRunEFactory).Execute()
if err != nil {
if err.Error() != "subcommand is required" {
fmt.Fprintln(os.Stderr, err)
}
os.Exit(1)
}
}
8 changes: 7 additions & 1 deletion plugins/source-kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ go 1.13

require (
github.com/maximilien/kn-source-pkg v0.4.0
github.com/spf13/cobra v1.0.0 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
gotest.tools v2.2.0+incompatible
k8s.io/apimachinery v0.17.0
k8s.io/client-go v0.17.0
knative.dev/client v0.13.1-0.20200406212659-8a60d2ebf8e2
knative.dev/eventing-contrib v0.14.0
knative.dev/pkg v0.0.0-20200404181734-92cdec5b3593
knative.dev/test-infra v0.0.0-20200413202711-9cf64fb1b912
)

Expand Down
4 changes: 4 additions & 0 deletions plugins/source-kafka/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -248,6 +249,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY=
github.com/maximilien/kn-source-pkg v0.4.0 h1:YmSHyu3dfQ3K736fn8J3dSCZRNkm2RJaC/DQ7QnIcsg=
github.com/maximilien/kn-source-pkg v0.4.0/go.mod h1:EDa7rFyEq3i2X02NaP1/pnrg+yVUlWPOS4CCZaZwCx0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down Expand Up @@ -319,9 +321,11 @@ github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfm
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rubiojr/go-vhd v0.0.0-20160810183302-0bfd3b39853c/go.mod h1:DM5xW0nvfNNm2uytzsvhI3OnX8uzaRAg8UX/CnDqbto=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
Expand Down
Binary file modified plugins/source-kafka/kn-source_kafka
Binary file not shown.
123 changes: 122 additions & 1 deletion plugins/source-kafka/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,130 @@
package client

import (
sourcetypes "github.com/maximilien/kn-source-pkg/pkg/types"
"knative.dev/client-contrib/plugins/source-kafka/pkg/types"
knerrors "knative.dev/client/pkg/errors"
v1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
clientv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned/typed/sources/v1alpha1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

type kafkaSourceClient struct {
client clientv1alpha1.SourcesV1alpha1Interface
kafkaSourceParams *types.KafkaSourceParams
client clientv1alpha1.SourcesV1alpha1Interface
namespace string
}

func NewKafkaSourceClient(kafkaParams *types.KafkaSourceParams, c clientv1alpha1.SourcesV1alpha1Interface, ns string) types.KafkaSourceClient {
if c == nil {
c, _ = kafkaParams.NewSourcesClient()
}
return &kafkaSourceClient{
kafkaSourceParams: kafkaParams,
client: c,
namespace: ns,
}
}

func (c *kafkaSourceClient) KnSourceParams() *sourcetypes.KnSourceParams {
return c.kafkaSourceParams.KnSourceParams
}

func (c *kafkaSourceClient) KafkaSourceParams() *types.KafkaSourceParams {
return c.kafkaSourceParams
}

//CreateKafkaSource is used to create an instance of KafkaSource
func (c *kafkaSourceClient) CreateKafkaSource(kafkaSource *v1alpha1.KafkaSource) error {
_, err := c.client.KafkaSources(c.namespace).Create(kafkaSource)
if err != nil {
return knerrors.GetError(err)
}

return nil
}

//DeleteKafkaSource is used to create an instance of KafkaSource
func (c *kafkaSourceClient) DeleteKafkaSource(name string) error {
err := c.client.KafkaSources(c.namespace).Delete(name, &metav1.DeleteOptions{})
if err != nil {
return knerrors.GetError(err)
}

return nil
}

//UpdateKafkaSource is used to create an instance of KafkaSource
func (c *kafkaSourceClient) UpdateKafkaSource(kafkaSource *v1alpha1.KafkaSource) error {
_, err := c.client.KafkaSources(c.namespace).Update(kafkaSource)
if err != nil {
return knerrors.GetError(err)
}

return nil
}

//GetKafkaSource is used to create an instance of KafkaSource
func (c *kafkaSourceClient) GetKafkaSource(name string) (*v1alpha1.KafkaSource, error) {
kafkaSource, err := c.client.KafkaSources(c.namespace).Get(name, metav1.GetOptions{})
if err != nil {
return nil, knerrors.GetError(err)
}

return kafkaSource, nil
}

// Return the client's namespace
func (c *kafkaSourceClient) Namespace() string {
return c.namespace
}

// KafkaSourceBuilder is for building the source
type KafkaSourceBuilder struct {
kafkaSource *v1alpha1.KafkaSource
}

// NewKafkaSourceBuilder for building ApiServer source object
func NewKafkaSourceBuilder(name string) *KafkaSourceBuilder {
return &KafkaSourceBuilder{kafkaSource: &v1alpha1.KafkaSource{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}}
}

// NewKafkaSourceBuilderFromExisting for building the object from existing KafkaSource object
func NewKafkaSourceBuilderFromExisting(kSource *v1alpha1.KafkaSource) *KafkaSourceBuilder {
return &KafkaSourceBuilder{kafkaSource: kSource.DeepCopy()}
}

// BootstrapServers to set the value of BootstrapServers
func (b *KafkaSourceBuilder) BootstrapServers(server string) *KafkaSourceBuilder {
b.kafkaSource.Spec.BootstrapServers = server
return b
}

// Topics to set the value of Topics
func (b *KafkaSourceBuilder) Topics(topics string) *KafkaSourceBuilder {
b.kafkaSource.Spec.Topics = topics
return b
}

// ConsumerGroup to set the value of ConsumerGroup
func (b *KafkaSourceBuilder) ConsumerGroup(consumerGroup string) *KafkaSourceBuilder {
b.kafkaSource.Spec.ConsumerGroup = consumerGroup
return b
}

// Sink or destination of the source
func (b *KafkaSourceBuilder) Sink(sink *duckv1.Destination) *KafkaSourceBuilder {
b.kafkaSource.Spec.Sink = sink
return b
}

// Build the KafkaSource object
func (b *KafkaSourceBuilder) Build() *v1alpha1.KafkaSource {
return b.kafkaSource
}
69 changes: 69 additions & 0 deletions plugins/source-kafka/pkg/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"testing"

"gotest.tools/assert"
client_testing "k8s.io/client-go/testing"
"knative.dev/client-contrib/plugins/source-kafka/pkg/types"
v1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
fake "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake"
)

func TestKafkaSourceClient(t *testing.T) {
fakeE := fake.FakeSourcesV1alpha1{Fake: &client_testing.Fake{}}
knSourceClient := NewKafkaSourceClient(&types.KafkaSourceParams{}, &fakeE, "fake-namespace")
assert.Assert(t, knSourceClient != nil)
}

func TestClient_KnSourceParams(t *testing.T) {
fakeE := fake.FakeSourcesV1alpha1{Fake: &client_testing.Fake{}}
fakeKafkaParams := &types.KafkaSourceParams{}
knSourceClient := NewKafkaSourceClient(fakeKafkaParams, &fakeE, "fake-namespace")
assert.Equal(t, knSourceClient.KnSourceParams(), fakeKafkaParams.KnSourceParams)
}

func TestNamespace(t *testing.T) {
fakeE := fake.FakeSourcesV1alpha1{Fake: &client_testing.Fake{}}
knSourceClient := NewKafkaSourceClient(&types.KafkaSourceParams{}, &fakeE, "fake-namespace")
assert.Equal(t, knSourceClient.Namespace(), "fake-namespace")
}
func TestCreateKafka(t *testing.T) {
fakeE := fake.FakeSourcesV1alpha1{Fake: &client_testing.Fake{}}
cli := NewKafkaSourceClient(&types.KafkaSourceParams{}, &fakeE, "fake-namespace")
objNew := newKafkaSource("samplekafka")
err := cli.CreateKafkaSource(objNew)
assert.NilError(t, err)
}

func TestDeleteKafka(t *testing.T) {
fakeE := fake.FakeSourcesV1alpha1{Fake: &client_testing.Fake{}}
cli := NewKafkaSourceClient(&types.KafkaSourceParams{}, &fakeE, "fake-namespace")
objNew := newKafkaSource("samplekafka")
err := cli.CreateKafkaSource(objNew)
assert.NilError(t, err)
err = cli.DeleteKafkaSource("samplekafka")
assert.NilError(t, err)
}

func newKafkaSource(name string) *v1alpha1.KafkaSource {
return NewKafkaSourceBuilder(name).
BootstrapServers("test.server.org").
Topics("topic").
ConsumerGroup("mygroup").
Build()
}
95 changes: 95 additions & 0 deletions plugins/source-kafka/pkg/factories/command_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright © 2018 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package factories

import (
"knative.dev/client-contrib/plugins/source-kafka/pkg/types"

sourcefactories "github.com/maximilien/kn-source-pkg/pkg/factories"
sourcetypes "github.com/maximilien/kn-source-pkg/pkg/types"

"github.com/spf13/cobra"
)

type kafkaSourceCommandFactory struct {
kafkaSourceFactory types.KafkaSourceFactory
defaultCommandFactory sourcetypes.CommandFactory
}

func NewKafkaSourceCommandFactory(kafkaFactory types.KafkaSourceFactory) types.KafkaSourceCommandFactory {
return &kafkaSourceCommandFactory{
kafkaSourceFactory: kafkaFactory,
defaultCommandFactory: sourcefactories.NewDefaultCommandFactory(kafkaFactory),
}
}

func (f *kafkaSourceCommandFactory) KnSourceFactory() sourcetypes.KnSourceFactory {
return f.kafkaSourceFactory
}

func (f *kafkaSourceCommandFactory) KafkaSourceFactory() types.KafkaSourceFactory {
return f.kafkaSourceFactory
}

func (f *kafkaSourceCommandFactory) KafkaSourceParams() *types.KafkaSourceParams {
return f.kafkaSourceFactory.KafkaSourceParams()
}

func (f *kafkaSourceCommandFactory) KnSourceParams() *sourcetypes.KnSourceParams {
return f.kafkaSourceFactory.KnSourceParams()
}

func (f *kafkaSourceCommandFactory) SourceCommand() *cobra.Command {
sourceCmd := f.defaultCommandFactory.SourceCommand()
sourceCmd.Use = "kafka"
sourceCmd.Short = "Knative eventing Kafka source plugin"
sourceCmd.Long = "Manage your Knative Kafka eventing sources"
return sourceCmd
}

func (f *kafkaSourceCommandFactory) CreateCommand() *cobra.Command {
createCmd := f.defaultCommandFactory.CreateCommand()
createCmd.Short = "create NAME"
createCmd.Example = `#Creates a new Kafka source named as 'mykafkasrc' which subscribes a Kafka server 'my-cluster-kafka-bootstrap.kafka.svc:9092' at topic 'test-topic' using the consumer group ID 'test-consumer-group' and sends the event messages to service 'event-display'
kn source_kafka create mykafkasrc --servers my-cluster-kafka-bootstrap.kafka.svc:9092 --topics test-topic --consumergroup test-consumer-group --sink svc:event-display`
return createCmd
}

func (f *kafkaSourceCommandFactory) DeleteCommand() *cobra.Command {
deleteCmd := f.defaultCommandFactory.DeleteCommand()
deleteCmd.Short = "delete NAME"
deleteCmd.Long = "delete a Kafka source"
deleteCmd.Example = `#Deletes a Kafka source with name 'mykafkasrc'
kn source_kafka delete mykafkasrc`
return deleteCmd
}

func (f *kafkaSourceCommandFactory) UpdateCommand() *cobra.Command {
updateCmd := f.defaultCommandFactory.UpdateCommand()
updateCmd.Short = "update NAME"
updateCmd.Long = "update a Kafka source"
updateCmd.Example = `#Updates a Kafka source with NAME
kn source_kafka update kafka-name --servers my-cluster-kafka-bootstrap.kafka.svc:9092 --topics test-topic --consumergroup test-consumer-group --sink svc:event-display`
return updateCmd
}

func (f *kafkaSourceCommandFactory) DescribeCommand() *cobra.Command {
describeCmd := f.defaultCommandFactory.DescribeCommand()
describeCmd.Short = "describe NAME"
describeCmd.Long = "update a Kafka source"
describeCmd.Example = `#Describes a Kafka source with NAME
kn source_kafka describe kafka-name`
return describeCmd
}
Loading

0 comments on commit 12422c9

Please sign in to comment.