Skip to content

Commit

Permalink
expand buffer and cache to multiple rows for multithreading
Browse files Browse the repository at this point in the history
  • Loading branch information
cyliang368 committed Jun 22, 2024
1 parent 7b1ea6f commit cfc95ef
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 45 deletions.
6 changes: 4 additions & 2 deletions raster/r.mapcalc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ r3_mapcalc_OBJS := $(filter-out map.o xcoor.o xres.o, $(AUTO_OBJS))
include $(MODULE_TOPDIR)/include/Make/Multi.make

EXTRA_CFLAGS = $(READLINEINCPATH) $(PTHREADINCPATH)
LIBES2 = $(CALCLIB) $(GISLIB) $(RASTERLIB) $(BTREELIB) $(READLINELIBPATH) $(READLINELIB) $(HISTORYLIB) $(PTHREADLIBPATH) $(PTHREADLIB)
LIBES3 = $(CALCLIB) $(RASTER3DLIB) $(GISLIB) $(RASTERLIB) $(BTREELIB) $(READLINELIBPATH) $(READLINELIB) $(HISTORYLIB) $(PTHREADLIBPATH) $(PTHREADLIB)
LIBES2 = $(CALCLIB) $(GISLIB) $(RASTERLIB) $(BTREELIB) $(READLINELIBPATH) $(READLINELIB) $(HISTORYLIB) $(PTHREADLIBPATH) $(PTHREADLIB) $(OPENMP_LIBPATH) $(OPENMP_LIB)
LIBES3 = $(CALCLIB) $(RASTER3DLIB) $(GISLIB) $(RASTERLIB) $(BTREELIB) $(READLINELIBPATH) $(READLINELIB) $(HISTORYLIB) $(PTHREADLIBPATH) $(PTHREADLIB) $(OPENMP_LIBPATH) $(OPENMP_LIB)
EXTRA_CFLAGS = $(OPENMP_CFLAGS)
EXTRA_INC = $(OPENMP_INCPATH)

default: multi

Expand Down
101 changes: 76 additions & 25 deletions raster/r.mapcalc/evaluate.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#if defined(_OPENMP)
#include <omp.h>
#endif

#include <stdlib.h>
#include <unistd.h>
#include <string.h>
Expand All @@ -12,7 +16,8 @@

/****************************************************************************/

int current_depth, current_row;
int current_depth;
int *current_row;
int depths, rows;

/* Local variables for map management */
Expand Down Expand Up @@ -69,10 +74,18 @@ void extract_maps(expression *e)

static void allocate_buf(expression *e)
{
e->buf = G_malloc(columns * Rast_cell_size(e->res_type));

int threads = 1;
#if defined(_OPENMP)
threads = omp_get_max_threads();
#endif

e->buf = (void **)G_malloc(sizeof(void *) * threads);
for (int t = 0; t < threads; t++)
e->buf[t] = G_malloc(columns * Rast_cell_size(e->res_type));
}

static void set_buf(expression *e, void *buf)
static void set_buf(expression *e, void **buf)
{
e->buf = buf;
}
Expand Down Expand Up @@ -102,8 +115,7 @@ static void initialize_function(expression *e)
int i;

allocate_buf(e);

e->data.func.argv = G_malloc((e->data.func.argc + 1) * sizeof(void *));
e->data.func.argv = G_malloc((e->data.func.argc + 1) * sizeof(void **));
e->data.func.argv[0] = e->buf;

