@@ -385,6 +385,7 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
385
385
/* add this name to the pool */
386
386
nd = OBJ_NEW (orte_node_t );
387
387
nd -> name = strdup (names [n ]);
388
+ nd -> index = n ;
388
389
opal_pointer_array_set_item (orte_node_pool , n , nd );
389
390
/* set the topology - always default to homogeneous
390
391
* as that is the most common scenario */
@@ -409,7 +410,6 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
409
410
daemons -> num_procs ++ ;
410
411
opal_pointer_array_set_item (daemons -> procs , proc -> name .vpid , proc );
411
412
}
412
- nd -> index = proc -> name .vpid ;
413
413
OBJ_RETAIN (nd );
414
414
proc -> node = nd ;
415
415
OBJ_RETAIN (proc );
@@ -945,8 +945,9 @@ int orte_util_parse_node_info(opal_buffer_t *buf)
945
945
int orte_util_generate_ppn (orte_job_t * jdata ,
946
946
opal_buffer_t * buf )
947
947
{
948
- uint16_t * ppn = NULL ;
949
- size_t nbytes ;
948
+ uint16_t ppn ;
949
+ uint8_t * bytes ;
950
+ int32_t nbytes ;
950
951
int rc = ORTE_SUCCESS ;
951
952
orte_app_idx_t i ;
952
953
int j , k ;
@@ -955,40 +956,47 @@ int orte_util_generate_ppn(orte_job_t *jdata,
955
956
orte_node_t * nptr ;
956
957
orte_proc_t * proc ;
957
958
size_t sz ;
959
+ opal_buffer_t bucket ;
958
960
959
- /* make room for the number of procs on each node */
960
- nbytes = sizeof (uint16_t ) * orte_node_pool -> size ;
961
- ppn = (uint16_t * )malloc (nbytes );
961
+ OBJ_CONSTRUCT (& bucket , opal_buffer_t );
962
962
963
963
for (i = 0 ; i < jdata -> num_apps ; i ++ ) {
964
- /* reset the #procs */
965
- memset (ppn , 0 , nbytes );
966
- /* for each app_context, compute the #procs on
967
- * each node of the allocation */
968
- for (j = 0 ; j < orte_node_pool -> size ; j ++ ) {
969
- if (NULL == (nptr = (orte_node_t * )opal_pointer_array_get_item (orte_node_pool , j ))) {
964
+ /* for each app_context */
965
+ for (j = 0 ; j < jdata -> map -> nodes -> size ; j ++ ) {
966
+ if (NULL == (nptr = (orte_node_t * )opal_pointer_array_get_item (jdata -> map -> nodes , j ))) {
970
967
continue ;
971
968
}
972
969
if (NULL == nptr -> daemon ) {
973
970
continue ;
974
971
}
972
+ ppn = 0 ;
975
973
for (k = 0 ; k < nptr -> procs -> size ; k ++ ) {
976
974
if (NULL != (proc = (orte_proc_t * )opal_pointer_array_get_item (nptr -> procs , k ))) {
977
975
if (proc -> name .jobid == jdata -> jobid ) {
978
- ++ ppn [ j ] ;
976
+ ++ ppn ;
979
977
}
980
978
}
981
979
}
980
+ if (0 < ppn ) {
981
+ if (ORTE_SUCCESS != (rc = opal_dss .pack (& bucket , & nptr -> index , 1 , ORTE_STD_CNTR ))) {
982
+ goto cleanup ;
983
+ }
984
+ if (ORTE_SUCCESS != (rc = opal_dss .pack (& bucket , & ppn , 1 , OPAL_UINT16 ))) {
985
+ goto cleanup ;
986
+ }
987
+ }
982
988
}
983
- if (opal_compress .compress_block ((uint8_t * )ppn , nbytes ,
989
+ opal_dss .unload (& bucket , (void * * )& bytes , & nbytes );
990
+
991
+ if (opal_compress .compress_block (bytes , (size_t )nbytes ,
984
992
(uint8_t * * )& bo .bytes , & sz )) {
985
993
/* mark that this was compressed */
986
994
compressed = true;
987
995
bo .size = sz ;
988
996
} else {
989
997
/* mark that this was not compressed */
990
998
compressed = false;
991
- bo .bytes = ( uint8_t * ) ppn ;
999
+ bo .bytes = bytes ;
992
1000
bo .size = nbytes ;
993
1001
}
994
1002
/* indicate compression */
@@ -1015,21 +1023,31 @@ int orte_util_generate_ppn(orte_job_t *jdata,
1015
1023
}
1016
1024
1017
1025
cleanup :
1018
- free ( ppn );
1026
+ OBJ_DESTRUCT ( & bucket );
1019
1027
return rc ;
1020
1028
}
1021
1029
1022
1030
int orte_util_decode_ppn (orte_job_t * jdata ,
1023
1031
opal_buffer_t * buf )
1024
1032
{
1033
+ orte_std_cntr_t index ;
1025
1034
orte_app_idx_t n ;
1026
- int m , cnt , rc ;
1035
+ int cnt , rc , m ;
1027
1036
opal_byte_object_t * boptr ;
1028
1037
bool compressed ;
1038
+ uint8_t * bytes ;
1029
1039
size_t sz ;
1030
- uint16_t * ppn , k ;
1040
+ uint16_t ppn , k ;
1031
1041
orte_node_t * node ;
1032
1042
orte_proc_t * proc ;
1043
+ opal_buffer_t bucket ;
1044
+
1045
+ /* reset any flags */
1046
+ for (m = 0 ; m < orte_node_pool -> size ; m ++ ) {
1047
+ if (NULL != (node = (orte_node_t * )opal_pointer_array_get_item (orte_node_pool , m ))) {
1048
+ ORTE_FLAG_UNSET (node , ORTE_NODE_FLAG_MAPPED );
1049
+ }
1050
+ }
1033
1051
1034
1052
for (n = 0 ; n < jdata -> num_apps ; n ++ ) {
1035
1053
/* unpack the compression flag */
@@ -1062,14 +1080,15 @@ int orte_util_decode_ppn(orte_job_t *jdata,
1062
1080
1063
1081
/* decompress if required */
1064
1082
if (compressed ) {
1065
- if (!opal_compress .decompress_block (( uint8_t * * ) & ppn , sz ,
1083
+ if (!opal_compress .decompress_block (& bytes , sz ,
1066
1084
boptr -> bytes , boptr -> size )) {
1067
1085
ORTE_ERROR_LOG (ORTE_ERROR );
1068
1086
OBJ_RELEASE (boptr );
1069
1087
return ORTE_ERROR ;
1070
1088
}
1071
1089
} else {
1072
- ppn = (uint16_t * )boptr -> bytes ;
1090
+ bytes = boptr -> bytes ;
1091
+ sz = boptr -> size ;
1073
1092
boptr -> bytes = NULL ;
1074
1093
boptr -> size = 0 ;
1075
1094
}
@@ -1078,38 +1097,74 @@ int orte_util_decode_ppn(orte_job_t *jdata,
1078
1097
}
1079
1098
free (boptr );
1080
1099
1081
- /* cycle thru the node pool */
1082
- for (m = 0 ; m < orte_node_pool -> size ; m ++ ) {
1083
- if (NULL == (node = (orte_node_t * )opal_pointer_array_get_item (orte_node_pool , m ))) {
1084
- continue ;
1100
+ /* setup to unpack */
1101
+ OBJ_CONSTRUCT (& bucket , opal_buffer_t );
1102
+ opal_dss .load (& bucket , bytes , sz );
1103
+
1104
+ /* unpack each node and its ppn */
1105
+ cnt = 1 ;
1106
+ while (OPAL_SUCCESS == (rc = opal_dss .unpack (& bucket , & index , & cnt , ORTE_STD_CNTR ))) {
1107
+ /* get the corresponding node object */
1108
+ if (NULL == (node = (orte_node_t * )opal_pointer_array_get_item (orte_node_pool , index ))) {
1109
+ rc = ORTE_ERR_NOT_FOUND ;
1110
+ ORTE_ERROR_LOG (rc );
1111
+ goto error ;
1085
1112
}
1086
- if (0 < ppn [m ]) {
1087
- if (!ORTE_FLAG_TEST (node , ORTE_NODE_FLAG_MAPPED )) {
1088
- OBJ_RETAIN (node );
1089
- ORTE_FLAG_SET (node , ORTE_NODE_FLAG_MAPPED );
1090
- opal_pointer_array_add (jdata -> map -> nodes , node );
1091
- }
1092
- /* create a proc object for each one */
1093
- for (k = 0 ; k < ppn [m ]; k ++ ) {
1094
- proc = OBJ_NEW (orte_proc_t );
1095
- proc -> name .jobid = jdata -> jobid ;
1096
- /* leave the vpid undefined as this will be determined
1097
- * later when we do the overall ranking */
1098
- proc -> app_idx = n ;
1099
- proc -> parent = node -> daemon -> name .vpid ;
1100
- OBJ_RETAIN (node );
1101
- proc -> node = node ;
1102
- /* flag the proc as ready for launch */
1103
- proc -> state = ORTE_PROC_STATE_INIT ;
1104
- opal_pointer_array_add (node -> procs , proc );
1105
- /* we will add the proc to the jdata array when we
1106
- * compute its rank */
1107
- }
1108
- node -> num_procs += ppn [m ];
1113
+ /* add the node to the job map if not already assigned */
1114
+ if (!ORTE_FLAG_TEST (node , ORTE_NODE_FLAG_MAPPED )) {
1115
+ OBJ_RETAIN (node );
1116
+ opal_pointer_array_add (jdata -> map -> nodes , node );
1117
+ ORTE_FLAG_SET (node , ORTE_NODE_FLAG_MAPPED );
1118
+ }
1119
+ /* get the ppn */
1120
+ cnt = 1 ;
1121
+ if (OPAL_SUCCESS != (rc = opal_dss .unpack (& bucket , & ppn , & cnt , OPAL_UINT16 ))) {
1122
+ ORTE_ERROR_LOG (rc );
1123
+ goto error ;
1109
1124
}
1125
+ /* create a proc object for each one */
1126
+ for (k = 0 ; k < ppn ; k ++ ) {
1127
+ proc = OBJ_NEW (orte_proc_t );
1128
+ proc -> name .jobid = jdata -> jobid ;
1129
+ /* leave the vpid undefined as this will be determined
1130
+ * later when we do the overall ranking */
1131
+ proc -> app_idx = n ;
1132
+ proc -> parent = node -> daemon -> name .vpid ;
1133
+ OBJ_RETAIN (node );
1134
+ proc -> node = node ;
1135
+ /* flag the proc as ready for launch */
1136
+ proc -> state = ORTE_PROC_STATE_INIT ;
1137
+ opal_pointer_array_add (node -> procs , proc );
1138
+ node -> num_procs ++ ;
1139
+ /* we will add the proc to the jdata array when we
1140
+ * compute its rank */
1141
+ }
1142
+ node -> num_procs += ppn ;
1143
+ cnt = 1 ;
1110
1144
}
1111
- free (ppn );
1145
+ OBJ_DESTRUCT (& bucket );
1146
+ }
1147
+ if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc ) {
1148
+ ORTE_ERROR_LOG (rc );
1112
1149
}
1113
1150
1151
+ /* reset any flags */
1152
+ for (m = 0 ; m < jdata -> map -> nodes -> size ; m ++ ) {
1153
+ node = (orte_node_t * )opal_pointer_array_get_item (jdata -> map -> nodes , m );
1154
+ if (NULL != node ) {
1155
+ ORTE_FLAG_UNSET (node , ORTE_NODE_FLAG_MAPPED );
1156
+ }
1157
+ }
1114
1158
return ORTE_SUCCESS ;
1159
+
1160
+ error :
1161
+ OBJ_DESTRUCT (& bucket );
1162
+ /* reset any flags */
1163
+ for (m = 0 ; m < jdata -> map -> nodes -> size ; m ++ ) {
1164
+ node = (orte_node_t * )opal_pointer_array_get_item (jdata -> map -> nodes , m );
1165
+ if (NULL != node ) {
1166
+ ORTE_FLAG_UNSET (node , ORTE_NODE_FLAG_MAPPED );
1167
+ }
1168
+ }
1169
+ return rc ;
1115
1170
}
0 commit comments