Skip to content

Introduced callback to Pthread, Win32 and OpenMP backend #4577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cblas.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ char* openblas_get_config(void);
/*Get the CPU corename on runtime.*/
char* openblas_get_corename(void);

/*Set the threading backend to a custom callback.*/
typedef void (*openblas_dojob_callback)(int thread_num, void *jobdata, int dojob_data);
typedef void (*openblas_threads_callback)(int sync, openblas_dojob_callback dojob, int numjobs, size_t jobdata_elsize, void *jobdata, int dojob_data);
void openblas_set_threads_callback_function(openblas_threads_callback callback);

#ifdef OPENBLAS_OS_LINUX
/* Sets thread affinity for OpenBLAS threads. `thread_idx` is in [0, openblas_get_num_threads()-1]. */
int openblas_setaffinity(int thread_idx, size_t cpusetsize, cpu_set_t* cpu_set);
Expand Down
5 changes: 5 additions & 0 deletions common_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ int BLASFUNC(xerbla)(char *, blasint *info, blasint);

void openblas_set_num_threads_(int *);

/*Set the threading backend to a custom callback.*/
typedef void (*openblas_dojob_callback)(int thread_num, void *jobdata, int dojob_data);
typedef void (*openblas_threads_callback)(int sync, openblas_dojob_callback dojob, int numjobs, size_t jobdata_elsize, void *jobdata, int dojob_data);
extern openblas_threads_callback openblas_threads_callback_;

FLOATRET BLASFUNC(sdot) (blasint *, float *, blasint *, float *, blasint *);
FLOATRET BLASFUNC(sdsdot)(blasint *, float *, float *, blasint *, float *, blasint *);

Expand Down
1 change: 1 addition & 0 deletions driver/others/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ if (USE_THREAD)
${BLAS_SERVER}
divtable.c # TODO: Makefile has -UDOUBLE
blas_l1_thread.c
blas_server_callback.c
)

if (NOT NO_AFFINITY)
Expand Down
5 changes: 4 additions & 1 deletion driver/others/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COMMONOBJS = memory.$(SUFFIX) xerbla.$(SUFFIX) c_abs.$(SUFFIX) z_abs.$(SUFFIX)
#COMMONOBJS += slamch.$(SUFFIX) slamc3.$(SUFFIX) dlamch.$(SUFFIX) dlamc3.$(SUFFIX)

ifdef SMP
COMMONOBJS += blas_server.$(SUFFIX) divtable.$(SUFFIX) blasL1thread.$(SUFFIX)
COMMONOBJS += blas_server.$(SUFFIX) divtable.$(SUFFIX) blasL1thread.$(SUFFIX) blas_server_callback.$(SUFFIX)
ifneq ($(NO_AFFINITY), 1)
COMMONOBJS += init.$(SUFFIX)
endif
Expand Down Expand Up @@ -140,6 +140,9 @@ memory.$(SUFFIX) : $(MEMORY) ../../common.h ../../param.h
blas_server.$(SUFFIX) : $(BLAS_SERVER) ../../common.h ../../common_thread.h ../../param.h
$(CC) $(CFLAGS) -c $< -o $(@F)

blas_server_callback.$(SUFFIX) : blas_server_callback.c ../../common.h
$(CC) $(CFLAGS) -c $< -o $(@F)

openblas_set_num_threads.$(SUFFIX) : openblas_set_num_threads.c
$(CC) $(CFLAGS) -c $< -o $(@F)

Expand Down
278 changes: 162 additions & 116 deletions driver/others/blas_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ int blas_server_avail __attribute__((aligned(ATTRIBUTE_SIZE))) = 0;

int blas_omp_threads_local = 1;

static void * blas_thread_buffer[MAX_CPU_NUMBER];

/* Local Variables */
#if defined(USE_PTHREAD_LOCK)
static pthread_mutex_t server_lock = PTHREAD_MUTEX_INITIALIZER;
Expand Down Expand Up @@ -190,6 +192,10 @@ static int main_status[MAX_CPU_NUMBER];
BLASLONG exit_time[MAX_CPU_NUMBER];
#endif

//Prototypes
static void exec_threads(int , blas_queue_t *, int);
static void adjust_thread_buffers();

