Incomprehensible rpc, or the first pet project or DynamicRPC

ATTENTION!

Most of the content of this article is presented as comments in the code!

Introduction

Initially this project was just an experiment with libffi and whether it is possible to make a full-fledged P2P RPC with it. It seems to have worked 🙂 And here I will try to tell what I did and what features this rpc has.

Peculiarities

The most important thing that this RPC gives is the ability to use ready-made functions without rewriting them or wrappers (IDL, which is not here, also falls under wrappers), but there are some limitations, for example structures or 2+D arrays, for them there are special types. Well, the main feature is the ability to send some arguments back to the client, which will cause an update of their originals on the client.

Protocol

The protocol was not particularly well thought out, but it fulfilled its tasks; all network interactions were tied to 2 functions.

ssize_t rpcmsg_write_to_fd(struct rpcmsg* msg, int fd); //Пишет сообщение в сокет
int get_rpcmsg_from_fd(struct rpcmsg* msg ,int fd);  //Получает сообщение из сокета

/*Структура которую эти функции шлют в сокет*/
struct rpcmsg{
    enum rpcmsg_type msg_type; //тип сообщения
    uint64_t payload_len;      //Длина полезных данных в байтах
    char* payload;             //Сами полезные данные
    uint8_t payload_crc;       //Самая тупейшая контрольная сумма в мире,crc = crc ^ msg->payload[i] ^ i;
};

I will not post the source codes of these functions in the article because I am ashamed of them, but they work and perform their task.

The protocol itself (before the request processing cycle starts) is quite simple.

But the protocol for exchanging data between the client and its stream on the server will be a little more interesting, but first about authentication, it is implemented through a hash from the password and its search in the hash table

    if(get_rpcmsg_from_fd(&gotmsg,thrd->client_fd) == 0 && gotmsg.msg_type == AUTH && gotmsg.payload != NULL){
        /*понимаем что клиент нам отослал все правильно и не словил timeout*/
        struct rpctype type; //Сюда будет распакован упакованный uint64_t хеш
        uint64_t hash = type_to_uint64(arr_to_type(gotmsg.payload,&type)); //распаковавываем gotmsg.payload в тип а этот тип в uint64_t
        free(gotmsg.payload); //Очищаем уже лишние данные,да free тут будет ОЧЕНЬ много
        if(hash != 0){        //хеш правильный? / Был упакован validный тип?
            int* gotusr = NULL; //указатель на привелегии пользователя
            if(hashtable_get_by_hash(thrd->serv->users,hash,(void**)&gotusr) == 0){
                /*Получаем эти привелегии по хешу пароля*/
                assert(gotusr); //проверка что все правильно, потому что если не правильно то нельзя быть увереным что в сервере все хорошо
                is_authed = 1;  //ставим переменную что пользователь аутентифицирован
                user_perm = *gotusr;  //устанавлием привелегии в локальную переменную,они влиляют можно ли вызвать конкретную функцию конкретному клиенту
            }
        }
      free(type.data); //Да, опять очистка, но теперь уже чистим наш распакованный тип
    }else printf("%s: no auth provided\n",__PRETTY_FUNCTION__); // если клиент прислал что-то не то или отключился

