Skip to content

Commit

Permalink
Merge pull request #71 from Kanaries/dev
Browse files Browse the repository at this point in the history
feat: Support ClickHouse
  • Loading branch information
ObservedObserver authored Oct 19, 2021
2 parents 0a15744 + d5586aa commit ecb136d
Show file tree
Hide file tree
Showing 68 changed files with 3,184 additions and 490 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ dist
coverage

.cache
.eslintcache
.eslintcache

*.cert
*.key
*.pem
8 changes: 8 additions & 0 deletions packages/connectors/app-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"port": 2333,
"clickhouse": {
"protocol": "http",
"host": "localhost",
"port": 8123
}
}
8 changes: 8 additions & 0 deletions packages/connectors/app/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import path from 'path';

export const GLOBAL_VARS = {
jwtTokenKeyName: 'ktoken',
minEmailWaitTime: 1000 * 60
}

export const LOG_DIR = path.resolve(__dirname, '../logs')
99 changes: 99 additions & 0 deletions packages/connectors/app/controllers/clickHouseProxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Context } from "koa";
import axios from 'axios';
import { useGlobalStore } from "../store";
import { sendFailResponse, sendPureResponse, sendSuccessResponse } from "../utils";
import { CHQuery, getFieldMetas } from "../services/chmeta";
import { parseTable } from "../services/chparser";

interface CHProxyProps {
[key: string]: string
}

export async function CHGeneralProxy (ctx: Context) {
// const props = ctx.request.body as CHProxyProps;
// http://localhost:8123?query=SELECT * from datasets.suicideRate;
const config = useGlobalStore().getConfig();
const url = ctx.request.URL;
try {
const paramsObj: {[key: string]: any} = {};
for (let [pk, pv] of url.searchParams.entries()) {
paramsObj[pk] = pv;
}
const res = await axios(`${config.clickhouse.protocol}://${config.clickhouse.host}:${config.clickhouse.port}`, {
method: ctx.request.method as 'post' | 'get',
params: paramsObj
});
sendPureResponse(ctx, res.data);
} catch (error) {
console.error(error);
sendFailResponse(ctx, error)
}
}

export async function CHDBListProxy (ctx: Context) {
try {
const rawData = await CHQuery('SHOW DATABASES;')
const list = rawData.slice(0, -1).split('\n').slice(1);
sendSuccessResponse(ctx, list)
} catch (error) {
console.error(error);
sendFailResponse(ctx, error);
}
}

export async function CHTableListProxy (ctx: Context) {
try {
const dbName = ctx.request.URL.searchParams.get('dbName');
let sql = (dbName && dbName !== '') ? `show tables from ${dbName}` : `show tables`;
const rawData = await CHQuery(sql);
const list = rawData.slice(0, -1).split('\n');
sendSuccessResponse(ctx, list)
} catch (error) {
console.error(error);
sendFailResponse(ctx, error);
}
}

export async function CHTableDescProxy (ctx: Context) {
try {
const url = ctx.request.URL;
const tableName = url.searchParams.get('table');
if (tableName) {
const metas = await getFieldMetas(tableName);
sendSuccessResponse(ctx, metas)
} else {
throw new Error('[table name is empty]')
}
} catch (error) {
console.error(error);
sendFailResponse(ctx, error);
}
}

export async function CHSampleData (ctx: Context) {
try {
const url = ctx.request.URL;
const tableName = url.searchParams.get('table');
const dbName = url.searchParams.get('dbName');
const viewName = dbName && dbName !== '' ? `${dbName}.${tableName}` : tableName;
const rawData = await CHQuery(`select * from ${viewName} limit 1000;`);
if (viewName) {
const metas = await getFieldMetas(viewName);
const data = parseTable(rawData, metas);
sendSuccessResponse(ctx, {
data,
metas
})
} else {
throw new Error('[table name is empty]')
}
} catch (error) {
console.error(error);
sendFailResponse(ctx, error);
}
}

// setTimeout(() => {
// CHQuery('describe table datasets.suicideRate;').then(() => {}).catch(console.error);
// CHQuery('show tables from datasets;').then(() => {}).catch(console.error);
// }, 3000)
6 changes: 6 additions & 0 deletions packages/connectors/app/controllers/ping.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Context } from "koa";
import { sendSuccessResponse } from "../utils";

export async function ping(ctx: Context) {
sendSuccessResponse(ctx, true);
}
40 changes: 40 additions & 0 deletions packages/connectors/app/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Context } from "koa";

export type IService<T = any, R = void> = (ctx: Context, props?: T) => Promise<R>;

export interface IRow {
[key: string]: any
}

export interface IAPP_CONFIG {
port: number;
clickhouse: {
protocol: string;
host: string;
port: number
}
}

export interface IDBFieldMeta {
fid: string;
dataType: string;
}

export type ISemanticType = 'quantitative' | 'nominal' | 'ordinal' | 'temporal';
export type IDataType = 'number' | 'integer' | 'boolean' | 'date' | 'string';
export type IAnalyticType = 'dimension' | 'measure';
export interface IField {
key: string;
name?: string;
analyticType: IAnalyticType;
semanticType: ISemanticType;
dataType: IDataType;
}

export interface IMutField {
key: string;
name?: string;
analyticType: IAnalyticType | '?';
semanticType: ISemanticType | '?';
dataType: IDataType | '?';
}
40 changes: 40 additions & 0 deletions packages/connectors/app/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import Koa from 'koa';
import koaBody from 'koa-body';
import cors from '@koa/cors';
import http2 from 'http2';
// import http from 'http';
import fs from 'fs';
import path from 'path';
import { router } from './router';
import { getAppConfig } from './utils';
import { GlobalStore, updateGlobalStore } from './store';

