API

Top level user functions:

Bag(dsk, name, npartitions)

Parallel collection of Python objects

Bag.all(self[, split_every])

Are all elements truthy?

Bag.any(self[, split_every])

Are any of the elements truthy?

Bag.compute(self, \*\*kwargs)

Compute this dask collection

Bag.count(self[, split_every])

Count the number of elements.

Bag.distinct(self[, key])

Distinct elements of collection

Bag.filter(self, predicate)

Filter elements in collection by a predicate function.

Bag.flatten(self)

Concatenate nested lists into one long list.

Bag.fold(self, binop[, combine, initial, …])

Parallelizable reduction

Bag.foldby(self, key, binop[, initial, …])

Combined reduction and groupby.

Bag.frequencies(self[, split_every, sort])

Count number of occurrences of each distinct element.

Bag.groupby(self, grouper[, method, …])

Group collection by key function

Bag.join(self, other, on_self[, on_other])

Joins collection with another collection.

Bag.map(self, func, \*args, \*\*kwargs)

Apply a function elementwise across one or more bags.

Bag.map_partitions(self, func, \*args, …)

Apply a function to every partition across one or more bags.

Bag.max(self[, split_every])

Maximum element

Bag.mean(self)

Arithmetic mean

Bag.min(self[, split_every])

Minimum element

Bag.pluck(self, key[, default])

Select item from all tuples/dicts in collection.

Bag.product(self, other)

Cartesian product between two bags.

Bag.reduction(self, perpartition, aggregate)

Reduce collection with reduction operators.

Bag.random_sample(self, prob[, random_state])

Return elements from bag with probability of prob.

Bag.remove(self, predicate)

Remove elements in collection that match predicate.

Bag.repartition(self[, npartitions, …])

Repartition Bag across new divisions.

Bag.starmap(self, func, \*\*kwargs)

Apply a function using argument tuples from the given bag.

Bag.std(self[, ddof])

Standard deviation

Bag.sum(self[, split_every])

Sum all elements

Bag.take(self, k[, npartitions, compute, warn])

Take the first k elements.

Bag.to_dataframe(self[, meta, columns])

Create Dask Dataframe from a Dask Bag.

Bag.to_delayed(self[, optimize_graph])

Convert into a list of dask.delayed objects, one per partition.

Bag.to_textfiles(b, path[, name_function, …])

Write dask Bag to disk, one filename per partition, one line per element.

Bag.to_avro(b, filename, schema[, …])

Write bag to set of avro files

Bag.topk(self, k[, key, split_every])

K largest elements in collection

Bag.var(self[, ddof])

Variance

Bag.visualize(self[, filename, format, …])

Render the computation of this object’s task graph using graphviz.

Create Bags

from_sequence(seq[, partition_size, npartitions])

Create a dask Bag from Python sequence.

from_delayed(values)

Create bag from many dask Delayed objects.

read_text(urlpath[, blocksize, compression, …])

Read lines from text files

from_url(urls)

Create a dask Bag from a url.

read_avro(urlpath[, blocksize, …])

Read set of avro files

range(n, npartitions)

Numbers from zero to n

Top-level functions

concat(bags)

Concatenate many bags together, unioning all elements.

map(func, \*args, \*\*kwargs)

Apply a function elementwise across one or more bags.

map_partitions(func, \*args, \*\*kwargs)

Apply a function to every partition across one or more bags.

zip(\*bags)

Partition-wise bag zip

random.choices(population[, k])

Return a k sized list of elements chosen with replacement.

random.sample(population, k)

Chooses k unique random elements from a bag.

Turn Bags into other things

Bag.to_textfiles(b, path[, name_function, …])

Write dask Bag to disk, one filename per partition, one line per element.

Bag.to_dataframe(self[, meta, columns])

Create Dask Dataframe from a Dask Bag.

Bag.to_delayed(self[, optimize_graph])

Convert into a list of dask.delayed objects, one per partition.

Bag.to_avro(b, filename, schema[, …])

Write bag to set of avro files

Bag methods

class dask.bag.Bag(dsk, name, npartitions)

Parallel collection of Python objects

Examples

Create Bag from sequence