Now the client protocol itself, it happens in a cycle and is a “simple” finite state machine)

        if(is_authed){
            /* говорим клиенту: "Все отлично,бро, вот тебе тут твои привелегии только хз зачем они"*/
            repl.msg_type = OK;
            struct rpctype perm = {0};
            int32_to_type(user_perm,&perm);
            repl.payload = malloc((repl.payload_len = type_buflen(&perm)));
            assert(repl.payload);
            type_to_arr(repl.payload,&perm); //тут переводится rpcтип в массив
            rpcmsg_write_to_fd(&repl,thrd->client_fd); repl.msg_type = 0;
            free(repl.payload);
            free(perm.data);
            repl.payload = NULL;
            repl.payload_len = 0;
            /*=====================================================================================*/
            printf("%s: auth ok, OK is replied to client\n",__PRETTY_FUNCTION__);
            while(thrd->serv->stop == 0){   /*крутимся пока сервер "включен"*/
                /*установка всех локальных переменных*/
                repl.msg_type = 0; repl.payload = NULL; repl.payload_len = 0; 
                gotmsg.msg_type = 0; gotmsg.payload = NULL; gotmsg.payload_len = 0;
                struct rpccall call = {0}; struct rpcret ret = {0};
                /*===================================*/
                if(get_rpcmsg_from_fd(&gotmsg,thrd->client_fd) < 0) {printf("%s: client disconected badly\n",__PRETTY_FUNCTION__);goto exit;}
                /*Получаем и обрабрабатываем что хочет клиент*/
                switch(gotmsg.msg_type){
                    case PING:
                                    //это сообщение от клиента просто чтобы сбросить timeout
                                    free(gotmsg.payload);
                                    break;
                    case DISCON:
                                    //обработка "нормального отключения" клиента,она также выходит из потока
                                    free(gotmsg.payload);
                                    printf("%s: client disconnected normaly\n",__PRETTY_FUNCTION__);
                                    goto exit;
                    case CALL:
                                    //ГЛАВНОЕ и ЕДИНСТВЕННОЕ не служебное сообщение которое обрабатывает сервер
                                    
                                    //здесь из gotmsg.payload распаковывается структура вызова функции, в ней находятся аргументы и их количество + имя функции
                                    if(buf_to_rpccall(&call,gotmsg.payload) != 0 ){
                                        printf("%s: internal server error or server closing\n",__PRETTY_FUNCTION__);
                                        rpctypes_free(call.args,call.args_amm);
                                        free(call.fn_name);
                                        free(gotmsg.payload);
                                        goto exit;
                                    }
                                    free(gotmsg.payload);
                                    //проверяем что сервер не находится в состояние изменения (регистрация или удалении функции)
                                    pthread_mutex_lock(&thrd->serv->edit); pthread_mutex_unlock(&thrd->serv->edit);

                                    //получаем структуру функции которая будет использоваться в функции вызова
                                    /*
                                    struct fn{
                                        char* fn_name; //имя функции
                                        void* fn; //libffi;
                                        uint8_t nargs; //кол-во аргументов
                                        enum rpctypes rtype; //возращаемый тип
                                        enum rpctypes* argtypes; //типы аргументов в RPC формате
                                        ffi_type** ffi_type_free;  // заглушка для высвобождения, не спрашивайте
                                        ffi_cif cif;               //cif от libffi, нужен для вызова функции, для большего смотрите в доки libffi
                                        void* personal;            //для PSTORAGE
                                        int perm;                  //минимальные привелегии для вызова функции
                                    };
                                    */
                                    struct fn* cfn = NULL;hashtable_get(thrd->serv->fn_ht,call.fn_name,strlen(call.fn_name) + 1,(void**)&cfn);
                                    if(cfn == NULL){
                                        repl.msg_type = NOFN;
                                        printf("%s: '%s' no such function\n",__PRETTY_FUNCTION__,call.fn_name);
                                        rpcmsg_write_to_fd(&repl,thrd->client_fd);
                                        free(call.fn_name); rpctypes_free(call.args,call.args_amm);
                                        break;
                                    //проверка привелегий клиента, -1 позволяет вызывать все функции!
                                    }else if(cfn->perm > user_perm && user_perm != -1){
                                        repl.msg_type = LPERM;
                                        printf("%s: low permissions, need %d, have %d\n",__PRETTY_FUNCTION__,cfn->perm,(int)user_perm);
                                        rpcmsg_write_to_fd(&repl,thrd->client_fd);
                                        free(call.fn_name);
                                        rpctypes_free(call.args,call.args_amm);
                                        break;
                                    }else repl.msg_type = OK;

                                    //Отдаем сгенерированный предыдущими проверками ответ клиенту
                                    if(rpcmsg_write_to_fd(&repl,thrd->client_fd) < 0){
                                        free(call.fn_name);
                                        rpctypes_free(call.args,call.args_amm);
                                        printf("%s: client connection closed\n",__PRETTY_FUNCTION__);
                                        goto exit;
                                    }
                                    //получаем от клиента ответ
                                    if(get_rpcmsg_from_fd(&gotmsg,thrd->client_fd) < 0) {
                                        free(gotmsg.payload);
                                        free(call.fn_name);
                                        rpctypes_free(call.args,call.args_amm);
                                        printf("%s: client disconected badly\n",__PRETTY_FUNCTION__);
                                        goto exit;
                                    }
                                    free(gotmsg.payload);
                                    if(gotmsg.msg_type == NONREADY){printf("%s: client nonready\n",__PRETTY_FUNCTION__);free(call.fn_name); rpctypes_free(call.args,call.args_amm); break;} // я понятия не умею зачем оно тут -____- Клиент его не использует
                                    if(gotmsg.msg_type == READY){
                                        //клиент готов и тут начинается вызов функции
                                        repl.msg_type = RET;
                                        int err = 0;
                                        int callret = 0;
                                        //вызов функции, и обработка ошибок
                                        if((callret = __rpcserver_call_fn(&ret,thrd->serv,&call,cfn,&err)) != 0 && err == 0){
                                            free(call.fn_name);
                                            rpctypes_free(call.args,call.args_amm);
                                            printf("%s: internal server error\n",__PRETTY_FUNCTION__);
                                            goto exit;
                                        }else if(callret != 0 && err != 0){
                                            //Если клиент выдал не те аргументы(отличные от прототипа функции)
                                            repl.msg_type = BAD;
                                            printf("%s: client provided wrong arguments\n",__PRETTY_FUNCTION__);
                                            rpcmsg_write_to_fd(&repl,thrd->client_fd);
                                            free(call.fn_name);
                                            rpctypes_free(call.args,call.args_amm);
                                            break;
                                        }
                                        //отправка rpcret и очистка
                                        free(call.fn_name);
                                        repl.payload = rpcret_to_buf(&ret,&repl.payload_len); //упаковка return функции и возвращаемых аргументов
                                        rpcmsg_write_to_fd(&repl,thrd->client_fd);
                                        rpctypes_free(ret.resargs,ret.resargs_amm);
                                        free(repl.payload);
                                        if(ret.ret.data) free(ret.ret.data);
                                        break;
                                    }
                                    free(gotmsg.payload);
                                    free(call.fn_name);
                                    rpctypes_free(call.args,call.args_amm);
                                    printf("%s: client bad reply\n",__PRETTY_FUNCTION__);
                                    goto exit;
                    default:
                            free(gotmsg.payload);
                            free(call.fn_name);
                            rpctypes_free(call.args,call.args_amm);
                            printf("%s: client sent non call or disconected badly(%d), exiting\n",__PRETTY_FUNCTION__,gotmsg.msg_type);
                            goto exit;
                }
            } //всякие ошибки
        }else {repl.msg_type = BAD;rpcmsg_write_to_fd(&repl,thrd->client_fd);printf("%s: client not passed auth\n",__PRETTY_FUNCTION__);}
    } else printf("%s: no auth provided\n",__PRETTY_FUNCTION__);

