Curious how you can get started processing 100 Million data points, using only 150 MiBs of memory, from a native GUI application? Keep reading to figure out how we’ve been working on this problem. In a previous post, we talked about the design goals that were important to a rainflow cycle counting application that we’re working on. Two of those goals are in somewhat of a conflict, Efficient and Quick, but efficient edged out over quick in the priority scale.
Curious how you can get started processing 100 Million data points, using only 150 MiBs of memory, from a native GUI application? Keep reading to figure out how we’ve been working on this problem.
In a previous post, we talked about the design goals that were important to a rainflow cycle counting application that we’re working on. Two of those goals are in somewhat of a conflict, Efficient and Quick, but efficient edged out over quick in the priority scale. We decided being able to run on lower spec hardware instead of a massive datacenter machine with gobs of memory, was more important than being super quick.
Before we can get started on the discussion of our solution, we’ve got to go over what the simple solution looked like. For rainflow cycle counting, we have an array of data-points representing measured PSI at a point in time within the pipeline system. This data then needs to be processed to find reversals, cycles, and then binning the counts.
The simple solution then, is to pull in the full dataset into an array in memory. This is an array of real numbers, or floating point numbers in software/computer parlance. We’re going to be taking up a decent amount of memory to do this. Through our tests, to load in a 100 Million point dataset we were taking up roughly 3 - 4 GiBs of memory. When computers are being sold with 8 GiBs and some workstations being sold with 16 GiBs (We know though, this isn’t always the case we run workstations with 32 GiBs but we want to be more accessible), a dataset this large could start causing some memory pressure and start to slow down your system overall, including the rainflow data processing.
After our preliminary test, we knew that we would need a better solution.
If you can’t have all of your dataset in emory at one time then, what is the solution?
You need to start streaming, just like iterating over each item in an array, a stream can emit one item at a time to be processed. As you build your algorithms, with streaming in mind, you can more easily reduce your memory footprint.
Now, instead of filling our memory with the dataset, we can use the disk and a method of iterating that storage repository to emit data points into our processing pipeline.
Before we talk about the concrete implementation of this, you need to know a bit about the tech being used. I’ve yet to go into much detail about the tech stack used for this project, not that I want to keep it a secret, but rather prefer discussing some of the whats and whys before I get to that point.
By way of basic introduction though, we’re using go for the language and fyne is our GUI toolkit of choice. Most of our projects are written in go, and a lot of them are built with fyne. You might wonder, python is the more standardized language for datasciency projects why use go?
With go, concurrency, was built into the language and the runtime from the beginning. It has good language primitives to get up and running with multi-threading quickly and painlessly. There are two things to know about here, the go keyword and channels. With these two things, the world of streaming is at your fingertips.
A quick aside: There are other languages that have multi-threading primitives that also have implementations that work like streams (think yield in python), but we’re using go, so that is what we’re going to discuss here.
Any time you make a function call using the go keyword, the go scheduler will run this call on a goroutine (think a lightweight thread, so not an os thread but a runtime thread). This allows for concurrent things to be started easily.
By using channels you can communicate between these goroutines. You simply specify a data type when making the channel, and can now pass that around your application. By using a simple for loop construct you can easily begin consuming from the channel, that is being populated with data from a background thread.
We’re going to go over a bit of code in just a bit, but first, one more thing to cover. We’re storing the data in a sqlite database. This is due to one of our other goals Lossless. We want to be able to easily audit the data that went into the final determination. By flagging data points as being filtered, we can build up a progression of the workflow.
As mentioned above then, our disk becomes the store of the data, and we can stream right out of the database to fill the cycle counting pipeline.
Let’s think about an imaginary data set, one that has an ID, status, and value. We can have that stored in a database and stream it out. Checkout the code below to get a basic example of a data type and a function to query, create a channel, and then stream the data out of the database.
type Datum struct {
ID string
Status string
Value float64
}
func StreamData(db *sql.DB) (chan Datum, error) {
// Just an example query, the data may actually need to be constrained or filtered
rows, err := db.Query("SELECT id, status, value FROM DATUM")
if err != nil {
return nil, err
}
res := make(chan Datum)
go func() {
for rows.Next() {
var datum Datum
// The streaming magic, as far as databases are concerned, happens here.
if err := rows.Scan(&datum.ID, &datum.Status, &datum.Value); err == nil {
res <- datum
}
}
close(res)
}()
return res, nil
}
In go, the rows.Scan call, keeps a cursor into the database, you’re only loading one data point at a time. This is the start of your stream, and important for keeping memory usage low. This could be done for other sources as well, i.e. if you were reading from a CSV that could be done line by line and emitted one at a time in the same fashion as the database.
Now we need to go over what it would take to start reading from the stream.
var db *sql.DB
func StartProcessing() {
source := StreamData(db)
for d := range source {
log.Println("ID:", source.ID, "Value:", source.Value)
}
}
As part of our rainflow cycle counting, we need to do a few things (more depending on if we’re doing hysteresis filtering):
We have covered goroutines, channels, and consuming those channels. In go, you can easily take the channel that is returned from one function, pass it to a function and start consuming that data, all the while using another goroutine and channel to export the results from the processing. Let’s check out a basic example for this:
type Cycle struct {
// ... Omitted for brevity
}
func FindReversals(source chan Datum) chan Cycle {
cycles := make(chan Cycle)
go func() {
for d := range source {
// again, more omissions as this is about streaming not finding reversals
if isReversal(d) {
cycles <- Cycle{}
}
}
close(cycles)
}()
return cycles
}
As this is a discussion of the go implementation, you’ll notice that we’re calling close on our channel. It is important to remember this step. If you don’t the for loop will block waiting for another data point to come through the channel. This will effectively hang your application.
If you’re working on a project, like us that deals with large datasets, consider using a streaming implementation to improve your portability and memory consumption. Streaming might take a bit more effort to reason and figure out, but it can open up opportunities for the different devices you can run on.
We have a post planned to discuss some of the performance issues we ran into using sqlite. And the indexes we worked with to improve performance.