Data Transport

Basic Interface

class reip.interface.Sink[source]

An abstract interface representing a place to put data.

Essentially, a put-only queue.

spawn()[source]

An optional method that allows a sink to perform initialization.

join()[source]

An optional method that allows a sink to perform cleanup.

full()[source]

Is the queue full?

wait(timeout=None)[source]

Wait until the queue has space for another element.

put(buffer, block=True, timeout=None)[source]

Put an element in the sink.

put_nowait(buffer)[source]

Put an element in the sink, dropping the value if the queue is full.

class reip.interface.Source(strategy='all', skip=0, default=None)[source]

An abstract interface representing a place to get data.

Essentially, a get-only queue.

empty()[source]

Is the Source queue empty?

last()[source]

Returns True if there is only one element left.

wait(timeout=None)[source]

Wait until an element is ready.

get(block=True, timeout=None)[source]

Get the next element in the queue.

get_nowait()[source]

Get the next element in the queue, returning None if no value is available.

Mechanics

class reip.stores.customer.Customer(source, index, store_id, **kw)[source]

A Source object to read from a reip.stores.producer.Producer

next()[source]

Increment this cursor.

property cursor

This customer’s reader cursor. It marks the index of the data within the Producer’s Store.

property store

Links to the corresponding store from the Producer. This will vary depending on the configuration across Tasks.

class reip.stores.producer.Producer(size=100, delete_rate=10, task_id=|(((( reip UNSET ))))|, skip_no_readers=True, **kw)[source]
spawn()[source]

An optional method that allows a sink to perform initialization.

join()[source]

An optional method that allows a sink to perform cleanup.

full()[source]

Is the queue full?

gen_source(task_id=|(((( reip UNSET ))))|, throughput='small', **kw)[source]

Generate a source from this sink.

Parameters
  • task_id (str) – the identifier for the task.

  • throughput (str) –

    The size of data. This determines the serialization method to use. Should be:

    • ’small’ for data < 1GB

    • ’medium’ for data < 3GB

    • ’large’ for data > 3GB

  • **kwargs – arguments to pass to store.Customer.

Stores

class reip.stores.store.Store(size)[source]

A basic store that stores it’s elements in a list.

class reip.stores.queue_store.QueueCustomer(*a, serializer=None, **kw)[source]

A customer that requests values from a Queue Store.

empty()[source]

Is the Source queue empty?

next()[source]

Increment this cursor.

class reip.stores.queue_store.QueueStore(*a, **kw)[source]

A Store that will push values through a queue when a Customer requests.

Customer

alias of reip.stores.queue_store.QueueCustomer

class reip.stores.plasma.PlasmaStore(size, plasma_socket=None)[source]

Store that puts data in plasma store.

Requires running a plasma store process.

e.g.

plasma_store -s $TMPDIR/plasma -m 2000000000
class reip.stores.base.BaseStore[source]

A base class for Stores.

class Customer(source, index, store_id, **kw)

A Source object to read from a reip.stores.producer.Producer

property cursor

This customer’s reader cursor. It marks the index of the data within the Producer’s Store.

next()

Increment this cursor.

property store

Links to the corresponding store from the Producer. This will vary depending on the configuration across Tasks.

Queues

reip.stores.queue.get_serializer(name)[source]

Returns an object with a loads and dumps member.