Loading presentation...

Present Remotely

Send the link below via email or IM

Copy

Present to your audience

Start remote presentation

  • Invited audience members will follow you as you navigate and present
  • People invited to a presentation do not need a Prezi account
  • This link expires 10 minutes after you close the presentation
  • A maximum of 30 users can follow your presentation
  • Learn more about this feature in our knowledge base article

Do you really want to delete this prezi?

Neither you, nor the coeditors you shared it with will be able to recover it again.

DeleteCancel

Make your likes visible on Facebook?

Connect your Facebook account to Prezi and let your likes appear on your timeline.
You can change this under Settings & Account at any time.

No, thanks

Stream Processing - Past, Present and Reactive Future (with python examples)

No description
by

Oleg Ilyenko

on 25 November 2014

Comments (0)

Please log in to add your comment.

Report abuse

Transcript of Stream Processing - Past, Present and Reactive Future (with python examples)

Error
Handling
* / 30 * * * * import_data.sh
Stream Processing
Past, Present and
Reactive
Future
by
Oleg Ilyenko
@easyangel
About me
Creator of
Scaldi
- Scala DI library
Author of
Hacking Scala
blog
Functional-reactive programming
Scala Backend Engineer at commercetools GmbH
New languages and concepts
8 years of software development
numbers = [1, 2, 3, 4]

it = iter(numbers)

try
:

while
True:
print it.next()
except
StopIteration:

pass

class

Observer
:


def
on_next(self, value):

pass


def
on_error(self, error):

pass


def
on_completed(self):

pass

file = open(
'some-file.csv'
)
chars = [ ]

try
:


try
:

while

True
:
chars += file.
next()

except
StopIteration:

pass

except
IOError as e:
print "I/O error({0}): {1}".format(e.errno, e.strerror)
finally
:
file.close()

Blocking
Resource
Management
It's all about the
programming model
The
Iterator
Pattern
Simple
Lack of real-time feedback
Pull-based model
Pull-based
Model

Reactive Streams
Non-blocking!!
Error
Handling??
Reactive Extensions
from rx import Observer
Push-based
Model

Rx
Originally developed for .Net at Microsoft
Now ported to
Python
Ruby
JavaScript
Clojure
Java
Scala
many more...
Non-blocking
Building Blocks
Composition
is the
key
Ubiquitous
Observable
Observer
Stream
of data
[1, 2, 3, 4, ...]
Consumable
Can be infinite
Subscribes to Observable
Subject
= Observable + Observer
from
rx
import
Observable

users
= get_users()

last_names
=
users
\
.filter(
lambda
user: user.first_name ==
"John"
) \
.map(
lambda
user: user.last_name)

last_names
.subscribe(
on_next =
lambda
last_name: print(
"Hello "
+ last_name),
on_error =
lambda
error: print(error),
on_completed =
lambda
: print(
"Done"
)
)
users
John
Doe
Bob
Martin
Jane Smith
John
Smith
Lieschen
Müller
lastNames
Doe
Smith
"Hello Doe"
"Hello Smith"
"Done"
Backpressure
Resource
Management??
Consumes the data
OOM
akka-http
Reactive Streams
Collaborative effort
Still in development
Async and non-blocking backpressure mechanism
Minimalistic
class

Publisher
:


def
subscribe(self, subscriber):

pass
class

Subscriber
:


def
on_subscribe(self, subscription):

pass


def
on_next(self, value):

pass


def
on_error(self, error):

pass


def
on_completed(self):

pass
class

Subscription
:


def

request
(self, n):

pass


def
cancel(self):

pass
Backpressure Stream
Resources
Reactive Extensions
Reactive Streams
This Presentation
Thank you!
Questions?
http://reactivex.io/
http://www.reactive-streams.org/
http://bit.ly/stream-processing-python
GET /images/123.tiff
123.tiff 5 GB
Inefficient resource usage
queries
= get_search_queries()
search_results
= get_search_output()
Subscription
Unsubscribe
.forEach(
function
(event) {

image
.offset({
top
: event.
clientY
,
left
: event.
clientX
})
});
mouseDowns
.flatMap(
function
(event) {
event.preventDefault()
return
mouseMoves
.takeUntil(
mouseUps
)
})
var

mouseDowns
= Rx.Observable.fromEvent(
image
,
"mousedown"
)

var

mouseUps
= Rx.Observable.fromEvent(
document
,
"mouseup"
)
var

mouseMoves
= Rx.Observable.fromEvent(
document
,
"mousemove"
)
var

image
= $(
"#image"
)
@easyangel
https://github.com/OlegIlyenko
from
rx
import
Observable
def
do_search(query):
es_results = search_elasticsearch(query)
google_results = search_google(query) \
.retry(
3
) \
.on_error_resume_next(Observable.just(
"No results from google."
))

return
Observable.merge(es_results, google_results).take(5)
results
=
queries
.throttle_last(
250
).flat_map(
do_search
)
results
.subscribe(
search_results
)
Full transcript