Module riak_pipe_fitting

The coordinator process that hold the details for the fitting.

Behaviours: gen_fsm.

Description

The coordinator process that hold the details for the fitting. This process also manages the end-of-inputs synchronization for this stage of the pipeline.

Data Types

details()

details() = #fitting_details{fitting = #fitting{pid = pid(), ref = reference(), chashfun = riak_pipe_vnode:chashfun(), nval = riak_pipe_vnode:nval()}, name = term(), module = atom(), arg = term(), output = #fitting{pid = pid(), ref = reference(), chashfun = riak_pipe_vnode:chashfun(), nval = riak_pipe_vnode:nval()}, options = riak_pipe:exec_opts(), q_limit = pos_integer()}

state()

abstract datatype: state()

Function Index

code_change/4Unused.
eoi/1Send an end-of-inputs message to the specified coordinator.
format_name/1Coerce a fitting name into a printable string.
get_details/2Request the details about this fitting.
handle_event/3Unused.
handle_info/3The non-gen_fsm message that this process expects is 'DOWN'.
handle_sync_event/4The only sync event handled in all states is workers, which retrieves a list of ring partition indexes that have requested the fitting details (i.e.
init/1Initialize the coordinator.
start_link/4Start the coordinator, according to the Spec given.
terminate/3Unused.
validate_fitting/1Ensure that a fitting specification is valid.
wait_upstream_eoi/2The coordinator is just hanging out, serving details and waiting for end-of-inputs.
wait_upstream_eoi/3The coordinator is just hanging out, serving details and waiting for end-of-inputs.
wait_workers_done/3The coordinator has forwarded the end-of-inputs signal to all of the vnodes working for it, and is waiting for done responses.
worker_done/1Tell the coordinator that this worker is done.
workers/1Get the list of ring partition indexes (vnodes) that are doing work for this coordinator.

Function Details

code_change/4

code_change(OldVsn::term(), StateName::atom(), State::state(), Extra::term()) -> {ok, atom(), state()}

Unused.

eoi/1

eoi(Fitting::riak_pipe:fitting()) -> ok

Send an end-of-inputs message to the specified coordinator.

format_name/1

format_name(Name::term()) -> iolist()

Coerce a fitting name into a printable string.

get_details/2

get_details(Fitting::riak_pipe:fitting(), Partition::riak_pipe_vnode:partition()) -> {ok, details()} | gone

Request the details about this fitting. The ring partition index of the vnode requesting the details is included such that the coordinator can inform the vnode of end-of-inputs later. This function assumes that it is being called from the vnode process, so the self() can be used to give the coordinator a pid to monitor.

handle_event/3

handle_event(Event::term(), StateName::atom(), State::state()) -> {next_state, atom(), state()}

Unused.

handle_info/3

handle_info(Info::{'DOWN', reference(), term(), term(), term()}, StateName::atom(), State::state()) -> {next_state, atom(), state()} | {stop, normal, state()}

The non-gen_fsm message that this process expects is 'DOWN'.

'DOWN' messages are received when monitored vnodes exit. In that case, the vnode is removed from the worker list. If that was also the last vnode we were waiting on a done message from, also forward eoi and shut down the coordinator.

handle_sync_event/4

handle_sync_event(Event::workers, From::term(), StateName::atom(), State::state()) -> {reply, [riak_pipe_vnode:partition()], atom(), state()}

The only sync event handled in all states is workers, which retrieves a list of ring partition indexes that have requested the fitting details (i.e. that are doing work for this coordinator).

init/1

init(X1::[pid() | riak_pipe:fitting_spec() | riak_pipe:fitting() | riak_pipe:exec_opts()]) -> {ok, wait_upstream_eoi, state()}

Initialize the coordinator. This function monitors the builder process, so it will tear down if the builder exits.

start_link/4

start_link(Builder::pid(), Spec::riak_pipe:fitting_spec(), Output::riak_pipe:fitting(), Options::riak_pipe:exec_opts()) -> {ok, pid(), riak_pipe:fitting()} | ignore | {error, term()}

Start the coordinator, according to the Spec given. The coordinator will register with Builder and will request its outputs to be processed under the Output fitting.

terminate/3

terminate(Reason::term(), StateName::atom(), State::state()) -> ok

Unused.

validate_fitting/1

validate_fitting(Fitting_spec::riak_pipe:fitting_spec()) -> ok

Ensure that a fitting specification is valid. This function will check that the module is an atom that names a valid module (see riak_pipe_v:validate_module/2), that the arg is valid for the module (see validate_argument/2), and that the partition function is of the proper form (see validate_chashfun/1). It also checks that nval is undefined or a postive integer.

If all components are valid, the atom ok is returned. If any piece is invalid, {badarg, #fitting_spec.name, ErrorMsg} is thrown.

wait_upstream_eoi/2

wait_upstream_eoi(X1::eoi, State::state()) -> {stop, normal, state()} | {next_state, wait_workers_done, state()}

The coordinator is just hanging out, serving details and waiting for end-of-inputs.

When it gets eoi, it forwards the signal to its workers, and then begins waiting for them to respond done. If it has no workers when it receives end-of-inputs, the coordinator stops immediately.

wait_upstream_eoi/3

wait_upstream_eoi(M::{get_details, riak_pipe_vnode:partition(), pid()}, From::term(), State::state()) -> {reply, {ok, details()}, wait_upstream_eoi, state()}

wait_upstream_eoi(M::{done, pid()}, From::term(), State::state()) -> {reply, ok, wait_upstream_eoi, state()}

The coordinator is just hanging out, serving details and waiting for end-of-inputs.

When it gets a request for the fitting details, it sets up a monitor for the working vnode, and responds with details.

The coordinator may receive a done message from a vnode before eoi has been sent, if handoff causes the worker to relocate. In this case, the coordinator simply demonitors the vnode, and removes it from its worker list.

wait_workers_done/3

wait_workers_done(M::{get_details, riak_pipe_vnode:partition(), pid()}, From::term(), State::state()) -> {reply, {ok, details()}, wait_workers_done, state()}

wait_workers_done(M::{done, pid()}, From::term(), State::state()) -> {reply, ok, wait_workers_done, state()} | {stop, normal, ok, state()}

The coordinator has forwarded the end-of-inputs signal to all of the vnodes working for it, and is waiting for done responses.

When the coordinator receives a done response, it demonitors the vnode that sent it, and removes it from its worker list. If there are no more responses to wait for, the coordinator forwards the end-of-inputs signal to the coordinator for the next fitting in the pipe, and then shuts down normally.

If the coordinator receives a request for details from a vnode while in this state, it responds with the detail as usual, but also immediately sends end-of-inputs to that vnode.

worker_done/1

worker_done(Fitting::riak_pipe:fitting()) -> ok | gone

Tell the coordinator that this worker is done. This function assumes that it is being called from the vnode process, so that self() can be used to inform the coordinator of which worker is done.

workers/1

workers(Fitting::pid()) -> {ok, [riak_pipe_vnode:partition()]} | gone

Get the list of ring partition indexes (vnodes) that are doing work for this coordinator.


Generated by EDoc