Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init #51

Merged
merged 17 commits into from
Apr 18, 2024
35 changes: 35 additions & 0 deletions nodejs/example/basicSchemaless.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { WSConfig } from '../src/common/config';
import { Precision, SchemalessProto } from '../src/sql/wsProto';
import { sqlConnect } from '../index';
let dsn = 'ws://root:[email protected]:6051/ws';
let db = 'power'
let influxdbData = "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

尽量使用 meters 表做用例

let telnetData = "stb0_0 1626006833 4 host=host0 interface=eth0";
let jsonData = "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// const dropDB = `drop database if exists ${db}`

async function Prepare() {
let conf :WSConfig = new WSConfig(dsn)
let wsSql = await sqlConnect(conf)
await wsSql.Exec(`create database if not exists ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`)
wsSql.Close()
}

(async () => {
let wsSchemaless = null
try {
await Prepare()
let conf = new WSConfig(dsn);
conf.SetDb(db)
wsSchemaless = await sqlConnect(conf)
await wsSchemaless.SchemalessInsert([influxdbData], SchemalessProto.InfluxDBLineProtocol, Precision.NANO_SECONDS, 0);
await wsSchemaless.SchemalessInsert([telnetData], SchemalessProto.OpenTSDBTelnetLineProtocol, Precision.SECONDS, 0);
await wsSchemaless.SchemalessInsert([jsonData], SchemalessProto.OpenTSDBJsonFormatProtocol, Precision.SECONDS, 0);
} catch (e) {
console.error(e);
}finally {
if (wsSchemaless) {
wsSchemaless.Close();
}
}
})();
59 changes: 59 additions & 0 deletions nodejs/example/basicSql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

require('qingwa')();

import { WSConfig } from '../src/common/config';
import { sqlConnect } from '../index'

let dsn = 'ws://root:[email protected]:6051/ws';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

最后的 ws 是数据库吗,如果不是,是端点,不要体现出来,Adapter 的 路径对外不可见

(async () => {
let wsSql = null;
let wsRows = null;
let reqId = 0;
try {
let conf :WSConfig = new WSConfig(dsn)
wsSql = await sqlConnect(conf)

let version = await wsSql.Version();
console.log(version);

let taosResult = await wsSql.Exec('show databases', reqId++)
console.log(taosResult);

taosResult = await wsSql.Exec('create database if not exists power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;',reqId++);
console.log(taosResult);

taosResult = await wsSql.Exec('use power',reqId++)
console.log(taosResult);

taosResult = await wsSql.Exec('CREATE STABLE if not exists meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);', reqId++);
console.log(taosResult);

taosResult = await wsSql.Exec('describe meters', reqId++)
console.log(taosResult);

taosResult = await wsSql.Exec('INSERT INTO d1001 USING meters TAGS ("California.SanFrancisco", 3) VALUES (NOW, 10.2, 219, 0.32)', reqId++)
console.log(taosResult);

wsRows = await wsSql.Query('select * from meters', reqId++);
let meta = wsRows.GetMeta()
console.log("wsRow:meta:=>", meta);

while (await wsRows.Next()) {
let result = await wsRows.GetData();
console.log('queryRes.Scan().then=>', result);
}
await wsRows.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrRows.Close is a synchronous function


} catch (e) {
let err:any = e
console.error(err);

} finally {
if (wsRows) {
await wsRows.Close();
}
if (wsSql) {
wsSql.Close();
}
}
})();
60 changes: 60 additions & 0 deletions nodejs/example/basicStmt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
require('qingwa')();
import { WSConfig } from '../src/common/config';
import { sqlConnect } from '../index';

let db = 'power'
let stable = 'meters'
let tags = ['California.SanFrancisco', 3];
let multi = [
[1706786044994, 1706786044995, 1706786044996],
[10.2, 10.3, 10.4],
[292, 293, 294],
[0.32, 0.33, 0.34],
];

async function Prepare() {
let dsn = 'ws://root:[email protected]:6051/ws';
let conf :WSConfig = new WSConfig(dsn)
let wsSql = await sqlConnect(conf)
await wsSql.Exec(`create database if not exists ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`)
await wsSql.Exec(`CREATE STABLE if not exists ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`);
wsSql.Close()
}

