scala, transform a callback pattern to a functional style internal iterator -
suppose api given , cannot change it:
object providerapi { trait receiver[t] { def receive(entry: t) def close() } def run(r: receiver[int]) { new thread() { override def run() { (0 9).foreach { => r.receive(i) thread.sleep(100) } r.close() } }.start() } }
in example, providerapi.run
takes receiver
, calls receive(i)
10 times , closes. typically, providerapi.run
call receive(i)
based on collection infinite.
this api intended used in imperative style, external iterator. if our application needs filter, map , print input, need implement receiver mixes these operations:
object main extends app { class myreceiver extends providerapi.receiver[int] { def receive(entry: int) { if (entry % 2 == 0) { println("entry#" + entry) } } def close() {} } providerapi.run(new myreceiver()) }
now, question how use providerapi in functional style, internal iterator (without changing implementation of providerapi, given us). note providerapi call receive(i)
infinite times, not option collect in list (also, should handle each result 1 one, instead of collecting input first, , processing afterwards).
i asking how implement such receivertoiterator
, can use providerapi in functional style:
object main extends app { val iterator = new receivertoiterator[int] // how implement this? providerapi.run(iterator) iterator .view .filter(_ % 2 == 0) .map("entry#" + _) .foreach(println) }
update
here 4 solutions:
iteratorwithsemaphorsolution: workaround solution proposed first attached question
queueiteratorsolution: using
blockingqueue[option[t]]
based on suggestion of nadavwr. allows producer continue producingqueuecapacity
before being blocked consumer.publishsubjectsolution: simple solution, using
publishsubject
netflix rxjava-scala api.samethreadreceivertotraversable: simple solution, relaxing constraints of question
updated: blockingqueue of 1 entry
what you've implemented here java's blockingqueue, queue size of 1.
main characteristic: uber-blocking. slow consumer kill producer's performance.
update: @gzm0 mentioned blockingqueue doesn't cover eof. you'll have use blockingqueue[option[t]] that.
update: here's code fragment. can made fit receiver
.
of inspired iterator.buffered
. note peek
misleading name, may block -- , hasnext
.
// fairness enabled -- want preserve order... // alternatively, disable fairness , increase buffer 'big enough' private val queue = new java.util.concurrent.arrayblockingqueue[option[t]](1, true) // following block provides potentially blocking peek operation // should `queue.take` when previous peeked head has been invalidated // specifically, `queue.take` , block when queue empty private var head: option[t] = _ private var headdefined: boolean = false private def invalidatehead() { headdefined = false } private def peek: option[t] = { if (!headdefined) { head = queue.take() headdefined = true } head } def iterator = new iterator[t] { // potentially blocking; false upon taking `none` def hasnext = peek.isdefined // peeks , invalidates head; throws nosuchelementexception appropriate def next: t = { val opt = peek; invalidatehead() if (opt.isempty) throw new nosuchelementexception else opt.get } }
alternative: iteratees
iterator-based solutions involve more blocking. conceptually, use continuations on thread doing iteration avoid blocking thread, continuations mess scala's for-comprehensions, no joy down road.
alternatively, consider iteratee-based solution. iteratees different iterators in consumer isn't responsible advancing iteration -- producer is. iteratees, consumer folds on entries pushed producer on time. folding each next entry becomes available can take place in thread pool, since thread relinquished after each fold completes.
you won't nice for-syntax iteration, , learning curve little challenging, if feel confident using foldleft
you'll end non-blocking solution reasonable on eye.
to read more iteratees, suggest taking peek @ playframework 2.x's iteratee reference. documentation describes stand-alone iteratee library, 100% usable outside context of play. scalaz 7 has comprehensive iteratee library.
Comments
Post a Comment