static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){

if (!(mode & BLAS_COMPLEX)){
Expand Down Expand Up @@ -375,7 +381,6 @@ static void* blas_thread_server(void *arg){
/* Thread identifier */
BLASLONG cpu = (BLASLONG)arg;
unsigned int last_tick;
void *buffer, *sa, *sb;
blas_queue_t *queue;

blas_queue_t *tscq;
Expand All @@ -395,8 +400,6 @@ blas_queue_t *tscq;
main_status[cpu] = MAIN_ENTER;
#endif

buffer = blas_memory_alloc(2);

#ifdef SMP_DEBUG
fprintf(STDERR, "Server[%2ld] Thread has just been spawned!\n", cpu);
#endif
Expand Down Expand Up @@ -456,117 +459,9 @@ blas_queue_t *tscq;
start = rpcc();
#endif

if (queue) {
int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = (int (*)(blas_arg_t *, void *, void *, void *, void *, BLASLONG))queue -> routine;

atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1);

sa = queue -> sa;
sb = queue -> sb;

#ifdef SMP_DEBUG
if (queue -> args) {
fprintf(STDERR, "Server[%2ld] Calculation started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n",
cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k);
}
#endif

#ifdef CONSISTENT_FPCSR
#ifdef __aarch64__
__asm__ __volatile__ ("msr fpcr, %0" : : "r" (queue -> sse_mode));
#else
__asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode));
__asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode));
#endif
#endif

#ifdef MONITOR
main_status[cpu] = MAIN_RUNNING1;
#endif

//For Loongson servers, like the 3C5000 (featuring 16 cores), applying an
//offset to the buffer is essential for minimizing cache conflicts and optimizing performance.
#if defined(LOONGSON3R5) && !defined(NO_AFFINITY)
char model_name[128];
get_cpu_model(model_name);
if ((strstr(model_name, "3C5000") != NULL) || (strstr(model_name, "3D5000") != NULL))
if (sa == NULL) sa = (void *)((BLASLONG)buffer + (WhereAmI() & 0xf) * GEMM_OFFSET_A);
#endif
if (sa == NULL) sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A);

if (sb == NULL) {
if (!(queue -> mode & BLAS_COMPLEX)){
#ifdef EXPRECISION
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
sb = (void *)(((BLASLONG)sa + ((QGEMM_P * QGEMM_Q * sizeof(xdouble)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
} else
#endif
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) {
#ifdef BUILD_DOUBLE
sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
#ifdef BUILD_SINGLE
sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else {
/* Other types in future */
}
} else {
#ifdef EXPRECISION
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
} else
#endif
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){
#ifdef BUILD_COMPLEX16
sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
#ifdef BUILD_COMPLEX
sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else {
/* Other types in future */
}
}
queue->sb=sb;
}

#ifdef MONITOR
main_status[cpu] = MAIN_RUNNING2;
#endif

if (queue -> mode & BLAS_LEGACY) {
legacy_exec(routine, queue -> mode, queue -> args, sb);
} else
if (queue -> mode & BLAS_PTHREAD) {
void (*pthreadcompat)(void *) = (void(*)(void*))queue -> routine;
(pthreadcompat)(queue -> args);
} else
(routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position);

#ifdef SMP_DEBUG
fprintf(STDERR, "Server[%2ld] Calculation finished!\n", cpu);
#endif

#ifdef MONITOR
main_status[cpu] = MAIN_FINISH;
#endif

// arm: make sure all results are written out _before_
// thread is marked as done and other threads use them
MB;
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0);


}
if(queue) {
exec_threads(cpu, queue, 0);
}

#ifdef MONITOR
main_status[cpu] = MAIN_DONE;
Expand All @@ -588,8 +483,6 @@ blas_queue_t *tscq;
fprintf(STDERR, "Server[%2ld] Shutdown!\n", cpu);
#endif

blas_memory_free(buffer);

//pthread_exit(NULL);

