from portage import os
from portage.tests import TestCase
-from _emerge.TaskScheduler import TaskScheduler
+from _emerge.PollScheduler import PollScheduler
from _emerge.PipeReader import PipeReader
from _emerge.SpawnProcess import SpawnProcess
test_string = 2 * "blah blah blah\n"
- task_scheduler = TaskScheduler()
+ scheduler = PollScheduler().sched_iface
master_fd, slave_fd = os.pipe()
master_file = os.fdopen(master_fd, 'rb')
slave_file = os.fdopen(slave_fd, 'wb')
producer = SpawnProcess(
args=["bash", "-c", "echo -n '%s'" % test_string],
env=os.environ, fd_pipes={1:slave_fd},
- scheduler=task_scheduler.sched_iface)
+ scheduler=scheduler)
producer.start()
slave_file.close()
consumer = PipeReader(
input_files={"producer" : master_file},
- scheduler=task_scheduler.sched_iface)
+ scheduler=scheduler)
- task_scheduler.add(consumer)
- task_scheduler.run()
+ consumer.start()
+ consumer.wait()
output = consumer.getvalue().decode('ascii', 'replace')
self.assertEqual(test_string, output)