Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CB] Split token streaming and generation to different threads for all CB based pipelines #1544

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions samples/cpp/text_generation/speculative_decoding_lm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
#include "openvino/genai/llm_pipeline.hpp"

int main(int argc, char* argv[]) try {
if (4 != argc) {
throw std::runtime_error(std::string{"Usage: "} + argv[0] + " <MODEL_DIR> <DRAFT_MODEL_DIR> '<PROMPT>'");
if (5 != argc) {
throw std::runtime_error(std::string{"Usage: "} + argv[0] + " <MODEL_DIR> <DRAFT_MODEL_DIR> <TYPE> '<PROMPT>'");
}

ov::genai::GenerationConfig config;
config.max_new_tokens = 100;
config.max_new_tokens = 500;
// Speculative decoding generation parameters like `num_assistant_tokens` and `assistant_confidence_threshold` are mutually excluded
// add parameter to enable speculative decoding to generate `num_assistant_tokens` candidates by draft_model per iteration
config.num_assistant_tokens = 5;
Expand All @@ -20,25 +20,55 @@ int main(int argc, char* argv[]) try {

std::string main_model_path = argv[1];
std::string draft_model_path = argv[2];
std::string prompt = argv[3];
std::string type = argv[3];
std::string prompt = argv[4];

// User can run main and draft model on different devices.
// Please, set device for main model in `LLMPipeline` constructor and in in `ov::genai::draft_model` for draft.
std::string main_device = "CPU", draft_device = "CPU";

ov::genai::LLMPipeline pipe(
main_model_path,
main_device,
ov::genai::draft_model(draft_model_path, draft_device));

auto streamer = [](std::string subword) {
std::cout << subword << std::flush;
return false;
};

// Since the streamer is set, the results will
// be printed each time a new token is generated.
pipe.generate(prompt, config, streamer);
if (type == "cb") {
std::cout << "CB" << std::endl;

ov::genai::LLMPipeline pipe(
main_model_path,
main_device,
ov::genai::scheduler_config(ov::genai::SchedulerConfig())
// ov::genai::draft_model(draft_model_path, draft_device)
);

// Since the streamer is set, the results will
// be printed each time a new token is generated.
pipe.generate(prompt, config, streamer);
} else if (type == "sd") {
std::cout << "SD" << std::endl;
ov::genai::LLMPipeline pipe(
main_model_path,
main_device,
ov::genai::draft_model(draft_model_path, draft_device)
);

// Since the streamer is set, the results will
// be printed each time a new token is generated.
pipe.generate(prompt, config, streamer);
} else {
config.max_ngram_size = 3;
std::cout << "PL" << std::endl;
ov::genai::LLMPipeline pipe(
main_model_path,
main_device,
ov::genai::prompt_lookup(true)
);

// Since the streamer is set, the results will
// be printed each time a new token is generated.
pipe.generate(prompt, config, streamer);
}
} catch (const std::exception& error) {
try {
std::cerr << error.what() << '\n';
Expand Down
87 changes: 70 additions & 17 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) 2023-2024 Intel Corporation
// SPDX-License-Identifier: Apache-2.0

#include <thread>

#include "text_callback_streamer.hpp"
#include "continuous_batching_impl.hpp"
#include "utils.hpp"
Expand Down Expand Up @@ -261,6 +263,10 @@ std::vector<EncodedGenerationResult>
ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<ov::Tensor>& input_ids,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) {
// todo: remove
ManualTimer generate_timer("generate()");
generate_timer.start();

OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request");
OPENVINO_ASSERT(input_ids.size() == sampling_params.size());

Expand Down Expand Up @@ -300,25 +306,16 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
auto all_requests = m_awaiting_requests; // we need to store all requests to get results from them once generation has finished

bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
try {
const auto infer_start = std::chrono::steady_clock::now();
step();
if (m_batch_size > 0) {
const auto infer_end = std::chrono::steady_clock::now();
const auto infer_ms = PerfMetrics::get_microsec(std::chrono::steady_clock::now() - infer_start);
raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms);
raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms);
raw_perf_counters.m_new_token_times.emplace_back(infer_end);
raw_perf_counters.m_batch_sizes.emplace_back(m_batch_size);
}
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
throw;
}
GenerationHandle& generation = generations.at(0);
// todo: remove
float streaming_duraton = 0, thread_duration = 0;

GenerationHandle & generation = generations.at(0);
auto stream_generated_tokens = [&generation, &streamer_ptr, &continue_generation, &streaming_duraton]() {
if (streamer_ptr && generation->can_read()) {
// todo: remove
ManualTimer streaming_timer("streaming");
streaming_timer.start();

std::unordered_map<uint64_t, GenerationOutput> token = generation->back();
for (const auto& gen_token : token.begin()->second.generated_ids) {
continue_generation = !streamer_ptr->put(gen_token);
Expand All @@ -327,6 +324,55 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
break;
}
}

// todo: remove
streaming_timer.end();
streaming_duraton += streaming_timer.get_duration();
}
};

