2017/02/17

[OpenWrt] 用ubus實現Inter-Process Communication

OpenWrt依靠ubus來做所有的IPC,所以也就是說,我認為玩OpenWrt不能不懂什麼是ubus。




當初在玩ubus的時候發現能夠參考的資料真的不多,大部分都得靠自己trace code來架設,OpenWrt官網也沒寫太多資訊。

大部分我都參考odhcpd這個來架設ubus IPC的架構,我得說這支程式架構寫得真的超級漂亮的!

首先,關於架構上的設計,假設有Process A/B,那在IPC的設計上就應該是

1. Process A to Process B
Process A利用ubus_invoke送執行資料給Process B;Process B則可以用ubus_send_reply回覆確認訊息,這個是對於ubus call method架構的設計。

關於ubus_invoke的使用可以參考這篇[OpenWrt] ubus_invoke()使用介紹

2. Process B to Process A
當Process B處理完之後,再回報給Process A處理完成的結果;這個subscribe-notify的架構好處是,透過ubus server來處理notify,所以Process A就不需要送完命令後一直在等Process B完成,增加Process A在處理其他訊息的能力。

而且這樣一來,可以大大減低多餘的連線/佔線,ubus大概是目前我看到IPC的集大成者。

怎麼實作:
Process A的部分
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <libubus.h>
#include <libubox/uloop.h>
#include <libubox/blobmsg_json.h>

static struct ubus_context *ubus = NULL;
static struct ubus_subscriber processB_subscriber;
static uint32_t objid_processB = 0;
static struct blob_buf b;

enum {
 PB_NOTIFY_CMD,
 __PB_NOTIFY_MAX,
};

static const struct blobmsg_policy pb_cmd_policy[__PB_NOTIFY_MAX] = {
 [PB_NOTIFY_CMD] = { .name = "command", .type = BLOBMSG_TYPE_INT8 },
};

static int handle_processB_notify(struct blob_attr *msg) {

 uint8_t command = 0;

 struct blob_attr *tb[__PB_NOTIFY_MAX], *c;
 blobmsg_parse(pb_cmd_policy, __PB_NOTIFY_MAX, tb, blob_data(msg), blob_len(msg));

 if ((c = tb[PB_NOTIFY_CMD])) {
  // acqurie results from Process B
  command = blobmsg_get_u8(c);
 }

 return 0;
}

enum {
 PA_CALL_ENABLE,
 __PA_CALL_MAX
};

static const struct blobmsg_policy pa_call_policy[__PA_CALL_MAX] = {
 [PA_CALL_ENABLE] = { .name = "enable", .type = BLOBMSG_TYPE_BOOL },
};

// method for ubus call
static int handle_pa_call(_unused struct ubus_context *ctx, _unused struct ubus_object *obj,
  _unused struct ubus_request_data *req, _unused const char *method,
  struct blob_attr *msg) {

 struct blob_attr *tb[__PA_CALL_MAX], *c;

 bool enable = false;

 blobmsg_parse(pa_call_policy, __PA_CALL_MAX, tb, blob_data(msg), blob_len(msg));

 if ((c = tb[PA_CALL_ENABLE])) {
  enable = blobmsg_get_u8(c);
 }

 if(enable) {
  // call Process B to do something...
  blob_buf_init(&b, 0);
  blobmsg_add_u32(&b, "command", enable);
  ubus_invoke(ubus, objid_processB, "pb_call_method", b.head, NULL, NULL, 2000);
 }

 return 0;
}

static void update_processB(bool subscribe) {
 if (subscribe)
  ubus_subscribe(ubus, &processB_subscriber, objid_processB);
}

// create a method for ubus
static struct ubus_method processA_object_methods[] = {
 UBUS_METHOD("pa_call_command", handle_pa_call, pa_call_policy),
};

static struct ubus_object_type processA_object_type =
 UBUS_OBJECT_TYPE("processA", processA_object_methods);