>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.filter(lambda x: x % 2 == 0).map(lambda x: x * 10))  
[0, 20, 40]

Create Bag from filename or globstring of filenames

>>> b = db.read_text('/path/to/mydata.*.json.gz').map(json.loads)  

Create manually (expert use)

>>> dsk = {('x', 0): (range, 5),
...        ('x', 1): (range, 5),
...        ('x', 2): (range, 5)}
>>> b = Bag(dsk, 'x', npartitions=3)
>>> sorted(b.map(lambda x: x * 10))  
[0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40]
>>> int(b.fold(lambda x, y: x + y))  
30
accumulate(self, binop, initial='__no__default__')

Repeatedly apply binary function to a sequence, accumulating results.

This assumes that the bag is ordered. While this is typically the case not all Dask.bag functions preserve this property.

Examples

>>> from operator import add
>>> b = from_sequence([1, 2, 3, 4, 5], npartitions=2)
>>> b.accumulate(add).compute()  
[1, 3, 6, 10, 15]

Accumulate also takes an optional argument that will be used as the first value.

>>> b.accumulate(add, initial=-1)  
[-1, 0, 2, 5, 9, 14]
all(self, split_every=None)

Are all elements truthy?

any(self, split_every=None)

Are any of the elements truthy?

count(self, split_every=None)

Count the number of elements.

distinct(self, key=None)

Distinct elements of collection

Unordered without repeats.

Parameters
key: {callable,str}

Defines uniqueness of items in bag by calling key on each item. If a string is passed key is considered to be lambda x: x[key].

Examples

>>> b = from_sequence(['Alice', 'Bob', 'Alice'])
>>> sorted(b.distinct())
['Alice', 'Bob']
>>> b = from_sequence([{'name': 'Alice'}, {'name': 'Bob'}, {'name': 'Alice'}])
>>> b.distinct(key=lambda x: x['name']).compute()
[{'name': 'Alice'}, {'name': 'Bob'}]
>>> b.distinct(key='name').compute()
[{'name': 'Alice'}, {'name': 'Bob'}]
filter(self, predicate)

Filter elements in collection by a predicate function.

>>> def iseven(x):
...     return x % 2 == 0
>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.filter(iseven))  
[0, 2, 4]
flatten(self)

Concatenate nested lists into one long list.

>>> b = from_sequence([[1], [2, 3]])
>>> list(b)
[[1], [2, 3]]
>>> list(b.flatten())
[1, 2, 3]
fold(self, binop, combine=None, initial='__no__default__', split_every=None, out_type=<class 'dask.bag.core.Item'>)

Parallelizable reduction

Fold is like the builtin function reduce except that it works in parallel. Fold takes two binary operator functions, one to reduce each partition of our dataset and another to combine results between partitions

  1. binop: Binary operator to reduce within each partition

  2. combine: Binary operator to combine results from binop

Sequentially this would look like the following:

>>> intermediates = [reduce(binop, part) for part in partitions]  
>>> final = reduce(combine, intermediates)  

If only one function is given then it is used for both functions binop and combine as in the following example to compute the sum:

>>> def add(x, y):
...     return x + y
>>> b = from_sequence(range(5))
>>> b.fold(add).compute()  
10

In full form we provide both binary operators as well as their default arguments

>>> b.fold(binop=add, combine=add, initial=0).compute()  
10

More complex binary operators are also doable

>>> def add_to_set(acc, x):
...     ''' Add new element x to set acc '''
...     return acc | set([x])
>>> b.fold(add_to_set, set.union, initial=set()).compute()  
{1, 2, 3, 4, 5}

See also

Bag.foldby
foldby(self, key, binop, initial='__no__default__', combine=None, combine_initial='__no__default__', split_every=None)

Combined reduction and groupby.

Foldby provides a combined groupby and reduce for efficient parallel split-apply-combine tasks.

The computation

>>> b.foldby(key, binop, init)                        

is equivalent to the following:

>>> def reduction(group):                               
...     return reduce(binop, group, init)               
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))

But uses minimal communication and so is much faster.

>>> b = from_sequence(range(10))
>>> iseven = lambda x: x % 2 == 0
>>> add = lambda x, y: x + y
>>> dict(b.foldby(iseven, add))                         
{True: 20, False: 25}

