Query Methods¶
Although most operations you will do involve directly interacting with known buckets and keys, there are additional ways to get information out of Riak.
Secondary Indexes¶
Objects can be tagged
with secondary index
entries
. Those entries can then
be queried over the bucket
for equality or across ranges.:
bucket = client.bucket("index_test")
# Tag an object with indexes and save
sean = bucket.new("seancribbs")
sean.add_index("fname_bin", "Sean")
sean.add_index("byear_int", 1979)
sean.store()
# Performs an equality query
seans = bucket.get_index("fname_bin", "Sean")
# Performs a range query
eighties = bucket.get_index("byear_int", 1980, 1989)
Secondary indexes are also available via MapReduce
.
Streaming and Paginating Indexes¶
Sometimes the number of results from such a query is too great to
process in one payload, so you can also stream the results
:
for keys in bucket.stream_index("bmonth_int", 1):
# keys is a list of matching keys
print(keys)
Both the regular get_index()
method and
the stream_index()
method allow you to
return the index entry along with the matching key as tuples using the
return_terms
option:
bucket.get_index("byear_int", 1970, 1990, return_terms=True)
# => [(1979, 'seancribbs')]
You can also limit the number of results using the max_results
option, which enables pagination:
results = bucket.get_index("fname_bin", "S", "T", max_results=20)
Optionally you can use paginate_index()
or paginate_stream_index()
to create a
generator of paged results:
for page in bucket.paginate_stream_index("maestro_bin", "Cribbs"):
for key in page:
do_something(key)
page.close()
All of these features are implemented using the
IndexPage
class, which emulates a
list but also supports streaming and capturing the
continuation
, which is a
sort of pointer to the next page of results:
# Detect whether there are more results
if results.has_next_page():
# Fetch the next page of results manually
more = bucket.get_index("fname_bin", "S", "T", max_results=20,
continuation=results.continuation)
# Fetch the next page of results automatically
more = results.next_page()
-
class
riak.client.index_page.
IndexPage
(client, bucket, index, startkey, endkey, return_terms, max_results, term_regex)¶ Encapsulates a single page of results from a secondary index query, with the ability to iterate over results (if not streamed), capture the page marker (continuation), and automatically fetch the next page.
While users will interact with this object, it will be created automatically by the client and does not need to be instantiated elsewhere.
-
continuation
= None¶ The opaque page marker that is used when fetching the next chunk of results. The user can simply call
next_page()
to do so, or pass this to theget_index()
method using thecontinuation
option.
-
has_next_page
()¶ Whether there is another page available, i.e. the response included a continuation.
-
next_page
(timeout=None, stream=None)¶ Fetches the next page using the same parameters as the original query.
Note that if streaming was used before, it will be used again unless overridden.
Parameters: - stream (boolean) – whether to enable streaming. True enables, False disables, None uses previous value.
- timeout (int) – a timeout value in milliseconds, or ‘infinity’
-
__eq__
(other)¶ An IndexPage can pretend to be equal to a list when it has captured results by simply comparing the internal results to the passed list. Otherwise the other object needs to be an equivalent IndexPage.
-
__iter__
()¶ Emulates the iterator interface. When streaming, this means delegating to the stream, otherwise iterating over the existing result set.
-
__getitem__
(index)¶ Fetches an item by index from the captured results.
-
MapReduce¶
RiakMapReduce
allows you to construct query-processing jobs that
are performed mostly in-parallel around the Riak cluster. You can
think of it as a pipeline, where inputs are fed in one end, they pass
through a number of map
and reduce
phases, and then are
returned to the client.
Constructing the query¶
-
class
riak.mapreduce.
RiakMapReduce
(client)¶ The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak. Most methods return the object on which it was called, modified with new information, so you can chain calls together to build the job.
Construct a Map/Reduce object.
Parameters: client ( RiakClient
) – the client that will perform the query
Inputs¶
The first step is to identify the inputs that should be processed. They can be:
- An entire
bucket
- An entire bucket, with the
keys filtered by criteria
- A
list of bucket/key pairs
or bucket/key/data triples - A
fulltext search query
- A
secondary-index query
Adding inputs always returns the RiakMapReduce
object so that you
can chain the construction of the query job.
-
RiakMapReduce.
add_bucket
(bucket, bucket_type=None)¶ Adds all keys in a bucket to the inputs.
Parameters: - bucket (string) – the bucket
- bucket_type (string, None) – Optional name of a bucket type
Return type:
-
RiakMapReduce.
add_key_filters
(key_filters)¶ Adds key filters to the inputs.
Parameters: key_filters (list) – a list of filters Return type: RiakMapReduce
-
RiakMapReduce.
add_key_filter
(*args)¶ Add a single key filter to the inputs.
Parameters: args (list) – a filter Return type: RiakMapReduce
-
RiakMapReduce.
add
(arg1, arg2=None, arg3=None, bucket_type=None)¶ Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg.
Parameters: - arg1 (RiakObject, string) – the object or bucket to add
- arg2 (string, list, None) – a key or list of keys to add (if a bucket is given in arg1)
- arg3 (string, list, dict, None) – key data for this input (must be convertible to JSON)
- bucket_type (string, None) – Optional name of a bucket type
Return type:
-
RiakMapReduce.
add_object
(obj)¶ Adds a RiakObject to the inputs.
Parameters: obj (RiakObject) – the object to add Return type: RiakMapReduce
-
RiakMapReduce.
add_bucket_key_data
(bucket, key, data, bucket_type=None)¶ Adds a bucket/key/keydata triple to the inputs.
Parameters: - bucket (string) – the bucket
- key (string) – the key or list of keys
- data (string, list, dict, None) – the key-specific data
- bucket_type (string, None) – Optional name of a bucket type
Return type:
-
RiakMapReduce.
search
(index, query)¶ Begin a map/reduce operation using a Search. This command will return an error unless executed against a Riak Search cluster.
Parameters: - index (string) – The Solr index used in the search
- query (string) – The search query
Return type:
-
RiakMapReduce.
index
(bucket, index, startkey, endkey=None, bucket_type=None)¶ Begin a map/reduce operation using a Secondary Index query.
Parameters: - bucket (string) – The bucket over which to perform the query
- index (string) – The index to use for query
- startkey (string, integer) – The start key of index range, or the value which all entries must equal
- endkey (string, integer, None) – The end key of index range (if doing a range query)
- bucket_type (string, None) – Optional name of a bucket type
Return type:
-
class
riak.mapreduce.
RiakKeyFilter
(*args)¶ A helper class for building up lists of key filters. Unknown methods are treated as filters to be added;
&
and|
create conjunctions and disjunctions, respectively.+
concatenates filters.Example:
f1 = RiakKeyFilter().starts_with('2005') f2 = RiakKeyFilter().ends_with('-01') f3 = f1 & f2 print(f3) # => [['and', [['starts_with', '2005']], [['ends_with', '-01']]]]
Parameters: args (list) – a list of arguments to be treated as a filter.
Phases¶
The second step is to add processing phases to the query. map
phases load and process individual keys, returning one or more
results, while reduce
phases operate over collections of results
from previous phases. link
phases are a special type of map
phase that extract matching links
from the object, usually so they can be used in a subsequent map
phase.
Any number of phases can return results directly to the client by
passing keep=True
.
-
RiakMapReduce.
map
(function, options=None)¶ Add a map phase to the map/reduce operation.
Parameters: - function (string, list) – Either a named Javascript function (ie: ‘Riak.mapValues’), or an anonymous javascript function (ie: ‘function(…) … ‘ or an array [‘erlang_module’, ‘function’].
- options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
Return type:
-
RiakMapReduce.
reduce
(function, options=None)¶ Add a reduce phase to the map/reduce operation.
Parameters: - function (string, list) – Either a named Javascript function (ie. ‘Riak.reduceSum’), or an anonymous javascript function(ie: ‘function(…) { … }’ or an array [‘erlang_module’, ‘function’].
- options – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
Return type:
-
RiakMapReduce.
link
(bucket='_', tag='_', keep=False)¶ Add a link phase to the map/reduce operation.
Parameters: - bucket (string) – Bucket name (default ‘_’, which means all buckets)
- tag (string) – Tag (default ‘_’, which means any tag)
- keep (boolean) – Flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase)
Return type:
-
class
riak.mapreduce.
RiakMapReducePhase
(type, function, language, keep, arg)¶ The RiakMapReducePhase holds information about a Map or Reduce phase in a RiakMapReduce operation.
Normally you won’t need to use this object directly, but instead call methods on RiakMapReduce objects to add instances to the query.
Construct a RiakMapReducePhase object.
Parameters: - type (string) – the phase type - ‘map’, ‘reduce’, ‘link’
- function (string, list) – the function to execute
- language (string) – ‘javascript’ or ‘erlang’
- keep (boolean) – whether to return the output of this phase in the results.
- arg (string, dict, list) – Additional static value to pass into the map or reduce function.
-
class
riak.mapreduce.
RiakLinkPhase
(bucket, tag, keep)¶ The RiakLinkPhase object holds information about a Link phase in a map/reduce operation.
Normally you won’t need to use this object directly, but instead call
RiakMapReduce.link()
on RiakMapReduce objects to add instances to the query.Construct a RiakLinkPhase object.
Parameters: - bucket (string) –
- The bucket name
- tag (string) – The tag
- keep (boolean) – whether to return results of this phase.
- bucket (string) –
Phase shortcuts¶
A number of commonly-used phases are also available as shortcut methods:
-
RiakMapReduce.
map_values
(options=None)¶ Adds the Javascript built-in
Riak.mapValues
to the query as a map phase.Parameters: options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
map_values_json
(options=None)¶ Adds the Javascript built-in
Riak.mapValuesJson
to the query as a map phase.Parameters: options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
reduce_sum
(options=None)¶ Adds the Javascript built-in
Riak.reduceSum
to the query as a reduce phase.Parameters: options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
reduce_min
(options=None)¶ Adds the Javascript built-in
Riak.reduceMin
to the query as a reduce phase.Parameters: options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
reduce_max
(options=None)¶ Adds the Javascript built-in
Riak.reduceMax
to the query as a reduce phase.Parameters: options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
reduce_sort
(js_cmp=None, options=None)¶ Adds the Javascript built-in
Riak.reduceSort
to the query as a reduce phase.Parameters: - js_cmp (string) – A Javascript comparator function as specified by Array.sort()
- options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
reduce_numeric_sort
(options=None)¶ Adds the Javascript built-in
Riak.reduceNumericSort
to the query as a reduce phase.Parameters: options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
reduce_limit
(limit, options=None)¶ Adds the Javascript built-in
Riak.reduceLimit
to the query as a reduce phase.Parameters: - limit (integer) – the maximum number of results to return
- options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
reduce_slice
(start, end, options=None)¶ Adds the Javascript built-in
Riak.reduceSlice
to the query as a reduce phase.Parameters: - start (integer) – the beginning of the slice
- end (integer) – the end of the slice
- options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
-
RiakMapReduce.
filter_not_found
(options=None)¶ Adds the Javascript built-in
Riak.filterNotFound
to the query as a reduce phase.Parameters: options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
Execution¶
Query results can either be executed in one round-trip, or streamed
back to the client. The format of results will depend on the structure
of the map
and reduce
phases the query contains.
-
RiakMapReduce.
run
(timeout=None)¶ Run the map/reduce operation synchronously. Returns a list of results, or a list of links if the last phase is a link phase. Shortcut for
riak.client.RiakClient.mapred()
.Parameters: timeout (integer, None) – Timeout in milliseconds Return type: list
-
RiakMapReduce.
stream
(timeout=None)¶ Streams the MapReduce query (returns an iterator). Shortcut for
riak.client.RiakClient.stream_mapred()
.Parameters: timeout (integer) – Timeout in milliseconds Return type: iterator that yields (phase_num, data) tuples
Shortcut constructors¶
RiakObject
contains some shortcut methods
that make it more convenient to begin constructing
RiakMapReduce
queries.
-
RiakObject.
add
(arg1, arg2=None, arg3=None, bucket_type=None)¶ Start assembling a Map/Reduce operation. A shortcut for
add()
.Parameters: - arg1 (RiakObject, string) – the object or bucket to add
- arg2 (string, list, None) – a key or list of keys to add (if a bucket is given in arg1)
- arg3 (string, list, dict, None) – key data for this input (must be convertible to JSON)
- bucket_type (string, None) – Optional name of a bucket type
Return type:
-
RiakObject.
link
(*args)¶ Start assembling a Map/Reduce operation. A shortcut for
link()
.Return type: RiakMapReduce
-
RiakObject.
map
(*args)¶ Start assembling a Map/Reduce operation. A shortcut for
map()
.Return type: RiakMapReduce
-
RiakObject.
reduce
(*args)¶ Start assembling a Map/Reduce operation. A shortcut for
reduce()
.Return type: RiakMapReduce
Riak Search 2.0 (Yokozuna)¶
With Riak 2.0 came the introduction of Riak Search 2.0, a.k.a Yokozuna (the top rank in sumo). Riak Search 2.0 is an integration of Solr (for indexing and querying) and Riak (for storage and distribution). It allows for distributed, scalable, fault-tolerant, transparent indexing and querying of Riak values. After connecting a bucket (or bucket type) to a Apache Solr index, you simply write values (such as JSON, XML, plain text, Data Types, etc.) into Riak as normal, and then query those indexed values using the Solr API. Unlike traditional Riak data, however, Solr needs to know the format of the stored data so it can index it. Solr is a document-based search engine so it treats each value stored in Riak as a document.
Creating a schema¶
The first thing which needs to be done is to define a Solr schema for
your data. Riak Search comes bundled with a default schema named
_yz_default
. It defaults to many dynamic field types, where the
suffix defines its type. This is an easy path to start development,
but we recommend in production that you define your own schema.
You can find information about defining your own schema at Search Schema, with a short section dedicated to the default schema.
Here is a brief example of creating a custom schema with
create_search_schema()
:
content = """<?xml version="1.0" encoding="UTF-8" ?>
<schema name="test" version="1.5">
<fields>
<field name="_yz_id" type="_yz_str" indexed="true" stored="true"
multiValued="false" required="true" />
<field name="_yz_ed" type="_yz_str" indexed="true" stored="true"
multiValued="false" />
<field name="_yz_pn" type="_yz_str" indexed="true" stored="true"
multiValued="false" />
<field name="_yz_fpn" type="_yz_str" indexed="true" stored="true"
multiValued="false" />
<field name="_yz_vtag" type="_yz_str" indexed="true" stored="true"
multiValued="false" />
<field name="_yz_rk" type="_yz_str" indexed="true" stored="true"
multiValued="false" />
<field name="_yz_rb" type="_yz_str" indexed="true" stored="true"
multiValued="false" />
<field name="_yz_rt" type="_yz_str" indexed="true" stored="true"
multiValued="false" />
<field name="_yz_err" type="_yz_str" indexed="true"
multiValued="false" />
</fields>
<uniqueKey>_yz_id</uniqueKey>
<types>
<fieldType name="_yz_str" class="solr.StrField"
sortMissingLast="true" />
</types>
</schema>"""
schema_name = 'jalapeno'
client.create_search_schema(schema_name, content)
If you would like to retrieve the current XML Solr schema,
get_search_schema()
is available:
schema = client.get_search_schema('jalapeno')
Solr indexes¶
Once a schema has been created, then a Solr index must also be created.
This index represents a collection of similar data that you use to perform
queries. When creating an index with
create_search_index()
, you can optionally
specify a schema. If you do not, the default schema will be used:
client.create_search_index('nacho')
Likewise you can specify a schema, e.g. the index "nacho"
is
associated with the schema "jalapeno"
:
client.create_search_index('nacho', 'jalapeno')
Just as easily you can delete an index with
delete_search_index()
:
client.delete_search_index('jalapeno')
A single index can be retrieved with
get_search_index()
or all of them
with list_search_indexes()
:
index = client.get_search_index('jalapeno')
name = index['name']
schema = index['schema']
indexes = client.list_search_indexes()
first_nval = indexes[0]['n_val']
Note
Note that index names may only be ASCII values from 32-127 (spaces, standard punctuation, digits and word characters). This may change in the future to allow full unicode support.
More discussion about Riak Search 2.0 Indexes can be found at Indexes.
Linking a bucket type to an index¶
The last step to setting up Riak Search 2.0 is to link a Bucket Type to a Solr index. This lets Riak know when to index values. This can be done via the command line:
riak-admin bucket-type create spicy '{"props":{"search_index":"jalapeno"}}'
riak-admin bucket-type activate spicy
Or simply create an empty Bucket Type:
riak-admin bucket-type create spicy '{"props":{}}'
riak-admin bucket-type activate spicy
Then change the bucket properties on the associated bucket or Bucket Type:
b = client.bucket('peppers')
b.set_property('search_index', 'jalapeno')
btype = client.bucket_type('spicy')
btype.set_property('search_index', 'jalapeno')
Querying an index¶
Once the schema, index and bucket properties have all been properly configured, adding data is as simple as writing to Riak. Solr is automatically updated.
To query, on the other hand, is as easy as writing Solr queries. This allows for the full use of existing Solr tools as well as its rich semantics.
Here is a brief example of loading and querying data::
bucket = self.client.bucket('peppers')
bucket.new("bell", {"name_s": "bell", "scoville_low_i": 0,
"scoville_high_i": 0}).store()
bucket.new("anaheim", {"name_s": "anaheim", "scoville_low_i": 1000,
"scoville_high_i": 2500}).store()
bucket.new("chipotle", {"name_s": "chipotle", "scoville_low_i": 3500,
"scoville_high_i": 10000}).store()
bucket.new("serrano", {"name_s": "serrano", "scoville_low_i": 10000,
"scoville_high_i": 23000}).store()
bucket.new("habanero", {"name_s": "habanero", "scoville_low_i": 100000,
"scoville_high_i": 350000}).store()
results = bucket.search("name_s:/c.*/", index='jalapeno')
# Yields single document 'chipotle'
print(results['docs'][0]['name_s'])
results = bucket.search("scoville_high_i:[20000 TO 500000]")
# Yields two documents
for result in results['docs']:
print(result['name_s'])
results = bucket.search('name_s:*', index='jalapeno',
sort="scoville_low_i desc")
# Yields all documents, sorted in descending order. We take the top one
print("The hottest pepper is {0}".format(results['docs'][0]['name_s']))
The results returned by search()
is a dictionary
with lots of search metadata like the number of results, the maxium
Lucene Score
as well as the matching documents.
When querying on Data Types the datatype is the name of the field used in Solr since they do not fit into the default schema, e.g.:
riak-admin bucket-type create visitors '{"props":{"datatype": "counter}}'
riak-admin bucket-type activate visitors
client.create_search_index('website')
bucket = client.bucket_type('visitors').bucket('hits')
bucket.set_property('search_index', 'website')
site = bucket.new('bbc.co.uk')
site.increment(80)
site.store()
site = bucket.new('cnn.com')
site.increment(150)
site.store()
site = bucket.new('abc.net.au')
site.increment(24)
site.store()
results = bucket.search("counter:[10 TO *]", index='website',
sort="counter desc", rows=5)
# Assume you have a bucket-type named "profiles" that has datatype
# "map". Let's create and search an index containing maps.
client.create_search_index('user-profiles')
bucket = client.bucket_type('profiles').bucket('USA')
bucket.set_property('search_index', 'user-profiles')
brett = bucket.new()
brett.registers['fname'].assign("Brett")
brett.registers['lname'].assign("Hazen")
brett.sets['emails'].add('spam@basho.com')
brett.counters['visits'].increment()
brett.maps['pages'].counters['homepage'].increment()
brett.update()
# Note that the field name in the index/schema is the field name in
# the map joined with its type by an underscore. Deeply embedded
# fields are joined with their parent field names by an underscore.
results = bucket.search('lname_register:Hazen AND pages_map_homepage_counter:[1 TO *]',
index='user-profiles')
Details on querying Riak Search 2.0 can be found at Querying.