Create and Store Dask DataFrames

Dask can create DataFrames from various data storage formats like CSV, HDF, Apache Parquet, and others. For most formats, this data can live on various storage systems including local disk, network file systems (NFS), the Hadoop File System (HDFS), and Amazon’s S3 (excepting HDF, which is only available on POSIX like file systems).

See the DataFrame overview page for an in depth discussion of dask.dataframe scope, use, and limitations.

API

The following functions provide access to convert between Dask DataFrames, file formats, and other Dask or Python collections.

File Formats:

read_csv(urlpath[, blocksize, …])

Read CSV files into a Dask.DataFrame

read_parquet(path[, columns, filters, …])

Read a Parquet file into a Dask DataFrame

read_hdf(pattern, key[, start, stop, …])

Read HDF files into a Dask DataFrame

read_orc(path[, columns, storage_options])

Read dataframe from ORC file(s)

read_json(url_path[, orient, lines, …])

Create a dataframe from a set of JSON files

read_sql_table(table, uri, index_col[, …])

Create dataframe from an SQL table.

read_table(urlpath[, blocksize, …])

Read delimited files into a Dask.DataFrame

read_fwf(urlpath[, blocksize, …])

Read fixed-width files into a Dask.DataFrame

from_bcolz(x[, chunksize, categorize, …])

Read BColz CTable into a Dask Dataframe

from_array(x[, chunksize, columns, meta])

Read any sliceable array into a Dask Dataframe

to_csv(df, filename[, single_file, …])

Store Dask DataFrame to CSV files

to_parquet(df, path[, engine, compression, …])

Store Dask.dataframe to Parquet files

to_hdf(df, path, key[, mode, append, …])

Store Dask Dataframe to Hierarchical Data Format (HDF) files

to_sql(df, name, uri[, schema, index_label, …])

Store Dask Dataframe to a SQL table

Dask Collections:

from_delayed(dfs[, meta, divisions, prefix, …])

Create Dask DataFrame from many Dask Delayed objects

from_dask_array(x[, columns, index, meta])

Create a Dask DataFrame from a Dask Array.

dask.bag.core.Bag.to_dataframe(self[, meta, …])

Create Dask Dataframe from a Dask Bag.

DataFrame.to_delayed(self[, optimize_graph])

Convert into a list of dask.delayed objects, one per partition.

to_records(df)

Create Dask Array from a Dask Dataframe

to_bag(df[, index])

Create Dask Bag from a Dask DataFrame

Pandas:

from_pandas(data[, npartitions, chunksize, …])

Construct a Dask DataFrame from a Pandas DataFrame

Creating

Reading from various locations

For text, CSV, and Apache Parquet formats, data can come from local disk, the Hadoop File System, S3FS, or other sources, by prepending the filenames with a protocol:

>>> df = dd.read_csv('my-data-*.csv')
>>> df = dd.read_csv('hdfs:///path/to/my-data-*.csv')
>>> df = dd.read_csv('s3://bucket-name/my-data-*.csv')

For remote systems like HDFS or S3, credentials may be an issue. Usually, these are handled by configuration files on disk (such as a .boto file for S3), but in some cases you may want to pass storage-specific options through to the storage backend. You can do this with the storage_options= keyword:

>>> df = dd.read_csv('s3://bucket-name/my-data-*.csv',
...                  storage_options={'anon': True})

Dask Delayed

For more complex situations not covered by the functions above, you may want to use dask.delayed, which lets you construct Dask DataFrames out of arbitrary Python function calls that load DataFrames. This can allow you to handle new formats easily or bake in particular logic around loading data if, for example, your data is stored with some special format.

See documentation on using dask.delayed with collections or an example notebook showing how to create a Dask DataFrame from a nested directory structure of Feather files (as a stand in for any custom file format).

Dask delayed is particularly useful when simple map operations aren’t sufficient to capture the complexity of your data layout.

From Raw Dask Graphs

This section is mainly for developers wishing to extend dask.dataframe. It discusses internal API not normally needed by users. Everything below can be done just as effectively with dask.delayed described just above. You should never need to create a DataFrame object by hand.

To construct a DataFrame manually from a dask graph you need the following information:

  1. Dask: a Dask graph with keys like {(name, 0): ..., (name, 1): ...} as well as any other tasks on which those tasks depend. The tasks corresponding to (name, i) should produce pandas.DataFrame objects that correspond to the columns and divisions information discussed below

  2. Name: the special name used above

  3. Columns: a list of column names

  4. Divisions: a list of index values that separate the different partitions. Alternatively, if you don’t know the divisions (this is common), you can provide a list of [None, None, None, ...] with as many partitions as you have plus one. For more information, see the Partitions section in the DataFrame documentation

As an example, we build a DataFrame manually that reads several CSV files that have a datetime index separated by day. Note that you should never do this. The dd.read_csv function does this for you:

dsk = {('mydf', 0): (pd.read_csv, 'data/2000-01-01.csv'),
       ('mydf', 1): (pd.read_csv, 'data/2000-01-02.csv'),
       ('mydf', 2): (pd.read_csv, 'data/2000-01-03.csv')}
name = 'mydf'
columns = ['price', 'name', 'id']
divisions = [Timestamp('2000-01-01 00:00:00'),
             Timestamp('2000-01-02 00:00:00'),
             Timestamp('2000-01-03 00:00:00'),
             Timestamp('2000-01-03 23:59:59')]

df = dd.DataFrame(dsk, name, columns, divisions)

Storing

Writing to remote locations

Dask can write to a variety of data stores including cloud object stores. For example, you can write a dask.dataframe to an Azure storage blob as:

>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
>>> df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
>>> dd.to_parquet(df=df,
...               path='abfs://CONTAINER/FILE.parquet'
...               storage_options={'account_name': 'ACCOUNT_NAME',
...                                'account_key': 'ACCOUNT_KEY'}

See the remote data services documentation for more information.