1515# limitations under the License.
1616#
1717
18+ from pathlib import Path
19+ import subprocess
20+ import time
1821from pywy .dataquanta import WayangContext
1922from pywy .platforms .java import JavaPlugin
2023from pywy .platforms .spark import SparkPlugin
24+ from pywy .tests import resources as resources_folder
25+
26+ from importlib import resources
27+ from pathlib import Path
2128
2229def test_filter_to_json ():
23- ctx = WayangContext () \
24- .register ({JavaPlugin , SparkPlugin })
25- left = ctx .textfile ("file:///var/www/html/README.md" ) \
26- .filter (lambda w : "Apache" in w , str ) \
27- .flatmap (lambda w : w .split (), str , str ) \
28- .map (lambda w : (len (w ), w ), str , (int , str ))
29- right = ctx .textfile ("file:///var/www/html/README.md" ) \
30- .filter (lambda w : "Wayang" in w , str ) \
31- .map (lambda w : (len (w ), w ), str , (int , str ))
32- join = left .join (lambda w : w [0 ], right , lambda w : w [0 ], (int , str )) \
33- .store_textfile ("file:///var/www/html/data/wordcount-out-python.txt" )
34-
35- assert join is not None , "Could not construct join."
30+ wayang_runner_dir = Path .cwd () / 'wayang-assembly' / 'target' / 'wayang-1.1.0' / 'bin'
31+
32+ print ("Opening subprocess" )
33+ with resources .path (resources_folder , "sample_data.txt" ) as resource_path , \
34+ resources .path (resources_folder , "wordcount_out_python.txt" ) as output_path , \
35+ resources .path (resources_folder , "wayang.properties" ) as configuration_file_path :
36+ print (f"Using resource path: { resource_path } " )
37+ print (f"Using output path: { output_path } " )
38+ print (f"Using configuration path: { configuration_file_path } " )
39+ proc = subprocess .Popen ([
40+ f"{ wayang_runner_dir } /wayang-submit" ,
41+ f"-Dwayang.configuration=file://{ configuration_file_path } " ,
42+ f"org.apache.wayang.api.json.Main" ,
43+ f"8080" ], stdout = subprocess .PIPE , stderr = subprocess .PIPE )
44+ time .sleep (5 )
45+ try :
46+ print (f"Running process: { proc .pid } with args: { proc .args } " )
47+ ctx = WayangContext () \
48+ .register ({JavaPlugin , SparkPlugin })
49+ left = ctx .textfile (f"file://{ resource_path } " ) \
50+ .filter (lambda w : "Apache" in w , str ) \
51+ .flatmap (lambda w : w .split (), str , str ) \
52+ .map (lambda w : (str (len (w )), w ), str , (int , str ))
53+ right = ctx .textfile (f"file://{ resource_path } " ) \
54+ .filter (lambda w : "Wayang" in w , str ) \
55+ .map (lambda w : (str (len (w )), w ), str , (int , str ))
56+ join = left .join (lambda w : w [0 ], right , lambda w : w [0 ], (int , str )) \
57+ .store_textfile (f"file://{ output_path } " )
58+ time .sleep (3 )
59+
60+ for _ in range (1 ):
61+ print (proc .stdout .readline ())
62+ assert join is not None , "Could not construct join."
63+ finally :
64+ proc .kill ()
0 commit comments