# RxPy ```python import rx stream = rx.subject.Subject() c = stream.pipe( # ops.take_while(lambda i: i < 60) ops.filter(lambda i: i < 60) ) c.subscribe(on_next=lambda i: print("i is {}".format(i))) stream.on_next(0) stream.on_next(50) stream.on_next(51) stream.on_next(49) stream.on_next(100) stream.on_next(-50) ``` ```python from rx.scheduler import ThreadPoolScheduler import multiprocessing optimal_thread_count = multiprocessing.cpu_count() pool_scheduler = ThreadPoolScheduler(optimal_thread_count + 1) composed = source.pipe( ops.filter(lambda d: not d['IsPlusAD'] and not d['IsPowerClickAD']), # ops.take(1), ops.map_indexed(lambda x, i: (x, i)), ops.delay(0.1, pool_scheduler) ) composed.subscribe(on_next=on_next_data) ```