Key Function

The key function determines how to group the elements in your bag. In the common case where your bag holds dictionaries then the key function often gets out one of those elements.

>>> def key(x):
...     return x['name']

This case is so common that it is special cased, and if you provide a key that is not a callable function then dask.bag will turn it into one automatically. The following are equivalent:

>>> b.foldby(lambda x: x['name'], ...)  
>>> b.foldby('name', ...)  

Binops

It can be tricky to construct the right binary operators to perform analytic queries. The foldby method accepts two binary operators, binop and combine. Binary operators two inputs and output must have the same type.

Binop takes a running total and a new element and produces a new total:

>>> def binop(total, x):
...     return total + x['amount']

Combine takes two totals and combines them:

>>> def combine(total1, total2):
...     return total1 + total2

Each of these binary operators may have a default first value for total, before any other value is seen. For addition binary operators like above this is often 0 or the identity element for your operation.

split_every

Group partitions into groups of this size while performing reduction. Defaults to 8.

>>> b.foldby('name', binop, 0, combine, 0)  

See also

toolz.reduceby
pyspark.combineByKey

Examples

We can compute the maximum of some (key, value) pairs, grouped by the key. (You might be better off converting the Bag to a dask.dataframe and using its groupby).

>>> import random
>>> import dask.bag as db
>>> tokens = list('abcdefg')
>>> values = range(10000)
>>> a = [(random.choice(tokens), random.choice(values))
...       for _ in range(100)]
>>> a[:2]  
[('g', 676), ('a', 871)]
>>> a = db.from_sequence(a)
>>> def binop(t, x):
...     return max((t, x), key=lambda x: x[1])
>>> a.foldby(lambda x: x[0], binop).compute()  
[('g', ('g', 984)),
 ('a', ('a', 871)),
 ('b', ('b', 999)),
 ('c', ('c', 765)),
 ('f', ('f', 955)),
 ('e', ('e', 991)),
 ('d', ('d', 854))]
frequencies(self, split_every=None, sort=False)

Count number of occurrences of each distinct element.

>>> b = from_sequence(['Alice', 'Bob', 'Alice'])
>>> dict(b.frequencies())  
{'Alice': 2, 'Bob', 1}
groupby(self, grouper, method=None, npartitions=None, blocksize=1048576, max_branch=None, shuffle=None)

Group collection by key function

This requires a full dataset read, serialization and shuffle. This is expensive. If possible you should use foldby.

Parameters
grouper: function

Function on which to group elements

shuffle: str

Either ‘disk’ for an on-disk shuffle or ‘tasks’ to use the task scheduling framework. Use ‘disk’ if you are on a single machine and ‘tasks’ if you are on a distributed cluster.

npartitions: int

If using the disk-based shuffle, the number of output partitions

blocksize: int

If using the disk-based shuffle, the size of shuffle blocks (bytes)

max_branch: int

If using the task-based shuffle, the amount of splitting each partition undergoes. Increase this for fewer copies but more scheduler overhead.

See also

Bag.foldby

Examples

>>> b = from_sequence(range(10))
>>> iseven = lambda x: x % 2 == 0
>>> dict(b.groupby(iseven))  
{True: [0, 2, 4, 6, 8], False: [1, 3, 5, 7, 9]}
join(self, other, on_self, on_other=None)

Joins collection with another collection.

Other collection must be one of the following:

  1. An iterable. We recommend tuples over lists for internal performance reasons.

  2. A delayed object, pointing to a tuple. This is recommended if the other collection is sizable and you’re using the distributed scheduler. Dask is able to pass around data wrapped in delayed objects with greater sophistication.

  3. A Bag with a single partition

You might also consider Dask Dataframe, whose join operations are much more heavily optimized.

Parameters
other: Iterable, Delayed, Bag

Other collection on which to join

on_self: callable

Function to call on elements in this collection to determine a match

on_other: callable (defaults to on_self)

Function to call on elements in the other collection to determine a match

Examples

>>> people = from_sequence(['Alice', 'Bob', 'Charlie'])
>>> fruit = ['Apple', 'Apricot', 'Banana']
>>> list(people.join(fruit, lambda x: x[0]))  
[('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')]
map(self, func, *args, **kwargs)

