當初在玩ubus的時候發現能夠參考的資料真的不多,大部分都得靠自己trace code來架設,OpenWrt官網也沒寫太多資訊。
大部分我都參考odhcpd這個來架設ubus IPC的架構,我得說這支程式架構寫得真的超級漂亮的!
首先,關於架構上的設計,假設有Process A/B,那在IPC的設計上就應該是
1. Process A to Process B
Process A利用ubus_invoke送執行資料給Process B;Process B則可以用ubus_send_reply回覆確認訊息,這個是對於ubus call method架構的設計。
關於ubus_invoke的使用可以參考這篇[OpenWrt] ubus_invoke()使用介紹
2. Process B to Process A
當Process B處理完之後,再回報給Process A處理完成的結果;這個subscribe-notify的架構好處是,透過ubus server來處理notify,所以Process A就不需要送完命令後一直在等Process B完成,增加Process A在處理其他訊息的能力。
而且這樣一來,可以大大減低多餘的連線/佔線,ubus大概是目前我看到IPC的集大成者。
怎麼實作:
Process A的部分
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <libubus.h>
#include <libubox/uloop.h>
#include <libubox/blobmsg_json.h>
static struct ubus_context *ubus = NULL;
static struct ubus_subscriber processB_subscriber;
static uint32_t objid_processB = 0;
static struct blob_buf b;
enum {
PB_NOTIFY_CMD,
__PB_NOTIFY_MAX,
};
static const struct blobmsg_policy pb_cmd_policy[__PB_NOTIFY_MAX] = {
[PB_NOTIFY_CMD] = { .name = "command", .type = BLOBMSG_TYPE_INT8 },
};
static int handle_processB_notify(struct blob_attr *msg) {
uint8_t command = 0;
struct blob_attr *tb[__PB_NOTIFY_MAX], *c;
blobmsg_parse(pb_cmd_policy, __PB_NOTIFY_MAX, tb, blob_data(msg), blob_len(msg));
if ((c = tb[PB_NOTIFY_CMD])) {
// acqurie results from Process B
command = blobmsg_get_u8(c);
}
return 0;
}
enum {
PA_CALL_ENABLE,
__PA_CALL_MAX
};
static const struct blobmsg_policy pa_call_policy[__PA_CALL_MAX] = {
[PA_CALL_ENABLE] = { .name = "enable", .type = BLOBMSG_TYPE_BOOL },
};
// method for ubus call
static int handle_pa_call(_unused struct ubus_context *ctx, _unused struct ubus_object *obj,
_unused struct ubus_request_data *req, _unused const char *method,
struct blob_attr *msg) {
struct blob_attr *tb[__PA_CALL_MAX], *c;
bool enable = false;
blobmsg_parse(pa_call_policy, __PA_CALL_MAX, tb, blob_data(msg), blob_len(msg));
if ((c = tb[PA_CALL_ENABLE])) {
enable = blobmsg_get_u8(c);
}
if(enable) {
// call Process B to do something...
blob_buf_init(&b, 0);
blobmsg_add_u32(&b, "command", enable);
ubus_invoke(ubus, objid_processB, "pb_call_method", b.head, NULL, NULL, 2000);
}
return 0;
}
static void update_processB(bool subscribe) {
if (subscribe)
ubus_subscribe(ubus, &processB_subscriber, objid_processB);
}
// create a method for ubus
static struct ubus_method processA_object_methods[] = {
UBUS_METHOD("pa_call_command", handle_pa_call, pa_call_policy),
};
static struct ubus_object_type processA_object_type =
UBUS_OBJECT_TYPE("processA", processA_object_methods);
static struct ubus_object processA_object = {
.name = "processA",
.type = &processA_object_type,
.methods = processA_object_methods,
.n_methods = ARRAY_SIZE(processA_object_methods),
};
static int processB_notify(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req, const char *method,
struct blob_attr *msg) {
if(strcmp(method, "pb_notify")==0) {
// receive results from Process B
handle_processB_notify(msg);
}
return 0;
}
int init_ubus(void) {
// initialize ubus of Process A
uloop_init();
// connect to ubus server
if(!(ubus = ubus_connect(NULL))) {
return -1;
}
// set up subscribing Process B object
processB_subscriber.cb = processB_notify;
ubus_register_subscriber(ubus, &processB_subscriber);
ubus_add_uloop(ubus);
// create ubus method
ubus_add_object(ubus, &processA_object);
if(!ubus_lookup_id(ubus, "processB", &objid_processB)) {
update_processB(true);
}
uloop_run();
return 0;
}
int exit_ubus(void) {
ubus_free(ubus);
uloop_done();
return 0;
}
Process B的部分
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <libubus.h>
#include <libubox/uloop.h>
#include <libubox/blobmsg_json.h>
static struct ubus_context *ubus = NULL;
static uint32_t objid_processA = 0;
static struct blob_buf b;
static struct ubus_object processB_object;
int ubus_notify_processA(uint8_t state) {
blob_buf_init(&b, 0);
blobmsg_add_u8(&b, "command", state);
// tell Process A the results of commands
ubus_notify(ubus, &processB_object, "pb_notify", b.head, -1);
return 0;
}
void do_founction(uint32_t command) {
// do something, then notify to Process A
ubus_notify_processA(true);
}
enum {
PB_CALL_CMD,
__PB_CALL_MAX
};
static const struct blobmsg_policy pb_call_method_policy[__PB_CALL_MAX] = {
[PB_CALL_CMD] = { .name = "command", .type = BLOBMSG_TYPE_INT32 }
};
static int handle_pb_call_method(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req, const char *method,
struct blob_attr *msg)
{
struct blob_attr *tb[__PB_CALL_MAX];
uint32_t command;
blobmsg_parse(pb_call_method_policy, __PB_CALL_MAX, tb, blob_data(msg), blob_len(msg));
if ((c = tb[PB_CALL_CMD])) {
command = blobmsg_get_u32(tb[PB_CALL_CMD]);
}
// send invoke reply
blob_buf_init(&b, 0);
blobmsg_add_u8(&b, "rc", true);
ubus_send_reply(ctx, req, b.head);
// start to do something......
do_founction(command);
return 0;
}
static const struct ubus_method processB_methods[] = {
UBUS_METHOD("pb_call_method", handle_pb_call_method, pb_call_method_policy),
};
static struct ubus_object_type processB_object_type =
UBUS_OBJECT_TYPE("processB", processB_methods);
static struct ubus_object processB_object = {
.name = "processB",
.type = &processB_object_type,
.methods = processB_methods,
.n_methods = ARRAY_SIZE(processB_methods),
};
int init_ubus(void)
{
int ret;
uloop_init();
if (!(ubus = ubus_connect(NULL))) {
return -1;
}
ubus_add_uloop(ubus);
ret = ubus_add_object(ubus, &processB_object);
ubus_lookup_id(ubus, "processA", &objid_processA);
uloop_run();
return 0;
}
int exit_ubus(void)
{
ubus_free(ubus);
uloop_done();
return 0;
}
有問題歡迎一起討論研究OpenWrt。
沒有留言:
張貼留言