return NULL;
Expand Down Expand Up @@ -671,6 +564,9 @@ int blas_thread_init(void){

LOCK_COMMAND(&server_lock);

// Adjust thread buffers
adjust_thread_buffers();

if (!blas_server_avail){

thread_timeout_env=openblas_thread_timeout();
Expand Down Expand Up @@ -901,6 +797,18 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){
fprintf(STDERR, "Exec_blas is called. Number of executing threads : %ld\n", num);
#endif

//Redirect to caller's callback routine
if (openblas_threads_callback_) {
int buf_index = 0, i = 0;
#ifndef USE_SIMPLE_THREADED_LEVEL3
for (i = 0; i < num; i ++)
queue[i].position = i;
#endif
openblas_threads_callback_(1, (openblas_dojob_callback) exec_threads, num, sizeof(blas_queue_t), (void*) queue, buf_index);
return 0;
}


#ifdef __ELF__
if (omp_in_parallel && (num > 1)) {
if (omp_in_parallel() > 0) {
Expand Down Expand Up @@ -1074,6 +982,14 @@ int BLASFUNC(blas_thread_shutdown)(void){

LOCK_COMMAND(&server_lock);

//Free buffers allocated for threads
for(i=0; i<MAX_CPU_NUMBER; i++){
if(blas_thread_buffer[i]!=NULL){
blas_memory_free(blas_thread_buffer[i]);
blas_thread_buffer[i]=NULL;
}
}

if (blas_server_avail) {

for (i = 0; i < blas_num_threads - 1; i++) {
Expand Down Expand Up @@ -1110,5 +1026,135 @@ int BLASFUNC(blas_thread_shutdown)(void){
return 0;
}

static void adjust_thread_buffers() {

int i=0;

//adjust buffer for each thread
for(i=0; i < blas_cpu_number; i++){
if(blas_thread_buffer[i] == NULL){
blas_thread_buffer[i] = blas_memory_alloc(2);
}
}
for(; i < MAX_CPU_NUMBER; i++){
if(blas_thread_buffer[i] != NULL){
blas_memory_free(blas_thread_buffer[i]);
blas_thread_buffer[i] = NULL;
}
}
}

static void exec_threads(int cpu, blas_queue_t *queue, int buf_index) {

int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = (int (*)(blas_arg_t *, void *, void *, void *, void *, BLASLONG))queue -> routine;

atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1);

void *buffer = blas_thread_buffer[cpu];
void *sa = queue -> sa;
void *sb = queue -> sb;

#ifdef SMP_DEBUG
if (queue -> args) {
fprintf(STDERR, "Server[%2ld] Calculation started. Mode = 0x%03x M = %3ld N=%3ld K=%3ld\n",
cpu, queue->mode, queue-> args ->m, queue->args->n, queue->args->k);
}
#endif

#ifdef CONSISTENT_FPCSR
#ifdef __aarch64__
__asm__ __volatile__ ("msr fpcr, %0" : : "r" (queue -> sse_mode));
#else
__asm__ __volatile__ ("ldmxcsr %0" : : "m" (queue -> sse_mode));
__asm__ __volatile__ ("fldcw %0" : : "m" (queue -> x87_mode));
#endif
#endif

#ifdef MONITOR
main_status[cpu] = MAIN_RUNNING1;
#endif

//For Loongson servers, like the 3C5000 (featuring 16 cores), applying an
//offset to the buffer is essential for minimizing cache conflicts and optimizing performance.
#if defined(LOONGSON3R5) && !defined(NO_AFFINITY)
char model_name[128];
get_cpu_model(model_name);
if ((strstr(model_name, "3C5000") != NULL) || (strstr(model_name, "3D5000") != NULL))
if (sa == NULL) sa = (void *)((BLASLONG)buffer + (WhereAmI() & 0xf) * GEMM_OFFSET_A);
#endif
if (sa == NULL) sa = (void *)((BLASLONG)buffer + GEMM_OFFSET_A);

if (sb == NULL) {
if (!(queue -> mode & BLAS_COMPLEX)){
#ifdef EXPRECISION
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
sb = (void *)(((BLASLONG)sa + ((QGEMM_P * QGEMM_Q * sizeof(xdouble)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
} else
#endif
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE) {
#ifdef BUILD_DOUBLE
sb = (void *)(((BLASLONG)sa + ((DGEMM_P * DGEMM_Q * sizeof(double)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
#ifdef BUILD_SINGLE
sb = (void *)(((BLASLONG)sa + ((SGEMM_P * SGEMM_Q * sizeof(float)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else {
/* Other types in future */
}
} else {
#ifdef EXPRECISION
if ((queue -> mode & BLAS_PREC) == BLAS_XDOUBLE){
sb = (void *)(((BLASLONG)sa + ((XGEMM_P * XGEMM_Q * 2 * sizeof(xdouble)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
} else
#endif
if ((queue -> mode & BLAS_PREC) == BLAS_DOUBLE){
#ifdef BUILD_COMPLEX16
sb = (void *)(((BLASLONG)sa + ((ZGEMM_P * ZGEMM_Q * 2 * sizeof(double)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else if ((queue -> mode & BLAS_PREC) == BLAS_SINGLE) {
#ifdef BUILD_COMPLEX
sb = (void *)(((BLASLONG)sa + ((CGEMM_P * CGEMM_Q * 2 * sizeof(float)
+ GEMM_ALIGN) & ~GEMM_ALIGN)) + GEMM_OFFSET_B);
#endif
} else {
/* Other types in future */
}
}
queue->sb=sb;
}

#ifdef MONITOR
main_status[cpu] = MAIN_RUNNING2;
#endif

if (queue -> mode & BLAS_LEGACY) {
legacy_exec(routine, queue -> mode, queue -> args, sb);
} else
if (queue -> mode & BLAS_PTHREAD) {
void (*pthreadcompat)(void *) = (void(*)(void*))queue -> routine;
(pthreadcompat)(queue -> args);
} else
(routine)(queue -> args, queue -> range_m, queue -> range_n, sa, sb, queue -> position);

#ifdef SMP_DEBUG
fprintf(STDERR, "Server[%2ld] Calculation finished!\n", cpu);
#endif

#ifdef MONITOR
main_status[cpu] = MAIN_FINISH;
#endif

// arm: make sure all results are written out _before_
// thread is marked as done and other threads use them
MB;
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0);

}

#endif
Loading
Loading