From baf603c31dd9d842b639c405419ba0379689b1ed Mon Sep 17 00:00:00 2001 From: nanic Date: Thu, 19 Oct 2023 14:31:21 -0400 Subject: [PATCH 1/2] Feature: Add JSON file produce support --- cmd/produce/produce.go | 1 + cmd/produce/produce_test.go | 47 +++++++++++++++++++++++++ internal/producer/producer-operation.go | 16 +++++++++ testutil/testdata/msg.csv | 3 ++ testutil/testdata/msg.txt | 3 ++ 5 files changed, 70 insertions(+) create mode 100644 testutil/testdata/msg.csv create mode 100644 testutil/testdata/msg.txt diff --git a/cmd/produce/produce.go b/cmd/produce/produce.go index b41cfe4..b1efada 100644 --- a/cmd/produce/produce.go +++ b/cmd/produce/produce.go @@ -36,6 +36,7 @@ func NewProduceCmd() *cobra.Command { cmdProduce.Flags().StringVarP(&flags.Value, "value", "v", "", "value to produce") cmdProduce.Flags().BoolVarP(&flags.NullValue, "null-value", "", false, "produce a null value (can be used instead of providing a value with --value)") cmdProduce.Flags().StringVarP(&flags.File, "file", "f", "", "file to read input from") + cmdProduce.Flags().StringVarP(&flags.FileType, "file-type", "", "", "file type to read input from") cmdProduce.Flags().StringArrayVarP(&flags.Headers, "header", "H", flags.Headers, "headers in format `key:value`") cmdProduce.Flags().StringVarP(&flags.Separator, "separator", "S", "", "separator to split key and value from stdin or file") cmdProduce.Flags().StringVarP(&flags.LineSeparator, "lineSeparator", "L", "\n", "separator to split multiple messages from stdin or file") diff --git a/cmd/produce/produce_test.go b/cmd/produce/produce_test.go index 50c7c6a..ee3c186 100644 --- a/cmd/produce/produce_test.go +++ b/cmd/produce/produce_test.go @@ -367,6 +367,53 @@ func TestProduceProtoFileIntegration(t *testing.T) { testutil.AssertEquals(t, value, string(actualValue)) } +func TestProduceWithCSVFileIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + topic := testutil.CreateTopic(t, "produce-topic-csv") + kafkaCtl := testutil.CreateKafkaCtlCommand() + + dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + if _, err := kafkaCtl.Execute("produce", topic, + "--file", filepath.Join(dataFilePath, "msg.csv")); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "1,a\n2,b\n3,c", kafkaCtl.GetStdOut()) + +} + +func TestProduceWithJSONFileIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + topic := testutil.CreateTopic(t, "produce-topic-json") + kafkaCtl := testutil.CreateKafkaCtlCommand() + + dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + if _, err := kafkaCtl.Execute("produce", topic, + "--file", filepath.Join(dataFilePath, "msg.txt"), + "--file-type", "json"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "a\nb\nc", kafkaCtl.GetStdOut()) + +} + func TestProduceProtoFileWithOnlyKeyEncodedIntegration(t *testing.T) { testutil.StartIntegrationTest(t) diff --git a/internal/producer/producer-operation.go b/internal/producer/producer-operation.go index 350ac21..b77fbfb 100644 --- a/internal/producer/producer-operation.go +++ b/internal/producer/producer-operation.go @@ -2,6 +2,7 @@ package producer import ( "bufio" + "encoding/json" "io" "os" "os/signal" @@ -24,6 +25,7 @@ type Flags struct { Separator string LineSeparator string File string + FileType string Key string Value string NullValue bool @@ -46,6 +48,11 @@ const DefaultMaxMessagesBytes = 1000000 type Operation struct { } +type KV struct { + Key string `json:"key"` + Value string `json:"value"` +} + func (operation *Operation) Produce(topic string, flags Flags) error { var ( @@ -113,6 +120,8 @@ func (operation *Operation) Produce(topic string, flags Flags) error { } }() + var kv KV + var key string var value string @@ -221,6 +230,13 @@ func (operation *Operation) Produce(topic string, flags Flags) error { } key = input[keyColumnIdx] value = input[valueColumnIdx] + } else if flags.FileType == "json" { + if err = json.Unmarshal([]byte(line), &kv); err != nil { + return errors.Errorf("Can't unmarshal line at %d", messageCount) + } + + key = kv.Key + value = kv.Value } else { value = line } diff --git a/testutil/testdata/msg.csv b/testutil/testdata/msg.csv new file mode 100644 index 0000000..864a928 --- /dev/null +++ b/testutil/testdata/msg.csv @@ -0,0 +1,3 @@ +1,a +2,b +3,c \ No newline at end of file diff --git a/testutil/testdata/msg.txt b/testutil/testdata/msg.txt new file mode 100644 index 0000000..3023ac7 --- /dev/null +++ b/testutil/testdata/msg.txt @@ -0,0 +1,3 @@ +{"key": "1", "value": "a"} +{"key": "2", "value": "b"} +{"key": "3", "value": "c"} \ No newline at end of file From c37c44cea3584841dbce7e91118fdb7fc90ef982 Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Fri, 10 Nov 2023 09:56:20 +0100 Subject: [PATCH 2/2] refactor input parsing for produce command --- CHANGELOG.md | 4 ++ README.md | 8 +++ cmd/produce/produce.go | 2 +- cmd/produce/produce_test.go | 58 ++++++++++++++--- internal/producer/input/csv_parser.go | 72 +++++++++++++++++++++ internal/producer/input/json_parser.go | 24 +++++++ internal/producer/input/parser.go | 10 +++ internal/producer/producer-operation.go | 85 ++++++------------------- testutil/testdata/msg-ts1.csv | 3 + testutil/testdata/msg-ts2.csv | 3 + testutil/testdata/{msg.txt => msg.json} | 0 util/util.go | 8 +++ 12 files changed, 201 insertions(+), 76 deletions(-) create mode 100644 internal/producer/input/csv_parser.go create mode 100644 internal/producer/input/json_parser.go create mode 100644 internal/producer/input/parser.go create mode 100644 testutil/testdata/msg-ts1.csv create mode 100644 testutil/testdata/msg-ts2.csv rename testutil/testdata/{msg.txt => msg.json} (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 477184f..a4680f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- [#171](https://github.com/deviceinsight/kafkactl/issues/171) Support for reset command to reset offset to offset from provided datetime +- [#172](https://github.com/deviceinsight/kafkactl/issues/172) Support for json format in produce command + ## 3.4.0 - 2023-09-25 ### Fixed diff --git a/README.md b/README.md index 9b0739b..bc94437 100644 --- a/README.md +++ b/README.md @@ -432,6 +432,14 @@ kafkactl produce my-topic --separator=# --lineSeparator=|| --file=myfile **NOTE:** if the file was generated with `kafkactl consume --print-keys --print-timestamps my-topic` the produce command is able to detect the message timestamp in the input and will ignore it. +It is also possible to produce messages in json format: + +```bash +# each line in myfile.json is expected to contain a json object with fields key, value +kafkactl produce my-topic --file=myfile.json --input-format=json +cat myfile.json | kafkactl produce my-topic --input-format=json +``` + the number of messages produced per second can be controlled with the `--rate` parameter: ```bash diff --git a/cmd/produce/produce.go b/cmd/produce/produce.go index b1efada..4f9b44a 100644 --- a/cmd/produce/produce.go +++ b/cmd/produce/produce.go @@ -36,7 +36,7 @@ func NewProduceCmd() *cobra.Command { cmdProduce.Flags().StringVarP(&flags.Value, "value", "v", "", "value to produce") cmdProduce.Flags().BoolVarP(&flags.NullValue, "null-value", "", false, "produce a null value (can be used instead of providing a value with --value)") cmdProduce.Flags().StringVarP(&flags.File, "file", "f", "", "file to read input from") - cmdProduce.Flags().StringVarP(&flags.FileType, "file-type", "", "", "file type to read input from") + cmdProduce.Flags().StringVarP(&flags.InputFormat, "input-format", "", "", "input format. One of: csv,json (default is csv)") cmdProduce.Flags().StringArrayVarP(&flags.Headers, "header", "H", flags.Headers, "headers in format `key:value`") cmdProduce.Flags().StringVarP(&flags.Separator, "separator", "S", "", "separator to split key and value from stdin or file") cmdProduce.Flags().StringVarP(&flags.LineSeparator, "lineSeparator", "L", "\n", "separator to split multiple messages from stdin or file") diff --git a/cmd/produce/produce_test.go b/cmd/produce/produce_test.go index ee3c186..d2145d0 100644 --- a/cmd/produce/produce_test.go +++ b/cmd/produce/produce_test.go @@ -375,19 +375,62 @@ func TestProduceWithCSVFileIntegration(t *testing.T) { dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata") - if _, err := kafkaCtl.Execute("produce", topic, + if _, err := kafkaCtl.Execute("produce", topic, "--separator", ",", "--file", filepath.Join(dataFilePath, "msg.csv")); err != nil { t.Fatalf("failed to execute command: %v", err) } testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) - if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil { + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut()) +} + +func TestProduceWithCSVFileWithTimestampsFirstColumnIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + topic := testutil.CreateTopic(t, "produce-topic-csv") + kafkaCtl := testutil.CreateKafkaCtlCommand() + + dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + if _, err := kafkaCtl.Execute("produce", topic, "--separator", ",", + "--file", filepath.Join(dataFilePath, "msg-ts1.csv")); err != nil { t.Fatalf("failed to execute command: %v", err) } - testutil.AssertEquals(t, "1,a\n2,b\n3,c", kafkaCtl.GetStdOut()) + testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut()) +} + +func TestProduceWithCSVFileWithTimestampsSecondColumnIntegration(t *testing.T) { + + testutil.StartIntegrationTest(t) + topic := testutil.CreateTopic(t, "produce-topic-csv") + kafkaCtl := testutil.CreateKafkaCtlCommand() + + dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata") + + if _, err := kafkaCtl.Execute("produce", topic, "--separator", ",", + "--file", filepath.Join(dataFilePath, "msg-ts2.csv")); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) + + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--exit"); err != nil { + t.Fatalf("failed to execute command: %v", err) + } + + testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut()) } func TestProduceWithJSONFileIntegration(t *testing.T) { @@ -399,19 +442,18 @@ func TestProduceWithJSONFileIntegration(t *testing.T) { dataFilePath := filepath.Join(testutil.RootDir, "testutil", "testdata") if _, err := kafkaCtl.Execute("produce", topic, - "--file", filepath.Join(dataFilePath, "msg.txt"), - "--file-type", "json"); err != nil { + "--file", filepath.Join(dataFilePath, "msg.json"), + "--input-format", "json"); err != nil { t.Fatalf("failed to execute command: %v", err) } testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut()) - if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--exit"); err != nil { + if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--exit"); err != nil { t.Fatalf("failed to execute command: %v", err) } - testutil.AssertEquals(t, "a\nb\nc", kafkaCtl.GetStdOut()) - + testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut()) } func TestProduceProtoFileWithOnlyKeyEncodedIntegration(t *testing.T) { diff --git a/internal/producer/input/csv_parser.go b/internal/producer/input/csv_parser.go new file mode 100644 index 0000000..6de9a02 --- /dev/null +++ b/internal/producer/input/csv_parser.go @@ -0,0 +1,72 @@ +package input + +import ( + "fmt" + "strings" + "time" + + "github.com/deviceinsight/kafkactl/output" + "github.com/deviceinsight/kafkactl/util" + "github.com/pkg/errors" +) + +const defaultKeyColumnIdx = 0 +const defaultValueColumnIdx = 1 +const defaultColumnCount = 2 + +type csvParser struct { + key string + separator string + keyColumnIdx int + valueColumnIdx int + columnCount int +} + +func NewCsvParser(key string, separator string) Parser { + return &csvParser{ + key: key, + separator: separator, + keyColumnIdx: defaultKeyColumnIdx, + valueColumnIdx: defaultValueColumnIdx, + columnCount: defaultColumnCount, + } +} + +func (p *csvParser) ParseLine(line string) (Message, error) { + + if p.separator == "" { + return Message{Key: p.key, Value: line}, nil + } + + input := strings.Split(line, util.ConvertControlChars(p.separator)) + if len(input) < 2 { + return Message{}, fmt.Errorf("the provided input does not contain the separator %s", p.separator) + } else if len(input) == 3 && p.columnCount == defaultColumnCount { + // lazy resolving of column indices + var err error + p.keyColumnIdx, p.valueColumnIdx, p.columnCount, err = resolveColumns(input) + if err != nil { + return Message{}, err + } + } else if len(input) != p.columnCount { + return Message{}, fmt.Errorf("line contains unexpected amount of separators:\n%s", line) + } + + return Message{input[p.keyColumnIdx], input[p.valueColumnIdx]}, nil +} + +func resolveColumns(line []string) (keyColumnIdx, valueColumnIdx, columnCount int, err error) { + if isTimestamp(line[0]) { + output.Warnf("assuming column 0 to be message timestamp. Column will be ignored") + return 1, 2, 3, nil + } else if isTimestamp(line[1]) { + output.Warnf("assuming column 1 to be message timestamp. Column will be ignored") + return 0, 2, 3, nil + } + return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line) +} + +func isTimestamp(value string) bool { + _, e := time.Parse(time.RFC3339, value) + return e == nil +} diff --git a/internal/producer/input/json_parser.go b/internal/producer/input/json_parser.go new file mode 100644 index 0000000..97a5f8a --- /dev/null +++ b/internal/producer/input/json_parser.go @@ -0,0 +1,24 @@ +package input + +import ( + "encoding/json" + "fmt" +) + +type jsonParser struct { +} + +func NewJSONParser() Parser { + return &jsonParser{} +} + +func (p *jsonParser) ParseLine(line string) (Message, error) { + + var message Message + + if err := json.Unmarshal([]byte(line), &message); err != nil { + return message, fmt.Errorf("can't unmarshal line: %w", err) + } + + return message, nil +} diff --git a/internal/producer/input/parser.go b/internal/producer/input/parser.go new file mode 100644 index 0000000..e38a4dc --- /dev/null +++ b/internal/producer/input/parser.go @@ -0,0 +1,10 @@ +package input + +type Message struct { + Key string `json:"key"` + Value string `json:"value"` +} + +type Parser interface { + ParseLine(line string) (Message, error) +} diff --git a/internal/producer/producer-operation.go b/internal/producer/producer-operation.go index b77fbfb..36895e1 100644 --- a/internal/producer/producer-operation.go +++ b/internal/producer/producer-operation.go @@ -2,17 +2,17 @@ package producer import ( "bufio" - "encoding/json" "io" "os" "os/signal" "strings" "syscall" - "time" "github.com/IBM/sarama" "github.com/deviceinsight/kafkactl/internal" + "github.com/deviceinsight/kafkactl/internal/producer/input" "github.com/deviceinsight/kafkactl/output" + "github.com/deviceinsight/kafkactl/util" "github.com/pkg/errors" "go.uber.org/ratelimit" ) @@ -25,7 +25,7 @@ type Flags struct { Separator string LineSeparator string File string - FileType string + InputFormat string Key string Value string NullValue bool @@ -48,11 +48,6 @@ const DefaultMaxMessagesBytes = 1000000 type Operation struct { } -type KV struct { - Key string `json:"key"` - Value string `json:"value"` -} - func (operation *Operation) Produce(topic string, flags Flags) error { var ( @@ -120,15 +115,10 @@ func (operation *Operation) Produce(topic string, flags Flags) error { } }() - var kv KV - - var key string - var value string + var inputMessage input.Message if flags.Key != "" && flags.Separator != "" { return errors.New("parameters --key and --separator cannot be used together") - } else if flags.Key != "" { - key = flags.Key } if flags.NullValue && flags.Value != "" { @@ -140,9 +130,9 @@ func (operation *Operation) Produce(topic string, flags Flags) error { var message *sarama.ProducerMessage if flags.NullValue { - message, err = serializers.Serialize([]byte(key), nil, flags) + message, err = serializers.Serialize([]byte(flags.Key), nil, flags) } else { - message, err = serializers.Serialize([]byte(key), []byte(flags.Value), flags) + message, err = serializers.Serialize([]byte(flags.Key), []byte(flags.Value), flags) } if err != nil { @@ -166,9 +156,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error { }() messageCount := 0 - keyColumnIdx := 0 - valueColumnIdx := 1 - columnCount := 2 + // print an empty line that will be replaced when updating the status output.Statusf("") @@ -182,6 +170,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error { } var inputReader io.Reader + var inputParser input.Parser if flags.File != "" { inputReader, err = os.Open(flags.File) @@ -192,11 +181,17 @@ func (operation *Operation) Produce(topic string, flags Flags) error { inputReader = os.Stdin } + if flags.InputFormat == "json" { + inputParser = input.NewJSONParser() + } else { + inputParser = input.NewCsvParser(flags.Key, flags.Separator) + } + scanner := bufio.NewScanner(inputReader) scanner.Buffer(make([]byte, 0, config.Producer.MaxMessageBytes), config.Producer.MaxMessageBytes) if len(flags.LineSeparator) > 0 && flags.LineSeparator != "\n" { - scanner.Split(splitAt(convertControlChars(flags.LineSeparator))) + scanner.Split(splitAt(util.ConvertControlChars(flags.LineSeparator))) } for scanner.Scan() { @@ -216,33 +211,12 @@ func (operation *Operation) Produce(topic string, flags Flags) error { continue } - if flags.Separator != "" { - input := strings.Split(line, convertControlChars(flags.Separator)) - if len(input) < 2 { - return failWithMessageCount(messageCount, "the provided input does not contain the separator %s", flags.Separator) - } else if len(input) == 3 && messageCount == 0 { - keyColumnIdx, valueColumnIdx, columnCount, err = resolveColumns(input) - if err != nil { - return failWithMessageCount(messageCount, err.Error()) - } - } else if len(input) != columnCount { - return failWithMessageCount(messageCount, "line contains unexpected amount of separators:\n%s", line) - } - key = input[keyColumnIdx] - value = input[valueColumnIdx] - } else if flags.FileType == "json" { - if err = json.Unmarshal([]byte(line), &kv); err != nil { - return errors.Errorf("Can't unmarshal line at %d", messageCount) - } - - key = kv.Key - value = kv.Value - } else { - value = line + if inputMessage, err = inputParser.ParseLine(line); err != nil { + return failWithMessageCount(messageCount, err.Error()) } messageCount++ - message, err := serializers.Serialize([]byte(key), []byte(value), flags) + message, err := serializers.Serialize([]byte(inputMessage.Key), []byte(inputMessage.Value), flags) if err != nil { return errors.Wrap(err, "Failed to produce message") } @@ -341,29 +315,6 @@ func parsePartitioner(partitioner string, flags Flags) (sarama.PartitionerConstr } } -func convertControlChars(value string) string { - value = strings.Replace(value, "\\n", "\n", -1) - value = strings.Replace(value, "\\r", "\r", -1) - value = strings.Replace(value, "\\t", "\t", -1) - return value -} - -func resolveColumns(line []string) (keyColumnIdx, valueColumnIdx, columnCount int, err error) { - if isTimestamp(line[0]) { - output.Warnf("assuming column 0 to be message timestamp. Column will be ignored") - return 1, 2, 3, nil - } else if isTimestamp(line[1]) { - output.Warnf("assuming column 1 to be message timestamp. Column will be ignored") - return 0, 2, 3, nil - } - return -1, -1, -1, errors.Errorf("line contains unexpected amount of separators:\n%s", line) -} - -func isTimestamp(value string) bool { - _, e := time.Parse(time.RFC3339, value) - return e == nil -} - func failWithMessageCount(messageCount int, errorMessage string, args ...interface{}) error { output.Infof("\r%d messages produced", messageCount) return errors.Errorf(errorMessage, args...) diff --git a/testutil/testdata/msg-ts1.csv b/testutil/testdata/msg-ts1.csv new file mode 100644 index 0000000..678c709 --- /dev/null +++ b/testutil/testdata/msg-ts1.csv @@ -0,0 +1,3 @@ +2006-01-02T15:04:05.000Z,1,a +2006-01-02T16:04:05.000Z,2,b +2006-01-02T17:04:05.000Z,3,c \ No newline at end of file diff --git a/testutil/testdata/msg-ts2.csv b/testutil/testdata/msg-ts2.csv new file mode 100644 index 0000000..107fffe --- /dev/null +++ b/testutil/testdata/msg-ts2.csv @@ -0,0 +1,3 @@ +1,2006-01-02T15:04:05.000Z,a +2,2006-01-02T16:04:05.000Z,b +3,2006-01-02T17:04:05.000Z,c \ No newline at end of file diff --git a/testutil/testdata/msg.txt b/testutil/testdata/msg.json similarity index 100% rename from testutil/testdata/msg.txt rename to testutil/testdata/msg.json diff --git a/util/util.go b/util/util.go index 6c35e3c..c4f2cc8 100644 --- a/util/util.go +++ b/util/util.go @@ -2,6 +2,7 @@ package util import ( "strconv" + "strings" "time" "gopkg.in/errgo.v2/fmt/errors" @@ -32,6 +33,13 @@ func ParseTimestamp(timestamp string) (time.Time, error) { return time.Time{}, errors.Newf("unable to parse timestamp: %s", timestamp) } +func ConvertControlChars(value string) string { + value = strings.Replace(value, "\\n", "\n", -1) + value = strings.Replace(value, "\\r", "\r", -1) + value = strings.Replace(value, "\\t", "\t", -1) + return value +} + func ContainsString(list []string, element string) bool { for _, it := range list { if it == element {