open:rxpy

RxPy

snippet.python
import rx
 
stream = rx.subject.Subject()
 
c = stream.pipe(
#     ops.take_while(lambda i: i < 60)
    ops.filter(lambda i: i < 60)
)
c.subscribe(on_next=lambda i: print("i is {}".format(i)))
 
stream.on_next(0)
stream.on_next(50)
stream.on_next(51)
stream.on_next(49)
stream.on_next(100)
stream.on_next(-50)
snippet.python
from rx.scheduler import ThreadPoolScheduler
import multiprocessing
 
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count + 1)
 
 
composed = source.pipe(
    ops.filter(lambda d: not d['IsPlusAD'] and not d['IsPowerClickAD']),
#     ops.take(1),
    ops.map_indexed(lambda x, i: (x, i)),
    ops.delay(0.1, pool_scheduler)
 
)
 
composed.subscribe(on_next=on_next_data)

  • open/rxpy.txt
  • 마지막으로 수정됨: 2020/06/02 09:25
  • 저자 127.0.0.1