Skip to content

Commit

Permalink
feat(ubus): add support publish/subscriber.
Browse files Browse the repository at this point in the history
Signed-off-by: Jianhui Zhao <[email protected]>
  • Loading branch information
zhaojh329 committed Jan 23, 2025
1 parent 756b063 commit c6d5f24
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 6 deletions.
37 changes: 37 additions & 0 deletions examples/ubus/subscribe.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env eco

local ubus = require 'eco.ubus'
local time = require 'eco.time'

eco.run(function()
local con, err = ubus.connect()
if not con then
error(err)
end

local obj = con:add('eco', {})

while true do
time.sleep(1)
local ts = time.now()
print('notify...', ts)
con:notify(obj, 'time', { ts = ts })
end
end)

eco.run(function()
local con, err = ubus.connect()
if not con then
error(err)
end

con:subscribe('eco', function(method, msg)
if method == 'time' then
print('recv:', msg.ts)
end
end)

while true do
time.sleep(1000)
end
end)
15 changes: 15 additions & 0 deletions tests/ubus_client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,18 @@ eco.run(function()
time.sleep(0.001)
end
end)

eco.run(function()
local con, err = ubus.connect()
if not con then
error(err)
end

con:subscribe('eco', function(method, msg)
print('recv:', method, msg.j)
end)

while true do
time.sleep(1000)
end
end)
7 changes: 4 additions & 3 deletions tests/ubus_server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ end)

local con = ubus.connect()

con:add('eco', {
local obj = con:add('eco', {
echo = {
function(req, msg)
con:reply(req, msg)
Expand All @@ -21,8 +21,9 @@ con:add('eco', {

con:listen('*', function(ev, msg)
print('got event:', msg.j, ev)
con:notify(obj, 'test', msg)
end)

while true do
time.sleep(1)
end
time.sleep(1000)
end
111 changes: 111 additions & 0 deletions ubus.c
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,115 @@ static int lua_ubus_complete_deferred_request(lua_State *L)
return 0;
}

static int ubus_subscriber_cb(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req,const char *method, struct blob_attr *msg)
{
struct eco_ubus_context *c = container_of(ctx, struct eco_ubus_context, ctx);
struct ubus_subscriber *s = container_of(obj, struct ubus_subscriber, obj);
lua_State *L = c->eco->L;

lua_pushnil(L);

lua_push_ubus_ctx(L, c);
lua_getuservalue(L, -1);

lua_pushlightuserdata(L, s);
lua_rawget(L, -2);

lua_getuservalue(L, -1);

lua_rawgeti(L, -1, 1);

lua_replace(L, -6);

lua_settop(L, -5);

lua_pushstring(L, method);

blob_to_lua_table(L, blob_data(msg), blob_len(msg), false);

lua_call(L, 2, 0);

return 0;
}

static int lua_ubus_subscribe(lua_State *L)
{
struct eco_ubus_context *ctx = luaL_checkudata(L, 1, ECO_UBUS_CTX_MT);
const char *path = luaL_checkstring(L, 2);
struct ubus_subscriber *sub;
uint32_t id;
int ret;

if (ubus_lookup_id(&ctx->ctx, path, &id)) {
lua_pushnil(L);
lua_pushliteral(L, "not found");
return 2;
}

sub = lua_newuserdata(L, sizeof(struct ubus_subscriber));
lua_newtable(L);
lua_pushvalue(L, 3);
lua_rawseti(L, -2, 1);
lua_setuservalue(L, -2);

memset(sub, 0, sizeof(struct ubus_event_handler));

sub->cb = ubus_subscriber_cb;

ret = ubus_register_subscriber(&ctx->ctx, sub);
if (ret) {
lua_pushnil(L);
lua_pushstring(L, ubus_strerror(ret));
return 2;
}

ret = ubus_subscribe(&ctx->ctx, sub, id);
if (ret) {
lua_pushnil(L);
lua_pushstring(L, ubus_strerror(ret));
return 2;
}

lua_push_ubus_ctx(L, ctx);
lua_getuservalue(L, -1);

lua_pushlightuserdata(L, sub);
lua_pushvalue(L, -4);
lua_rawset(L, -3);

lua_settop(L, -3);

return 0;
}

static int lua_ubus_notify(lua_State *L)
{
struct eco_ubus_context *ctx = luaL_checkudata(L, 1, ECO_UBUS_CTX_MT);
struct blob_buf buf = {};
struct eco_ubus_object *obj;
const char *method;

if(!lua_isuserdata(L, 2))
return luaL_error(L, "Invald 2nd parameter, expected ubus obj ref");

obj = lua_touserdata(L, 2);

method = luaL_checkstring(L, 3);

luaL_checktype(L, 4, LUA_TTABLE);

blob_buf_init(&buf, 0);

lua_table_to_blob(L, 4, &buf, false);

ubus_notify(&ctx->ctx, &obj->object, method, buf.head, -1);

blob_buf_free(&buf);

return 0;
}

static int lua_ubus_connect(lua_State *L)
{
const char *path = luaL_optstring(L, 1, NULL);
Expand Down Expand Up @@ -785,6 +894,8 @@ static const struct luaL_Reg ubus_methods[] = {
{"add", lua_ubus_add},
{"reply", lua_ubus_reply},
{"complete_deferred_request", lua_ubus_complete_deferred_request},
{"subscribe", lua_ubus_subscribe},
{"notify", lua_ubus_notify},
{"objects", lua_ubus_objects},
{"signatures", lua_ubus_signatures},
{"close", lua_ubus_close},
Expand Down
12 changes: 9 additions & 3 deletions ubus.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ local global_timeout = 30.0

local methods = {}

for _, method in ipairs({'close', 'call', 'reply', 'send', 'objects', 'signatures', 'settimeout', 'auto_reconnect'}) do
for _, method in ipairs({'close', 'call', 'reply', 'send', 'notify', 'objects', 'signatures', 'settimeout', 'auto_reconnect'}) do
methods[method] = function(self, ...)
local con = self.con
return con[method](con, ...)
Expand All @@ -33,8 +33,14 @@ function methods:add(object, ms)
end
end

local o, err = con:add(object, ms)
if not o then
return con:add(object, ms)
end

function methods:subscribe(path, cb)
local s, err = self.con:subscribe(path, function(...)
eco.run(cb, ...)
end)
if not s then
return false, err
end

Expand Down

0 comments on commit c6d5f24

Please sign in to comment.