static struct ubus_object processA_object = {
 .name = "processA",
 .type = &processA_object_type,
 .methods = processA_object_methods,
 .n_methods = ARRAY_SIZE(processA_object_methods),
};

static int processB_notify(struct ubus_context *ctx, struct ubus_object *obj, 
 struct ubus_request_data *req, const char *method,
 struct blob_attr *msg) {

 if(strcmp(method, "pb_notify")==0) {
  // receive results from Process B
  handle_processB_notify(msg);
 }
 return 0;
}

int init_ubus(void) {

 // initialize ubus of Process A
 uloop_init();

 // connect to ubus server
 if(!(ubus = ubus_connect(NULL))) {
  return -1;
 }

 // set up subscribing Process B object
 processB_subscriber.cb = processB_notify;
 ubus_register_subscriber(ubus, &processB_subscriber);

 ubus_add_uloop(ubus);

 // create ubus method
 ubus_add_object(ubus, &processA_object);

 if(!ubus_lookup_id(ubus, "processB", &objid_processB)) {
  update_processB(true);
 }

 uloop_run();

 return 0;
}

int exit_ubus(void) {

 ubus_free(ubus);
 uloop_done();
 return 0;
}


Process B的部分
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <libubus.h>
#include <libubox/uloop.h>
#include <libubox/blobmsg_json.h> 

static struct ubus_context *ubus = NULL;
static uint32_t objid_processA = 0;
static struct blob_buf b;
static struct ubus_object processB_object;

int ubus_notify_processA(uint8_t state) {

 blob_buf_init(&b, 0);
 blobmsg_add_u8(&b, "command", state);

 // tell Process A the results of commands
 ubus_notify(ubus, &processB_object, "pb_notify", b.head, -1);

 return 0;
}

void do_founction(uint32_t command) {
 // do something, then notify to Process A
 ubus_notify_processA(true);

}

enum {
 PB_CALL_CMD,
 __PB_CALL_MAX
};

static const struct blobmsg_policy pb_call_method_policy[__PB_CALL_MAX] = {
 [PB_CALL_CMD] = { .name = "command", .type = BLOBMSG_TYPE_INT32 }
};

static int handle_pb_call_method(struct ubus_context *ctx, struct ubus_object *obj,
       struct ubus_request_data *req, const char *method,
          struct blob_attr *msg)
{
 struct blob_attr *tb[__PB_CALL_MAX];
 uint32_t command;

 blobmsg_parse(pb_call_method_policy, __PB_CALL_MAX, tb, blob_data(msg), blob_len(msg));

 if ((c = tb[PB_CALL_CMD])) {
  command = blobmsg_get_u32(tb[PB_CALL_CMD]);
 }

 // send invoke reply
 blob_buf_init(&b, 0);
 blobmsg_add_u8(&b, "rc", true);
 ubus_send_reply(ctx, req, b.head);

 // start to do something......
 do_founction(command);

 return 0;
}

static const struct ubus_method processB_methods[] = {
 UBUS_METHOD("pb_call_method", handle_pb_call_method, pb_call_method_policy),
};

static struct ubus_object_type processB_object_type =
 UBUS_OBJECT_TYPE("processB", processB_methods);

static struct ubus_object processB_object = {
 .name = "processB",
 .type = &processB_object_type,
 .methods = processB_methods,
 .n_methods = ARRAY_SIZE(processB_methods),
};

int init_ubus(void)
{
 int ret;

 uloop_init();

 if (!(ubus = ubus_connect(NULL))) {
  return -1;
 }

 ubus_add_uloop(ubus);

 ret = ubus_add_object(ubus, &processB_object);

 ubus_lookup_id(ubus, "processA", &objid_processA);

 uloop_run();

 return 0;
}

int exit_ubus(void)
{
 ubus_free(ubus);
 uloop_done();

 return 0;
}


有問題歡迎一起討論研究OpenWrt。

沒有留言:

張貼留言