Apply a function elementwise across one or more bags.

Note that all Bag arguments must be partitioned identically.

Parameters
funccallable
*args, **kwargsBag, Item, or object

Extra arguments and keyword arguments to pass to func after the calling bag instance. Non-Bag args/kwargs are broadcasted across all calls to func.

Notes

For calls with multiple Bag arguments, corresponding partitions should have the same length; if they do not, the call will error at compute time.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence(range(5), npartitions=2)
>>> b2 = db.from_sequence(range(5, 10), npartitions=2)

Apply a function to all elements in a bag:

>>> b.map(lambda x: x + 1).compute()
[1, 2, 3, 4, 5]

Apply a function with arguments from multiple bags:

>>> from operator import add
>>> b.map(add, b2).compute()
[5, 7, 9, 11, 13]

Non-bag arguments are broadcast across all calls to the mapped function:

>>> b.map(add, 1).compute()
[1, 2, 3, 4, 5]

Keyword arguments are also supported, and have the same semantics as regular arguments:

>>> def myadd(x, y=0):
...     return x + y
>>> b.map(myadd, y=b2).compute()
[5, 7, 9, 11, 13]
>>> b.map(myadd, y=1).compute()
[1, 2, 3, 4, 5]

Both arguments and keyword arguments can also be instances of dask.bag.Item. Here we’ll add the max value in the bag to each element:

>>> b.map(myadd, b.max()).compute()
[4, 5, 6, 7, 8]
map_partitions(self, func, *args, **kwargs)

Apply a function to every partition across one or more bags.

Note that all Bag arguments must be partitioned identically.

Parameters
funccallable

The function to be called on every partition. This function should expect an Iterator or Iterable for every partition and should return an Iterator or Iterable in return.

*args, **kwargsBag, Item, Delayed, or object

Arguments and keyword arguments to pass to func. Partitions from this bag will be the first argument, and these will be passed after.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence(range(1, 101), npartitions=10)
>>> def div(nums, den=1):
...     return [num / den for num in nums]

Using a python object:

>>> hi = b.max().compute()
>>> hi
100
>>> b.map_partitions(div, den=hi).take(5)
(0.01, 0.02, 0.03, 0.04, 0.05)

Using an Item:

>>> b.map_partitions(div, den=b.max()).take(5)
(0.01, 0.02, 0.03, 0.04, 0.05)

Note that while both versions give the same output, the second forms a single graph, and then computes everything at once, and in some cases may be more efficient.

max(self, split_every=None)

Maximum element

mean(self)

Arithmetic mean

min(self, split_every=None)

Minimum element

pluck(self, key, default='__no__default__')

Select item from all tuples/dicts in collection.

>>> b = from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]},
...                    {'name': 'Bob',   'credits': [10, 20]}])
>>> list(b.pluck('name'))  
['Alice', 'Bob']
>>> list(b.pluck('credits').pluck(0))  
[1, 10]
product(self, other)

Cartesian product between two bags.

random_sample(self, prob, random_state=None)

Return elements from bag with probability of prob.

Parameters
probfloat

A float between 0 and 1, representing the probability that each element will be returned.

random_stateint or random.Random, optional

If an integer, will be used to seed a new random.Random object. If provided, results in deterministic sampling.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.random_sample(0.5, 43))
[0, 3, 4]
>>> list(b.random_sample(0.5, 43))
[0, 3, 4]
reduction(self, perpartition, aggregate, split_every=None, out_type=<class 'dask.bag.core.Item'>, name=None)

Reduce collection with reduction operators.

Parameters
perpartition: function

reduction to apply to each partition

aggregate: function

reduction to apply to the results of all partitions

split_every: int (optional)

Group partitions into groups of this size while performing reduction Defaults to 8

out_type: {Bag, Item}

The out type of the result, Item if a single element, Bag if a list of elements. Defaults to Item.

Examples

>>> b = from_sequence(range(10))
>>> b.reduction(sum, sum).compute()
45
remove(self, predicate)

Remove elements in collection that match predicate.

