Skip to content

Pr/sharedfp sm logic3 #745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 27, 2015
Merged
38 changes: 38 additions & 0 deletions ompi/mca/sharedfp/sm/configure.m4
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2012 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2010-2014 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2008-2015 University of Houston. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#

# MCA_sharedfp_sm_CONFIG(action-if-can-compile,
# [action-if-cant-compile])
# ------------------------------------------------
AC_DEFUN([MCA_ompi_sharedfp_sm_CONFIG],[
AC_CONFIG_FILES([ompi/mca/sharedfp/sm/Makefile])

sharedfp_sm_happy=no
AC_CHECK_HEADER([semaphore.h],
[AC_CHECK_FUNCS([sem_open],[sharedfp_sm_happy=yes],[])])

AC_CHECK_HEADER([semaphore.h],
[AC_CHECK_FUNCS([sem_init],[sharedfp_sm_happy=yes],[])])

AS_IF([test "$sharedfp_sm_happy" = "yes"],
[$1],
[$2])
])dnl
16 changes: 8 additions & 8 deletions ompi/mca/sharedfp/sm/sharedfp_sm.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ struct mca_sharedfp_base_module_1_0_0_t * mca_sharedfp_sm_component_file_query(m
ompi_group_t *group = comm->c_local_group;

for (i = 0; i < size; ++i) {
proc = ompi_group_peer_lookup(group,i);
if (!OPAL_PROC_ON_LOCAL_NODE(proc->super.proc_flags)){
opal_output(ompi_sharedfp_base_framework.framework_output,
"mca_sharedfp_sm_component_file_query: Disqualifying myself: (%d/%s) "
"not all processes are on the same node.",
comm->c_contextid, comm->c_name);
return NULL;
}
proc = ompi_group_peer_lookup(group,i);
if (!OPAL_PROC_ON_LOCAL_NODE(proc->super.proc_flags)){
opal_output(ompi_sharedfp_base_framework.framework_output,
"mca_sharedfp_sm_component_file_query: Disqualifying myself: (%d/%s) "
"not all processes are on the same node.",
comm->c_contextid, comm->c_name);
return NULL;
}
}
/* This module can run */
*priority = mca_sharedfp_sm_priority;
Expand Down
13 changes: 8 additions & 5 deletions ompi/mca/sharedfp/sm/sharedfp_sm.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2015 University of Houston. All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -100,9 +101,8 @@ int mca_sharedfp_sm_iwrite (mca_io_ompio_file_t *fh,
/*--------------------------------------------------------------*
*Structures and definitions only for this component
*--------------------------------------------------------------*/

struct sm_offset{
sem_t *mutex; /* the mutex: a Posix memory-based unnamed semaphore */
struct mca_sharedfp_sm_offset{
sem_t mutex; /* the mutex: a POSIX memory-based unnamed semaphore */
long long offset; /* and the shared file pointer offset */
};

Expand All @@ -111,10 +111,13 @@ struct sm_offset{
*/
struct mca_sharedfp_sm_data
{
struct sm_offset * sm_offset_ptr;
struct mca_sharedfp_sm_offset * sm_offset_ptr;
/*save filename so that we can remove the file on close*/
char * sm_filename;
sem_t *mutex; /* the mutex: a Posix memory-based named semaphore */
/* The mutex: it will either point to a POSIX memory-based named
semaphore, or it will point to the a POSIX memory-based unnamed
semaphore located in sm_offset_ptr->mutex. */
sem_t *mutex;
char *sem_name; /* Name of the semaphore */
};

Expand Down
109 changes: 52 additions & 57 deletions ompi/mca/sharedfp/sm/sharedfp_sm_file_open.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -54,8 +55,8 @@ int mca_sharedfp_sm_file_open (struct ompi_communicator_t *comm,
mca_io_ompio_file_t * shfileHandle;
char * filename_basename;
char * sm_filename;
struct sm_offset * sm_offset_ptr;
struct sm_offset sm_offset;
struct mca_sharedfp_sm_offset * sm_offset_ptr;
struct mca_sharedfp_sm_offset sm_offset;
int sm_fd;
int rank;

Expand All @@ -71,27 +72,27 @@ int mca_sharedfp_sm_file_open (struct ompi_communicator_t *comm,

/*Memory is allocated here for the sh structure*/
if ( mca_sharedfp_sm_verbose ) {
printf( "mca_sharedfp_sm_file_open: malloc f_sharedfp_ptr struct\n");
printf( "mca_sharedfp_sm_file_open: malloc f_sharedfp_ptr struct\n");
}

sh = (struct mca_sharedfp_base_data_t*)malloc(sizeof(struct mca_sharedfp_base_data_t));
if ( NULL == sh ) {
opal_output(0, "mca_sharedfp_sm_file_open: Error, unable to malloc f_sharedfp_ptr struct\n");
free(shfileHandle);
return OMPI_ERR_OUT_OF_RESOURCE;
opal_output(0, "mca_sharedfp_sm_file_open: Error, unable to malloc f_sharedfp_ptr struct\n");
free(shfileHandle);
return OMPI_ERR_OUT_OF_RESOURCE;
}

/*Populate the sh file structure based on the implementation*/
sh->sharedfh = shfileHandle; /* Shared file pointer*/
sh->global_offset = 0; /* Global Offset*/
sh->comm = comm; /* Communicator*/
sh->sharedfh = shfileHandle; /* Shared file pointer*/
sh->global_offset = 0; /* Global Offset*/
sh->comm = comm; /* Communicator*/
sh->selected_module_data = NULL;

rank = ompi_comm_rank ( sh->comm );

/*Open a shared memory segment which will hold the shared file pointer*/
if ( mca_sharedfp_sm_verbose ) {
printf( "mca_sharedfp_sm_file_open: allocatge shared memory segment.\n");
printf( "mca_sharedfp_sm_file_open: allocatge shared memory segment.\n");
}


Expand Down Expand Up @@ -135,26 +136,25 @@ int mca_sharedfp_sm_file_open (struct ompi_communicator_t *comm,
return OMPI_ERROR;
}

free(sm_filename);
sm_data->sm_filename = sm_filename;

/*TODO: is it necessary to write to the file first?*/
if( 0 == rank ){
memset ( &sm_offset, 0, sizeof (struct sm_offset ));
write ( sm_fd, &sm_offset, sizeof(struct sm_offset));
memset ( &sm_offset, 0, sizeof (struct mca_sharedfp_sm_offset ));
write ( sm_fd, &sm_offset, sizeof(struct mca_sharedfp_sm_offset));
}
comm->c_coll.coll_barrier (comm, comm->c_coll.coll_barrier_module );

/*the file has been written to, now we can map*/
sm_offset_ptr = mmap(NULL, sizeof(struct sm_offset), PROT_READ | PROT_WRITE,
MAP_SHARED, sm_fd, 0);
sm_offset_ptr = mmap(NULL, sizeof(struct mca_sharedfp_sm_offset), PROT_READ | PROT_WRITE,
MAP_SHARED, sm_fd, 0);

close(sm_fd);

if ( sm_offset_ptr==MAP_FAILED){
err = OMPI_ERROR;
printf("mca_sharedfp_sm_file_open: Error, unable to mmap file: %s\n",sm_filename);
printf("%s\n", strerror(errno));
err = OMPI_ERROR;
printf("mca_sharedfp_sm_file_open: Error, unable to mmap file: %s\n",sm_filename);
printf("%s\n", strerror(errno));
free(sm_filename);
free(sm_data);
free(sh);
Expand All @@ -165,43 +165,38 @@ int mca_sharedfp_sm_file_open (struct ompi_communicator_t *comm,
/* Initialize semaphore so that is shared between processes. */
/* the semaphore is shared by keeping it in the shared memory segment */

#ifdef OMPIO_SHAREDFP_USE_UNNAMED_SEMAPHORES
if(sem_init(&sm_offset_ptr->mutex, 1, 1) != -1){
#else
#if defined(HAVE_SEM_OPEN)
sm_data->sem_name = (char*) malloc( sizeof(char) * (strlen(filename_basename)+32) );
sprintf(sm_data->sem_name,"OMPIO_sharedfp_sem_%s",filename_basename);

if( (sm_data->mutex = sem_open(sm_data->sem_name, O_CREAT, 0644, 1)) != SEM_FAILED ) {
#elif defined(HAVE_SEM_INIT)
sm_data->mutex = &sm_offset_ptr->mutex;
if(sem_init(&sm_offset_ptr->mutex, 1, 1) != -1){
#endif
/*If opening was successful*/
/*Store the new file handle*/
sm_data->sm_offset_ptr = sm_offset_ptr;
/* Assign the sm_data to sh->selected_module_data*/
sh->selected_module_data = sm_data;
/*remember the shared file handle*/
fh->f_sharedfp_data = sh;

/*write initial zero*/
if(rank==0){
MPI_Offset position=0;

#ifdef OMPIO_SHAREDFP_USE_UNNAMED_SEMAPHORES
sem_wait(sm_offset_ptr->mutex);
sm_offset_ptr->offset=position;
sem_post(sm_offset_ptr->mutex);
#else
sem_wait(sm_data->mutex);
sm_offset_ptr->offset=position;
sem_post(sm_data->mutex);
#endif
}
/*If opening was successful*/
/*Store the new file handle*/
sm_data->sm_offset_ptr = sm_offset_ptr;
/* Assign the sm_data to sh->selected_module_data*/
sh->selected_module_data = sm_data;
/*remember the shared file handle*/
fh->f_sharedfp_data = sh;

/*write initial zero*/
if(rank==0){
MPI_Offset position=0;

sem_wait(sm_data->mutex);
sm_offset_ptr->offset=position;
sem_post(sm_data->mutex);
}
}else{
free(sm_filename);
free(sm_data);
free(sh);
free(shfileHandle);
munmap(sm_offset_ptr, sizeof(struct sm_offset));
err = OMPI_ERROR;
free(sm_data);
free(sh);
free(shfileHandle);
munmap(sm_offset_ptr, sizeof(struct mca_sharedfp_sm_offset));
err = OMPI_ERROR;
}

comm->c_coll.coll_barrier (comm, comm->c_coll.coll_barrier_module );
Expand All @@ -218,9 +213,9 @@ int mca_sharedfp_sm_file_close (mca_io_ompio_file_t *fh)
struct mca_sharedfp_sm_data * file_data=NULL;

if( NULL == fh->f_sharedfp_data ){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_file_close: shared file pointer structure not initialized\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_file_close: shared file pointer structure not initialized\n");
}
return OMPI_SUCCESS;
}
sh = fh->f_sharedfp_data;
Expand All @@ -236,14 +231,14 @@ int mca_sharedfp_sm_file_close (mca_io_ompio_file_t *fh)
/*Close sm handle*/
if (file_data->sm_offset_ptr) {
/* destroy semaphore */
#ifdef OMPIO_SHAREDFP_USE_UNNAMED_SEMAPHORES
sem_destroy(file_data->sm_offset_ptr->mutex);
#else
sem_unlink (file_data->sem_name);
free (file_data->sem_name);
#if defined(HAVE_SEM_OPEN)
sem_unlink (file_data->sem_name);
free (file_data->sem_name);
#elif defined(HAVE_SEM_INIT)
sem_destroy(&file_data->sm_offset_ptr->mutex);
#endif
/*Release the shared memory segment.*/
munmap(file_data->sm_offset_ptr,sizeof(struct sm_offset));
munmap(file_data->sm_offset_ptr,sizeof(struct mca_sharedfp_sm_offset));
/*Q: Do we need to delete the file? */
remove(file_data->sm_filename);
}
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/sharedfp/sm/sharedfp_sm_get_position.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mca_sharedfp_sm_get_position(mca_io_ompio_file_t *fh,

if(fh->f_sharedfp_data==NULL){
opal_output(ompi_sharedfp_base_framework.framework_output,
"sharedfp_sm_write - opening the shared file pointer\n");
"sharedfp_sm_write - opening the shared file pointer\n");
shared_fp_base_module = fh->f_sharedfp;

ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
Expand Down
26 changes: 13 additions & 13 deletions ompi/mca/sharedfp/sm/sharedfp_sm_iread.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
#include "ompi/mca/sharedfp/sharedfp.h"

int mca_sharedfp_sm_iread(mca_io_ompio_file_t *fh,
void *buf,
int count,
ompi_datatype_t *datatype,
MPI_Request * request)
void *buf,
int count,
ompi_datatype_t *datatype,
MPI_Request * request)
{
int ret = OMPI_SUCCESS;
OMPI_MPI_OFFSET_TYPE offset = 0;
Expand All @@ -39,9 +39,9 @@ int mca_sharedfp_sm_iread(mca_io_ompio_file_t *fh,
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;

if( NULL == fh->f_sharedfp_data){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: opening the shared file pointer\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;

ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
Expand All @@ -63,15 +63,15 @@ int mca_sharedfp_sm_iread(mca_io_ompio_file_t *fh,
sh = fh->f_sharedfp_data;

if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: Bytes Requested is %ld\n",bytesRequested);
printf("sharedfp_sm_iread: Bytes Requested is %ld\n",bytesRequested);
}
/*Request the offset to write bytesRequested bytes*/
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offset);

if ( -1 != ret ) {
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: Offset received is %lld\n",offset);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: Offset received is %lld\n",offset);
}
/* Read the file */
ret = ompio_io_ompio_file_iread_at(sh->sharedfh,offset,buf,count,datatype,request);
}
Expand All @@ -90,8 +90,8 @@ int mca_sharedfp_sm_read_ordered_begin(mca_io_ompio_file_t *fh,


int mca_sharedfp_sm_read_ordered_end(mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t *status)
void *buf,
ompi_status_public_t *status)
{
opal_output(0,"mca_sharedfp_sm_read_ordered_end: NOT IMPLEMENTED\n");
return OMPI_ERROR;
Expand Down
Loading