Flux is a functional data scripting language designed specifically for time series data exploration and data processing with InfluxDB.

Key concepts

  • pipe-forward operator: |>: chain operators together by passing a table to the next step
  • tables: flux formats data as annotated CSV
  • table group key: describe the table's contents as a list of those columns with non-unique values

Example query

  • Queries data from the example-bucket bucket
  • Returns minute average cpu usage (based on tags) from the last hour
  |> range(start:-1h)
  |> filter(fn:(r)) =>
    r._measurement == "cpu" and
    r.cpu == "cpu-total"
  |> aggregateWindow(every: 1m, fn:mean)
  |> yield()

Elements of a Flux query

Every flux query requires at least the following parameters

  • data source: from(bucket:"bucket-name")
  • time range: |> range(start: starting timestamp/time preceding "now", stop: end time preceding "now")
    • timestamp: 2018-11-05T23:30:00Z
    • duration preceding now: -15m (always negative)
  • filter:
    • filter(fn(r)), the only parameter is fn which expects an anonymous function (here r)
    • r: reports or rows
    • chain operators with and
  • result output: yield() (Flux automatically assumes this, yield() is necessary only to return the results of multiple queries when it also need to name the output tables)

Exploring data with InfluxDB UI's Data Explorer

The most straightforward way to explore InfluxDB is to use the UI's Data Explorer

Here we use flux scripting together with the interactive UI.

In the following flux query, we do not pass all the parameters manually but allow the use the UI to set them dynamically:

  • |> range(start: v.timeRangeStart, stop: v.timeRangeStop): here v.timeRangeStart and v.timeRangeStop are set with time range selector dropdown and the on the graph (with the mouse)
  • |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false): we define the window with v.windowPeriod
from(bucket: "trial_bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "boltdb_reads_total")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)