diff --git a/samples/cpp/text_generation/speculative_decoding_lm.cpp b/samples/cpp/text_generation/speculative_decoding_lm.cpp index e10228863f..533bc968f7 100644 --- a/samples/cpp/text_generation/speculative_decoding_lm.cpp +++ b/samples/cpp/text_generation/speculative_decoding_lm.cpp @@ -6,12 +6,12 @@ #include "openvino/genai/llm_pipeline.hpp" int main(int argc, char* argv[]) try { - if (4 != argc) { - throw std::runtime_error(std::string{"Usage: "} + argv[0] + " ''"); + if (5 != argc) { + throw std::runtime_error(std::string{"Usage: "} + argv[0] + " ''"); } ov::genai::GenerationConfig config; - config.max_new_tokens = 100; + config.max_new_tokens = 500; // Speculative decoding generation parameters like `num_assistant_tokens` and `assistant_confidence_threshold` are mutually excluded // add parameter to enable speculative decoding to generate `num_assistant_tokens` candidates by draft_model per iteration config.num_assistant_tokens = 5; @@ -20,25 +20,55 @@ int main(int argc, char* argv[]) try { std::string main_model_path = argv[1]; std::string draft_model_path = argv[2]; - std::string prompt = argv[3]; + std::string type = argv[3]; + std::string prompt = argv[4]; // User can run main and draft model on different devices. // Please, set device for main model in `LLMPipeline` constructor and in in `ov::genai::draft_model` for draft. std::string main_device = "CPU", draft_device = "CPU"; - ov::genai::LLMPipeline pipe( - main_model_path, - main_device, - ov::genai::draft_model(draft_model_path, draft_device)); - auto streamer = [](std::string subword) { std::cout << subword << std::flush; return false; }; - // Since the streamer is set, the results will - // be printed each time a new token is generated. - pipe.generate(prompt, config, streamer); + if (type == "cb") { + std::cout << "CB" << std::endl; + + ov::genai::LLMPipeline pipe( + main_model_path, + main_device, + ov::genai::scheduler_config(ov::genai::SchedulerConfig()) + // ov::genai::draft_model(draft_model_path, draft_device) + ); + + // Since the streamer is set, the results will + // be printed each time a new token is generated. + pipe.generate(prompt, config, streamer); + } else if (type == "sd") { + std::cout << "SD" << std::endl; + ov::genai::LLMPipeline pipe( + main_model_path, + main_device, + ov::genai::draft_model(draft_model_path, draft_device) + ); + + // Since the streamer is set, the results will + // be printed each time a new token is generated. + pipe.generate(prompt, config, streamer); + } else { + config.max_ngram_size = 3; + std::cout << "PL" << std::endl; + ov::genai::LLMPipeline pipe( + main_model_path, + main_device, + ov::genai::prompt_lookup(true) + ); + + // Since the streamer is set, the results will + // be printed each time a new token is generated. + pipe.generate(prompt, config, streamer); + } } catch (const std::exception& error) { try { std::cerr << error.what() << '\n'; diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index e778e55b93..3fef68952a 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -1,6 +1,8 @@ // Copyright (C) 2023-2024 Intel Corporation // SPDX-License-Identifier: Apache-2.0 +#include + #include "text_callback_streamer.hpp" #include "continuous_batching_impl.hpp" #include "utils.hpp" @@ -261,6 +263,10 @@ std::vector ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { + // todo: remove + ManualTimer generate_timer("generate()"); + generate_timer.start(); + OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); @@ -300,25 +306,16 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector 0) { - const auto infer_end = std::chrono::steady_clock::now(); - const auto infer_ms = PerfMetrics::get_microsec(std::chrono::steady_clock::now() - infer_start); - raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms); - raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms); - raw_perf_counters.m_new_token_times.emplace_back(infer_end); - raw_perf_counters.m_batch_sizes.emplace_back(m_batch_size); - } - } catch (...) { - drop_requests(); // remove all requests from pipeline state in case of exception - throw; - } + GenerationHandle& generation = generations.at(0); + // todo: remove + float streaming_duraton = 0, thread_duration = 0; - GenerationHandle & generation = generations.at(0); + auto stream_generated_tokens = [&generation, &streamer_ptr, &continue_generation, &streaming_duraton]() { if (streamer_ptr && generation->can_read()) { + // todo: remove + ManualTimer streaming_timer("streaming"); + streaming_timer.start(); + std::unordered_map token = generation->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { continue_generation = !streamer_ptr->put(gen_token); @@ -327,6 +324,55 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector 0) { + const auto infer_end = std::chrono::steady_clock::now(); + const auto infer_ms = PerfMetrics::get_microsec(std::chrono::steady_clock::now() - infer_start); + raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms); + raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms); + raw_perf_counters.m_new_token_times.emplace_back(infer_end); + raw_perf_counters.m_batch_sizes.emplace_back(m_batch_size); + } + } catch (...) { + drop_requests(); // remove all requests from pipeline state in case of exception + step_outputs_error = std::current_exception(); + } + }); + + std::thread t_stream([&stream_generated_tokens] { + stream_generated_tokens(); + }); + + // todo: remove + thread_timer.end(); + thread_duration += thread_timer.get_duration(); + + t_stream.join(); + t_step.join(); + + if (step_outputs_error) { + throw; + } + + if (!has_non_finished_requests() && continue_generation) { + stream_generated_tokens(); + break; } } @@ -378,6 +424,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector + #include "prompt_lookup_impl.hpp" #include "text_callback_streamer.hpp" @@ -109,30 +111,75 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vectorget_awaiting_requests(); + // todo: shouls be removed + float streaming_duration = 0, thread_duration = 0; + ManualTimer streaming_timer("gen"); + streaming_timer.start(); + bool continue_generation = true; - while (has_non_finished_requests() && continue_generation) { - try { - step(); - } catch (...) { - drop_requests(); // remove all requests from pipeline state in case of exception - throw; - } - if (streamer_ptr) { - // not generated tokens like several prompt phase - auto& generation = generations.at(0); - if (!generation->can_read()) { - continue; - } - std::unordered_map token = generation->back(); - OPENVINO_ASSERT(1 <= token.size()); - OPENVINO_ASSERT(1 <= token.begin()->second.generated_ids.size()); + auto& main_generation = generations.at(0); + // define lamdba to stream generated tokens + auto stream_generated_tokens = [&main_generation, &streamer_ptr, &continue_generation, &streaming_duration]() { + if (streamer_ptr && main_generation->can_read()) { + // todo: remove + ManualTimer streaming_timer("streaming"); + streaming_timer.start(); + + std::unordered_map token = main_generation->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { continue_generation = !streamer_ptr->put(gen_token); if (!continue_generation) { - generation->drop(); + main_generation->drop(); break; } } + + // todo: remove + streaming_timer.end(); + streaming_duration += streaming_timer.get_duration(); + } + }; + + // to store potential exception thrown in step_thread + std::exception_ptr step_outputs_error = nullptr; + + while (continue_generation) { + // todo: remove + ManualTimer thread_timer("threading"); + thread_timer.start(); + + // to define inference thread + std::thread t_step([this, &step_outputs_error] { + try { + step(); + } catch (...) { + // remove all requests from pipeline state in case of exception + drop_requests(); + step_outputs_error = std::current_exception(); + } + }); + + // to define streaming thread + std::thread t_stream([&stream_generated_tokens] { + stream_generated_tokens(); + }); + + // todo: remove + thread_timer.end(); + thread_duration += thread_timer.get_duration(); + + t_stream.join(); + t_step.join(); + + // throw exception in case of inference error + if (step_outputs_error) { + throw; + } + + // stream last generated tokens + if (!has_non_finished_requests() && continue_generation) { + stream_generated_tokens(); + break; } } @@ -177,6 +224,11 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector + #include "text_callback_streamer.hpp" #include "speculative_decoding_impl.hpp" #include "utils.hpp" @@ -235,20 +237,20 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< } auto all_requests = get_awaiting_requests(); + // todo: remove + float streaming_duraton = 0, thread_duration = 0; + ManualTimer streaming_timer("gen"); + streaming_timer.start(); + bool continue_generation = true; - while (has_non_finished_requests() && continue_generation) { - try { - step(); - } catch (...) { - drop_requests(); // remove all requests from pipeline state in case of exception - throw; - } - if (streamer_ptr) { - auto& main_generation = main_generations.at(0); - // not generated tokens like several prompt phase - if (!main_generation->can_read()) { - continue; - } + auto& main_generation = main_generations.at(0); + // define lamdba to stream generated tokens + auto stream_generated_tokens = [&main_generation, &streamer_ptr, &continue_generation, &streaming_duraton]() { + if (streamer_ptr && main_generation->can_read()) { + // todo: remove + ManualTimer streaming_timer("streaming"); + streaming_timer.start(); + std::unordered_map token = main_generation->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { continue_generation = !streamer_ptr->put(gen_token); @@ -257,6 +259,52 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< break; } } + + // todo: remove + streaming_timer.end(); + streaming_duraton += streaming_timer.get_duration(); + } + }; + + // to store potential exception thrown in step_thread + std::exception_ptr step_outputs_error = nullptr; + while (continue_generation) { + // todo: remove + ManualTimer thread_timer("threading"); + thread_timer.start(); + + // to define inference thread + std::thread t_step([this, &step_outputs_error] { + try { + step(); + } catch (...) { + // remove all requests from pipeline state in case of exception + drop_requests(); + step_outputs_error = std::current_exception(); + } + }); + + // to define streaming thread + std::thread t_stream([&stream_generated_tokens] { + stream_generated_tokens(); + }); + + // todo: remove + thread_timer.end(); + thread_duration += thread_timer.get_duration(); + + t_stream.join(); + t_step.join(); + + // throw exception in case of inference error + if (step_outputs_error) { + throw; + } + + // stream last generated tokens + if (!has_non_finished_requests() && continue_generation) { + stream_generated_tokens(); + break; } } @@ -301,6 +349,12 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< } OPENVINO_ASSERT(results.size() == input_ids.size()); + generate_timer.end(); + + // todo: remove + std::cout << std::endl << "STREAMING DURATION: " << streaming_duraton << std::endl; + std::cout << "GENERATION DURATION: " << generate_timer.get_duration() << std::endl; + std::cout << "THREAD CREATION DURATION: " << thread_duration << std::endl; return results; }