>>> def iseven(x):
...     return x % 2 == 0
>>> import dask.bag as db
>>> b = db.from_sequence(range(5))
>>> list(b.remove(iseven))  
[1, 3]
repartition(self, npartitions=None, partition_size=None)

Repartition Bag across new divisions.

Parameters
npartitionsint, optional

Number of partitions of output.

partition_sizeint or string, optional

Max number of bytes of memory for each partition. Use numbers or strings like 5MB.

Warning

This keyword argument triggers computation to determine the memory size of each partition, which may be expensive.

Notes

Exactly one of npartitions or partition_size should be specified. A ValueError will be raised when that is not the case.

Examples

>>> b.repartition(5)  # set to have 5 partitions  
starmap(self, func, **kwargs)

Apply a function using argument tuples from the given bag.

This is similar to itertools.starmap, except it also accepts keyword arguments. In pseudocode, this is could be written as:

>>> def starmap(func, bag, **kwargs):
...     return (func(*args, **kwargs) for args in bag)
Parameters
funccallable
**kwargsItem, Delayed, or object, optional

Extra keyword arguments to pass to func. These can either be normal objects, dask.bag.Item, or dask.delayed.Delayed.

Examples

>>> import dask.bag as db
>>> data = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
>>> b = db.from_sequence(data, npartitions=2)

Apply a function to each argument tuple:

>>> from operator import add
>>> b.starmap(add).compute()
[3, 7, 11, 15, 19]

Apply a function to each argument tuple, with additional keyword arguments:

>>> def myadd(x, y, z=0):
...     return x + y + z
>>> b.starmap(myadd, z=10).compute()
[13, 17, 21, 25, 29]

Keyword arguments can also be instances of dask.bag.Item or dask.delayed.Delayed:

>>> max_second = b.pluck(1).max()
>>> max_second.compute()
10
>>> b.starmap(myadd, z=max_second).compute()
[13, 17, 21, 25, 29]
std(self, ddof=0)

Standard deviation

property str

String processing functions

Examples

>>> import dask.bag as db
>>> b = db.from_sequence(['Alice Smith', 'Bob Jones', 'Charlie Smith'])
>>> list(b.str.lower())
['alice smith', 'bob jones', 'charlie smith']
>>> list(b.str.match('*Smith'))
['Alice Smith', 'Charlie Smith']
>>> list(b.str.split(' '))
[['Alice', 'Smith'], ['Bob', 'Jones'], ['Charlie', 'Smith']]
sum(self, split_every=None)

Sum all elements

take(self, k, npartitions=1, compute=True, warn=True)

Take the first k elements.

Parameters
kint

The number of elements to return

npartitionsint, optional

Elements are only taken from the first npartitions, with a default of 1. If there are fewer than k rows in the first npartitions a warning will be raised and any found rows returned. Pass -1 to use all partitions.

computebool, optional

Whether to compute the result, default is True.

warnbool, optional

Whether to warn if the number of elements returned is less than requested, default is True.

>>> b = from_sequence(range(1_000))
>>> b.take(3) # doctest: +SKIP
(0, 1, 2)
to_avro(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)

Write bag to set of avro files

The schema is a complex dictionary describing the data, see https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema and https://fastavro.readthedocs.io/en/latest/writer.html . It’s structure is as follows:

{'name': 'Test',
 'namespace': 'Test',
 'doc': 'Descriptive text',
 'type': 'record',
 'fields': [
    {'name': 'a', 'type': 'int'},
 ]}

