Skip to content

Commit

Permalink
Sample thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
pajgo committed Jul 19, 2019
1 parent 85c2729 commit ceef96d
Show file tree
Hide file tree
Showing 9 changed files with 510 additions and 29 deletions.
97 changes: 97 additions & 0 deletions csrc/redis-filtered-sort/client_list.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include "client_list.h"

blocked_client_t *blocked_client_create(RedisModuleBlockedClient*bc, void **targ) {
blocked_client_t *bct = malloc(sizeof(blocked_client_t));
bct->client = bc;
bct->targ = targ;
return bct;
}

int blocked_client_free(blocked_client_t *bct) {
free(&bct->targ);
free(bct);
return 1;
}

//Blocked clients array
blocked_clients_t *blocked_clients_create(const char * cmd) {
blocked_clients_t* blocked = malloc(sizeof(blocked_clients_t));
blocked->clients = malloc(sizeof(RedisModuleBlockedClient*));
blocked->size = 0;
blocked->cmd = strdup(cmd);
return blocked;
}

int blocked_clients_add(blocked_clients_t *bc, RedisModuleBlockedClient *client, void **targ) {
bc->clients = realloc(bc->clients, sizeof(blocked_client_t*) * (bc->size +1));
bc->clients[bc->size] = blocked_client_create(client, targ);
bc->size++;
return 1;
}

int blocked_clients_free(blocked_clients_t *bc) {
for (size_t i = 0; i < bc->size; i++){
blocked_client_free(bc->clients[i]);
}
free(bc->clients);
free(bc);
return 1;
}

//General blocklist maps blocked clients and command
blocked_list_t *blocked_list_create(const char * cmd) {
blocked_list_t* bl = malloc(sizeof(blocked_list_t));
bl->list = malloc(sizeof(blocked_clients_t*));
bl->size = 0;
bl->cmd = strdup(cmd);
return bl;
}

int blocked_list_add_bc(blocked_list_t *bl, blocked_clients_t *clientList) {
bl->list = realloc(bl->list, sizeof(blocked_clients_t*) * (bl->size +1));
bl->list[bl->size] = clientList;
bl->size++;
return 1;
}

int blocked_list_delete(blocked_list_t *bl, blocked_clients_t *clientList) {
int pos = -1;
for (size_t i = 0; i < bl->size; i++){
if (bl->list[i] == clientList) pos = i;
}

if (pos == -1) return 0;

blocked_clients_free(clientList);

for (size_t i = pos - 1; 0 < bl->size - 1; i++)
bl->list[i] = bl->list[i+1];
bl->size --;
bl->list = realloc(bl->list, sizeof(blocked_clients_t*));
return 1;
}

//Searches clients blocked by command
blocked_clients_t *blocked_list_find_cmd(blocked_list_t *list, const char *cmd) {
for (size_t i = 0; i < list->size; i++) {
if (strcasecmp(list->list[i]->cmd, cmd)) {
return list->list[i];
}
}
return NULL;
}

//Add blocked client into specified group
int blocked_list_add_client(blocked_list_t *list, const char *cmd, RedisModuleBlockedClient *client, void **targ) {
blocked_clients_t *bc;
bc = blocked_list_find_cmd(list, cmd);
//create new bock list if not exists
if (bc == NULL) {
bc = blocked_clients_create(cmd);
blocked_list_add_bc(list, bc);
}
blocked_clients_add(bc, client, targ);
return 1;
}


30 changes: 30 additions & 0 deletions csrc/redis-filtered-sort/client_list.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef __CLIENTS_LIST_H
#define __CLIENTS_LIST_H

#include "general.h"

typedef struct blocked_client {
RedisModuleBlockedClient *client;
void ** targ;
} blocked_client_t;

typedef struct blocked_clients {
blocked_client_t **clients;
size_t size;
const char * cmd;
} blocked_clients_t;

typedef struct blocked_list {
blocked_clients_t **list;
size_t size;
const char * cmd;
} blocked_list_t;

