How Do I Build A Producer/Consumer Architecture for DAQmx in Python?

Updated Jan 23, 2019

Reported In

Driver

  • NI-DAQmx

Programming Language

  • Python

Issue Details

  • I am developing a DAQmx application in Python using the nidaqmx package. How do I create a producer/consumer architecture so that I can process my data without slowing down acquisition?
  • Is there a Python analog to the common Producer/Consumer LabVIEW architecture?
  • My Python DAQmx application is throwing a buffer overflow error. I read that this happens when the software is unable to read from the DAQ task fast enough and I need to put any data processing in a separate "consumer" loop. How can I do this?

Solution

Python libraries for threading and queues

Unlike LabVIEW, whose compiler synthesizes parallel threads for the user, users must define their own threads in Python in order to run two blocks of code in parallel. The threading library is a standard Python library that contains functions for defining and using threads. You'll want to import this library in your application.

You'll also want to import the queues library, which contains support for data passing between functions by using queues. This is a data structure you can leverage in order to pass values from the producer thread to the consumer thread during data acquisition.
 

Setting up producer and consumer threads

q = queue.Queue()
prod = threading.Thread(target=producer_loop, args=(q, task))
cons = threading.Thread(target=consumer_loop, args=(q, task, out_file))
prod.start()
cons.start()


The code block above creates a producer thread that will run the code found in producer_loop() and a consumer thread that will run the code found in consumer_loop(). The args argument is used to pass function arguments to these functions. In addition to the threads, a queue object is created. A reference to the queue is passed to each thread so that the producer can put items on the queue and the consumer can remove them.
 

The producer thread

In order to allow data to be acquired from the DAQ device as it is ready, the producer thread should contain a call to task.read() and as little extra code as possible. The code block below shows a simple example of this:

def producer_loop(q, task):
    start_time = time.time();
    while(time.time() - start_time < ACQ_DURATION):
        q.put_nowait(
            task.read(number_of_samples_per_channel=nidaqmx.constants.READ_ALL_AVAILABLE))
    task.stop()
    return

The acquisition loop runs for ACQ_DURATION seconds. During that time, it places all available samples on the queue each time task.read() is called.
 

The consumer thread

The consumer thread will take the samples off of the queue and write them to a log file. By performing data logging in this thread, the producer does not have to wait for file.write() to return before it can read new data from the DAQ device.

def consumer_loop(q, task, file):
    while(True):
        try:
            temp = q.get(block=True, timeout=2)
        except:
            if (task.is_task_done()):
                return
        for val in temp:
            file.write("{}\n".format(val))
        time.sleep(0.5) # Simulate 0.5 seconds of extra processing per sample
        q.task_done()


The example above will try to retrieve a sample from the queue. Once it gets a sample, it writes however many values were packaged in that queue item to a log file and calls q.task_done() to notify the queue that it has processed a queue item.

If it does not receive a sample within the timeout of 2 seconds, then it will throw an exception. In the case that the queue is empty because the DAQmx task is finished, then the thread finishes and returns. Otherwise, it tries again as there must be something stalling the producer and it needs to wait for more samples.
 

Resource clean up


while(not task.is_task_done()):
        pass # Spin parent thread until task is done
    print("Task is done")

    while(cons.isAlive()):
        pass # Wait for consumer to finish
    print("Consumer finished")
    
# Clean up
out_file.close()
task.close()


Once the threads have been started, the main program should wait until the threads are finished to clean up the system resources being used by the threads. You can use task.is_task_done() to check if the DAQmx task is done and isAlive() to check whether an individual thread is finished. In this case, you know that the threads have finished their work once the consumer thread returns.

Once all processing is finished, the out_file handle and DAQmx task can be closed, which frees up these resources.

Additional Information

This article uses code from an example program that creates and configures a DAQmx task, starts two separate threads for reading and data logging, and then cleans up all resources once the acquisition has finished. This tutorial covers only the portion of the code related to threading and inter-thread communication, although the entire source code is attached in Producer_Consumer_Example.py

The information discussed here is by no means a complete primer on using nidaqmx with multiple threads. If you are interested in learning more or would like additional resources to help you build a more complex multithreaded program, check out the Related Links.