where the “name” field is required, but “namespace” and “doc” are optional descriptors; “type” must always be “record”. The list of fields should have an entry for every key of the input records, and the types are like the primitive, complex or logical types of the Avro spec ( https://avro.apache.org/docs/1.8.2/spec.html ).

Results in one avro file per input partition.

Parameters
b: dask.bag.Bag
filename: list of str or str

Filenames to write to. If a list, number must match the number of partitions. If a string, must include a glob character “*”, which will be expanded using name_function

schema: dict

Avro schema dictionary, see above

name_function: None or callable

Expands integers into strings, see dask.bytes.utils.build_name_function

storage_options: None or dict

Extra key/value options to pass to the backend file-system

codec: ‘null’, ‘deflate’, or ‘snappy’

Compression algorithm

sync_interval: int

Number of records to include in each block within a file

metadata: None or dict

Included in the file header

compute: bool

If True, files are written immediately, and function blocks. If False, returns delayed objects, which can be computed by the user where convenient.

kwargs: passed to compute(), if compute=True

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice', 'value': 100},
...                       {'name': 'Bob', 'value': 200}])
>>> schema = {'name': 'People', 'doc': "Set of people's scores",
...           'type': 'record',
...           'fields': [
...               {'name': 'name', 'type': 'string'},
...               {'name': 'value', 'type': 'int'}]}
>>> b.to_avro('my-data.*.avro', schema)  
['my-data.0.avro', 'my-data.1.avro']
to_dataframe(self, meta=None, columns=None)

Create Dask Dataframe from a Dask Bag.

Bag should contain tuples, dict records, or scalars.

Index will not be particularly meaningful. Use reindex afterwards if necessary.

Parameters
metapd.DataFrame, dict, iterable, optional

An empty pd.DataFrame that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided. If not provided or a list, a single element from the first partition will be computed, triggering a potentially expensive call to compute. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta.

columnssequence, optional

Column names to use. If the passed data do not have names associated with them, this argument provides names for the columns. Otherwise this argument indicates the order of the columns in the result (any names not found in the data will become all-NA columns). Note that if meta is provided, column names will be taken from there and this parameter is invalid.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()
>>> df.compute()
      name  balance
0    Alice      100
1      Bob      200
0  Charlie      300
to_delayed(self, optimize_graph=True)

Convert into a list of dask.delayed objects, one per partition.

Parameters
optimize_graphbool, optional

If True [default], the graph is optimized before converting into dask.delayed objects.

to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)

Write dask Bag to disk, one filename per partition, one line per element.

Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.

Use a globstring

>>> b.to_textfiles('/path/to/data/*.json.gz')  

The * will be replaced by the increasing sequence 1, 2, …

/path/to/data/0.json.gz
/path/to/data/1.json.gz

Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices.

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

You can also provide an explicit list of paths.

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.

Bag Contents: The bag calling to_textfiles must be a bag of text strings. For example, a bag of dictionaries could be written to JSON text files by mapping json.dumps on to the bag first, and then calling to_textfiles :

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  

Last endline: By default the last line does not end with a newline character. Pass last_endline=True to invert the default.

topk(self, k, key=None, split_every=None)

K largest elements in collection

Optionally ordered by some key function

>>> b = from_sequence([10, 3, 5, 7, 11, 4])
>>> list(b.topk(2))  
[11, 10]
>>> list(b.topk(2, lambda x: -x))  
[3, 4]
unzip(self, n)

Transform a bag of tuples to n bags of their elements.

Examples

>>> b = from_sequence([(i, i + 1, i + 2) for i in range(10)])
>>> first, second, third = b.unzip(3)
>>> isinstance(first, Bag)
True
>>> first.compute()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Note that this is equivalent to:

>>> first, second, third = (b.pluck(i) for i in range(3))
var(self, ddof=0)

Variance

Other functions

dask.bag.from_sequence(seq, partition_size=None, npartitions=None)

Create a dask Bag from Python sequence.

This sequence should be relatively small in memory. Dask Bag works best when it handles loading your data itself. Commonly we load a sequence of filenames into a Bag and then use .map to open them.

Parameters
seq: Iterable

A sequence of elements to put into the dask

partition_size: int (optional)

The length of each partition

npartitions: int (optional)

The number of desired partitions

It is best to provide either ``partition_size`` or ``npartitions``
(though not both.)

See also

read_text

Create bag from text files

Examples

>>> b = from_sequence(['Alice', 'Bob', 'Chuck'], partition_size=2)
dask.bag.from_delayed(values)

Create bag from many dask Delayed objects.

These objects will become the partitions of the resulting Bag. They should evaluate to a list or some other concrete sequence.

Parameters
values: list of delayed values

An iterable of dask Delayed objects. Each evaluating to a list.

Returns
Bag

See also

dask.delayed

Examples

