2929from typing import TypeVar
3030from typing import Union
3131
32+ import json
33+ import threading
34+ import uuid
35+
3236import apache_beam as beam
3337from apache_beam .io .filesystems import FileSystems
3438from apache_beam .portability .api import schema_pb2
5357from apache_beam .yaml .yaml_errors import maybe_with_exception_handling_transform_fn
5458from apache_beam .yaml .yaml_provider import dicts_to_rows
5559
56- # Import js2py package if it exists
5760try :
58- import js2py
59- from js2py .base import JsObjectWrapper
61+ from py_mini_racer import MiniRacer
6062except ImportError :
61- js2py = None
62- JsObjectWrapper = object
63+ MiniRacer = None
64+
65+ _js_thread_funcs = {}
6366
6467_str_expression_fields = {
6568 'AssignTimestamps' : 'timestamp' ,
@@ -178,18 +181,7 @@ def _check_mapping_arguments(
178181 raise ValueError (f'{ transform_name } cannot specify "name" without "path"' )
179182
180183
181- # js2py's JsObjectWrapper object has a self-referencing __dict__ property
182- # that cannot be pickled without implementing the __getstate__ and
183- # __setstate__ methods.
184- class _CustomJsObjectWrapper (JsObjectWrapper ):
185- def __init__ (self , js_obj ):
186- super ().__init__ (js_obj .__dict__ ['_obj' ])
187-
188- def __getstate__ (self ):
189- return self .__dict__ .copy ()
190184
191- def __setstate__ (self , state ):
192- self .__dict__ .update (state )
193185
194186
195187# TODO(yaml) Improve type inferencing for JS UDF's
@@ -205,83 +197,86 @@ def py_value_to_js_dict(py_value):
205197 return py_value
206198
207199
200+ def js_to_py (obj ):
201+ """Converts mini-racer mapped objects to standard Python types.
202+
203+ This is needed because ctx.eval returns JSMappedObjectImpl and JSArrayImpl
204+ for JS objects and arrays, which are not picklable and would fail when Beam
205+ tries to serialize rows containing them. We also preserve datetime objects
206+ which are correctly produced by ctx.eval for JS Date objects.
207+ """
208+ import datetime
209+ from collections import abc
210+
211+ type_name = type (obj ).__name__
212+ if type_name == 'JSMappedObjectImpl' :
213+ return {k : js_to_py (v ) for k , v in dict (obj ).items ()}
214+ elif type_name == 'JSArrayImpl' :
215+ return [js_to_py (v ) for v in list (obj )]
216+ elif isinstance (obj , datetime .datetime ):
217+ return obj
218+ elif isinstance (obj , dict ):
219+ return {k : js_to_py (v ) for k , v in obj .items ()}
220+ elif not isinstance (obj , str ) and isinstance (obj , abc .Iterable ):
221+ return [js_to_py (v ) for v in list (obj )]
222+ else :
223+ return obj
224+
225+
208226# TODO(yaml) Consider adding optional language version parameter to support
209227# ECMAScript 5 and 6
210228def _expand_javascript_mapping_func (
211229 original_fields , expression = None , callable = None , path = None , name = None ):
212230
213- # Check for installed js2py package
214- if js2py is None :
231+ if MiniRacer is None :
215232 raise ValueError (
216- "Javascript mapping functions are not supported on"
217- " Python 3.12 or later." )
218-
219- # import remaining js2py objects
220- from js2py import base
221- from js2py .constructors import jsdate
222- from js2py .internals import simplex
223-
224- js_array_type = (
225- base .PyJsArray ,
226- base .PyJsArrayBuffer ,
227- base .PyJsInt8Array ,
228- base .PyJsUint8Array ,
229- base .PyJsUint8ClampedArray ,
230- base .PyJsInt16Array ,
231- base .PyJsUint16Array ,
232- base .PyJsInt32Array ,
233- base .PyJsUint32Array ,
234- base .PyJsFloat32Array ,
235- base .PyJsFloat64Array )
236-
237- def _js_object_to_py_object (obj ):
238- if isinstance (obj , (base .PyJsNumber , base .PyJsString , base .PyJsBoolean )):
239- return base .to_python (obj )
240- elif isinstance (obj , js_array_type ):
241- return [_js_object_to_py_object (value ) for value in obj .to_list ()]
242- elif isinstance (obj , jsdate .PyJsDate ):
243- return obj .to_utc_dt ()
244- elif isinstance (obj , (base .PyJsNull , base .PyJsUndefined )):
245- return None
246- elif isinstance (obj , base .PyJsError ):
247- raise RuntimeError (obj ['message' ])
248- elif isinstance (obj , base .PyJsObject ):
249- return {
250- key : _js_object_to_py_object (value ['value' ])
251- for (key , value ) in obj .own .items ()
252- }
253- elif isinstance (obj , base .JsObjectWrapper ):
254- return _js_object_to_py_object (obj ._obj )
255-
256- return obj
257-
258- if expression :
259- source = '\n ' .join (['function(__row__) {' ] + [
260- f' { name } = __row__.{ name } '
261- for name in original_fields if name in expression
262- ] + [' return (' + expression + ')' ] + ['}' ])
263- js_func = _CustomJsObjectWrapper (js2py .eval_js (source ))
264-
265- elif callable :
266- js_func = _CustomJsObjectWrapper (js2py .eval_js (callable ))
233+ "JavaScript mapping functions require the 'mini-racer' package to be installed." )
267234
268- else :
235+ udf_code = None
236+ if path :
269237 if not path .endswith ('.js' ):
270238 raise ValueError (f'File "{ path } " is not a valid .js file.' )
271239 udf_code = FileSystems .open (path ).read ().decode ()
272- js = js2py .EvalJs ()
273- js .eval (udf_code )
274- js_func = _CustomJsObjectWrapper (getattr (js , name ))
240+ elif expression :
241+ udf_code = f"var func = (__row__) => {{ " + " " .join ([
242+ f"const { n } = __row__.{ n } ;"
243+ for n in original_fields if n in expression
244+ ]) + f" return ({ expression } ); }}"
245+ elif callable :
246+ udf_code = f"var func = { callable } "
247+
248+ udf_key = str (uuid .uuid4 ())
275249
276250 def js_wrapper (row ):
251+ tid = threading .get_ident ()
252+
253+ global _js_thread_funcs
254+ # MiniRacer contexts are not picklable and cannot be shared across threads.
255+ # We use a global dict keyed by thread ID to lazily create and cache a
256+ # context per thread.
257+ if tid not in _js_thread_funcs :
258+ _js_thread_funcs [tid ] = {}
259+
260+ if udf_key not in _js_thread_funcs [tid ]:
261+ ctx = MiniRacer ()
262+ ctx .eval (udf_code )
263+ # We use ctx.eval instead of ctx.call to ensure that JavaScript Date
264+ # objects are correctly returned as Python datetime objects.
265+ # We JSON-serialize the arguments to pass them safely to eval.
266+ if expression or callable :
267+ _js_thread_funcs [tid ][udf_key ] = lambda x : ctx .eval (f"func({ json .dumps (x )} )" )
268+ else :
269+ _js_thread_funcs [tid ][udf_key ] = lambda x : ctx .eval (f"{ name } ({ json .dumps (x )} )" )
270+
271+ func = _js_thread_funcs [tid ][udf_key ]
277272 row_as_dict = py_value_to_js_dict (row )
278273 try :
279- js_result = js_func (row_as_dict )
280- except simplex . JsException as exn :
274+ result = func (row_as_dict )
275+ except Exception as exn :
281276 raise RuntimeError (
282- f"Error evaluating javascript expression: "
283- f" { exn . mes [ 'message' ] } " ) from exn
284- return dicts_to_rows (_js_object_to_py_object ( js_result ) )
277+ f"Error evaluating JavaScript expression: { exn } " ) from exn
278+ result = js_to_py ( result )
279+ return dicts_to_rows (result )
285280
286281 return js_wrapper
287282
0 commit comments