Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wenxuan/dashbase 2 #1

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
PlanCache PlanCache `toml:"plan-cache" json:"plan-cache"`
PreparedPlanCache PreparedPlanCache `toml:"prepared-plan-cache" json:"prepared-plan-cache"`
OpenTracing OpenTracing `toml:"opentracing" json:"opentracing"`
Dashbase Dashbase `tomk:"dashbase" json:"dashbase"`
}

// Log is the log section of config.
Expand Down Expand Up @@ -134,6 +135,14 @@ type OpenTracingReporter struct {
LocalAgentHostPort string `toml:"local-agent-host-port" json:"local-agent-host-port"`
}

// Dashbase is the dashbase section of the config.
type Dashbase struct {
Enabled bool `toml:"enabled" json:"enabled"`
KafkaHosts []string `toml:"kafka-hosts" json:"kafka-hosts"`
APIURL string `toml:"api-url" json:"api-url"`
SchemaFile string `toml:"schema-file" json:"schema-file"`
}

var defaultConf = Config{
Host: "0.0.0.0",
Port: 4000,
Expand Down Expand Up @@ -184,6 +193,12 @@ var defaultConf = Config{
},
Reporter: OpenTracingReporter{},
},
Dashbase: Dashbase{
Enabled: false,
KafkaHosts: []string{"localhost:9092"},
APIURL: "http://localhost:9876",
SchemaFile: "dashbase-schema.toml",
},
}

var globalConf = defaultConf
Expand Down
15 changes: 14 additions & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,17 @@ buffer-flush-interval = 0
log-spans = false

# LocalAgentHostPort instructs reporter to send spans to jaeger-agent at this address
local-agent-host-port = ""
local-agent-host-port = ""

[dashbase]
# Whether TiDB is acted as a Dashbase proxy.
enabled = false

# The Kafka hosts for inserting data.
kafka-hosts = ["localhost:9092"]

# The API host for querying data.
api-url = "http://localhost:9876"

# The path to the file describes table schemas in TOML.
schema-file = "dashbase-schema.toml"
79 changes: 79 additions & 0 deletions dashbase/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dashbase

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/juju/errors"
)

type ApiClient struct {
URL string
}

type ApiSQLResponse struct {
Request struct {
Aggregations map[string]struct {
RequestType string
Col string
Type string
}
}
Hits []struct {
TimeInSeconds int64
Payload struct {
Fields map[string][]string
}
}
Aggregations map[string]struct {
ResponseType string
Value float64
}
ErrorMessage string `json:"message"`
}

const (
apiTimeout time.Duration = 30 * time.Second
)

// Query sends a SQL query to remote Dashbase API client
func (client *ApiClient) Query(SQLStatement string) (*ApiSQLResponse, error) {
param := url.Values{}
param.Add("sql", SQLStatement)
param.Add("timezone", "GMT")

httpClient := http.Client{Timeout: apiTimeout}
resp, err := httpClient.Get(fmt.Sprintf("%s/v1/sql?%s", client.URL, param.Encode()))
if err != nil {
return nil, errors.Trace(fmt.Errorf("Failed to connect Dashbase API service at %s", client.URL))
}
defer resp.Body.Close()

var ret ApiSQLResponse
err = json.NewDecoder(resp.Body).Decode(&ret)
if err != nil {
return nil, errors.Trace(fmt.Errorf("Failed to decode Dashbase API response data"))
}

if len(ret.ErrorMessage) > 0 {
return nil, errors.Trace(fmt.Errorf("Dashbase error: %s", ret.ErrorMessage))
}

return &ret, nil
}
103 changes: 103 additions & 0 deletions dashbase/avro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dashbase

import (
"bytes"
"encoding/binary"

"github.com/juju/errors"
"github.com/linkedin/goavro"
)

// See http://avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints
const emptyCRC64 uint64 = 0xc15d213aa4d7a795

const avroSchema string = `{"name":"io.dashbase.avro.DashbaseEvent","type":"record","fields":[{"name":"timeInMillis","type":"long"},{"name":"metaColumns","type":{"type":"map","values":"string"}},{"name":"numberColumns","type":{"type":"map","values":"double"}},{"name":"textColumns","type":{"type":"map","values":"string"}},{"name":"idColumns","type":{"type":"map","values":"string"}},{"name":"omitPayload","type":"boolean"}]}`