blocked_clients_t *blocked_list_find_cmd(blocked_list_t *list, const char *cmd);
int blocked_list_add_client(blocked_list_t *list, const char *cmd, RedisModuleBlockedClient *client, void **targ);
blocked_list_t *blocked_list_create(const char * cmd);
int blocked_client_free(blocked_client_t *bct);
int blocked_clients_free(blocked_clients_t *bc);
int blocked_list_delete(blocked_list_t *bl, blocked_clients_t *clientList);

#endif
117 changes: 103 additions & 14 deletions csrc/redis-filtered-sort/filter_module.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
#include "filter_module.h"
#include "fsort.h"
#include "fsort_utils.h"
#include "thread_pool.h"
#include "client_list.h"

static FSortPool_t *sortPool;

static blocked_list_t *fsortBlocked;

int FSortBust_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
Expand All @@ -26,53 +32,129 @@ int FSortBust_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
}

int FSortAggregate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
const char *cmd;
RedisModule_AutoMemory(ctx);

if (argc != 4) {
return RedisModule_WrongArity(ctx);
}

pthread_t tid;
cmd = strdup(GetArgvString(ctx, argv, argc));

RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);

void **targ = RedisModule_Alloc(sizeof(void*)*3);
targ[0] = bc;
targ[1] = (void*)(unsigned long) argc;
targ[2] = (void*)(RedisModuleString **)argv;

if (pthread_create(&tid,NULL,fsort_aggregate_thread,targ) != 0) {
RedisModule_AbortBlock(bc);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
//run threads
if (tpool_work_exists(sortPool, cmd) == 1) {
blocked_list_add_client(fsortBlocked, cmd, bc, targ);
} else {
tpool_add_work(sortPool, fsort_aggregate_thread, targ, cmd);
}

free((char*)cmd);

return REDISMODULE_OK;
}

int FSort_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
const char * cmd;
int pssType = 0;
int fflType = 0;
int longRunning =0;
pthread_t tid;

RedisModule_AutoMemory(ctx);

if (argc < 7 ) {
return RedisModule_WrongArity(ctx);
}

pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);

void **targ = RedisModule_Alloc(sizeof(void*)*3);
targ[0] = bc;
targ[1] = (void*)(unsigned long) argc;
targ[2] = (void*)(RedisModuleString **)argv;
FSortObj_t *sort = fsort_new_fsort();
RedisModuleCtx *thSafeCtx = RedisModule_GetThreadSafeContext(bc);
sort->ctx = ctx;
int parseRes = fsort_parse_args(sort, thSafeCtx, argv, argc);

if (pthread_create(&tid,NULL,fsort_fsort_thread,targ) != 0) {
RedisModule_AbortBlock(bc);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
fsort_form_keys(sort);

if (parseRes != REDISMODULE_OK) {
fsort_free_fsort(sort);
RedisModule_UnblockClient(bc,NULL);
RedisModule_FreeThreadSafeContext(thSafeCtx);
return REDISMODULE_ERR;
}

pssType = fsort_key_type(sort, sort->pssKey);
fflType = fsort_key_type(sort, sort->fflKey);

if (pssType == REDISMODULE_KEYTYPE_EMPTY || fflType == REDISMODULE_KEYTYPE_EMPTY) {
longRunning = 1;
//decide on which tasks block
if (RedisModule_StringCompare(sort->pssKey, sort->fflKey) == 0) {
//only sort, setting command to psskey
cmd = RedisModule_StringPtrLen(sort->pssKey, NULL);
} else {
//if no PSS key block all commands
//if no ffl block only filtering
//only elseif
if (pssType == REDISMODULE_KEYTYPE_EMPTY) {
cmd = RedisModule_StringPtrLen(sort->pssKey, NULL);
} else if (fflType == REDISMODULE_KEYTYPE_EMPTY) {
cmd = RedisModule_StringPtrLen(sort->fflKey, NULL);
}
}
cmd = strdup(cmd);
}

void **targ = RedisModule_Alloc(sizeof(void*)*3);
targ[0] = bc;
targ[1] = (void *)sort;
//if all keys are present, we dont need thread pool
if (longRunning == 1) {
//run threads
if (tpool_work_exists(sortPool, cmd) == 1) {
blocked_list_add_client(fsortBlocked, cmd, bc, targ);
} else {
tpool_add_work(sortPool, fsort_fsort_thread, targ, cmd);
}
free((char*)cmd);
} else {
if (pthread_create(&tid,NULL,fsort_fsort_thread,targ) != 0) {
RedisModule_AbortBlock(bc);
RedisModule_FreeThreadSafeContext(thSafeCtx);
free(targ);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
}
}
RedisModule_FreeThreadSafeContext(thSafeCtx);
return REDISMODULE_OK;
}