(async () => {
let stmt = null;
let connector = null;
try {
await Prepare();
let dsn = 'ws://root:[email protected]:6051/ws';
let wsConf = new WSConfig(dsn);
wsConf.SetDb(db)
connector = await sqlConnect(wsConf);
stmt = await connector.StmtInit()
await stmt.Prepare(`INSERT INTO ? USING ${db}.${stable} TAGS (?, ?) VALUES (?, ?, ?, ?)`);
await stmt.SetTableName('d1001');

let tagParams = stmt.NewStmtParam()
tagParams.SetVarcharColumn([tags[0]])
tagParams.SetIntColumn([tags[1]])
await stmt.SetBinaryTags(tagParams);

let bindParams = stmt.NewStmtParam()
bindParams.SetTimestampColumn(multi[0]);
bindParams.SetFloatColumn(multi[1])
bindParams.SetIntColumn(multi[2])
bindParams.SetFloatColumn(multi[3])
await stmt.BinaryBind(bindParams);
await stmt.Batch();
await stmt.Exec();
} catch (e) {
console.error(e);
}finally {
if (stmt) {
stmt.Close();
}
if (connector) {
connector.Close();
}
}
})();
71 changes: 71 additions & 0 deletions nodejs/example/basicTmq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { WSConfig } from "../src/common/config";
import { TMQConstants } from "../src/tmq/constant";
import { sqlConnect, tmqConnect } from "../index";


const stable = 'meters';
const db = 'power'
const topics:string[] = ['pwer_meters_topic']
let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};`
let configMap = new Map([
[TMQConstants.GROUP_ID, "gId"],
[TMQConstants.CONNECT_USER, "root"],
[TMQConstants.CONNECT_PASS, "taosdata"],
[TMQConstants.AUTO_OFFSET_RESET, "earliest"],
[TMQConstants.CLIENT_ID, 'test_tmq_client'],
[TMQConstants.WS_URL, 'ws://192.168.1.95:6051/rest/tmq'],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest/tmq 不要暴露

[TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);

async function Prepare() {
let dsn = 'ws://root:[email protected]:6051/ws';
let conf :WSConfig = new WSConfig(dsn)
const createDB = `create database if not exists ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`
const createStable = `CREATE STABLE if not exists ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`
let createTopic = `create topic if not exists ${topics[0]} as select * from ${db}.${stable}`
const useDB = `use ${db}`

let ws = await sqlConnect(conf);
await ws.Exec(createDB);
await ws.Exec(useDB);
await ws.Exec(createStable);
await ws.Exec(createTopic);
for (let i = 0; i < 10; i++) {
await ws.Exec(`INSERT INTO d1001 USING ${stable} TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10+i}, ${200+i}, ${0.32 + i})`)
}
ws.Close()
}

(async () => {
let consumer = null
try {
await Prepare()
consumer = await tmqConnect(configMap);
await consumer.Subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.Poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.Commit();
}

let assignment = await consumer.Assignment()
console.log(assignment)
await consumer.SeekToBeginning(assignment)

await consumer.Unsubscribe()
} catch (e) {
console.error(e);
} finally {
if (consumer) {
consumer.Close();
}
}
})();


23 changes: 23 additions & 0 deletions nodejs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { WsSql } from './src/sql/wsSql'
import { WSConfig } from './src/common/config';
import { WsConsumer } from './src/tmq/wsTmq';

let sqlConnect = async (conf: WSConfig) => {
try {
return await WsSql.Open(conf)
} catch (err: any) {
console.error(err);
throw err;
}
};

let tmqConnect = async (configMap: Map<string, string>) => {
try {
return await WsConsumer.NewConsumer(configMap);
} catch (err: any) {
console.error(err);
throw err;
}
};

export { sqlConnect, tmqConnect };
8 changes: 8 additions & 0 deletions nodejs/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
preset: "ts-jest",
transform: {'^.+\\.ts?$': 'ts-jest'},
testEnvironment: 'node',
testRegex: '/test/.*\\.(test|spec)?\\.(ts|tsx)$',
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'],
};
Loading