std::exception_ptr step_outputs_error;
while (continue_generation) {
// todo: remove
ManualTimer thread_timer("streaming");
thread_timer.start();

std::thread t_step([this, &raw_perf_counters, &step_outputs_error] {
try {
const auto infer_start = std::chrono::steady_clock::now();
step();
if (m_batch_size > 0) {
const auto infer_end = std::chrono::steady_clock::now();
const auto infer_ms = PerfMetrics::get_microsec(std::chrono::steady_clock::now() - infer_start);
raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms);
raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms);
raw_perf_counters.m_new_token_times.emplace_back(infer_end);
raw_perf_counters.m_batch_sizes.emplace_back(m_batch_size);
}
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
step_outputs_error = std::current_exception();
}
});

std::thread t_stream([&stream_generated_tokens] {
stream_generated_tokens();
});

// todo: remove
thread_timer.end();
thread_duration += thread_timer.get_duration();

t_stream.join();
t_step.join();

if (step_outputs_error) {
throw;
}

if (!has_non_finished_requests() && continue_generation) {
stream_generated_tokens();
break;
}
}

Expand Down Expand Up @@ -378,6 +424,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}

OPENVINO_ASSERT(results.size() == input_ids.size());

// todo: remove
generate_timer.end();
std::cout << std::endl << "STREAMING DURATION: " << streaming_duraton << std::endl;
std::cout << "GENERATION DURATION: " << generate_timer.get_duration() << std::endl;
std::cout << "THREAD CREATION DURATION: " << thread_duration << std::endl;

return results;
}

Expand Down
86 changes: 69 additions & 17 deletions src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) 2023-2024 Intel Corporation
// SPDX-License-Identifier: Apache-2.0

#include <thread>

#include "prompt_lookup_impl.hpp"
#include "text_callback_streamer.hpp"

Expand Down Expand Up @@ -109,30 +111,75 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten
}
auto all_requests = m_pipeline->get_awaiting_requests();

// todo: shouls be removed
float streaming_duration = 0, thread_duration = 0;
ManualTimer streaming_timer("gen");
streaming_timer.start();

bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
try {
step();
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
throw;
}
if (streamer_ptr) {
// not generated tokens like several prompt phase
auto& generation = generations.at(0);
if (!generation->can_read()) {
continue;
}
std::unordered_map<uint64_t, GenerationOutput> token = generation->back();
OPENVINO_ASSERT(1 <= token.size());
OPENVINO_ASSERT(1 <= token.begin()->second.generated_ids.size());
auto& main_generation = generations.at(0);
// define lamdba to stream generated tokens
auto stream_generated_tokens = [&main_generation, &streamer_ptr, &continue_generation, &streaming_duration]() {
if (streamer_ptr && main_generation->can_read()) {
// todo: remove
ManualTimer streaming_timer("streaming");
streaming_timer.start();

std::unordered_map<uint64_t, GenerationOutput> token = main_generation->back();
for (const auto& gen_token : token.begin()->second.generated_ids) {
continue_generation = !streamer_ptr->put(gen_token);
if (!continue_generation) {
generation->drop();
main_generation->drop();
break;
}
}

// todo: remove
streaming_timer.end();
streaming_duration += streaming_timer.get_duration();
}
};

// to store potential exception thrown in step_thread
std::exception_ptr step_outputs_error = nullptr;

while (continue_generation) {
// todo: remove
ManualTimer thread_timer("threading");
thread_timer.start();

// to define inference thread
std::thread t_step([this, &step_outputs_error] {
try {
step();
} catch (...) {
// remove all requests from pipeline state in case of exception
drop_requests();
step_outputs_error = std::current_exception();
}
});

// to define streaming thread
std::thread t_stream([&stream_generated_tokens] {
stream_generated_tokens();
});

// todo: remove
thread_timer.end();
thread_duration += thread_timer.get_duration();

t_stream.join();
t_step.join();

// throw exception in case of inference error
if (step_outputs_error) {
throw;
}

// stream last generated tokens
if (!has_non_finished_requests() && continue_generation) {
stream_generated_tokens();
break;
}
}

Expand Down Expand Up @@ -177,6 +224,11 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector<ov::Ten
}

OPENVINO_ASSERT(results.size() == input_ids.size());
generate_timer.end();
// todo: remove
std::cout << std::endl << "STREAMING DURATION: " << streaming_duration << std::endl;
std::cout << "GENERATION DURATION: " << generate_timer.get_duration() << std::endl;
std::cout << "THREAD CREATION DURATION: " << thread_duration << std::endl;
return results;
}

Expand Down
Loading
Loading