Skip to content

Commit

Permalink
Filter module (with threads)
Browse files Browse the repository at this point in the history
  • Loading branch information
pajgo committed Jul 24, 2019
1 parent 7e73481 commit afb3ae5
Show file tree
Hide file tree
Showing 19 changed files with 2,916 additions and 0 deletions.
32 changes: 32 additions & 0 deletions csrc/redis-filtered-sort/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
include ../variables.include

#override path defined in variables
JANSSON_LIBDIR=../lib/jansson

ifndef RM_INCLUDE_DIR
RM_INCLUDE_DIR=../lib/
endif

ifndef RMUTIL_LIBDIR
RMUTIL_LIBDIR=../lib/rmutil
endif

SOURCEDIR=$(shell pwd -P)
CC_SOURCES = $(wildcard $(SOURCEDIR)/*.c)
CC_OBJECTS = $(patsubst $(SOURCEDIR)/%.c, $(SOURCEDIR)/%.o, $(CC_SOURCES))

all: rmutil $(CC_OBJECTS) filter_module.so
rmutil: FORCE
$(MAKE) -C $(RMUTIL_LIBDIR)

filter_module.so: $(CC_OBJECTS)
$(LD) -o $@ $(CC_OBJECTS) $(SHOBJ_LDFLAGS) $(LIBS) -L$(RMUTIL_LIBDIR) -L$(JANSSON_LIBDIR)/src/.libs -lrmutil -lpthread -lc -Bstatic -ljansson

valgrind:
valgrind --leak-check=full --show-possibly-lost=no redis-server --loadmodule ./filter_module.so --loglevel debug

clean:
rm -rf *.xo *.so *.o
cd $(RMUTIL_LIBDIR) && make clean

FORCE:
108 changes: 108 additions & 0 deletions csrc/redis-filtered-sort/filter_module.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include "filter_module.h"
#include "fsort.h"
#include "fsort_utils.h"
#include "thread_pool.h"

static FSortPool_t *sortPool;

int FSortBust_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);

if (argc <3 || argc > 4 ) {
return RedisModule_WrongArity(ctx);
}

pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);

void **targ = RedisModule_Alloc(sizeof(void*)*3);
targ[0] = bc;
targ[1] = (void*)(unsigned long) argc;
targ[2] = argv;

if (pthread_create(&tid,NULL,fsort_bust_thread,targ) != 0) {
RedisModule_AbortBlock(bc);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
}

return REDISMODULE_OK;
}

int FSortAggregate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);

if (argc != 4) {
return RedisModule_WrongArity(ctx);
}

RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);

void **targ = RedisModule_Alloc(sizeof(void*)*3);
targ[0] = bc;
//targ[1] = argc;
targ[2] = argv;

tpool_add_work(sortPool, fsort_aggregate_thread, targ);

return REDISMODULE_OK;
}

int FSort_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);

if (argc < 7 ) {
return RedisModule_WrongArity(ctx);
}

RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);

FSortObj_t *sort = fsort_new_fsort();
sort->ctx = ctx; //RedisModule_GetThreadSafeContext(bc);
int parseRes = fsort_parse_args(sort, ctx, argv, argc);

fsort_form_keys(sort);

if (parseRes != REDISMODULE_OK) {
fsort_free_fsort(sort);
RedisModule_UnblockClient(bc,NULL);
return REDISMODULE_ERR;
}

void **targ = RedisModule_Alloc(sizeof(void*)*2);
targ[0] = bc;
targ[1] = sort;

tpool_add_work(sortPool, fsort_fsort_thread, targ);

return REDISMODULE_OK;
}

int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
long long poolSize = 4;
if (RedisModule_Init(ctx, "FilterSortModule", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "fsort", FSort_RedisCommand, "write", 1, 2, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "fsortBust", FSortBust_RedisCommand, "write", 1, 1, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "fsortaggregate", FSortAggregate_RedisCommand, "write", 1, 2, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if (argc == 1) {
RedisModule_StringToLongLong(argv[0], &poolSize);
if (poolSize == 0) {
poolSize = 4;
}
}


sortPool = tpool_create(poolSize);
return REDISMODULE_OK;
}
11 changes: 11 additions & 0 deletions csrc/redis-filtered-sort/filter_module.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#ifndef __FILTERMODULE_H
#define __FILTERMODULE_H 1

#include <sys/types.h>

#include "fsort.h"

#include "pthread.h"


#endif
Loading

0 comments on commit afb3ae5

Please sign in to comment.