@@ -78,6 +78,7 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
78
78
lead_pos = []
79
79
bindings = [] # type: List[Dict[Text,Text]]
80
80
binding = None # type: Dict[Text,Any]
81
+ value_from_expression = False
81
82
if "inputBinding" in schema and isinstance (schema ["inputBinding" ], dict ):
82
83
binding = copy .copy (schema ["inputBinding" ])
83
84
@@ -87,29 +88,33 @@ def bind_input(self, schema, datum, lead_pos=None, tail_pos=None):
87
88
binding ["position" ] = aslist (lead_pos ) + [0 ] + aslist (tail_pos )
88
89
89
90
binding ["datum" ] = datum
91
+ if "valueFrom" in binding :
92
+ value_from_expression = True
90
93
91
94
# Handle union types
92
95
if isinstance (schema ["type" ], list ):
93
- for t in schema ["type" ]:
94
- if isinstance (t , (str , Text )) and self .names .has_name (t , "" ):
95
- avsc = self .names .get_name (t , "" )
96
- elif isinstance (t , dict ) and "name" in t and self .names .has_name (t ["name" ], "" ):
97
- avsc = self .names .get_name (t ["name" ], "" )
98
- else :
99
- avsc = AvroSchemaFromJSONData (t , self .names )
100
- if validate .validate (avsc , datum ):
101
- schema = copy .deepcopy (schema )
102
- schema ["type" ] = t
103
- return self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos )
104
- raise validate .ValidationException (u"'%s' is not a valid union %s" % (datum , schema ["type" ]))
96
+ if not value_from_expression :
97
+ for t in schema ["type" ]:
98
+ if isinstance (t , (str , Text )) and self .names .has_name (t , "" ):
99
+ avsc = self .names .get_name (t , "" )
100
+ elif isinstance (t , dict ) and "name" in t and self .names .has_name (t ["name" ], "" ):
101
+ avsc = self .names .get_name (t ["name" ], "" )
102
+ else :
103
+ avsc = AvroSchemaFromJSONData (t , self .names )
104
+ if validate .validate (avsc , datum ):
105
+ schema = copy .deepcopy (schema )
106
+ schema ["type" ] = t
107
+ return self .bind_input (schema , datum , lead_pos = lead_pos , tail_pos = tail_pos )
108
+ raise validate .ValidationException (u"'%s' is not a valid union %s" % (datum , schema ["type" ]))
105
109
elif isinstance (schema ["type" ], dict ):
106
- st = copy .deepcopy (schema ["type" ])
107
- if binding and "inputBinding" not in st and st ["type" ] == "array" and "itemSeparator" not in binding :
108
- st ["inputBinding" ] = {}
109
- for k in ("secondaryFiles" , "format" , "streamable" ):
110
- if k in schema :
111
- st [k ] = schema [k ]
112
- bindings .extend (self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos ))
110
+ if not value_from_expression :
111
+ st = copy .deepcopy (schema ["type" ])
112
+ if binding and "inputBinding" not in st and st ["type" ] == "array" and "itemSeparator" not in binding :
113
+ st ["inputBinding" ] = {}
114
+ for k in ("secondaryFiles" , "format" , "streamable" ):
115
+ if k in schema :
116
+ st [k ] = schema [k ]
117
+ bindings .extend (self .bind_input (st , datum , lead_pos = lead_pos , tail_pos = tail_pos ))
113
118
else :
114
119
if schema ["type" ] in self .schemaDefs :
115
120
schema = self .schemaDefs [schema ["type" ]]
0 commit comments