>>> x, y, z = [delayed(load_sequence_from_file)(fn)
...             for fn in filenames] 
>>> b = from_delayed([x, y, z])  
dask.bag.read_text(urlpath, blocksize=None, compression='infer', encoding='utf-8', errors='strict', linedelimiter='n', collection=True, storage_options=None, files_per_partition=None, include_path=False)

Read lines from text files

Parameters
urlpathstring or list

Absolute or relative filepath(s). Prefix with a protocol like s3:// to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.

blocksize: None, int, or str

Size (in bytes) to cut up larger files. Streams by default. Can be None for streaming, an integer number of bytes, or a string like “128MiB”

compression: string

Compression format like ‘gzip’ or ‘xz’. Defaults to ‘infer’

encoding: string
errors: string
linedelimiter: string
collection: bool, optional

Return dask.bag if True, or list of delayed values if false

storage_options: dict

Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.

files_per_partition: None or int

If set, group input files into partitions of the requested size, instead of one partition per file. Mutually exclusive with blocksize.

include_path: bool

Whether or not to include the path in the bag. If true, elements are tuples of (line, path). Default is False.

Returns
dask.bag.Bag or list

dask.bag.Bag if collection is True or list of Delayed lists otherwise.

See also

from_sequence

Build bag from Python sequence

Examples

>>> b = read_text('myfiles.1.txt')  
>>> b = read_text('myfiles.*.txt')  
>>> b = read_text('myfiles.*.txt.gz')  
>>> b = read_text('s3://bucket/myfiles.*.txt')  
>>> b = read_text('s3://key:secret@bucket/myfiles.*.txt')  
>>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt')  

Parallelize a large file by providing the number of uncompressed bytes to load into each partition.

>>> b = read_text('largefile.txt', blocksize='10MB')  

Get file paths of the bag by setting include_path=True

>>> b = read_text('myfiles.*.txt', include_path=True) 
>>> b.take(1) 
(('first line of the first file', '/home/dask/myfiles.0.txt'),)
dask.bag.from_url(urls)

Create a dask Bag from a url.

Examples

>>> a = from_url('http://raw.githubusercontent.com/dask/dask/master/README.rst')  
>>> a.npartitions  
1
>>> a.take(8)  
(b'Dask\n',
 b'====\n',
 b'\n',
 b'|Build Status| |Coverage| |Doc Status| |Gitter| |Version Status|\n',
 b'\n',
 b'Dask is a flexible parallel computing library for analytics.  See\n',
 b'documentation_ for more information.\n',
 b'\n')
>>> b = from_url(['http://github.com', 'http://google.com'])  
>>> b.npartitions  
2
dask.bag.read_avro(urlpath, blocksize=100000000, storage_options=None, compression=None)

Read set of avro files

Use this with arbitrary nested avro schemas. Please refer to the fastavro documentation for its capabilities: https://github.com/fastavro/fastavro

Parameters
urlpath: string or list