async function start () {
try {
const config = await getAppConfig();
updateGlobalStore(new GlobalStore(config));

const app = new Koa();
app.use(cors({
credentials: true,
allowMethods: ['GET', 'HEAD', 'PUT', 'POST', 'DELETE', 'PATCH', 'OPTIONS']
}))

app.use(koaBody());

app.use(router.routes());
const serverKey = fs.readFileSync(path.resolve(__dirname, '../security/cert.key'), 'utf-8')
const serverCert = fs.readFileSync(path.resolve(__dirname, '../security/cert.pem'), 'utf-8')
http2.createSecureServer({
key: serverKey,
cert: serverCert
}, app.callback()).listen(config.port, () => {
console.log(`[server started] running on port: ${config.port}.`)
});
} catch (error) {
console.error('应用启动失败', error)
}

}

start();
29 changes: 29 additions & 0 deletions packages/connectors/app/router.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import Router from '@koa/router';
// import { UserController } from './controllers/user';
import {
CHGeneralProxy,
CHDBListProxy,
CHSampleData,
CHTableDescProxy,
CHTableListProxy
} from './controllers/clickHouseProxy';
import { ping } from './controllers/ping';
const router = new Router();

router.get('/ping', ping);
router.get('/api/ch/general', CHGeneralProxy);
router.post('/api/ch/general', CHGeneralProxy);

router.get('/api/ch/dbs', CHDBListProxy);
router.get('/api/ch/sampleData', CHSampleData);
router.get('/api/ch/tables', CHTableListProxy);

// router.post('/api/login', UserController.login);
// router.post('/api/register', UserController.register);
// router.post('/api/userUnique', UserController.isUserExisted);
// router.post('/api/sendMailCert', UserController.requireEmailCert);
// router.get('/api/logout', UserController.logout)

export {
router
}
19 changes: 19 additions & 0 deletions packages/connectors/app/services/chmeta.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import axios from "axios";
import { IDBFieldMeta } from "../interfaces";
import { useGlobalStore } from "../store";
export async function CHQuery (sql: string): Promise<string> {
const config = useGlobalStore().getConfig();
const res = await axios(`${config.clickhouse.protocol}://${config.clickhouse.host}:${config.clickhouse.port}?query=${sql}`);
return res.data;
}

export async function getFieldMetas(viewName: string): Promise<IDBFieldMeta[]> {
const metaRaw = await CHQuery(`DESC ${viewName}`);
return metaRaw.slice(0, -1).split('\n').map(fr => {
const infos = fr.split('\t');
return {
fid: infos[0],
dataType: infos[1]
}
})
}
76 changes: 76 additions & 0 deletions packages/connectors/app/services/chparser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { IAnalyticType, IDataType, IRow, ISemanticType } from "../interfaces";
const DB_DATA_TYPE_TO_DATA_TYPE: { [key: string]: IDataType } = {
'Int8': 'integer',
'Int16': 'integer',
'Int32': 'integer',
'Int64': 'integer',
'UInt8': 'integer',
'UInt16': 'integer',
'UInt32': 'number',
'UInt64': 'number',
'Float32': 'number',
'Float64': 'number',
'BOOLEAN': 'boolean',
'String': 'string'
}

const DEFAULT_SEMANTIC_TYPE = {
'number': 'quantitative',
'integer': 'quantitative',
'boolean': 'nominal',
'date': 'temporal',
'string': 'nominal'
} as const;

const DEFAULT_ANALYTIC_TYPE = {
'number': 'measure',
'integer': 'measure',
'boolean': 'dimension',
'date': 'dimension',
'string': 'dimension'
} as const;

export function dbDataType2DataType (dbDataType: string): IDataType {
return DB_DATA_TYPE_TO_DATA_TYPE[dbDataType] || 'string';
}

export function inferSemanticTypeFromDataType (dataType: IDataType): ISemanticType {
return DEFAULT_SEMANTIC_TYPE[dataType]
}

export function inferAnalyticTypeFromDataType (dataType: IDataType): IAnalyticType {
return DEFAULT_ANALYTIC_TYPE[dataType];
}
export function parseCell(rawValue: string, dataType: string) {
switch (dataType) {
case 'Int8':
case 'Int16':
case 'Int32':
case 'Int64':
// case 'Int128':
// case 'Int256':
case 'UInt8':
case 'UInt16':
case 'UInt32':
case 'UInt64':
return parseInt(rawValue);
case 'Float32':
case 'Float64':
return Number(rawValue);
default:
return rawValue;
}
}
export function parseTable (str: string, fields: {fid: string; dataType: string}[]): IRow[] {
const rows: IRow[] = [];
const rawRows = str.slice(0, -1).split('\n');
for (let rawRow of rawRows) {
const row: IRow = {};
const rowValues = rawRow.split(/[\t]/);
for (let i = 0; i < fields.length; i++) {
row[fields[i].fid] = parseCell(rowValues[i], fields[i].dataType)
}
rows.push(row)
}
return rows;
}
27 changes: 27 additions & 0 deletions packages/connectors/app/store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { IAPP_CONFIG } from "./interfaces";

export class GlobalStore {
private config: IAPP_CONFIG | null;
constructor (config: IAPP_CONFIG | null | undefined = null) {
this.config = config
}
public getConfig (): IAPP_CONFIG {
if (this.config === null) {
throw '应用全局配置信息错误,检查应用的配置信息是否被正确获取或者后续存在非法的修改操作.'
} else {
return this.config;
}
}
}

const storeRef: { ref: GlobalStore } = {
ref: new GlobalStore()
};

export function useGlobalStore() {
return storeRef.ref;
}

export function updateGlobalStore (value: GlobalStore) {
storeRef.ref = value;
}
Loading

0 comments on commit ecb136d

Please sign in to comment.