diff --git a/examples/nidaqmx/moniker-streaming/CMakeLists.txt b/examples/nidaqmx/moniker-streaming/CMakeLists.txt new file mode 100644 index 000000000..4ec0af194 --- /dev/null +++ b/examples/nidaqmx/moniker-streaming/CMakeLists.txt @@ -0,0 +1,80 @@ +# cmake build file for C++ MonikerStreamingClient example. +# Assumes protobuf and gRPC have been installed using cmake. + +cmake_minimum_required(VERSION 3.16) + +project(MonikerStreamingClient C CXX) + +find_package(Protobuf CONFIG REQUIRED) +message(STATUS "Using protobuf ${Protobuf_VERSION}") + +set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf) +find_program(_PROTOBUF_PROTOC protoc) +find_package(gRPC CONFIG REQUIRED) +message(STATUS "Using gRPC ${gRPC_VERSION}") +set(_GRPC_GRPCPP gRPC::grpc++) +find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /bigobj") + +# Proto files +get_filename_component(daqmx_proto "${CMAKE_SOURCE_DIR}/../../../generated/nidaqmx/nidaqmx.proto" ABSOLUTE) +get_filename_component(daqmx_proto_path "${daqmx_proto}" PATH) +get_filename_component(session_proto "${CMAKE_SOURCE_DIR}/../../../imports/protobuf/session.proto" ABSOLUTE) +get_filename_component(session_proto_path "${session_proto}" PATH) +get_filename_component(data_moniker_proto "${CMAKE_SOURCE_DIR}/../../../imports/protobuf/data_moniker.proto" ABSOLUTE) +get_filename_component(data_moniker_proto_path "${data_moniker_proto}" PATH) + +message(STATUS "daqmx_proto_path: ${daqmx_proto_path}") +#---------------------------------------------------------------------- +# Generate sources from proto files +# Usage: GenerateGrpcSources( ) +#---------------------------------------------------------------------- +function(GenerateGrpcSources) +set(proto_name ${ARGV0}) +set(proto_absolute_path ${ARGV1}) + +set(proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.pb.cc") +set(proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.pb.h") +set(grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.grpc.pb.cc") +set(grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.grpc.pb.h") + +add_custom_command( + OUTPUT "${proto_srcs}" "${proto_hdrs}" "${grpc_srcs}" "${grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${daqmx_proto_path}" + -I "${session_proto_path}" + -I "${data_moniker_proto_path}" + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + ${proto_absolute_path} + DEPENDS ${proto_absolute_path}) +endfunction() + +# Generated sources +GenerateGrpcSources(nidaqmx ${daqmx_proto}) +GenerateGrpcSources(session ${session_proto}) +GenerateGrpcSources(data_moniker ${data_moniker_proto}) + +# Include generated *.pb.h files +include_directories("${CMAKE_CURRENT_BINARY_DIR}") + +add_executable(MonikerStreamingClient + "stream-read-analog-input-using-moniker-streaming.cpp" + "${CMAKE_CURRENT_BINARY_DIR}/nidaqmx.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/nidaqmx.pb.h" + "${CMAKE_CURRENT_BINARY_DIR}/nidaqmx.grpc.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/nidaqmx.grpc.pb.h" + "${CMAKE_CURRENT_BINARY_DIR}/session.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/session.pb.h" + "${CMAKE_CURRENT_BINARY_DIR}/session.grpc.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/session.grpc.pb.h" + "${CMAKE_CURRENT_BINARY_DIR}/data_moniker.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/data_moniker.pb.h" + "${CMAKE_CURRENT_BINARY_DIR}/data_moniker.grpc.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/data_moniker.grpc.pb.h" + ) +target_link_libraries(MonikerStreamingClient + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF}) diff --git a/examples/nidaqmx/moniker-streaming/stream-read-analog-input-using-moniker-streaming.cpp b/examples/nidaqmx/moniker-streaming/stream-read-analog-input-using-moniker-streaming.cpp new file mode 100644 index 000000000..577676332 --- /dev/null +++ b/examples/nidaqmx/moniker-streaming/stream-read-analog-input-using-moniker-streaming.cpp @@ -0,0 +1,245 @@ +/********************************************************************* +* Acquire data continuously from an analog input channel using moniker based streaming +* +* The gRPC API is built from the C API. NI-DAQmx documentation is installed with the driver at: +* C:\Program Files (x86)\National Instruments\NI-DAQ\docs\cdaqmx.chm +* +* Getting Started: +* +* To run this example, install "NI-DAQmx Driver" on the server machine: +* https://www.ni.com/en-us/support/downloads/drivers/download.ni-daqmx.html +* +* For instructions on how to use protoc to generate gRPC client interfaces, see our "Creating a gRPC +* Client" wiki page: +* https://github.com/ni/grpc-device/wiki/Creating-a-gRPC-Client +* +* Refer to the NI DAQmx gRPC Wiki for the latest C Function Reference: +* https://github.com/ni/grpc-device/wiki/NI-DAQMX-C-Function-Reference +* +* To run this example without hardware: create a simulated device in NI MAX on the server (Windows +* only). +* +* Build: +* +* > mkdir build +* > cd build +* > cmake .. +* > cmake --build . +* +* Running from command line: +* +* Server machine's IP address, port number, and physical channel name can be passed as separate +* command line arguments. +* > MonikerStreamingClient +* If they are not passed in as command line arguments, then by default the server address will be +* "localhost:31763", with "Dev1/ai0" as the physical channel name.. +*********************************************************************/ + +#include +#include +#include + +#include "nidaqmx.grpc.pb.h" +#include "data_moniker.grpc.pb.h" + +using namespace nidaqmx_grpc; +using StubPtr = std::unique_ptr; + +std::string SERVER_ADDRESS = "localhost"; +std::string SERVER_PORT = "31763"; +std::string PHYSICAL_CHANNEL = "Dev1/ai0"; +int NUM_ITERATIONS = 5; + +class grpc_driver_error : public std::runtime_error { + private: + ::grpc::StatusCode code_; + std::multimap trailers_; + + public: + grpc_driver_error(const std::string& message, ::grpc::StatusCode code, const std::multimap& trailers) + : std::runtime_error(message), code_(code) + { + for (const auto& trailer : trailers) { + trailers_.emplace( + std::string(trailer.first.data(), trailer.first.length()), + std::string(trailer.second.data(), trailer.second.length())); + } + } + + ::grpc::StatusCode StatusCode() const + { + return code_; + } + + const std::multimap& Trailers() const + { + return trailers_; + } +}; + +inline void raise_if_error(const ::grpc::Status& status, const ::grpc::ClientContext& context) +{ + if (!status.ok()) { + throw grpc_driver_error(status.error_message(), status.error_code(), context.GetServerTrailingMetadata()); + } +} + +void print_array(const MonikerReadAnalogF64Response& data) +{ + std::cout << "["; + for (int i = 0; i < data.read_array().size(); i++) { + std::cout << data.read_array().Get(i) << " "; + } + std::cout << "]" << std::endl; +} + +::nidevice_grpc::Session create_and_configure_read_task(NiDAQmx::Stub &client, const std::string &PHYSICAL_CHANNEL) +{ + ::grpc::ClientContext create_task_context; + auto create_task_request = CreateTaskRequest{}; + create_task_request.set_session_name("my task"); + auto create_task_response = CreateTaskResponse{}; + raise_if_error( + client.CreateTask(&create_task_context, create_task_request, &create_task_response), + create_task_context); + auto daqmx_read_task = create_task_response.task(); + + ::grpc::ClientContext create_channel_context; + auto create_channel_request = CreateAIVoltageChanRequest{}; + create_channel_request.mutable_task()->CopyFrom(daqmx_read_task); + create_channel_request.set_physical_channel(PHYSICAL_CHANNEL); + create_channel_request.set_terminal_config(InputTermCfgWithDefault::INPUT_TERM_CFG_WITH_DEFAULT_CFG_DEFAULT); + create_channel_request.set_min_val(-10.0); + create_channel_request.set_max_val(10.0); + create_channel_request.set_units(VoltageUnits2::VOLTAGE_UNITS2_VOLTS); + auto create_channel_response = CreateAIVoltageChanResponse{}; + raise_if_error( + client.CreateAIVoltageChan(&create_channel_context, create_channel_request, &create_channel_response), + create_channel_context); + + ::grpc::ClientContext cfg_clk_context; + auto cfg_clk_request = CfgSampClkTimingRequest{}; + cfg_clk_request.mutable_task()->CopyFrom(daqmx_read_task); + cfg_clk_request.set_rate(100.0); + cfg_clk_request.set_active_edge(Edge1::EDGE1_RISING); + cfg_clk_request.set_sample_mode(AcquisitionType::ACQUISITION_TYPE_HW_TIMED_SINGLE_POINT); + cfg_clk_request.set_samps_per_chan(10); + auto cfg_clk_response = CfgSampClkTimingResponse{}; + raise_if_error( + client.CfgSampClkTiming(&cfg_clk_context, cfg_clk_request, &cfg_clk_response), + cfg_clk_context); + + ::grpc::ClientContext set_read_attribute_context; + auto set_read_attribute_request = SetReadAttributeInt32Request{}; + set_read_attribute_request.mutable_task()->CopyFrom(daqmx_read_task); + set_read_attribute_request.set_attribute(ReadInt32Attribute::READ_ATTRIBUTE_WAIT_MODE); + set_read_attribute_request.set_value(ReadInt32AttributeValues::READ_INT32_WAIT_MODE_POLL); + auto set_read_attribute_response = SetReadAttributeInt32Response{}; + raise_if_error( + client.SetReadAttributeInt32(&set_read_attribute_context, set_read_attribute_request, &set_read_attribute_response), + set_read_attribute_context); + + ::grpc::ClientContext start_task_context; + StartTaskRequest start_task_request; + start_task_request.mutable_task()->CopyFrom(daqmx_read_task); + StartTaskResponse start_task_response; + raise_if_error( + client.StartTask(&start_task_context, start_task_request, &start_task_response), + start_task_context); + + return daqmx_read_task; +} + +int main(int argc, char **argv) +{ + if (argc >= 2) { + SERVER_ADDRESS = argv[1]; + } + if (argc >= 3) { + SERVER_PORT = argv[2]; + } + if (argc >= 4) { + PHYSICAL_CHANNEL = argv[3]; + } + auto target_str = SERVER_ADDRESS + ":" + SERVER_PORT; + auto channel = grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()); + NiDAQmx::Stub client(channel); + ni::data_monikers::DataMoniker::Stub moniker_service(channel); + + try { + auto daqmx_read_task = create_and_configure_read_task(client, PHYSICAL_CHANNEL); + + // Setup the read stream + ::grpc::ClientContext begin_read_context; + auto begin_read_request = BeginReadAnalogF64Request{}; + begin_read_request.mutable_task()->CopyFrom(daqmx_read_task); + begin_read_request.set_num_samps_per_chan(10); + begin_read_request.set_timeout(10.0); + begin_read_request.set_fill_mode(GroupBy::GROUP_BY_GROUP_BY_CHANNEL); + begin_read_request.set_array_size_in_samps(10); + auto begin_read_response = BeginReadAnalogF64Response{}; + raise_if_error( + client.BeginReadAnalogF64(&begin_read_context, begin_read_request, &begin_read_response), + begin_read_context); + auto daqmx_read_moniker = new ni::data_monikers::Moniker(begin_read_response.moniker()); + + ::grpc::ClientContext stream_read_context; + ni::data_monikers::MonikerList read_request; + read_request.mutable_read_monikers()->AddAllocated(daqmx_read_moniker); + auto read_stream = moniker_service.StreamRead(&stream_read_context, read_request); + + // Read data + for (int i = 0; i < NUM_ITERATIONS; i++) { + ni::data_monikers::MonikerReadResponse read_data_result; + if (read_stream->Read(&read_data_result)) { + google::protobuf::Any read_message = read_data_result.data().values(0); + + MonikerReadAnalogF64Response read_analog_f64_response; + read_message.UnpackTo(&read_analog_f64_response); + std::cout << "Read data..." << std::endl; + print_array(read_analog_f64_response); + } + else { + std::cout << "No data available." << std::endl; + } + } + + stream_read_context.TryCancel(); + + std::cout << "Cleaning up." << std::endl; + + ::grpc::ClientContext stop_task_context; + StopTaskRequest stop_task_request; + stop_task_request.mutable_task()->CopyFrom(daqmx_read_task); + StopTaskResponse stop_task_response; + raise_if_error( + client.StopTask(&stop_task_context, stop_task_request, &stop_task_response), + stop_task_context); + + ::grpc::ClientContext clear_task_context; + ClearTaskRequest clear_task_request; + clear_task_request.mutable_task()->CopyFrom(daqmx_read_task); + ClearTaskResponse clear_task_response; + raise_if_error( + client.ClearTask(&clear_task_context, clear_task_request, &clear_task_response), + clear_task_context); + } + catch (const grpc_driver_error& e) { + std::string error_message = e.what(); + + for (const auto& entry : e.Trailers()) { + if (entry.first == "ni-error") { + error_message += "\nError status: " + entry.second; + } + } + + if (e.StatusCode() == grpc::StatusCode::UNAVAILABLE) { + error_message = "Failed to connect to server on " + SERVER_ADDRESS + ":" + SERVER_PORT; + } + else if (e.StatusCode() == grpc::StatusCode::UNIMPLEMENTED) { + error_message = "The operation is not implemented or is not supported/enabled in this service"; + } + + std::cout << "Exception: " << error_message << std::endl; + } +} \ No newline at end of file