Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: pre-check TiKV disk space before download (#17238) (#17569) #401

Open
wants to merge 2 commits into
base: raftstore-proxy-7.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion components/error_code/src/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,11 @@ define_error_codes!(
TTL_LEN_NOT_EQUALS_TO_PAIRS => ("TtlLenNotEqualsToPairs", "", ""),
INCOMPATIBLE_API_VERSION => ("IncompatibleApiVersion", "", ""),
INVALID_KEY_MODE => ("InvalidKeyMode", "", ""),
RESOURCE_NOT_ENOUTH => ("ResourceNotEnough", "", "")
RESOURCE_NOT_ENOUTH => ("ResourceNotEnough", "", ""),
SUSPENDED => ("Suspended",
"this request has been suspended.",
"Probably there are some export tools don't support exporting data inserted by `ingest`(say, snapshot backup). Check the user manual and stop them."),
REQUEST_TOO_NEW => ("RequestTooNew", "", ""),
REQUEST_TOO_OLD => ("RequestTooOld", "", ""),
DISK_SPACE_NOT_ENOUGH => ("DiskSpaceNotEnough", "", "")
);
10 changes: 10 additions & 0 deletions components/sst_importer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ pub enum Error {

#[error("resource is not enough {0}")]
ResourceNotEnough(String),

#[error("imports are suspended for {time_to_lease_expire:?}")]
Suspended { time_to_lease_expire: Duration },

#[error("TiKV disk space is not enough.")]
DiskSpaceNotEnough,
}

impl Error {
Expand Down Expand Up @@ -197,6 +203,10 @@ impl ErrorCodeExt for Error {
Error::IncompatibleApiVersion => error_code::sst_importer::INCOMPATIBLE_API_VERSION,
Error::InvalidKeyMode { .. } => error_code::sst_importer::INVALID_KEY_MODE,
Error::ResourceNotEnough(_) => error_code::sst_importer::RESOURCE_NOT_ENOUTH,
Error::Suspended { .. } => error_code::sst_importer::SUSPENDED,
Error::RequestTooNew(_) => error_code::sst_importer::REQUEST_TOO_NEW,
Error::RequestTooOld(_) => error_code::sst_importer::REQUEST_TOO_OLD,
Error::DiskSpaceNotEnough => error_code::sst_importer::DISK_SPACE_NOT_ENOUGH,
}
}
}
16 changes: 14 additions & 2 deletions src/import/sst_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ use tikv_kv::{
};
use tikv_util::{
config::ReadableSize,
future::create_stream_with_buffer,
sys::thread::ThreadBuildWrapper,
future::{create_stream_with_buffer, paired_future_callback},
sys::{
disk::{get_disk_status, DiskUsage},
thread::ThreadBuildWrapper,
},
time::{Instant, Limiter},
HandyRwLock,
};
Expand Down Expand Up @@ -883,6 +886,10 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
.observe(start.saturating_elapsed().as_secs_f64());

let mut resp = ApplyResponse::default();
if get_disk_status(0) != DiskUsage::Normal {
resp.set_error(Error::DiskSpaceNotEnough.into());
return crate::send_rpc_response!(Ok(resp), sink, label, start);
}

match Self::apply_imp(req, importer, applier, limiter, max_raft_size).await {
Ok(Some(r)) => resp.set_range(r),
Expand Down Expand Up @@ -924,6 +931,11 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
sst_importer::metrics::IMPORTER_DOWNLOAD_DURATION
.with_label_values(&["queue"])
.observe(start.saturating_elapsed().as_secs_f64());
if get_disk_status(0) != DiskUsage::Normal {
let mut resp = DownloadResponse::default();
resp.set_error(Error::DiskSpaceNotEnough.into());
return crate::send_rpc_response!(Ok(resp), sink, label, timer);
}

// FIXME: download() should be an async fn, to allow BR to cancel
// a download task.
Expand Down
43 changes: 40 additions & 3 deletions tests/failpoints/cases/test_import_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use std::{

use file_system::calc_crc32;
use futures::{executor::block_on, stream, SinkExt};
use grpcio::{Result, WriteFlags};
use kvproto::import_sstpb::*;
use grpcio::{ChannelBuilder, Environment, Result, WriteFlags};
use kvproto::{disk_usage::DiskUsage, import_sstpb::*, tikvpb_grpc::TikvClient};
use tempfile::{Builder, TempDir};
use test_raftstore::Simulator;
use test_sst_importer::*;
use tikv::config::TikvConfig;
use tikv_util::{config::ReadableSize, HandyRwLock};
use tikv_util::{config::ReadableSize, sys::disk, HandyRwLock};

#[allow(dead_code)]
#[path = "../../integrations/import/util.rs"]
Expand Down Expand Up @@ -90,6 +90,43 @@ fn upload_sst(import: &ImportSstClient, meta: &SstMeta, data: &[u8]) -> Result<U
})
}

#[test]
fn test_download_to_full_disk() {
let (_cluster, ctx, _tikv, import) = new_cluster_and_tikv_import_client();
let temp_dir = Builder::new()
.prefix("test_download_sst_blocking_sst_writer")
.tempdir()
.unwrap();

let sst_path = temp_dir.path().join("test.sst");
let sst_range = (0, 100);
let (mut meta, _) = gen_sst_file(sst_path, sst_range);
meta.set_region_id(ctx.get_region_id());
meta.set_region_epoch(ctx.get_region_epoch().clone());

// Now perform a proper download.
let mut download = DownloadRequest::default();
download.set_sst(meta.clone());
download.set_storage_backend(external_storage_export::make_local_backend(temp_dir.path()));
download.set_name("test.sst".to_owned());
download.mut_sst().mut_range().set_start(vec![sst_range.1]);
download
.mut_sst()
.mut_range()
.set_end(vec![sst_range.1 + 1]);
download.mut_sst().mut_range().set_start(Vec::new());
download.mut_sst().mut_range().set_end(Vec::new());
disk::set_disk_status(DiskUsage::AlmostFull);
let result = import.download(&download).unwrap();
assert!(!result.get_is_empty());
assert!(result.has_error());
assert_eq!(
result.get_error().get_message(),
"TiKV disk space is not enough."
);
disk::set_disk_status(DiskUsage::Normal);
}

#[test]
fn test_ingest_reentrant() {
let (cluster, ctx, _tikv, import) = new_cluster_and_tikv_import_client();
Expand Down
101 changes: 101 additions & 0 deletions tests/integrations/import/test_apply_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use engine_traits::CF_DEFAULT;
use external_storage_export::LocalStorage;
use kvproto::import_sstpb::ApplyRequest;
use tempfile::TempDir;
use tikv_util::sys::disk::{self, DiskUsage};

use crate::import::util;

#[test]
fn test_basic_apply() {
let (_cluster, ctx, tikv, import) = util::new_cluster_and_tikv_import_client();
let tmp = TempDir::new().unwrap();
let storage = LocalStorage::new(tmp.path()).unwrap();
let default = [
(b"k1", b"v1", 1),
(b"k2", b"v2", 2),
(b"k3", b"v3", 3),
(b"k4", b"v4", 4),
];
let default_rewritten = [(b"r1", b"v1", 1), (b"r2", b"v2", 2), (b"r3", b"v3", 3)];
let mut sst_meta = util::make_plain_file(&storage, "file1.log", default.into_iter());
util::register_range_for(&mut sst_meta, b"k1", b"k3a");
let mut req = ApplyRequest::new();
req.set_context(ctx.clone());
req.set_rewrite_rules(vec![util::rewrite_for(&mut sst_meta, b"k", b"r")].into());
req.set_metas(vec![sst_meta].into());
req.set_storage_backend(util::local_storage(&tmp));
import.apply(&req).unwrap();
util::check_applied_kvs_cf(&tikv, &ctx, CF_DEFAULT, default_rewritten.into_iter());
}

#[test]
fn test_apply_full_disk() {
let (_cluster, ctx, _tikv, import) = util::new_cluster_and_tikv_import_client();
let tmp = TempDir::new().unwrap();
let storage = LocalStorage::new(tmp.path()).unwrap();
let default = [
(b"k1", b"v1", 1),
(b"k2", b"v2", 2),
(b"k3", b"v3", 3),
(b"k4", b"v4", 4),
];
let mut sst_meta = util::make_plain_file(&storage, "file1.log", default.into_iter());
util::register_range_for(&mut sst_meta, b"k1", b"k3a");
let mut req = ApplyRequest::new();
req.set_context(ctx);
req.set_rewrite_rules(vec![util::rewrite_for(&mut sst_meta, b"k", b"r")].into());
req.set_metas(vec![sst_meta].into());
req.set_storage_backend(util::local_storage(&tmp));
disk::set_disk_status(DiskUsage::AlmostFull);
let result = import.apply(&req).unwrap();
assert!(result.has_error());
assert_eq!(
result.get_error().get_message(),
"TiKV disk space is not enough."
);
disk::set_disk_status(DiskUsage::Normal);
}

#[test]
fn test_apply_twice() {
let (_cluster, ctx, tikv, import) = util::new_cluster_and_tikv_import_client();
let tmp = TempDir::new().unwrap();
let storage = LocalStorage::new(tmp.path()).unwrap();
let default = [(
b"k1",
b"In this case, we are going to test write twice, but with different rewrite rule.",
1,
)];
let default_fst = [(
b"r1",
b"In this case, we are going to test write twice, but with different rewrite rule.",
1,
)];
let default_snd = [(
b"z1",
b"In this case, we are going to test write twice, but with different rewrite rule.",
1,
)];

let mut sst_meta = util::make_plain_file(&storage, "file2.log", default.into_iter());
util::register_range_for(&mut sst_meta, b"k1", b"k1a");
let mut req = ApplyRequest::new();
req.set_context(ctx.clone());
req.set_rewrite_rules(vec![util::rewrite_for(&mut sst_meta, b"k", b"r")].into());
req.set_metas(vec![sst_meta.clone()].into());
req.set_storage_backend(util::local_storage(&tmp));
import.apply(&req).unwrap();
util::check_applied_kvs_cf(&tikv, &ctx, CF_DEFAULT, default_fst.into_iter());

util::register_range_for(&mut sst_meta, b"k1", b"k1a");
req.set_rewrite_rules(vec![util::rewrite_for(&mut sst_meta, b"k", b"z")].into());
req.set_metas(vec![sst_meta].into());
import.apply(&req).unwrap();
util::check_applied_kvs_cf(
&tikv,
&ctx,
CF_DEFAULT,
default_fst.into_iter().chain(default_snd.into_iter()),
);
}
Loading