for (i = 1; i <= e->data.func.argc; i++) {
Expand Down Expand Up @@ -163,9 +175,14 @@ static void end_evaluate(struct expression *e)

static void evaluate_constant(expression *e)
{
int *ibuf = e->buf;
float *fbuf = e->buf;
double *dbuf = e->buf;
int tid = 0;
#if defined(_OPENMP)
tid = omp_get_thread_num();
#endif

int *ibuf = e->buf[tid];
float *fbuf = e->buf[tid];
double *dbuf = e->buf[tid];
int i;

switch (e->res_type) {
Expand Down Expand Up @@ -195,15 +212,25 @@ static void evaluate_variable(expression *e UNUSED)

static void evaluate_map(expression *e)
{
get_map_row(
e->data.map.idx, e->data.map.mod, current_depth + e->data.map.depth,
current_row + e->data.map.row, e->data.map.col, e->buf, e->res_type);
int tid = 0;
#if defined(_OPENMP)
tid = omp_get_thread_num();
#endif

get_map_row(e->data.map.idx, e->data.map.mod,
current_depth + e->data.map.depth,
current_row[tid] + e->data.map.row, e->data.map.col,
e->buf[tid], e->res_type);
}

static void evaluate_function(expression *e)
{
int i;
int res;
int tid = 0;
#if defined(_OPENMP)
tid = omp_get_thread_num();
#endif

if (e->data.func.argc > 1 && e->data.func.func != f_eval) {
for (i = 1; i <= e->data.func.argc; i++)
Expand All @@ -216,8 +243,19 @@ static void evaluate_function(expression *e)
for (i = 1; i <= e->data.func.argc; i++)
evaluate(e->data.func.args[i]);

res = (*e->data.func.func)(e->data.func.argc, e->data.func.argt,
e->data.func.argv);
/* copy the argv in the individual thread */
void **thread_argv = G_malloc((e->data.func.argc + 1) * sizeof(void *));
for (i = 0; i < e->data.func.argc + 1; i++)
thread_argv[i] = e->data.func.argv[i][tid];

res =
(*e->data.func.func)(e->data.func.argc, e->data.func.argt, thread_argv);

/* copy the results from thread_argv to e */
for (i = 0; i < e->data.func.argc + 1; i++)
e->data.func.argv[i][tid] = thread_argv[i];

G_free(thread_argv);

switch (res) {
case E_ARG_LO:
Expand Down Expand Up @@ -303,6 +341,12 @@ void execute(expr_list *ee)
int verbose = isatty(2);
expr_list *l;
int count, n;
int threads = 1;

#if defined(_OPENMP)
threads = omp_get_max_threads();
#endif
current_row = (int *)G_malloc(sizeof(int) * threads);

exprs = ee;
G_add_error_handler(error_handler, NULL);
Expand Down Expand Up @@ -361,27 +405,34 @@ void execute(expr_list *ee)
count = rows * depths;
n = 0;

G_init_workers();

for (current_depth = 0; current_depth < depths; current_depth++) {
for (current_row = 0; current_row < rows; current_row++) {
#pragma omp parallel for default(shared) schedule(static, 1) ordered
for (int row = 0; row < rows; row++) {
if (verbose)
G_percent(n, count, 2);

for (l = ee; l; l = l->next) {
int tid = 0;
#if defined(_OPENMP)
tid = omp_get_thread_num();
#endif
current_row[tid] = row;
for (l = ee; l != NULL; l = l->next) {
expression *e = l->exp;
int fd;

evaluate(e);

if (e->type != expr_type_binding)
continue;

fd = e->data.bind.fd;
put_map_row(fd, e->buf, e->res_type);
#pragma omp ordered
{
if (e->type == expr_type_binding) {
fd = e->data.bind.fd;
put_map_row(fd, e->buf[tid], e->res_type);
}
}
}
#pragma omp critical
{
n++;
}

n++;
}
}

Expand Down
10 changes: 5 additions & 5 deletions raster/r.mapcalc/expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ typedef struct expr_data_func {
const char *oper;
int prec;
func_t *func;
int argc;
struct expression **args;
int *argt;
void **argv;
int argc; /* number of args in the whole expression */
struct expression **args; /* array of expressions */
int *argt; /* type of expressions */
void ***argv; /* values in e->buf for each expression */
} expr_data_func;

typedef struct expr_data_bind {
Expand All @@ -50,7 +50,7 @@ typedef struct expr_data_bind {
typedef struct expression {
int type;
int res_type;
void *buf;
void **buf;
union {
expr_data_const con;
expr_data_var var;
Expand Down
3 changes: 2 additions & 1 deletion raster/r.mapcalc/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ extern long seed_value;
extern long seeded;
extern int region_approach;

extern int current_depth, current_row;
extern int current_depth;
extern int *current_row;
extern int depths, rows, columns;

#endif /* __GLOBALS_H_ */
16 changes: 15 additions & 1 deletion raster/r.mapcalc/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
* for details.
*
*****************************************************************************/
#if defined(_OPENMP)
#include <omp.h>
#endif

#include <unistd.h>
#include <stdio.h>
Expand Down Expand Up @@ -59,10 +62,11 @@ static expr_list *parse_file(const char *filename)
int main(int argc, char **argv)
{
struct GModule *module;
struct Option *expr, *file, *seed, *region;
struct Option *expr, *file, *seed, *region, *nprocs;
struct Flag *random, *describe;
int all_ok;
char *desc;
int threads = 1;

G_gisinit(argv[0]);

Expand Down Expand Up @@ -119,6 +123,8 @@ int main(int argc, char **argv)
describe->key = 'l';
describe->description = _("List input and output maps");

nprocs = G_define_standard_option(G_OPT_M_NPROCS);

if (argc == 1) {
char **p = G_malloc(3 * sizeof(char *));

Expand Down Expand Up @@ -183,6 +189,14 @@ int main(int argc, char **argv)
}

pre_exec();
threads = atoi(nprocs->answer);
#if defined(_OPENMP)
omp_set_num_threads(threads);
G_message(_("Computing in parallel, number of threads: %d"),
omp_get_max_threads());
#else
G_message(_("Number of threads: 1"));
#endif
execute(result);
post_exec();

Expand Down
32 changes: 27 additions & 5 deletions raster/r.mapcalc/map.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#if defined(_OPENMP)
#include <omp.h>
#endif

#include <grass/config.h>

#include <stdlib.h>
Expand Down Expand Up @@ -60,7 +64,7 @@ struct map {
struct Categories cats;
struct Colors colors;
BTREE btree;
struct row_cache cache;
struct row_cache *caches;
#ifdef HAVE_PTHREAD_H
pthread_mutex_t mutex;
#endif
Expand Down Expand Up @@ -383,13 +387,19 @@ static void translate_from_cats(struct map *m, CELL *cell, DCELL *xcell,
static void setup_map(struct map *m)
{
int nrows = m->max_row - m->min_row + 1;

int threads = 1;
#ifdef HAVE_PTHREAD_H
pthread_mutex_init(&m->mutex, NULL);
#endif

#ifdef _OPENMP
threads = omp_get_max_threads();
#endif
m->caches =
(struct row_cache *)G_malloc(threads * sizeof(struct row_cache));
if (nrows > 1 && nrows <= max_rows_in_memory) {
cache_setup(&m->cache, m->fd, nrows);
for (int i = 0; i < threads; i++)
cache_setup(&m->caches[i], m->fd, nrows);
m->use_rowio = 1;
}
else
Expand Down Expand Up @@ -426,8 +436,13 @@ static void read_map(struct map *m, void *buf, int res_type, int row, int col)
return;
}

int tid = 0;
#ifdef _OPENMP
tid = omp_get_thread_num();
#endif

if (m->use_rowio)
cache_get(&m->cache, buf, row, res_type);
cache_get(&m->caches[tid], buf, row, res_type);
else
read_row(m->fd, buf, row, res_type);

Expand All @@ -437,6 +452,11 @@ static void read_map(struct map *m, void *buf, int res_type, int row, int col)

static void close_map(struct map *m)
{
int threads = 1;
#ifdef _OPENMP
threads = omp_get_max_threads();
#endif

if (m->fd < 0)
return;

Expand All @@ -458,7 +478,9 @@ static void close_map(struct map *m)
}

if (m->use_rowio) {
cache_release(&m->cache);
for (int i = 0; i < threads; i++)
cache_release(&m->caches[i]);
G_free(&m->caches);
m->use_rowio = 0;
}
}
Expand Down
10 changes: 9 additions & 1 deletion raster/r.mapcalc/map3.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#if defined(_OPENMP)
#include <omp.h>
#endif

#include <stdlib.h>
#include <limits.h>
#include <string.h>
Expand Down Expand Up @@ -619,8 +623,12 @@ int open_output_map(const char *name, int res_type)
void put_map_row(int fd, void *buf, int res_type)
{
void *handle = omaps[fd];
int tid = 0;
#if defined(_OPENMP)
tid = omp_get_thread_num();
#endif

write_row(handle, buf, res_type, current_depth, current_row);
write_row(handle, buf, res_type, current_depth, current_row[tid]);
}

void close_output_map(int fd)
Expand Down
3 changes: 3 additions & 0 deletions raster/r.mapcalc/mapcalc.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
#define _MAPCALC_H_

/****************************************************************************/
#if defined(_OPENMP)
#include <omp.h>
#endif

#include <stdio.h>

Expand Down
Loading

0 comments on commit cfc95ef

Please sign in to comment.