int RedisModule_OnLoad(RedisModuleCtx *ctx) {
int FSort_onWorkFinish(FSortWork_t * work) {
printf("[rm]WorkFinish %s\n", work->cmd);
blocked_clients_t* clientList = blocked_list_find_cmd(fsortBlocked, work->cmd);

if(clientList != NULL) {
for (size_t i = 0; i < clientList->size; i++) {
blocked_client_t *bct = clientList->clients[i];
tpool_add_work(sortPool, fsort_fsort_thread, bct->targ, work->cmd);
}
blocked_list_delete(fsortBlocked, clientList);
}

free(work->argv);
tpool_work_destroy(work);

printf("[rm]Unblock done!\n");
return 1;
}

int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
long long poolSize = 2;
if (RedisModule_Init(ctx, "FilterSortModule", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
Expand All @@ -88,6 +170,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx) {
if (RedisModule_CreateCommand(ctx, "fsortaggregate", FSortAggregate_RedisCommand, "write", 1, 2, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if (argc == 1) {
RedisModule_StringToLongLong(argv[0], &poolSize);
}

sortPool = tpool_create(poolSize);
sortPool->cb = FSort_onWorkFinish;
fsortBlocked = blocked_list_create("fsort-f");
return REDISMODULE_OK;
}
25 changes: 11 additions & 14 deletions csrc/redis-filtered-sort/fsort.c
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ const char **fsort_filter_data(FSortObj_t *sort, size_t *outputDataSize) {
return outputData;
}

int fsort_key_type(FSortObj_t *sort, RedisModuleString *keyName) {
RedisModuleKey *key = RedisModule_OpenKey(sort->ctx, keyName, REDISMODULE_READ);
int kType = RedisModule_KeyType(key);
RedisModule_CloseKey(key);
return kType;
}

int fsort_fsort(FSortObj_t *sort) {
RedisModuleKey *fflkey = RedisModule_OpenKey(sort->ctx, sort->fflKey, REDISMODULE_READ);
RedisModuleKey *psskey = RedisModule_OpenKey(sort->ctx, sort->pssKey, REDISMODULE_READ);
Expand Down Expand Up @@ -733,23 +740,13 @@ void *fsort_aggregate_thread(void *arg) {
void *fsort_fsort_thread(void *arg) {
void **targ = arg;
RedisModuleBlockedClient *bc = targ[0];
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
int argc = (unsigned long) targ[1];
RedisModuleString **argv = targ[2];

FSortObj_t *sort = fsort_new_fsort();
int parseRes = fsort_parse_args(sort, ctx, argv, argc);
fsort_form_keys(sort);

if (parseRes == REDISMODULE_OK) {
//FSortObj_t sobj = *sort;
fsort_fsort(sort);
}

FSortObj_t *sort = targ[1];
RedisModuleCtx* ctx = RedisModule_GetThreadSafeContext(bc);
sort->ctx = ctx;
fsort_fsort(sort);
fsort_free_fsort(sort);

RedisModule_UnblockClient(bc, NULL);

RedisModule_FreeThreadSafeContext(ctx);

return NULL;
Expand Down
4 changes: 4 additions & 0 deletions csrc/redis-filtered-sort/fsort.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ int fsort_parse_args(FSortObj_t *sObj, RedisModuleCtx *ctx, RedisModuleString **
FSortObj_t *fsort_new_fsort();

void fsort_cache_buster(FSortObj_t *sortObj, RedisModuleString *key);
void fsort_free_fsort(FSortObj_t *sort);

int fsort_key_type(FSortObj_t *sort, RedisModuleString *keyName);


#endif
Loading

0 comments on commit ceef96d

Please sign in to comment.