Absolute or relative filepath, URL (may include protocols like s3://), or globstring pointing to data.

blocksize: int or None

Size of chunks in bytes. If None, there will be no chunking and each file will become one partition.

storage_options: dict or None

passed to backend file-system

compression: str or None

Compression format of the targe(s), like ‘gzip’. Should only be used with blocksize=None.

dask.bag.range(n, npartitions)

Numbers from zero to n

Examples

>>> import dask.bag as db
>>> b = db.range(5, npartitions=2)
>>> list(b)
[0, 1, 2, 3, 4]
dask.bag.concat(bags)

Concatenate many bags together, unioning all elements.

>>> import dask.bag as db
>>> a = db.from_sequence([1, 2, 3])
>>> b = db.from_sequence([4, 5, 6])
>>> c = db.concat([a, b])
>>> list(c)
[1, 2, 3, 4, 5, 6]
dask.bag.map_partitions(func, *args, **kwargs)

Apply a function to every partition across one or more bags.

Note that all Bag arguments must be partitioned identically.

Parameters
funccallable
*args, **kwargsBag, Item, Delayed, or object

Arguments and keyword arguments to pass to func.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence(range(1, 101), npartitions=10)
>>> def div(nums, den=1):
...     return [num / den for num in nums]

Using a python object:

>>> hi = b.max().compute()
>>> hi
100
>>> b.map_partitions(div, den=hi).take(5)
(0.01, 0.02, 0.03, 0.04, 0.05)

Using an Item:

>>> b.map_partitions(div, den=b.max()).take(5)
(0.01, 0.02, 0.03, 0.04, 0.05)

Note that while both versions give the same output, the second forms a single graph, and then computes everything at once, and in some cases may be more efficient.

dask.bag.map(func, *args, **kwargs)

Apply a function elementwise across one or more bags.

Note that all Bag arguments must be partitioned identically.

Parameters
funccallable
*args, **kwargsBag, Item, Delayed, or object

Arguments and keyword arguments to pass to func. Non-Bag args/kwargs are broadcasted across all calls to func.

Notes

For calls with multiple Bag arguments, corresponding partitions should have the same length; if they do not, the call will error at compute time.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence(range(5), npartitions=2)
>>> b2 = db.from_sequence(range(5, 10), npartitions=2)

Apply a function to all elements in a bag:

>>> db.map(lambda x: x + 1, b).compute()
[1, 2, 3, 4, 5]

Apply a function with arguments from multiple bags:

>>> from operator import add
>>> db.map(add, b, b2).compute()
[5, 7, 9, 11, 13]

Non-bag arguments are broadcast across all calls to the mapped function:

>>> db.map(add, b, 1).compute()
[1, 2, 3, 4, 5]

Keyword arguments are also supported, and have the same semantics as regular arguments:

>>> def myadd(x, y=0):
...     return x + y
>>> db.map(myadd, b, y=b2).compute()
[5, 7, 9, 11, 13]
>>> db.map(myadd, b, y=1).compute()
[1, 2, 3, 4, 5]

Both arguments and keyword arguments can also be instances of dask.bag.Item or dask.delayed.Delayed. Here we’ll add the max value in the bag to each element:

>>> db.map(myadd, b, b.max()).compute()
[4, 5, 6, 7, 8]
dask.bag.zip(*bags)

Partition-wise bag zip

All passed bags must have the same number of partitions.

NOTE: corresponding partitions should have the same length; if they do not, the “extra” elements from the longer partition(s) will be dropped. If you have this case chances are that what you really need is a data alignment mechanism like pandas’s, and not a missing value filler like zip_longest.

Examples

Correct usage:

>>> import dask.bag as db
>>> evens = db.from_sequence(range(0, 10, 2), partition_size=4)
>>> odds = db.from_sequence(range(1, 10, 2), partition_size=4)
>>> pairs = db.zip(evens, odds)
>>> list(pairs)
[(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]

Incorrect usage:

>>> numbers = db.range(20) 
>>> fizz = numbers.filter(lambda n: n % 3 == 0) 
>>> buzz = numbers.filter(lambda n: n % 5 == 0) 
>>> fizzbuzz = db.zip(fizz, buzz) 
>>> list(fizzbuzzz) 
[(0, 0), (3, 5), (6, 10), (9, 15), (12, 20), (15, 25), (18, 30)]

When what you really wanted was more along the lines of the following:

>>> list(fizzbuzzz) 
[(0, 0), (3, None), (None, 5), (6, None), (None 10), (9, None),
(12, None), (15, 15), (18, None), (None, 20), (None, 25), (None, 30)]

Random Sampling

dask.bag.random.choices(population, k=1)

Return a k sized list of elements chosen with replacement.

Parameters
population: Bag

Elements to sample.

k: integer, optional

Number of elements to sample.

Examples

>>> import dask.bag as db 
... from dask.bag import random
...
... b = db.from_sequence(range(5), npartitions=2)
... list(random.choices(b, 3).compute())
[1, 1, 5]
dask.bag.random.sample(population, k)

Chooses k unique random elements from a bag.

Returns a new bag containing elements from the population while leaving the original population unchanged.

Parameters
population: Bag

Elements to sample.

k: integer, optional

Number of elements to sample.

Examples

>>> import dask.bag as db 
... from dask.bag import random
...
... b = db.from_sequence(range(5), npartitions=2)
... list(random.sample(b, 3).compute())
[1, 3, 5]