var table []uint64
var dashbaseCodec *goavro.Codec
var dashbaseSchemaChecksum uint64

func makeCRC64Table() {
table = make([]uint64, 256)
for i := 0; i < 256; i++ {
fp := uint64(i)
for j := 0; j < 8; j++ {
fp = (fp >> 1) ^ (emptyCRC64 & -(fp & 1))
}
table[i] = fp
}
}

func makeAvroCodec() {
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
panic(err)
}
dashbaseCodec = codec
}

func avroCRC64(buf []byte) uint64 {
fp := emptyCRC64
for _, val := range buf {
fp = (fp >> 8) ^ table[int(fp^uint64(val))&0xff]
}
return fp
}

func AvroEncode(columns []*ColumnDefinition, values []interface{}) ([]byte, error) {
event := make(map[string]interface{})
metaColumns := make(map[string]string)
textColumns := make(map[string]string)
numberColumns := make(map[string]float64)
idColumns := make(map[string]string)

for idx, column := range columns {
switch column.dataType {
case TypeTime:
event["timeInMillis"] = values[idx].(int64)
case TypeMeta:
metaColumns[column.name] = values[idx].(string)
case TypeText:
textColumns[column.name] = values[idx].(string)
case TypeNumeric:
numberColumns[column.name] = values[idx].(float64)
}
}

event["metaColumns"] = metaColumns
event["textColumns"] = textColumns
event["numberColumns"] = numberColumns
event["idColumns"] = idColumns
event["omitPayload"] = false

body, err := dashbaseCodec.BinaryFromNative(nil, event)
if err != nil {
return nil, errors.Trace(err)
}

message := new(bytes.Buffer)
message.Write([]byte{0xC3, 0x01})
binary.Write(message, binary.LittleEndian, dashbaseSchemaChecksum)
message.Write(body)

return message.Bytes(), nil
}

func init() {
makeAvroCodec()
makeCRC64Table()
dashbaseSchemaChecksum = avroCRC64([]byte(avroSchema))
}
105 changes: 105 additions & 0 deletions dashbase/columntype.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dashbase

import (
"fmt"
"strconv"

"github.com/juju/errors"
"github.com/pingcap/tidb/util/types"
)

// ColumnType is the column type
type ColumnType string

const (
// TypeMeta is the `meta` type in Dashbase
TypeMeta ColumnType = "meta"

// TypeTime is the `time` type in Dashbase
TypeTime ColumnType = "time"

// TypeNumeric is the `numeric` type in Dashbase
TypeNumeric ColumnType = "numeric"

// TypeText is the `text` type in Dashbase
TypeText ColumnType = "text"
)

// KafkaEncoder converts a datum into a type for Dashbase Kafka API
type kafkaEncoder func(types.Datum) interface{}

type codecDefinition struct {
columnType ColumnType
encoder kafkaEncoder
}

var codecs = []*codecDefinition{
{
TypeText,
func(input types.Datum) interface{} {
return input.GetString()
},
},
{
TypeMeta,
func(input types.Datum) interface{} {
return input.GetString()
},
},
{
TypeNumeric,
func(input types.Datum) interface{} {
// return input.GetFloat64()
f, err := strconv.ParseFloat(input.GetString(), 64)
if err != nil {
return 0
}
return f
},
},
{
TypeTime,
func(input types.Datum) interface{} {
i, err := strconv.ParseInt(input.GetString(), 10, 64)
if err != nil {
return 0
}
return i
// time, err := input.GetMysqlTime().Time.GoTime(time.Local)
// if err != nil {
// return 0
// }
// return time.Unix() * 1000
},
},
}

var encoders map[ColumnType]kafkaEncoder

func init() {
encoders = make(map[ColumnType]kafkaEncoder)
for _, codec := range codecs {
encoders[codec.columnType] = codec.encoder
}
}

func getEncoder(columnType ColumnType) (kafkaEncoder, error) {
encoder, ok := encoders[columnType]
if !ok {
return nil, errors.Trace(fmt.Errorf("Unsupported Dashbase type %s", columnType))
}
return encoder, nil
}
Loading