Message bus¶
Message bus is the transport layer abstraction mechanism. new_frontera provides interface and several implementations. Only one message bus can be used in crawler at the time, and it’s selected with :setting:`MESSAGE_BUS` setting.
Spiders process can use
- class new_frontera.contrib.backends.remote.messagebus.MessageBusBackend(manager)¶
to communicate using message bus.
Built-in message bus reference¶
ZeroMQ¶
It’s the default option, implemented using lightweight ZeroMQ library in
and can be configured using ZeroMQ message bus settings.
ZeroMQ message bus requires installed ZeroMQ library and running broker process, see Start cluster.
Overall ZeroMQ message bus is designed to get a working PoC quickly and smaller deployments. Mainly because it’s prone to message loss when data flow of components isn’t properly adjusted or during startup. Here’s the recommended order of components startup to avoid message loss:
:term:`spider`s
- Unfortunately, it’s not possible to avoid message loss when stopping running crawler with unfinished crawl. We recommend
to use Kafka message bus if your crawler application is sensitive to small message loss.
WARNING! ZeroMQ message bus doesn’t support yet multiple SW and DB workers, only one instance of each worker type is allowed.
Kafka¶
Can be selected with
and configured using Kafka message bus settings.
Requires running Kafka service and more suitable for large-scale web crawling.
Protocol¶
Depending on stream new_frontera is using several message types to code it’s messages. Every message is a python native
object serialized using msgpack or JSON. The codec module can be selected using :setting:`MESSAGE_BUS_CODEC`, and
it’s required to export Encoder and Decoder classes.
Here are the classes needed to subclass to implement own codec:
- class new_frontera.core.codec.BaseEncoder¶
- abstract encode_page_crawled(response)¶
Encodes a page_crawled message
- Parameters:
response (object) – A frontier Response object
- Returns:
bytes encoded message
- abstract encode_request_error(request, error)¶
Encodes a request_error message
- Parameters:
request (object) – A frontier Request object
error (string) – Error description
- Returns:
bytes encoded message
- abstract encode_request(request)¶
Encodes requests for spider feed stream.
- Parameters:
request (object) – new_frontera Request object
- Returns:
bytes encoded message
- abstract encode_update_score(request, score, schedule)¶
Encodes update_score messages for scoring log stream.
- Parameters:
request (object) – new_frontera Request object
score (float) – score
schedule (bool) – True if document needs to be scheduled for download
- Returns:
bytes encoded message
- abstract encode_new_job_id(job_id)¶
Encodes changing of job_id parameter.
- Parameters:
job_id (int) –
- Returns:
bytes encoded message
- abstract encode_offset(partition_id, offset)¶
Encodes current spider offset in spider feed.
- Parameters:
partition_id (int) –
offset (int) –
- Returns:
bytes encoded message
- class new_frontera.core.codec.BaseDecoder¶
- abstract decode(buffer)¶
Decodes the message.
- Parameters:
buffer (bytes) – encoded message
- Returns:
tuple of message type and related objects
- abstract decode_request(buffer)¶
Decodes Request objects.
- Parameters:
buffer (bytes) – serialized string
- Returns:
object Request
Available codecs¶
MsgPack¶
Module: new_frontera.contrib.backends.remote.codecs.msgpack
JSON¶
A JSON codec for new_frontera. Implemented using native json library.
Module: new_frontera.contrib.backends.remote.codecs.json