Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In ImageNetApp, leave JPEGs compressed so the full dataset fits in memory more easily #71

Open
robertnishihara opened this issue Feb 17, 2016 · 9 comments

Comments

@robertnishihara
Copy link
Member

See discussion in #63.

@robertnishihara
Copy link
Member Author

Decompressing the JPEGs may be slow (we should benchmark this). We can potentially get around this by beginning to decompress the next minibatch of JPEGs while we are calling ForwardBackward on the current minibatch of JPEGs. If decompressing and ForwardBackward take around the same time, this buys us a 2x speedup at the cost of making the code a more complex.

Another possibility is to change the degree to which we compress the JPEGs so that decompressing is faster.

@robertnishihara
Copy link
Member Author

The main change will be to modify ImageNetPreprocessor in SparkNet/src/main/scala/libs/Preprocessor.scala to take a JPEG and decompress it. This will be similar to the way we do the other preprocessing there (like subtracting the mean image and cropping the image).

@rahulbhalerao001
Copy link
Contributor

Hello Robert,

I wanted to confirm my understanding of the App and ask two questions about the proposed solution. I really appreciate your support until now, and want to thank you for your prompt responses.
As I understand, the following are high level activities that occur in the ImageNet App. Please correct me if I am wrong:

  1. Preprocessing
          a. Master directs workers to download tars from S3
          b. Images untarred
          c. Data frame created and partitioned among all workers
          d. Mean Image Computed
          e. Caffe models initialized on all workers, and conversion to NDArray and subtraction from meanimage.

2.Training
Repeat
      a. Receive updated parameters from master.
      b. Choose valid random index r and batch of indexes from r to r+256 from trainPartition.
      c. Compute gradient based using this batch and update the parameters.
      d. Send updated parameters back to master.

Questions:

  1. Gzip of JPEG images does not reduce its size. I picked up the first tar(train.00000.tar) from the Imagenet training set and got the following findings
          a. Size of tar - 18 MB
          b. Size after zipping - 17 MB
          c. Size of raw images - 20 MB
    As can be seen, compressing does not reduce the size of the size of the images. So I feel that if uncompressed JPEGs are not fitting into memory, the compressed one might also not fit, and so believe that this solution might not solve this problem.

2.The total size of all the unprocessed training images is 138 GB. I was using a 5 worker g2.8xlarge cluster where each worker has a memory of 60 GB. Out of that 30 GB can be easily allocated to Spark and so a total of 150 GB can be made available to the RDDs. Given this, I feel that since the unprocessed size is 138 GB, and resizing reduces its size further, the RDDs should not have spilled to disk. Do you feel there might be a problem in some other area of the system/App. To recall, the App execution was happening smoothly over 1/10th of the data but failed when using entire data as per #63.

Thanks,
Rahul

@robertnishihara
Copy link
Member Author

That looks mostly correct. The "decompression" that I'm referring to is not "tarred -> untarred" but rather "JPEG -> Array[Byte]", where the values in the Array[Byte] are the pixel values in the image.*** That decompression step increases the size of the data a bunch I think. Concretely, this decompression step happens in SparkNet/src/main/scala/loaders/ImageNetLoader.scala in the line

val decompressedResizedImage = ScaleAndConvert.decompressImageAndResize(content, height, width)

Looking at decompressImageAndResize in SparkNet/src/main/scala/preprocessing/ScaleAndConvert.scala, the decompression step happens concretely in the line

val resizedImage = Thumbnails.of(im).forceSize(width, height).asBufferedImage()

Some points:

  • 1b) In this step, the images are untarred and converted from JPEG -> Array[Byte] (this is a bit confusing because the JPEGs are also stored as Array[Byte], but with a different interpretation.
  • 1e) The code in the Preprocessor class does indeed convert to NDArray, subtracts the mean image, and randomly crops the image. However, this code doesn't get run as a preprocessing step. We simply construct the Preprocessor. But we don't actually call it until the minibatch gets fed into the network (for example because we may want to crop the images differently on every pass through the dataset). It's a bit convoluted, but the convert method in Preprocessor gets called in SparkNet/src/main/scala/libs/CaffeNet.scala in the transformInto method, which gets called in forward and forwardBackward. I'm proposing taking the step that maps JPEG -> Array[Byte] and putting it in the convert method in Preprocessor. That way, the images are stored as JPEGs in the DataFrame (only 138GB, so it fits in memory). Then images only get decompressed right before they are fed into the network. Of course, the downside of this is that it could make each training iteration take longer (because we have to decompress the JPEG).
  • 2c) We perform several gradient updates, not just one (the number we perform is syncInterval).

*** Since the images are RGB 256x256, each image is 3x256x256 bytes, which is almost 200KB, therefore the full dataset (200KB x 1.2 million images) is about 235GB. Seems like this should still fit in memory... did I do that calculation correctly?

@rahulbhalerao001
Copy link
Contributor

Thank you for the detailed and thorough explanation.
I plan to do some measurements in next 2-3 days and support your calculation with the measurement data. I am interested in looking at what is the data size after the preprocessing and how much spills over to disk.
I wanted to know that if we are pre-shuffling the data, then can we simply pick up the batches sequentially at train time i.e. if we have a good pre-shuffle then we can choose images with index numbers of 0-255, 256-511 ... as our batches. That way we will read the RDD partition sequentially, and even if it spills to disk, we will have page in and page out once for say 30GB of data. This basically will ensure our working set is in memory. Such an predictable workload, might even allow an intelligent RDD swap algorithm to do this swapping proactively in background, further reducing the penalty.

@robertnishihara
Copy link
Member Author

You're absolutely right that in principle, spilling to disk shouldn't be a problem at all. After all, Caffe reads data from disk. Deep learning is computationally expensive enough that there is enough time to prefetch the next minibatch while you're computing on your current minibatch. And since deep learning cycles predictably through the data, you know exactly what data you need next.

The same is true with decompression. We should be preemptively decompress the next minibatch while we are computing on the first minibatch. I tried something similar a while ago using scala's Future feature, and it helped a bit, but Spark was still too slow when reading RDDs from disk.

@rahulbhalerao001
Copy link
Contributor

Thank you for again for your continued help and support.
With this clarifications I will independently try out some ideas, and if I get good results, will propose a code change.
For purpose of documentation, I will want to expand on two points that you mentioned.

1. After all, Caffe reads data from disk
This is true for standalone caffe, but in this integration data is fed from Sparknet in memory. eg.
val tops = caffeNet.Forward(inputs)
and there are no intermediate disk writes in the calls from SparkNet to Caffe.

2. but Spark was still too slow when reading RDDs from disk
This can arise when the compressed data itself does not fit in memory, but if it fits, then we eliminate disk access from critical path.

@robertnishihara
Copy link
Member Author

Thanks for clarifying! Yes, I was referring to standalone Caffe.

@michaelklachko
Copy link

michaelklachko commented Jan 7, 2017

Has anyone benchmarked decompressing JPEGs? With multiple fast video cards we might need to prepare several mini-batches at once while a single mini-batch is being forwarded through the network (on each card), can this become a bottleneck? I guess if each GPU has a dedicated CPU core servicing it, and multiple cores work in parallel, it shouldn't be much worse, is that the case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants