Skip to content

Commit

Permalink
Respect the maximum transfer size when building results
Browse files Browse the repository at this point in the history
With this commit, we now add the student programs result if it does not
exceed 1 MB, then use the rest of the space for our own and then the
student programs log with gzip compression.

Closes: #148 #141
  • Loading branch information
Florian Guggi committed Oct 25, 2024
1 parent 327b143 commit c4a5edf
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 157 deletions.
132 changes: 2 additions & 130 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ strum = { version = "0.26.3", features = ["derive"] }
subprocess = "0.2.9"
thiserror = "1.0.63"
toml = "0.8.19"
zopfli = "0.8.1"

[features]
mock = []
Expand Down
63 changes: 39 additions & 24 deletions scheduler/src/command/execute_program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ use crate::{
command::{
check_length, terminate_student_program, Event, ProgramStatus, ResultId, RetryEvent,
},
communication::{CEPPacket, CommunicationHandle},
communication::{self, CEPPacket, CommunicationHandle},
};
use anyhow::anyhow;
use simple_archive::Compression;
use std::{
io::{ErrorKind, Write},
io::ErrorKind,
path::{Path, PathBuf},
time::Duration,
};
use subprocess::Popen;

const MAXIMUM_FILE_SIZE: usize = 1_000_000;
use zopfli::Options;

/// Executes a students program and starts a watchdog for it. The watchdog also creates entries in the
/// status and result queue found in `context`. The result, including logs, is packed into
Expand Down Expand Up @@ -141,21 +139,30 @@ fn run_until_timeout(
Err(())
}

/// The function uses `tar` to create an uncompressed archive that includes the result file specified, as well as
/// the programs stdout/stderr and the schedulers log file. If any of the files is missing, the archive
/// is created without them.
const RESULT_SIZE_LIMIT: usize = 1_000_000;

fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> {
let out_path = PathBuf::from(&format!("./data/{res}"));
let mut archive = simple_archive::Writer::new(std::fs::File::create(out_path)?);
let mut archive = simple_archive::Writer::new(Vec::new());

let res_path =
PathBuf::from(format!("./archives/{}/results/{}", res.program_id, res.timestamp));
let student_log_path = PathBuf::from(format!("./data/{res}.log"));
let log_path = PathBuf::from("./log");

add_to_archive_if_exists(&mut archive, &res.to_string(), &res_path, Compression::None)?;
add_to_archive_if_exists(&mut archive, "student_log", &student_log_path, Compression::Zopfli)?;
add_to_archive_if_exists(&mut archive, "log", &log_path, Compression::Zopfli)?;
if let Some(d) = open_if_exists(&res_path)?.filter(|d| d.len() <= RESULT_SIZE_LIMIT) {
archive.append_data(&res.to_string(), &d)?;
}

if let Some(d) = open_if_exists(&log_path)? {
compress_into_archive_if_it_fits(&mut archive, "log", &d)?;
}

if let Some(d) = open_if_exists(&student_log_path)? {
compress_into_archive_if_it_fits(&mut archive, "student_log", &d)?;
}

std::fs::write(out_path, archive.into_inner())?;

let _ = std::fs::remove_file(res_path);
let _ = std::fs::remove_file(student_log_path);
Expand All @@ -164,19 +171,27 @@ fn build_result_archive(res: ResultId) -> Result<(), std::io::Error> {
Ok(())
}

fn add_to_archive_if_exists<T: Write>(
archive: &mut simple_archive::Writer<T>,
name: &str,
path: impl AsRef<Path>,
compression: simple_archive::Compression,
) -> std::io::Result<()> {
fn open_if_exists(path: impl AsRef<Path>) -> std::io::Result<Option<Vec<u8>>> {
match std::fs::read(path) {
Ok(mut data) => {
data.truncate(MAXIMUM_FILE_SIZE);
archive.append_data(name, &data, compression)?;
Ok(())
}
Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(()),
Ok(d) => Ok(Some(d)),
Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(e),
}
}

fn compress_into_archive_if_it_fits(
archive: &mut simple_archive::Writer<Vec<u8>>,
path: &str,
data: &[u8],
) -> std::io::Result<()> {
let mut compressed = Vec::new();
zopfli::compress(Options::default(), zopfli::Format::Gzip, data, &mut compressed)?;

if compressed.len()
<= communication::MAXIMUM_DATA_LENGTH - archive.inner().len() - path.len() - 5
{
archive.append_data(path, &compressed)?;
}

Ok(())
}
7 changes: 4 additions & 3 deletions scheduler/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use std::{

pub type ComResult<T> = Result<T, CommunicationError>;

pub const MAXIMUM_MULTI_PACKETS: usize = 100;
pub const MAXIMUM_DATA_LENGTH: usize = MAXIMUM_MULTI_PACKETS * CEPPacket::MAXIMUM_DATA_LENGTH;

pub trait CommunicationHandle: Read + Write {
const INTEGRITY_ACK_TIMEOUT: Duration;
const UNLIMITED_TIMEOUT: Duration;

const DATA_PACKET_RETRIES: usize = 4;
const MAXIMUM_MULTI_PACKETS: usize = 100;
const MAXIMUM_DATA_LENGTH: usize = Self::MAXIMUM_MULTI_PACKETS * CEPPacket::MAXIMUM_DATA_LENGTH;

fn set_timeout(&mut self, timeout: Duration);

Expand Down Expand Up @@ -45,7 +46,7 @@ pub trait CommunicationHandle: Read + Write {
}

fn send_multi_packet(&mut self, bytes: &[u8]) -> ComResult<()> {
if bytes.len() > Self::MAXIMUM_DATA_LENGTH {
if bytes.len() > MAXIMUM_DATA_LENGTH {
return Err(CommunicationError::TooManyBytes);
}

Expand Down

0 comments on commit c4a5edf

Please sign in to comment.