exit:
    //главный кусок завершения потока чтобы не дублировать код еще больше)
    if(thrd->serv->stop == 1) printf("%s: server stopping, exiting\n",__PRETTY_FUNCTION__);
    struct rpcmsg lreply = {DISCON,0,NULL,0};
    rpcmsg_write_to_fd(&lreply,thrd->client_fd);
    close(thrd->client_fd);
    thrd->serv->clientcount--;
    free(thrd);
    pthread_detach(pthread_self());
    pthread_exit(NULL);

Calling a function

This is done by the __rpcserver_call_fn function, it holds all the tasks of unpacking arguments, packing the returned arguments, packing the value returned by the function, this function is based on libffi (you may have already noticed this in struct fn). There is a LOT of boilerplate code, and I don't really know what to say here, so I'll just show and describe its work there

int __rpcserver_call_fn(struct rpcret* ret,struct rpcserver* serv,struct rpccall* call,struct fn* cfn, int* err_code){
    void** callargs = calloc(cfn->nargs, sizeof(void*)); //создается массив аргументов для вызова из libffi
    assert(callargs);
    uint8_t j = 0;
    struct tqueque* rpcbuff_upd = tqueque_create();
    struct tqueque* rpcbuff_free = tqueque_create();
    struct tqueque* rpcstruct_upd = tqueque_create();
    struct tqueque* _rpcstruct_free = tqueque_create();
    assert(rpcbuff_upd);
    assert(rpcbuff_free);
    //создаем из аргументов прототип и проверяем его
    enum rpctypes* check = rpctypes_get_types(call->args,call->args_amm);
    if(!is_rpctypes_equal(cfn->argtypes,cfn->nargs,check,call->args_amm)){
        //посылаем такой вызов функции куда-подальше иначе свалится весь сервер
        *err_code = 7;
        free(check);
        goto exit;
    }
    free(check);
    assert(callargs != NULL && call->args_amm != 0);
    for(uint8_t i = 0; i < cfn->nargs; i++){
      /*Это специальные типы данных которые не существуют для клиента,они предоставляют локальное хранилище для функций или глобальное хранилище для функции*/
        if(cfn->argtypes[i] == PSTORAGE){
            callargs[i] = calloc(1,sizeof(void*));
            assert(callargs[i]);
            *(void**)callargs[i] = cfn->personal;
            continue;
        }
        if(cfn->argtypes[i] == INTERFUNC){
            callargs[i] = calloc(1,sizeof(void*));
            assert(callargs[i]);
            *(void**)callargs[i] = serv->interfunc;
            continue;
        }
      /*====================================================================================================================================================*/
        if(j < call->args_amm){
            //здесь идет распаковка аргументов по типам
            if(call->args[j].type == RPCBUFF){
                callargs[i] = calloc(1,sizeof(void*));
                assert(callargs[i]);
                *(void**)callargs[i] = unpack_rpcbuff_type(&call->args[j]);
                if(call->args[j].flag == 1)
                    tqueque_push(rpcbuff_upd,*(void**)callargs[i],1,NULL);
                else        //это костыль для обновления распакованного оригинала ибо он создает полностью новый кусок памяти в отличии от STR и SIZEDBUF
                    tqueque_push(rpcbuff_free,*(void**)callargs[i],1,NULL);
                free(call->args[j].data);
                call->args[j].data = NULL;
                j++;
                continue;
            }
            if(call->args[j].type == RPCSTRUCT){
                callargs[i] = calloc(1,sizeof(void*));
                assert(callargs[i]);
                *(void**)callargs[i] = unpack_rpcstruct_type(&call->args[j]);
                if(call->args[j].flag == 1)
                    tqueque_push(rpcstruct_upd,*(void**)callargs[i],1,NULL);
                else //тоже самое но для rpcstruct
                    tqueque_push(_rpcstruct_free,*(void**)callargs[i],1,NULL);
                free(call->args[j].data);
                call->args[j].data = NULL;
                j++;
                continue;
            }
            if(call->args[j].type == SIZEDBUF){
                //это тоже полу-специальный тип но он предстовляется для функции как char*, uint64_t
                callargs[i] = calloc(1,sizeof(void*));
                assert(callargs[i]);
                uint64_t buflen = 0;
                *(void**)callargs[i] = unpack_sizedbuf_type(&call->args[j],&buflen);
                callargs[++i] = calloc(1,sizeof(void*));
                assert(callargs[i]);
                *(uint64_t*)callargs[i] = buflen;
                j++;
                continue;
            }
            if(call->args[j].type == STR){  //распаковка "обычных" типов, поэтому я пропущу остальные
                callargs[i] = calloc(1,sizeof(char*));
                assert(callargs[i]);
                *(void**)callargs[i] =  unpack_str_type(&call->args[j]);
                j++;
                continue;
            }
            .........подобный STR код..........
        } else {*err_code = 7; goto exit;}
    }
    //кусок памяти под return функции
    void* fnret = NULL;
    if(cfn->rtype != VOID){
        fnret = calloc(1,sizeof(uint64_t)); //Самый большой тип стандартоного C(кроме структур), это шаг для отвязки от 64битных платформ в некотором роде
        assert(fnret);
    }
    ffi_call(&cfn->cif,cfn->fn,fnret,callargs); // наконец-то вызов функции!
    ret->resargs = rpctypes_clean_nonres_args(call->args,call->args_amm,&ret->resargs_amm); //понимаем какие аргументы нужно переслать обратно а какие очистить
    enum rpctypes rtype = cfn->rtype; //Просто копируем тип который вернет функция чтобы не долбится в heap(Это все отговорки, мне просто было лень писать длинное название)
    ret->ret.type = VOID; //инициализация к void, он тут останется если не распакуется другой тип
    if(rtype != VOID){
        if(rtype == CHAR)    char_to_type(*(char*)fnret,&ret->ret); //Упаковка возврата функции в rpcтип
        ...........Остальные типы упаковываются также как и CHAR...........

        if(rtype == STR && *(void**)fnret != NULL){
            create_str_type(*(char**)fnret,0,&ret->ret); //упаковываем строку
            free(*(char**)fnret); //чистим строку
        }else if(rtype == STR && *(void**)fnret == NULL){  //если функция вернула NULL то возвращаемый типо void
            ret->ret.type = VOID;
        }
    }
    /*Перепаковка расспакованных rpcbuff и rpcstruct т.к они не достают данные из типа а распаковывают их и создают свои структуры*/
    for(uint8_t i = 0; i < ret->resargs_amm; i++){
        if(ret->resargs[i].type == RPCBUFF){
            struct rpcbuff* buf = tqueque_pop(rpcbuff_upd,NULL,NULL);
            if(!buf) break;
            create_rpcbuff_type(buf,ret->resargs[i].flag,&ret->resargs[i]);
            _rpcbuff_free(buf);
        }
        if(ret->resargs[i].type == RPCSTRUCT){
            struct rpcstruct* buf = tqueque_pop(rpcstruct_upd,NULL,NULL);
            if(!buf) break;
            create_rpcstruct_type(buf,ret->resargs[i].flag,&ret->resargs[i]);
            rpcstruct_free(buf);free(buf);
        }
    }
    /*===========================================================================================================================*/
    tqueque_free(rpcbuff_upd);
    tqueque_free(rpcstruct_upd);
    void* buf = NULL;
    while((buf = tqueque_pop(rpcbuff_free,NULL,NULL)) != NULL) _rpcbuff_free(buf);
    while((buf = tqueque_pop(_rpcstruct_free,NULL,NULL)) != NULL) {rpcstruct_free(buf);free(buf);}
    tqueque_free(rpcbuff_free);
    tqueque_free(_rpcstruct_free);
    free(fnret);
    for(uint8_t i = 0; i < cfn->nargs; i++){
        free(callargs[i]);
    }
    free(callargs);
    return 0;
exit:
    while((buf = tqueque_pop(rpcbuff_free,NULL,NULL)) != NULL) _rpcbuff_free(buf);
    while((buf = tqueque_pop(rpcbuff_upd,NULL,NULL)) != NULL) _rpcbuff_free(buf);
    while((buf = tqueque_pop(rpcstruct_upd,NULL,NULL)) != NULL) {rpcstruct_free(buf); free(buf);}
    while((buf = tqueque_pop(_rpcstruct_free,NULL,NULL)) != NULL) {rpcstruct_free(buf);free(buf);}
    tqueque_free(rpcbuff_free);
    tqueque_free(rpcbuff_upd);
    for(uint8_t i = 0; i < cfn->nargs; i++){
        free(callargs[i]);
    }
    free(callargs);
    return 1;

Server API

Here I will tell about the API of registration of functions and the common system of types for the client and the server, it is not ideal and there is one painful limitation (for now? Temporarily?) You cannot return RPCBUFF and RPCSTRUCT. Also you cannot return SIZEDBUF because you cannot normally specify where to get the size for its packing.

The type system consists largely of bindings to C's stdint.h:

enum rpctypes{
  VOID = 0, //Нельзя использовать в аргументах функции
  CHAR = 1,
  STR = 2, //char* строки
  UINT16 = 3,
  INT16 = 4,
  UINT32 = 5,
  INT32 = 6,
  UINT64 = 7,
  INT64 = 8,
  FLOAT = 9,
  DOUBLE = 10,
  RPCBUFF = 11, //!!!НЕЛЬЗЯ ВЕРНУТЬ В RETURN!!! многомерный массив, размерность которого не известна коду сервера(массив в массиве в массиве......)
  SIZEDBUF = 12, //!!!НЕЛЬЗЯ ВЕРНУТЬ В RETURN!!! char* массив который передастся в функцию как void*,uint64_t
  PSTORAGE = 13, //локальный для одной функции(но для всех ее итераций глобальный) кусок памяти
  INTERFUNC = 14, // Глобальный для всех функций и ее итераций кусок памяти, нужно в ручную указывать через модифицирований struct rpcserver, rpcserver->interfunc = какой_то_указатель
  RPCSTRUCT = 15, //!!!НЕЛЬЗЯ ВЕРНУТЬ В RETURN!!! Структура аля хештаблица
};

To call a function in the server, a function prototype is used in the form of this very enum, from which CIF is generated from libffi. Here is an example of registering a function on the server:

void* pstorage = NULL; //персональное хранилище функции
int perm = 1235; //уровень привелегий необходимый для вызова функции
struct rpcserver* serv = rpcserver_create(1234); //создаем сервер на порту 1234
rpcserver_load_keys("keys.txt"); //загрузка ключей в формате "ключ"разрешение_как_число
enum rpctypes fn_proto[] = {STR}; //прототип функции (ПОМЕТКА: SIZEDBUF на сервере объявляется как SIZEDBUF,UINT64 а на клиенте как SIZEDBUF)
rpcserver_register_fn(serv, example_function, "example_function",
                      VOID, fn_proto,
                      sizeof(fn_proto) / sizeof(fn_proto[0]), pstorage, perm); //регистрация функции
rpcserver_start(serv); //запуск сервера

Of course, there are also the functions rpcserver_stop() and rpcserver_free(), the first simply stops the server and the second stops and frees up memory (doesn't allow it to be started again via rpcserver_start)

Client

The client in my rpc turned out to be very simple, it has only 3 functions for the end user. These functions cover all the necessary tasks as for me, namely: connection, function call, disconnection. There is no garbage here (purely for me, do not beat me for my opinion) like data transfer without calling functions.


int rpcserver_connect(char* host,char* key,int portno,struct rpccon* con){
   if(!host || !key)
      return -1;
   int sockfd;
   struct sockaddr_in serv_addr;
   struct hostent *server;
   con->stop = 0;

   .......Бойлерплейт для настройки сокета........

   struct rpcmsg req = {0};
   struct rpcmsg ans = {0};
   req.msg_type = CON; //Это то самое магическое число из начала статьй(Число:53)
   if(rpcmsg_write_to_fd(&req,sockfd) == -1){
      close(sockfd);
      return 2;
   }
   struct rpctype auth = {0};
   uint64_to_type(_hash_fnc(key,strlen(key) + 1),&auth); //генерируем uint64_t тип из хэша пароля
   req.msg_type = AUTH;
   req.payload = malloc((req.payload_len = type_buflen(&auth)));
   assert(req.payload);
   type_to_arr(req.payload,&auth);  //запихиваем сгенерированый тип в req.payload
   if(rpcmsg_write_to_fd(&req,sockfd) == -1){  //пишем это в сокет!
      free(auth.data);
      free(req.payload);
      close(sockfd);
      return 2;
   }
   free(auth.data);
   free(req.payload);
   if(get_rpcmsg_from_fd(&ans,sockfd) != 0){
      close(sockfd);
      return 3;
   } //получаем ответ от сервера и если он не OK отключаемся от сервера
   if(ans.msg_type != OK) {
      free(ans.payload);
      close(sockfd);
      return 4;
   }
   //получаем уровень разрешений из ответа сервера
   arr_to_type(ans.payload,&auth);
   con->perm = type_to_int32(&auth);
   free(ans.payload);
   free(auth.data);
   con->fd = sockfd;
   pthread_mutex_init(&con->send,NULL);
   pthread_create(&con->ping,NULL,rpccon_keepalive,con); //создается keep-alive поток который будет сбрасывать timeout на сервере
   return 0;
}

The rpcclient_call function itself packs arguments from variable arguments, itself unpacks return and returned arguments (ATTENTION: the ban on return rpcbuff, rpcstruct is currently only on the server due to a potential memory leak). I'll show you a teaser in the form of the client API:

struct rpccon con;
rpcserver_connect("localhost", "my_key", 1234, &con); //подключаемся к серверу на localhost:1234 по ключу "my_key"
char* arg = "Hello, server!";
enum rpctypes proto[] = {STR}; //прототип функции на клиенте, нужен для парсинга varargs в rpcclient_call
rpcclient_call(&con, "example_function", proto, NULL, 1, NULL, arg); //вызов "example_function"
rpcclient_discon(&con); //остановка подключения

And here is the code with an explanation of what it does

int rpcclient_call(struct rpccon* con,char* fn,enum rpctypes* rpctypes,char* flags, int rpctypes_len,void* fnret,...){
   //система флагов используется для отслеживание нужно ли пересылать сигнал назад или нет, 1 если нужно чтобы он был переслан, 0 чтобы нет, NULL - все аргументы не пересылаются
   pthread_mutex_lock(&con->send);  //блокируем mutex чтобы keep-alive не сбивал конечный автомат клиентского потока
   va_list vargs;
   void** resargs_upd = NULL; //массив на указатели на оригиналы аргументов
   uint8_t resargs_updl = 0;
   struct rpcret ret = {0};
   if(flags)
      for(uint8_t i = 0; i < rpctypes_len; i++)
         if(flags[i] == 1)
            resargs_updl++; //считаем сколько аргументов будет пересланно, чтобы создать массив нужного размера под указатели на оригиналы
   if(resargs_updl != 0){
      resargs_upd = calloc(resargs_updl,sizeof(void*));
      assert(resargs_upd);
   }
   va_start(vargs, fnret);
   struct rpctype* args = calloc(rpctypes_len,sizeof(*args));
   assert(args);
   uint8_t j = 0;
   for(uint8_t i = 0; i < rpctypes_len; i++){
      if(rpctypes[i] == CHAR){
         char ch = va_arg(vargs,int); // получаем данные из varargs
         char_to_type(ch,&args[i]);
         continue;
      }
      .......Код упаковки остальных типов такой-же как CHAR........
      if(rpctypes[i] == SIZEDBUF){
         char* ch = va_arg(vargs,char*);
         char flag = 0;
         if(flags != NULL) flag = flags[i]; // если вместо flags был NULL то флаг будет 0
         if(flag == 1) {resargs_upd[j] = ch; j++;} //записываем указатель на оригинал в чтобы обновить его в будущем
         uint64_t buflen = va_arg(vargs,unsigned int); //получаем еще и длину SIZEDBUF
         create_sizedbuf_type(ch,buflen,flag,&args[i]); //пакуем в sizedbuf
         continue;
      }
      if(rpctypes[i] == RPCBUFF){
         struct rpcbuff* buf = va_arg(vargs,struct rpcbuff*);
         assert(flags);
         char flag = 0;
         if(flags != NULL) flag = flags[i]; // если вместо flags был NULL то флаг будет 0
         if(flag == 1) {resargs_upd[j] = buf; j++;} //записываем указатель на оригинал в чтобы обновить его в будущем
         create_rpcbuff_type(buf,flag,&args[i]);
         continue;
      }
      if(rpctypes[i] == RPCSTRUCT){
         struct rpcstruct* buf = va_arg(vargs,struct rpcstruct*);
         assert(flags);
         char flag = 0;
         if(flags != NULL) flag = flags[i]; // если вместо flags был NULL то флаг будет 0
         if(flag == 1) {resargs_upd[j] = buf; j++;} //записываем указатель на оригинал в чтобы обновить его в будущем
         create_rpcstruct_type(buf,flag,&args[i]);
         continue;
      }
   }
   struct rpccall call = {fn,rpctypes_len,args}; //собираем вызов функции
   struct rpcmsg req = {0};
   struct rpcmsg ans = {0};
   req.msg_type = CALL;
   req.payload = rpccall_to_buf(&call,&req.payload_len); //собираем struct rpccall в массив и запихиваем в сообщение
   if(rpcmsg_write_to_fd(&req,con->fd) < 0){ //шлём наш вызов функции
      rpctypes_free(args,rpctypes_len);
      free(req.payload);
      pthread_mutex_unlock(&con->send);
      return 1;
   }
   free(req.payload);
   rpctypes_free(args,rpctypes_len);
   if(get_rpcmsg_from_fd(&ans,con->fd) != 0){ //получаем ответ от сервера на наш запрос на вызов функции
      pthread_mutex_unlock(&con->send);
      return 2;
   };
   if(ans.msg_type != OK){   //если не OK возвращаемся из функции с ошибкой
      pthread_mutex_unlock(&con->send);
      return ans.msg_type;
   }
   memset(&req,0,sizeof(req));
   req.msg_type = READY;
   rpcmsg_write_to_fd(&req,con->fd);
   get_rpcmsg_from_fd(&ans,con->fd);
   if(ans.msg_type != RET){
      if(ans.msg_type == DISCON){  //если сервер прислал что он отключается, то мы тожет отключаемся и стопаем keep-alive поток
          close(con->fd);
          con->stop = 1;
          pthread_mutex_unlock(&con->send);
          return DISCON;
      }else return ans.msg_type;
   }
   pthread_mutex_unlock(&con->send);
   buf_to_rpcret(&ret,ans.payload); //распаковываем ответ от сервера в struct rpcret(структура с return функции и возвращенными аргументами)
   free(ans.payload);
   assert(resargs_updl == ret.resargs_amm);  // на всякий случай
   /*Это код для обновления аргументов, он распаковывает тип и записывает его в оригинал*/
   for(uint8_t i = 0; i < ret.resargs_amm; i++){
      if(ret.resargs[i].type == STR){
         char* new = unpack_str_type(&ret.resargs[i]);
         assert(new);
         strcpy(resargs_upd[i],new);
      }
      if(ret.resargs[i].type == SIZEDBUF){
         uint64_t cpylen = 0; //длина для memcpy
         char* new = unpack_sizedbuf_type(&ret.resargs[i],&cpylen); //распаковка SIZEDBUF
         assert(new); 
         memcpy(resargs_upd[i],new,cpylen); //Копируем в оригинал, тут нет утечки памяти так-как ret.resargs буду очищенны позже
      }
      if(ret.resargs[i].type == RPCBUFF){
         struct rpcbuff* new = unpack_rpcbuff_type(&ret.resargs[i]);
         assert(new);
         __rpcbuff_free_N_F_C(resargs_upd[i]); //функция которая удаляет rpcbuff но не очищает сам struct rpcbuff* (только его внутренности)
         memcpy(resargs_upd[i],new,sizeof(struct rpcbuff));
         free(new);
      }
      if(ret.resargs[i].type == RPCSTRUCT){
         struct rpcstruct* new = unpack_rpcstruct_type(&ret.resargs[i]);
         rpcstruct_free(resargs_upd[i]); //this is not heap-use-after-free since rpcstruct_free ONLY freeding struct internals
         memcpy(resargs_upd[i],new,sizeof(struct rpcstruct));
         free(new);
      }
   }
   enum rpctypes type = ret.ret.type;
   assert(fnret != NULL && type != VOID);
   if(type == CHAR){
      char ch = type_to_char(&ret.ret);
      *(char*)fnret = ch;
   }
   .......Весь код распаковки такой-же как CHAR.......
  
   free(resargs_upd);
   free(ret.ret.data);
   rpctypes_free(ret.resargs,ret.resargs_amm);
   return 0;
}

Icing on a burnt cake?

I'm talking about RPCBUFF now. This is one of the most interesting features of my RPC. As I already said, it's an array in an array and so on. In my opinion, it has the most interesting code here.

struct rpcbuff* rpcbuff_create(uint64_t* dimsizes,uint64_t dimsizes_len,uint64_t lastdim_len){
    struct __rpcbuff_el* md_array = calloc(1,sizeof(struct __rpcbuff_el)); //создаем стартовый элемент
    struct rpcbuff* cont = NULL;
    if(dimsizes != NULL || dimsizes_len == 0){
        cont = calloc(1,sizeof(struct rpcbuff)); //создание struct rpcbuff
        assert(cont);
    }
    assert(md_array);
    if(dimsizes_len > 0){
        //копируем dimsizes в struct rpcbuff
        cont->dimsizes = calloc(dimsizes_len, sizeof(uint64_t));
        assert(cont->dimsizes);
        memcpy(cont->dimsizes,dimsizes, sizeof(uint64_t) *dimsizes_len);
    }
    cont->dimsizes_len = dimsizes_len;
    cont->start = md_array;
    cont->lastdim_len = lastdim_len;
    if(dimsizes != NULL && dimsizes_len > 0){
        struct tqueque* que = tqueque_create();
        assert(que);
        /*Сам алгоритм аллокации, так как код писал давно не смогу вспомнить что он точно делает*/
        assert(tqueque_push(que,md_array,sizeof(struct __rpcbuff_el*),NULL) == 0);
        for(uint64_t i = 0; i < dimsizes_len; i++){
            struct __rpcbuff_el* cur = NULL;
            uint64_t iter = tqueque_get_tagamm(que,NULL);
            for(uint64_t j = 0; j < iter; j++){
                cur = tqueque_pop(que,NULL,NULL);
                cur->childs = calloc(dimsizes[i],sizeof(struct __rpcbuff_el));
                assert(cur->childs);
                for(uint64_t k = 0; k <dimsizes[i]; k++){
                    assert(tqueque_push(que,&cur->childs[k], sizeof(struct __rpcbuff_el*), NULL) == 0);
                }
            }
        }
        struct __rpcbuff_el* cur = NULL;
        while((cur = tqueque_pop(que,NULL,NULL)) != NULL){
            cur->endpoint = (char*)0xCAFE;
        }
        tqueque_free(que);
        return cont;
      /*=======================================================================================*/
    }
    md_array->endpoint = (char*)0xCAFE;
    md_array->elen = 0;
    assert(md_array->endpoint);
    return cont;
}

Example of using rpcbuff and its API:

uint64_t dimsizes[] = {3, 4}; //размеры массива
struct rpcbuff* my_buff = rpcbuff_create(dimsizes, 2, 1); //создаем массив, 1 это заглушка
int data[] = {1, 2, 3, 4};
uint64_t index[] = {0, 0};
rpcbuff_pushto(my_buff, index, 2, (char*)data, sizeof(data));

Conclusion?

In this article I tried to describe my project and show its insides. If someone needs the full source code and API, they will be here. Also, RPC turned out to be quite portable and it was possible to run it with minor changes on esp32s3 (I had to compile libffi for xtensa linux and link it to the esp-idf project) and without changes it ran on arm64 debian

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *