diff --git a/components/camera/camera.go b/components/camera/camera.go index 71f9b4e25fc..2ac36c21d1e 100644 --- a/components/camera/camera.go +++ b/components/camera/camera.go @@ -78,12 +78,6 @@ type ImageMetadata struct { } // A Camera is a resource that can capture frames. -type Camera interface { - resource.Resource - VideoSource -} - -// VideoSource represents anything that can capture frames. // For more information, see the [camera component docs]. // // Image example: @@ -102,17 +96,6 @@ type Camera interface { // // images, metadata, err := myCamera.Images(context.Background()) // -// Stream example: -// -// myCamera, err := camera.FromRobot(machine, "my_camera") -// -// // gets the stream from a camera -// stream, err := myCamera.Stream(context.Background()) -// -// // gets an image from the camera stream -// img, release, err := stream.Next(context.Background()) -// defer release() -// // NextPointCloud example: // // myCamera, err := camera.FromRobot(machine, "my_camera") @@ -127,7 +110,8 @@ type Camera interface { // err = myCamera.Close(context.Background()) // // [camera component docs]: https://docs.viam.com/components/camera/ -type VideoSource interface { +type Camera interface { + resource.Resource // Image returns a byte slice representing an image that tries to adhere to the MIME type hint. // Image also may return metadata about the frame. Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) @@ -136,10 +120,6 @@ type VideoSource interface { // along with associated metadata (just timestamp for now). It's not for getting a time series of images from the same imager. Images(ctx context.Context) ([]NamedImage, resource.ResponseMetadata, error) - // Stream returns a stream that makes a best effort to return consecutive images - // that may have a MIME type hint dictated in the context via gostream.WithMIMETypeHint. - Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) - // NextPointCloud returns the next immediately available point cloud, not necessarily one // a part of a sequence. In the future, there could be streaming of point clouds. NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) @@ -147,9 +127,13 @@ type VideoSource interface { // Properties returns properties that are intrinsic to the particular // implementation of a camera. Properties(ctx context.Context) (Properties, error) +} - // Close shuts down the resource and prevents further use. - Close(ctx context.Context) error +// StreamCamera is a camera that has `Stream` embedded to directly integrate with gostream. +// Note that generally, when writing camera components from scratch, embedding `Stream` is an anti-pattern. +type StreamCamera interface { + Camera + Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) } // ReadImage reads an image from the given source that is immediately available. diff --git a/components/camera/client_test.go b/components/camera/client_test.go index e1a827d5339..b8d458620b9 100644 --- a/components/camera/client_test.go +++ b/components/camera/client_test.go @@ -178,7 +178,6 @@ func TestClient(t *testing.T) { test.That(t, err, test.ShouldBeNil) camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) test.That(t, err, test.ShouldBeNil) - frame, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeRawRGBA, nil, camera1Client) test.That(t, err, test.ShouldBeNil) compVal, _, err := rimage.CompareImages(img, frame) @@ -245,10 +244,6 @@ func TestClient(t *testing.T) { client2, err := resourceAPI.RPCClient(context.Background(), conn, "", camera.Named(failCameraName), logger) test.That(t, err, test.ShouldBeNil) - _, _, err = camera.ReadImage(context.Background(), client2) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - _, _, err = client2.Image(context.Background(), "", nil) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) @@ -547,78 +542,6 @@ func TestClientWithInterceptor(t *testing.T) { test.That(t, conn.Close(), test.ShouldBeNil) } -func TestClientStreamAfterClose(t *testing.T) { - // Set up gRPC server - logger := logging.NewTestLogger(t) - listener, err := net.Listen("tcp", "localhost:0") - test.That(t, err, test.ShouldBeNil) - rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated()) - test.That(t, err, test.ShouldBeNil) - - // Set up camera that can stream images - img := image.NewNRGBA(image.Rect(0, 0, 4, 4)) - injectCamera := &inject.Camera{} - injectCamera.PropertiesFunc = func(ctx context.Context) (camera.Properties, error) { - return camera.Properties{}, nil - } - injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { - imgBytes, err := rimage.EncodeImage(ctx, img, mimeType) - test.That(t, err, test.ShouldBeNil) - return imgBytes, camera.ImageMetadata{MimeType: mimeType}, nil - } - - // Register CameraService API in our gRPC server. - resources := map[resource.Name]camera.Camera{ - camera.Named(testCameraName): injectCamera, - } - cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources) - test.That(t, err, test.ShouldBeNil) - resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API) - test.That(t, err, test.ShouldBeNil) - test.That(t, ok, test.ShouldBeTrue) - test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil) - - // Start serving requests. - go rpcServer.Serve(listener) - defer rpcServer.Stop() - - // Make client connection - conn, err := viamgrpc.Dial(context.Background(), listener.Addr().String(), logger) - test.That(t, err, test.ShouldBeNil) - client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) - test.That(t, err, test.ShouldBeNil) - - // Get a stream - stream, err := client.Stream(context.Background()) - test.That(t, stream, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Read from stream - media, _, err := stream.Next(context.Background()) - test.That(t, media, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Close client and read from stream - test.That(t, client.Close(context.Background()), test.ShouldBeNil) - media, _, err = stream.Next(context.Background()) - test.That(t, media, test.ShouldBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "context canceled") - - // Get a new stream - stream, err = client.Stream(context.Background()) - test.That(t, stream, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Read from the new stream - media, _, err = stream.Next(context.Background()) - test.That(t, media, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Close client and connection - test.That(t, client.Close(context.Background()), test.ShouldBeNil) - test.That(t, conn.Close(), test.ShouldBeNil) -} - // See modmanager_test.go for the happy path (aka, when the // client has a webrtc connection). func TestRTPPassthroughWithoutWebRTC(t *testing.T) { diff --git a/components/camera/ffmpeg/ffmpeg.go b/components/camera/ffmpeg/ffmpeg.go index 4dd392b24ec..96c8428f186 100644 --- a/components/camera/ffmpeg/ffmpeg.go +++ b/components/camera/ffmpeg/ffmpeg.go @@ -73,7 +73,6 @@ func init() { } type ffmpegCamera struct { - resource.Named gostream.VideoReader cancelFunc context.CancelFunc activeBackgroundWorkers sync.WaitGroup @@ -92,7 +91,7 @@ func (writer stderrWriter) Write(p []byte) (n int, err error) { } // NewFFMPEGCamera instantiates a new camera which leverages ffmpeg to handle a variety of potential video types. -func NewFFMPEGCamera(ctx context.Context, conf *Config, logger logging.Logger) (camera.VideoSource, error) { +func NewFFMPEGCamera(ctx context.Context, conf *Config, logger logging.Logger) (camera.StreamCamera, error) { // make sure ffmpeg is in the path before doing anything else if _, err := exec.LookPath("ffmpeg"); err != nil { return nil, err diff --git a/components/camera/ffmpeg/ffmpeg_test.go b/components/camera/ffmpeg/ffmpeg_test.go index 4f09511a6f5..fee3f48782d 100644 --- a/components/camera/ffmpeg/ffmpeg_test.go +++ b/components/camera/ffmpeg/ffmpeg_test.go @@ -8,6 +8,7 @@ import ( "go.viam.com/test" "go.viam.com/utils/artifact" + "go.viam.com/rdk/components/camera" "go.viam.com/rdk/logging" "go.viam.com/rdk/utils" ) @@ -20,7 +21,7 @@ func TestFFMPEGCamera(t *testing.T) { test.That(t, err, test.ShouldBeNil) test.That(t, err, test.ShouldBeNil) for i := 0; i < 5; i++ { - _, _, err = cam.Image(ctx, utils.MimeTypeJPEG, nil) + _, err = camera.DecodeImageFromCamera(ctx, utils.MimeTypeJPEG, nil, cam) test.That(t, err, test.ShouldBeNil) } test.That(t, cam.Close(context.Background()), test.ShouldBeNil) diff --git a/components/camera/replaypcd/replaypcd_test.go b/components/camera/replaypcd/replaypcd_test.go index 90b397edc21..39a60203010 100644 --- a/components/camera/replaypcd/replaypcd_test.go +++ b/components/camera/replaypcd/replaypcd_test.go @@ -660,11 +660,6 @@ func TestReplayPCDUnimplementedFunctions(t *testing.T) { replayCamera, _, serverClose, err := createNewReplayPCDCamera(ctx, t, replayCamCfg, true) test.That(t, err, test.ShouldBeNil) - t.Run("Stream", func(t *testing.T) { - _, err := replayCamera.Stream(ctx, nil) - test.That(t, err.Error(), test.ShouldEqual, "Stream is unimplemented") - }) - err = replayCamera.Close(ctx) test.That(t, err, test.ShouldBeNil) diff --git a/components/camera/server.go b/components/camera/server.go index a38e68e9f5a..d14d8127d4e 100644 --- a/components/camera/server.go +++ b/components/camera/server.go @@ -33,7 +33,11 @@ type serviceServer struct { func NewRPCServiceServer(coll resource.APIResourceCollection[Camera]) interface{} { logger := logging.NewLogger("camserver") imgTypes := make(map[string]ImageType) - return &serviceServer{coll: coll, logger: logger, imgTypes: imgTypes} + return &serviceServer{ + coll: coll, + logger: logger, + imgTypes: imgTypes, + } } // GetImage returns an image from a camera of the underlying robot. If a specific MIME type diff --git a/components/camera/transformpipeline/classifier.go b/components/camera/transformpipeline/classifier.go index 807c42b360d..8a410420100 100644 --- a/components/camera/transformpipeline/classifier.go +++ b/components/camera/transformpipeline/classifier.go @@ -27,7 +27,7 @@ type classifierConfig struct { // classifierSource takes an image from the camera, and overlays labels from the classifier. type classifierSource struct { - src camera.VideoSource + src camera.StreamCamera classifierName string maxClassifications uint32 labelFilter classification.Postprocessor @@ -37,8 +37,8 @@ type classifierSource struct { func newClassificationsTransform( ctx context.Context, - source camera.VideoSource, r robot.Robot, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { + source camera.StreamCamera, r robot.Robot, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*classifierConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/classifier_test.go b/components/camera/transformpipeline/classifier_test.go index 7cb212f139a..fd8666e9451 100644 --- a/components/camera/transformpipeline/classifier_test.go +++ b/components/camera/transformpipeline/classifier_test.go @@ -82,7 +82,6 @@ func buildRobotWithClassifier(logger logging.Logger) (robot.Robot, error) { return r, nil } -//nolint:dupl func TestClassifierSource(t *testing.T) { logger := logging.NewTestLogger(t) ctx, cancel := context.WithCancel(context.Background()) @@ -99,7 +98,9 @@ func TestClassifierSource(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer classifier.Close(ctx) - resImg, _, err := camera.ReadImage(ctx, classifier) + streamClassifier, ok := classifier.(camera.StreamCamera) + test.That(t, ok, test.ShouldBeTrue) + resImg, _, err := camera.ReadImage(ctx, streamClassifier) test.That(t, err, test.ShouldBeNil) ovImg := rimage.ConvertImage(resImg) diff --git a/components/camera/transformpipeline/composed.go b/components/camera/transformpipeline/composed.go index 0cb6080e580..fc7c25c7d05 100644 --- a/components/camera/transformpipeline/composed.go +++ b/components/camera/transformpipeline/composed.go @@ -18,28 +18,25 @@ import ( // depthToPretty takes a depth image and turns into a colorful image, with blue being // farther away, and red being closest. Actual depth information is lost in the transform. type depthToPretty struct { - src camera.VideoSource + src camera.StreamCamera cameraModel *transform.PinholeCameraModel } -func propsFromVideoSource(ctx context.Context, source camera.VideoSource) (camera.Properties, error) { +func propsFromVideoSource(ctx context.Context, source camera.Camera) (camera.Properties, error) { var camProps camera.Properties - - if cameraSrc, ok := source.(camera.Camera); ok { - props, err := cameraSrc.Properties(ctx) - if err != nil { - return camProps, err - } - camProps = props + props, err := source.Properties(ctx) + if err != nil { + return camProps, err } + camProps = props return camProps, nil } func newDepthToPrettyTransform( ctx context.Context, - source camera.VideoSource, + source camera.StreamCamera, stream camera.ImageType, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { if stream != camera.DepthStream { return nil, camera.UnspecifiedStream, errors.Errorf("source has stream type %s, depth_to_pretty only supports depth stream inputs", stream) @@ -109,16 +106,16 @@ type overlayConfig struct { // overlaySource overlays the depth and color 2D images in order to debug the alignment of the two images. type overlaySource struct { - src camera.VideoSource + src camera.StreamCamera cameraModel *transform.PinholeCameraModel } func newOverlayTransform( ctx context.Context, - src camera.VideoSource, + src camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*overlayConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/composed_test.go b/components/camera/transformpipeline/composed_test.go index bf288cf938e..ff24b83fb4b 100644 --- a/components/camera/transformpipeline/composed_test.go +++ b/components/camera/transformpipeline/composed_test.go @@ -63,22 +63,23 @@ func TestComposed(t *testing.T) { test.That(t, err, test.ShouldBeNil) test.That(t, pc.Size(), test.ShouldEqual, 1) - myOverlay, stream, err := newOverlayTransform(context.Background(), cloudSource, camera.ColorStream, am) + streamedCloudSource := streamCameraFromCamera(context.Background(), cloudSource) + myOverlay, stream, err := newOverlayTransform(context.Background(), streamedCloudSource, camera.ColorStream, am) test.That(t, err, test.ShouldBeNil) test.That(t, stream, test.ShouldEqual, camera.ColorStream) pic, _, err := camera.ReadImage(context.Background(), myOverlay) test.That(t, err, test.ShouldBeNil) test.That(t, pic.Bounds(), test.ShouldResemble, image.Rect(0, 0, 1280, 720)) - myPipeline, err := newTransformPipeline(context.Background(), cloudSource, conf, robot, logger) + myPipeline, err := newTransformPipeline(context.Background(), streamedCloudSource, nil, conf, robot, logger) test.That(t, err, test.ShouldBeNil) defer myPipeline.Close(context.Background()) - pic, _, err = camera.ReadImage(context.Background(), myPipeline) + pic, err = camera.DecodeImageFromCamera(context.Background(), utils.MimeTypeJPEG, nil, myPipeline) test.That(t, err, test.ShouldBeNil) test.That(t, pic.Bounds(), test.ShouldResemble, image.Rect(0, 0, 1280, 720)) // wrong result with bad config - _, _, err = newOverlayTransform(context.Background(), cloudSource, camera.ColorStream, utils.AttributeMap{}) + _, _, err = newOverlayTransform(context.Background(), streamedCloudSource, camera.ColorStream, utils.AttributeMap{}) test.That(t, err, test.ShouldNotBeNil) test.That(t, err, test.ShouldWrap, transform.ErrNoIntrinsics) // wrong config, still no intrinsics @@ -88,7 +89,7 @@ func TestComposed(t *testing.T) { Height: 720, }, } - _, _, err = newOverlayTransform(context.Background(), cloudSource, camera.ColorStream, am) + _, _, err = newOverlayTransform(context.Background(), streamedCloudSource, camera.ColorStream, am) test.That(t, err, test.ShouldNotBeNil) test.That(t, err, test.ShouldWrap, transform.ErrNoIntrinsics) // wrong config, no attributes @@ -99,7 +100,7 @@ func TestComposed(t *testing.T) { }, }, } - _, err = newTransformPipeline(context.Background(), cloudSource, conf, robot, logger) + _, err = newTransformPipeline(context.Background(), streamedCloudSource, nil, conf, robot, logger) test.That(t, err, test.ShouldNotBeNil) test.That(t, err, test.ShouldWrap, transform.ErrNoIntrinsics) } diff --git a/components/camera/transformpipeline/depth_edges.go b/components/camera/transformpipeline/depth_edges.go index 722a49a8ab5..b067a0e8bbc 100644 --- a/components/camera/transformpipeline/depth_edges.go +++ b/components/camera/transformpipeline/depth_edges.go @@ -22,13 +22,13 @@ type depthEdgesConfig struct { // depthEdgesSource applies a Canny Edge Detector to the depth map. type depthEdgesSource struct { - src camera.VideoSource + src camera.StreamCamera detector *rimage.CannyEdgeDetector blurRadius float64 } -func newDepthEdgesTransform(ctx context.Context, source camera.VideoSource, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { +func newDepthEdgesTransform(ctx context.Context, source camera.StreamCamera, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*depthEdgesConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/detector.go b/components/camera/transformpipeline/detector.go index b83851307bb..4d156c81b6e 100644 --- a/components/camera/transformpipeline/detector.go +++ b/components/camera/transformpipeline/detector.go @@ -26,7 +26,7 @@ type detectorConfig struct { // detectorSource takes an image from the camera, and overlays the detections from the detector. type detectorSource struct { - src camera.VideoSource + src camera.StreamCamera detectorName string labelFilter objectdetection.Postprocessor // must build from ValidLabels confFilter objectdetection.Postprocessor @@ -35,10 +35,10 @@ type detectorSource struct { func newDetectionsTransform( ctx context.Context, - source camera.VideoSource, + source camera.StreamCamera, r robot.Robot, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*detectorConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/detector_test.go b/components/camera/transformpipeline/detector_test.go index 1b24e217aca..334e9a57469 100644 --- a/components/camera/transformpipeline/detector_test.go +++ b/components/camera/transformpipeline/detector_test.go @@ -104,7 +104,6 @@ func buildRobotWithFakeCamera(logger logging.Logger) (robot.Robot, error) { return robotimpl.RobotFromConfigPath(context.Background(), newConfFile, logger) } -//nolint:dupl func TestColorDetectionSource(t *testing.T) { logger := logging.NewTestLogger(t) ctx, cancel := context.WithCancel(context.Background()) @@ -121,7 +120,7 @@ func TestColorDetectionSource(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer detector.Close(ctx) - resImg, _, err := camera.ReadImage(ctx, detector) + resImg, err := camera.DecodeImageFromCamera(ctx, rutils.MimeTypePNG, nil, detector) test.That(t, err, test.ShouldBeNil) ovImg := rimage.ConvertImage(resImg) test.That(t, ovImg.GetXY(852, 431), test.ShouldResemble, rimage.Red) @@ -146,7 +145,7 @@ func BenchmarkColorDetectionSource(b *testing.B) { b.ResetTimer() // begin benchmarking for i := 0; i < b.N; i++ { - _, _, _ = camera.ReadImage(ctx, detector) + _, _ = camera.DecodeImageFromCamera(ctx, rutils.MimeTypeJPEG, nil, detector) } test.That(b, detector.Close(context.Background()), test.ShouldBeNil) } diff --git a/components/camera/transformpipeline/mods.go b/components/camera/transformpipeline/mods.go index a81d054a580..573a7566835 100644 --- a/components/camera/transformpipeline/mods.go +++ b/components/camera/transformpipeline/mods.go @@ -26,14 +26,14 @@ type rotateConfig struct { // rotateSource is the source to be rotated and the kind of image type. type rotateSource struct { - src camera.VideoSource + src camera.StreamCamera stream camera.ImageType angle float64 } // newRotateTransform creates a new rotation transform. -func newRotateTransform(ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { +func newRotateTransform(ctx context.Context, source camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*rotateConfig](am) if err != nil { return nil, camera.UnspecifiedStream, errors.Wrap(err, "cannot parse rotate attribute map") @@ -96,7 +96,7 @@ type resizeConfig struct { } type resizeSource struct { - src camera.VideoSource + src camera.StreamCamera stream camera.ImageType height int width int @@ -104,8 +104,8 @@ type resizeSource struct { // newResizeTransform creates a new resize transform. func newResizeTransform( - ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*resizeConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -165,7 +165,7 @@ type cropConfig struct { } type cropSource struct { - src camera.VideoSource + src camera.StreamCamera imgType camera.ImageType cropWindow image.Rectangle cropRel []float64 @@ -175,8 +175,8 @@ type cropSource struct { // newCropTransform creates a new crop transform. func newCropTransform( - ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*cropConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/mods_test.go b/components/camera/transformpipeline/mods_test.go index f4a347de15a..61b701572a2 100644 --- a/components/camera/transformpipeline/mods_test.go +++ b/components/camera/transformpipeline/mods_test.go @@ -608,7 +608,8 @@ func BenchmarkColorRotate(b *testing.B) { am := utils.AttributeMap{ "angle_degs": 180, } - rs, stream, err := newRotateTransform(context.Background(), src, camera.ColorStream, am) + streamSrc := streamCameraFromCamera(context.Background(), src) + rs, stream, err := newRotateTransform(context.Background(), streamSrc, camera.ColorStream, am) test.That(b, err, test.ShouldBeNil) test.That(b, stream, test.ShouldEqual, camera.ColorStream) @@ -633,7 +634,8 @@ func BenchmarkDepthRotate(b *testing.B) { am := utils.AttributeMap{ "angle_degs": 180, } - rs, stream, err := newRotateTransform(context.Background(), src, camera.DepthStream, am) + streamSrc := streamCameraFromCamera(context.Background(), src) + rs, stream, err := newRotateTransform(context.Background(), streamSrc, camera.DepthStream, am) test.That(b, err, test.ShouldBeNil) test.That(b, stream, test.ShouldEqual, camera.DepthStream) diff --git a/components/camera/transformpipeline/pipeline.go b/components/camera/transformpipeline/pipeline.go index 9ad4007952e..1320f9500ee 100644 --- a/components/camera/transformpipeline/pipeline.go +++ b/components/camera/transformpipeline/pipeline.go @@ -12,12 +12,14 @@ import ( "go.opencensus.io/trace" "go.viam.com/rdk/components/camera" + "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" "go.viam.com/rdk/robot" + camerautils "go.viam.com/rdk/robot/web/stream/camera" "go.viam.com/rdk/utils" ) @@ -47,11 +49,12 @@ func init() { if err != nil { return nil, fmt.Errorf("no source camera for transform pipeline (%s): %w", sourceName, err) } - src, err := newTransformPipeline(ctx, source, newConf, actualR, logger) + streamCamera := streamCameraFromCamera(ctx, source) + src, err := newTransformPipeline(ctx, streamCamera, conf.ResourceName().AsNamed(), newConf, actualR, logger) if err != nil { return nil, err } - return camera.FromVideoSource(conf.ResourceName(), src, logger), nil + return src, nil }, }) } @@ -84,13 +87,40 @@ func (cfg *transformConfig) Validate(path string) ([]string, error) { return deps, nil } +type streamCamera struct { + camera.Camera + vs gostream.VideoSource +} + +func (sc *streamCamera) Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { + if sc.vs != nil { + return sc.vs.Stream(ctx, errHandlers...) + } + return sc.Stream(ctx, errHandlers...) +} + +// streamCameraFromCamera is a hack to allow us to use Stream to pipe frames through the pipeline +// and still implement a camera resource. +// We prefer this methodology over passing Image bytes because each transform desires a image.Image over +// a raw byte slice. To use Image would be to wastefully encode and decode the frame multiple times. +func streamCameraFromCamera(ctx context.Context, cam camera.Camera) camera.StreamCamera { + if streamCam, ok := cam.(camera.StreamCamera); ok { + return streamCam + } + return &streamCamera{ + Camera: cam, + vs: camerautils.VideoSourceFromCamera(ctx, cam), + } +} + func newTransformPipeline( ctx context.Context, - source camera.VideoSource, + source camera.StreamCamera, + named resource.Named, cfg *transformConfig, r robot.Robot, logger logging.Logger, -) (camera.VideoSource, error) { +) (camera.StreamCamera, error) { if source == nil { return nil, errors.New("no source camera for transform pipeline") } @@ -98,7 +128,7 @@ func newTransformPipeline( return nil, errors.New("pipeline has no transforms in it") } // check if the source produces a depth image or color image - img, release, err := camera.ReadImage(ctx, source) + img, err := camera.DecodeImageFromCamera(ctx, "", nil, source) var streamType camera.ImageType if err != nil { @@ -110,33 +140,32 @@ func newTransformPipeline( } else { streamType = camera.ColorStream } - if release != nil { - release() - } // loop through the pipeline and create the image flow - pipeline := make([]camera.VideoSource, 0, len(cfg.Pipeline)) - lastSource := source + pipeline := make([]camera.StreamCamera, 0, len(cfg.Pipeline)) + lastSource := streamCameraFromCamera(ctx, source) for _, tr := range cfg.Pipeline { src, newStreamType, err := buildTransform(ctx, r, lastSource, streamType, tr, cfg.Source) if err != nil { return nil, err } - pipeline = append(pipeline, src) - lastSource = src + streamSrc := streamCameraFromCamera(ctx, src) + pipeline = append(pipeline, streamSrc) + lastSource = streamSrc streamType = newStreamType } cameraModel := camera.NewPinholeModelWithBrownConradyDistortion(cfg.CameraParameters, cfg.DistortionParameters) return camera.NewVideoSourceFromReader( ctx, - transformPipeline{pipeline, lastSource, cfg.CameraParameters, logger}, + transformPipeline{named, pipeline, lastSource, cfg.CameraParameters, logger}, &cameraModel, streamType, ) } type transformPipeline struct { - pipeline []camera.VideoSource - src camera.VideoSource + resource.Named + pipeline []camera.StreamCamera + src camera.Camera intrinsicParameters *transform.PinholeCameraIntrinsics logger logging.Logger } @@ -144,7 +173,11 @@ type transformPipeline struct { func (tp transformPipeline) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::Read") defer span.End() - return camera.ReadImage(ctx, tp.src) + img, err := camera.DecodeImageFromCamera(ctx, "", nil, tp.src) + if err != nil { + return nil, func() {}, err + } + return img, func() {}, nil } func (tp transformPipeline) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { diff --git a/components/camera/transformpipeline/pipeline_test.go b/components/camera/transformpipeline/pipeline_test.go index eb31cd06e52..7b1b8ae8bb9 100644 --- a/components/camera/transformpipeline/pipeline_test.go +++ b/components/camera/transformpipeline/pipeline_test.go @@ -34,13 +34,14 @@ func TestTransformPipelineColor(t *testing.T) { test.That(t, err, test.ShouldBeNil) source := gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) src, err := camera.WrapVideoSourceWithProjector(context.Background(), source, nil, camera.ColorStream) + streamSrc := streamCameraFromCamera(context.Background(), src) test.That(t, err, test.ShouldBeNil) - inImg, _, err := camera.ReadImage(context.Background(), src) + inImg, err := camera.DecodeImageFromCamera(context.Background(), "", nil, src) test.That(t, err, test.ShouldBeNil) test.That(t, inImg.Bounds().Dx(), test.ShouldEqual, 128) test.That(t, inImg.Bounds().Dy(), test.ShouldEqual, 72) - color, err := newTransformPipeline(context.Background(), src, transformConf, r, logger) + color, err := newTransformPipeline(context.Background(), streamSrc, nil, transformConf, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), color) @@ -81,12 +82,13 @@ func TestTransformPipelineDepth(t *testing.T) { source := gostream.NewVideoSource(&fake.StaticSource{DepthImg: dm}, prop.Video{}) src, err := camera.WrapVideoSourceWithProjector(context.Background(), source, nil, camera.DepthStream) test.That(t, err, test.ShouldBeNil) - inImg, _, err := camera.ReadImage(context.Background(), src) + inImg, err := camera.DecodeImageFromCamera(context.Background(), "", nil, src) test.That(t, err, test.ShouldBeNil) test.That(t, inImg.Bounds().Dx(), test.ShouldEqual, 128) test.That(t, inImg.Bounds().Dy(), test.ShouldEqual, 72) - depth, err := newTransformPipeline(context.Background(), src, transformConf, r, logger) + streamSrc := streamCameraFromCamera(context.Background(), src) + depth, err := newTransformPipeline(context.Background(), streamSrc, nil, transformConf, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), depth) @@ -134,7 +136,7 @@ func TestTransformPipelineDepth2(t *testing.T) { test.That(t, err, test.ShouldBeNil) // first depth transform - depth1, err := newTransformPipeline(context.Background(), source, transform1, r, logger) + depth1, err := newTransformPipeline(context.Background(), source, nil, transform1, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), depth1) test.That(t, err, test.ShouldBeNil) @@ -142,7 +144,7 @@ func TestTransformPipelineDepth2(t *testing.T) { test.That(t, outImg.Bounds().Dy(), test.ShouldEqual, 20) test.That(t, depth1.Close(context.Background()), test.ShouldBeNil) // second depth image - depth2, err := newTransformPipeline(context.Background(), source, transform2, r, logger) + depth2, err := newTransformPipeline(context.Background(), source, nil, transform2, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err = camera.ReadImage(context.Background(), depth2) test.That(t, err, test.ShouldBeNil) @@ -164,7 +166,7 @@ func TestNullPipeline(t *testing.T) { test.That(t, err, test.ShouldBeNil) source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) test.That(t, err, test.ShouldBeNil) - _, err = newTransformPipeline(context.Background(), source, transform1, r, logger) + _, err = newTransformPipeline(context.Background(), source, nil, transform1, r, logger) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, "pipeline has no transforms") @@ -172,7 +174,7 @@ func TestNullPipeline(t *testing.T) { Source: "source", Pipeline: []Transformation{{Type: "identity", Attributes: nil}}, } - pipe, err := newTransformPipeline(context.Background(), source, transform2, r, logger) + pipe, err := newTransformPipeline(context.Background(), source, nil, transform2, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), pipe) // should not transform anything test.That(t, err, test.ShouldBeNil) @@ -206,7 +208,7 @@ func TestPipeIntoPipe(t *testing.T) { }, } - pipe1, err := newTransformPipeline(context.Background(), source, transform1, r, logger) + pipe1, err := newTransformPipeline(context.Background(), source, nil, transform1, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), pipe1) test.That(t, err, test.ShouldBeNil) @@ -217,7 +219,7 @@ func TestPipeIntoPipe(t *testing.T) { test.That(t, prop.IntrinsicParams.Width, test.ShouldEqual, 128) test.That(t, prop.IntrinsicParams.Height, test.ShouldEqual, 72) // transform pipeline into pipeline - pipe2, err := newTransformPipeline(context.Background(), pipe1, transform2, r, logger) + pipe2, err := newTransformPipeline(context.Background(), pipe1, nil, transform2, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err = camera.ReadImage(context.Background(), pipe2) test.That(t, err, test.ShouldBeNil) diff --git a/components/camera/transformpipeline/preprocessing.go b/components/camera/transformpipeline/preprocessing.go index 639b099d087..4f8401f0ced 100644 --- a/components/camera/transformpipeline/preprocessing.go +++ b/components/camera/transformpipeline/preprocessing.go @@ -14,11 +14,11 @@ import ( // preprocessDepthTransform applies pre-processing functions to depth maps in order to smooth edges and fill holes. type preprocessDepthTransform struct { - src camera.VideoSource + src camera.StreamCamera } -func newDepthPreprocessTransform(ctx context.Context, source camera.VideoSource, -) (camera.VideoSource, camera.ImageType, error) { +func newDepthPreprocessTransform(ctx context.Context, source camera.StreamCamera, +) (camera.StreamCamera, camera.ImageType, error) { reader := &preprocessDepthTransform{source} props, err := propsFromVideoSource(ctx, source) diff --git a/components/camera/transformpipeline/segmenter.go b/components/camera/transformpipeline/segmenter.go index df67011828d..00a7d6e7b36 100644 --- a/components/camera/transformpipeline/segmenter.go +++ b/components/camera/transformpipeline/segmenter.go @@ -23,7 +23,7 @@ type segmenterConfig struct { // segmenterSource takes a pointcloud from the camera and applies a segmenter to it. type segmenterSource struct { - src camera.VideoSource + src camera.StreamCamera cameraName string segmenterName string r robot.Robot @@ -31,11 +31,11 @@ type segmenterSource struct { func newSegmentationsTransform( ctx context.Context, - source camera.VideoSource, + source camera.StreamCamera, r robot.Robot, am utils.AttributeMap, sourceString string, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*segmenterConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/segmenter_test.go b/components/camera/transformpipeline/segmenter_test.go index a00242a3999..f77d0bdfff5 100644 --- a/components/camera/transformpipeline/segmenter_test.go +++ b/components/camera/transformpipeline/segmenter_test.go @@ -63,7 +63,8 @@ func TestTransformSegmenterProps(t *testing.T) { _, err = conf.Validate("path") test.That(t, err, test.ShouldBeNil) - _, err = newTransformPipeline(context.Background(), cam, transformConf, r, logger) + streamCamera := streamCameraFromCamera(context.Background(), cam) + _, err = newTransformPipeline(context.Background(), streamCamera, nil, transformConf, r, logger) test.That(t, err, test.ShouldBeNil) transformConf = &transformConfig{ @@ -151,7 +152,8 @@ func TestTransformSegmenterFunctionality(t *testing.T) { }, } - pipeline, err := newTransformPipeline(context.Background(), cam, transformConf, r, logger) + streamCamera := streamCameraFromCamera(context.Background(), cam) + pipeline, err := newTransformPipeline(context.Background(), streamCamera, nil, transformConf, r, logger) test.That(t, err, test.ShouldBeNil) pc, err := pipeline.NextPointCloud(context.Background()) diff --git a/components/camera/transformpipeline/transform.go b/components/camera/transformpipeline/transform.go index 2b23743bb5a..5ff3c8efb8e 100644 --- a/components/camera/transformpipeline/transform.go +++ b/components/camera/transformpipeline/transform.go @@ -136,11 +136,11 @@ func (Transformation) JSONSchema() *jsonschema.Schema { func buildTransform( ctx context.Context, r robot.Robot, - source camera.VideoSource, + source camera.StreamCamera, stream camera.ImageType, tr Transformation, sourceString string, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { switch transformType(tr.Type) { case transformTypeUnspecified, transformTypeIdentity: return source, stream, nil diff --git a/components/camera/transformpipeline/undistort.go b/components/camera/transformpipeline/undistort.go index 507a0bd1363..0ac5059d171 100644 --- a/components/camera/transformpipeline/undistort.go +++ b/components/camera/transformpipeline/undistort.go @@ -22,14 +22,14 @@ type undistortConfig struct { // undistortSource will undistort the original image according to the Distortion parameters // within the intrinsic parameters. type undistortSource struct { - originalSource camera.VideoSource + originalSource camera.StreamCamera stream camera.ImageType cameraParams *transform.PinholeCameraModel } func newUndistortTransform( - ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*undistortConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/videosource/webcam.go b/components/camera/videosource/webcam.go index 18cbc61e2b2..e71adaa288b 100644 --- a/components/camera/videosource/webcam.go +++ b/components/camera/videosource/webcam.go @@ -28,7 +28,9 @@ import ( "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" + "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" + "go.viam.com/rdk/utils" ) // ModelWebcam is the name of the webcam component. @@ -314,7 +316,7 @@ type webcam struct { underlyingSource gostream.VideoSource exposedSwapper gostream.HotSwappableVideoSource - exposedProjector camera.VideoSource + exposedProjector camera.Camera // this is returned to us as a label in mediadevices but our config // treats it as a video path. @@ -569,7 +571,20 @@ func (c *webcam) Stream(ctx context.Context, errHandlers ...gostream.ErrorHandle } func (c *webcam) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { - return camera.ReadImageBytes(ctx, c.underlyingSource, mimeType) + img, release, err := camera.ReadImage(ctx, c.underlyingSource) + if err != nil { + return nil, camera.ImageMetadata{}, err + } + defer release() + + if mimeType == "" { + mimeType = utils.MimeTypeJPEG + } + imgBytes, err := rimage.EncodeImage(ctx, img, mimeType) + if err != nil { + return nil, camera.ImageMetadata{}, err + } + return imgBytes, camera.ImageMetadata{MimeType: mimeType}, nil } func (c *webcam) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { diff --git a/components/camera/videosourcewrappers.go b/components/camera/videosourcewrappers.go index d1e989e7f79..3ef354fe4cf 100644 --- a/components/camera/videosourcewrappers.go +++ b/components/camera/videosourcewrappers.go @@ -16,33 +16,50 @@ import ( "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/depthadapter" "go.viam.com/rdk/rimage/transform" + "go.viam.com/rdk/utils" ) +// FromVideoSource is DEPRECATED!!! Please implement cameras according to the camera.Camera interface. // FromVideoSource creates a Camera resource from a VideoSource. // Note: this strips away Reconfiguration and DoCommand abilities. // If needed, implement the Camera another way. For example, a webcam // implements a Camera manually so that it can atomically reconfigure itself. -func FromVideoSource(name resource.Name, src VideoSource, logger logging.Logger) Camera { +func FromVideoSource(name resource.Name, src StreamCamera, logger logging.Logger) StreamCamera { var rtpPassthroughSource rtppassthrough.Source if ps, ok := src.(rtppassthrough.Source); ok { rtpPassthroughSource = ps } return &sourceBasedCamera{ rtpPassthroughSource: rtpPassthroughSource, + StreamCamera: src, Named: name.AsNamed(), - VideoSource: src, Logger: logger, } } type sourceBasedCamera struct { - resource.Named + StreamCamera resource.AlwaysRebuild - VideoSource + resource.Named rtpPassthroughSource rtppassthrough.Source logging.Logger } +// Explicitly define Reconfigure to resolve ambiguity. +func (vs *sourceBasedCamera) Reconfigure(ctx context.Context, deps resource.Dependencies, conf resource.Config) error { + return vs.AlwaysRebuild.Reconfigure(ctx, deps, conf) +} + +// Explicitly define Name to resolve ambiguity. +func (vs *sourceBasedCamera) Name() resource.Name { + return vs.Named.Name() +} + +// Define DoCommand to fulfill Named interface. +func (vs *sourceBasedCamera) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { + return vs.StreamCamera.DoCommand(ctx, cmd) +} + func (vs *sourceBasedCamera) SubscribeRTP( ctx context.Context, bufferSize int, @@ -79,6 +96,7 @@ func (vs *videoSource) Unsubscribe(ctx context.Context, id rtppassthrough.Subscr return errors.New("Unsubscribe unimplemented") } +// NewPinholeModelWithBrownConradyDistortion is DEPRECATED!!! Please implement cameras according to the camera.Camera interface. // NewPinholeModelWithBrownConradyDistortion creates a transform.PinholeCameraModel from // a *transform.PinholeCameraIntrinsics and a *transform.BrownConrady. // If *transform.BrownConrady is `nil`, transform.PinholeCameraModel.Distortion @@ -95,6 +113,7 @@ func NewPinholeModelWithBrownConradyDistortion(pinholeCameraIntrinsics *transfor return cameraModel } +// NewVideoSourceFromReader is DEPRECATED!!! Please implement cameras according to the camera.Camera interface. // NewVideoSourceFromReader creates a VideoSource either with or without a projector. The stream type // argument is for detecting whether or not the resulting camera supports return // of pointcloud data in the absence of an implemented NextPointCloud function. @@ -103,7 +122,7 @@ func NewVideoSourceFromReader( ctx context.Context, reader gostream.VideoReader, syst *transform.PinholeCameraModel, imageType ImageType, -) (VideoSource, error) { +) (StreamCamera, error) { if reader == nil { return nil, errors.New("cannot have a nil reader") } @@ -115,7 +134,7 @@ func NewVideoSourceFromReader( vs := gostream.NewVideoSource(reader, prop.Video{}) actualSystem := syst if actualSystem == nil { - srcCam, ok := reader.(VideoSource) + srcCam, ok := reader.(Camera) if ok { props, err := srcCam.Properties(ctx) if err != nil { @@ -140,6 +159,7 @@ func NewVideoSourceFromReader( }, nil } +// WrapVideoSourceWithProjector is DEPRECATED!!! Please implement cameras according to the camera.Camera interface. // WrapVideoSourceWithProjector creates a Camera either with or without a projector. The stream type // argument is for detecting whether or not the resulting camera supports return // of pointcloud data in the absence of an implemented NextPointCloud function. @@ -148,14 +168,13 @@ func WrapVideoSourceWithProjector( ctx context.Context, source gostream.VideoSource, syst *transform.PinholeCameraModel, imageType ImageType, -) (VideoSource, error) { +) (StreamCamera, error) { if source == nil { return nil, errors.New("cannot have a nil source") } actualSystem := syst if actualSystem == nil { - //nolint:staticcheck srcCam, ok := source.(Camera) if ok { props, err := srcCam.Properties(ctx) @@ -182,6 +201,7 @@ func WrapVideoSourceWithProjector( // videoSource implements a Camera with a gostream.VideoSource. type videoSource struct { + resource.AlwaysRebuild rtpPassthroughSource rtppassthrough.Source videoSource gostream.VideoSource actualSource interface{} @@ -193,15 +213,18 @@ func (vs *videoSource) Stream(ctx context.Context, errHandlers ...gostream.Error return vs.videoSource.Stream(ctx, errHandlers...) } -// ReadImageBytes wraps ReadImage given a mimetype to encode the image as bytes data, returning -// supplementary metadata for downstream processing. -// TODO(hexbabe): make function private or remove altogether once the usages are limited to this file. -func ReadImageBytes(ctx context.Context, src gostream.VideoSource, mimeType string) ([]byte, ImageMetadata, error) { - img, release, err := ReadImage(ctx, src) +func (vs *videoSource) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) { + if sourceCam, ok := vs.actualSource.(Camera); ok { + return sourceCam.Image(ctx, mimeType, extra) + } + img, release, err := ReadImage(ctx, vs.videoSource) if err != nil { return nil, ImageMetadata{}, err } defer release() + if mimeType == "" { + mimeType = utils.MimeTypePNG // default to lossless mimetype such as PNG + } imgBytes, err := rimage.EncodeImage(ctx, img, mimeType) if err != nil { return nil, ImageMetadata{}, err @@ -209,10 +232,6 @@ func ReadImageBytes(ctx context.Context, src gostream.VideoSource, mimeType stri return imgBytes, ImageMetadata{MimeType: mimeType}, nil } -func (vs *videoSource) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) { - return ReadImageBytes(ctx, vs.videoSource, mimeType) -} - // Images is for getting simultaneous images from different sensors // If the underlying source did not specify an Images function, a default is applied. // The default returns a list of 1 image from ReadImage, and the current time. @@ -264,6 +283,13 @@ func (vs *videoSource) DoCommand(ctx context.Context, cmd map[string]interface{} return nil, resource.ErrDoUnimplemented } +func (vs *videoSource) Name() resource.Name { + if namedResource, ok := vs.actualSource.(resource.Named); ok { + return namedResource.Name() + } + return resource.Name{} +} + func (vs *videoSource) Properties(ctx context.Context) (Properties, error) { _, supportsPCD := vs.actualSource.(PointCloudSource) result := Properties{ diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index a54928dcfa3..5adb12abc2c 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -22,7 +22,7 @@ import ( "go.viam.com/rdk/logging" "go.viam.com/rdk/resource" "go.viam.com/rdk/robot" - streamCamera "go.viam.com/rdk/robot/web/stream/camera" + camerautils "go.viam.com/rdk/robot/web/stream/camera" "go.viam.com/rdk/robot/web/stream/state" rutils "go.viam.com/rdk/utils" ) @@ -170,7 +170,7 @@ func (server *Server) AddStream(ctx context.Context, req *streampb.AddStreamRequ } // return error if resource is neither a camera nor audioinput - _, isCamErr := streamCamera.Camera(server.robot, streamStateToAdd.Stream) + _, isCamErr := camerautils.Camera(server.robot, streamStateToAdd.Stream) _, isAudioErr := audioinput.FromRobot(server.robot, resource.SDPTrackNameToShortName(streamStateToAdd.Stream.Name())) if isCamErr != nil && isAudioErr != nil { return nil, errors.Errorf("stream is neither a camera nor audioinput. streamName: %v", streamStateToAdd.Stream) @@ -307,7 +307,7 @@ func (server *Server) RemoveStream(ctx context.Context, req *streampb.RemoveStre shortName := resource.SDPTrackNameToShortName(streamToRemove.Stream.Name()) _, isAudioResourceErr := audioinput.FromRobot(server.robot, shortName) - _, isCameraResourceErr := streamCamera.Camera(server.robot, streamToRemove.Stream) + _, isCameraResourceErr := camerautils.Camera(server.robot, streamToRemove.Stream) if isAudioResourceErr != nil && isCameraResourceErr != nil { return &streampb.RemoveStreamResponse{}, nil @@ -392,12 +392,12 @@ func (server *Server) SetStreamOptions( defer server.mu.Unlock() switch cmd { case optionsCommandResize: - err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) + err = server.resizeVideoSource(ctx, req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) if err != nil { return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) } case optionsCommandReset: - err = server.resetVideoSource(req.Name) + err = server.resetVideoSource(ctx, req.Name) if err != nil { return nil, fmt.Errorf("failed to reset video source for stream %q: %w", req.Name, err) } @@ -426,7 +426,7 @@ func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int } // resizeVideoSource resizes the video source with the given name. -func (server *Server) resizeVideoSource(name string, width, height int) error { +func (server *Server) resizeVideoSource(ctx context.Context, name string, width, height int) error { existing, ok := server.videoSources[name] if !ok { return fmt.Errorf("video source %q not found", name) @@ -440,7 +440,7 @@ func (server *Server) resizeVideoSource(name string, width, height int) error { if !ok { return fmt.Errorf("stream state not found with name %q", name) } - resizer := gostream.NewResizeVideoSource(cam, width, height) + resizer := gostream.NewResizeVideoSource(camerautils.VideoSourceFromCamera(ctx, cam), width, height) server.logger.Debugf( "resizing video source to width %d and height %d", width, height, @@ -454,7 +454,7 @@ func (server *Server) resizeVideoSource(name string, width, height int) error { } // resetVideoSource resets the video source with the given name to the source resolution. -func (server *Server) resetVideoSource(name string) error { +func (server *Server) resetVideoSource(ctx context.Context, name string) error { existing, ok := server.videoSources[name] if !ok { return fmt.Errorf("video source %q not found", name) @@ -468,7 +468,7 @@ func (server *Server) resetVideoSource(name string) error { return fmt.Errorf("stream state not found with name %q", name) } server.logger.Debug("resetting video source") - existing.Swap(cam) + existing.Swap(camerautils.VideoSourceFromCamera(ctx, cam)) err = streamState.Reset() if err != nil { return fmt.Errorf("failed to reset stream %q: %w", name, err) @@ -482,7 +482,7 @@ func (server *Server) resetVideoSource(name string) error { func (server *Server) AddNewStreams(ctx context.Context) error { // Refreshing sources will walk the robot resources for anything implementing the camera and // audioinput APIs and mutate the `svc.videoSources` and `svc.audioSources` maps. - server.refreshVideoSources() + server.refreshVideoSources(ctx) server.refreshAudioSources() if server.streamConfig == (gostream.StreamConfig{}) { @@ -643,18 +643,19 @@ func (server *Server) removeMissingStreams() { } // refreshVideoSources checks and initializes every possible video source that could be viewed from the robot. -func (server *Server) refreshVideoSources() { +func (server *Server) refreshVideoSources(ctx context.Context) { for _, name := range camera.NamesFromRobot(server.robot) { cam, err := camera.FromRobot(server.robot, name) if err != nil { continue } existing, ok := server.videoSources[cam.Name().SDPTrackName()] + src := camerautils.VideoSourceFromCamera(ctx, cam) if ok { - existing.Swap(cam) + existing.Swap(src) continue } - newSwapper := gostream.NewHotSwappableVideoSource(cam) + newSwapper := gostream.NewHotSwappableVideoSource(src) server.videoSources[cam.Name().SDPTrackName()] = newSwapper } } @@ -743,29 +744,20 @@ func GenerateResolutions(width, height int32, logger logging.Logger) []Resolutio } // sampleFrameSize takes in a camera.Camera, starts a stream, attempts to -// pull a frame using Stream.Next, and returns the width and height. +// pull a frame, and returns the width and height. func sampleFrameSize(ctx context.Context, cam camera.Camera, logger logging.Logger) (int, int, error) { logger.Debug("sampling frame size") - stream, err := cam.Stream(ctx) - if err != nil { - return 0, 0, err - } - defer func() { - if cerr := stream.Close(ctx); cerr != nil { - logger.Error("failed to close stream:", cerr) - } - }() // Attempt to get a frame from the stream with a maximum of 5 retries. // This is useful if cameras have a warm-up period before they can start streaming. var frame image.Image - var release func() + var err error retryLoop: for i := 0; i < 5; i++ { select { case <-ctx.Done(): return 0, 0, ctx.Err() default: - frame, release, err = stream.Next(ctx) + frame, err = camera.DecodeImageFromCamera(ctx, "", nil, cam) if err == nil { break retryLoop // Break out of the for loop, not just the select. } @@ -773,9 +765,6 @@ retryLoop: time.Sleep(retryDelay) } } - if release != nil { - defer release() - } if err != nil { return 0, 0, fmt.Errorf("failed to get frame after 5 attempts: %w", err) } diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index de4c0c03cc8..b885907f12a 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -19,7 +19,7 @@ import ( "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/robot" - streamCamera "go.viam.com/rdk/robot/web/stream/camera" + camerautils "go.viam.com/rdk/robot/web/stream/camera" ) // ErrClosed indicates that the StreamState is already closed. @@ -332,7 +332,7 @@ func (state *StreamState) tick() { } func (state *StreamState) streamH264Passthrough() error { - cam, err := streamCamera.Camera(state.robot, state.Stream) + cam, err := camerautils.Camera(state.robot, state.Stream) if err != nil { return err } @@ -393,7 +393,7 @@ func (state *StreamState) streamH264Passthrough() error { } func (state *StreamState) unsubscribeH264Passthrough(ctx context.Context, id rtppassthrough.SubscriptionID) error { - cam, err := streamCamera.Camera(state.robot, state.Stream) + cam, err := camerautils.Camera(state.robot, state.Stream) if err != nil { return err } diff --git a/services/vision/obstaclesdepth/obstacles_depth.go b/services/vision/obstaclesdepth/obstacles_depth.go index ed450b98d9b..eb76f7f2ace 100644 --- a/services/vision/obstaclesdepth/obstacles_depth.go +++ b/services/vision/obstaclesdepth/obstacles_depth.go @@ -99,8 +99,8 @@ func registerObstaclesDepth( // BuildObsDepth will check for intrinsics and determine how to build based on that. func (o *obsDepth) buildObsDepth(logger logging.Logger) func( - ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { - return func(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { + ctx context.Context, src camera.Camera) ([]*vision.Object, error) { + return func(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { props, err := src.Properties(ctx) if err != nil { logger.CWarnw(ctx, "could not find camera properties. obstacles depth started without camera's intrinsic parameters", "error", err) @@ -116,14 +116,13 @@ func (o *obsDepth) buildObsDepth(logger logging.Logger) func( } // buildObsDepthNoIntrinsics will return the median depth in the depth map as a Geometry point. -func (o *obsDepth) obsDepthNoIntrinsics(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { - pic, release, err := camera.ReadImage(ctx, src) +func (o *obsDepth) obsDepthNoIntrinsics(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { + img, err := camera.DecodeImageFromCamera(ctx, "", nil, src) if err != nil { return nil, errors.Errorf("could not get image from %s", src) } - defer release() - dm, err := rimage.ConvertImageToDepthMap(ctx, pic) + dm, err := rimage.ConvertImageToDepthMap(ctx, img) if err != nil { return nil, errors.New("could not convert image to depth map") } @@ -144,17 +143,16 @@ func (o *obsDepth) obsDepthNoIntrinsics(ctx context.Context, src camera.VideoSou // buildObsDepthWithIntrinsics will use the methodology in Manduchi et al. to find obstacle points // before clustering and projecting those points into 3D obstacles. -func (o *obsDepth) obsDepthWithIntrinsics(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { +func (o *obsDepth) obsDepthWithIntrinsics(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { // Check if we have intrinsics here. If not, don't even try if o.intrinsics == nil { return nil, errors.New("tried to build obstacles depth with intrinsics but no instrinsics found") } - pic, release, err := camera.ReadImage(ctx, src) + img, err := camera.DecodeImageFromCamera(ctx, "", nil, src) if err != nil { return nil, errors.Errorf("could not get image from %s", src) } - defer release() - dm, err := rimage.ConvertImageToDepthMap(ctx, pic) + dm, err := rimage.ConvertImageToDepthMap(ctx, img) if err != nil { return nil, errors.New("could not convert image to depth map") } diff --git a/services/vision/obstaclesdistance/obstacles_distance.go b/services/vision/obstaclesdistance/obstacles_distance.go index ef887e48275..2aec6b39fcf 100644 --- a/services/vision/obstaclesdistance/obstacles_distance.go +++ b/services/vision/obstaclesdistance/obstacles_distance.go @@ -75,7 +75,7 @@ func registerObstacleDistanceDetector( return nil, errors.New("config for obstacles_distance cannot be nil") } - segmenter := func(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { + segmenter := func(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { clouds := make([]pointcloud.PointCloud, 0, conf.NumQueries) for i := 0; i < conf.NumQueries; i++ { diff --git a/services/vision/vision.go b/services/vision/vision.go index e763c31db23..c4dd3add012 100644 --- a/services/vision/vision.go +++ b/services/vision/vision.go @@ -308,11 +308,10 @@ func (vm *vizModel) DetectionsFromCamera( if err != nil { return nil, errors.Wrapf(err, "could not find camera named %s", cameraName) } - img, release, err := camera.ReadImage(ctx, cam) + img, err := camera.DecodeImageFromCamera(ctx, "", extra, cam) if err != nil { return nil, errors.Wrapf(err, "could not get image from %s", cameraName) } - defer release() return vm.detectorFunc(ctx, img) } @@ -351,11 +350,10 @@ func (vm *vizModel) ClassificationsFromCamera( if err != nil { return nil, errors.Wrapf(err, "could not find camera named %s", cameraName) } - img, release, err := camera.ReadImage(ctx, cam) + img, err := camera.DecodeImageFromCamera(ctx, "", extra, cam) if err != nil { return nil, errors.Wrapf(err, "could not get image from %s", cameraName) } - defer release() fullClassifications, err := vm.classifierFunc(ctx, img) if err != nil { return nil, errors.Wrap(err, "could not get classifications from image") @@ -401,11 +399,10 @@ func (vm *vizModel) CaptureAllFromCamera( if err != nil { return viscapture.VisCapture{}, errors.Wrapf(err, "could not find camera named %s", cameraName) } - img, release, err := camera.ReadImage(ctx, cam) + img, err := camera.DecodeImageFromCamera(ctx, "", extra, cam) if err != nil { return viscapture.VisCapture{}, errors.Wrapf(err, "could not get image from %s", cameraName) } - defer release() logger := vm.r.Logger() var detections []objectdetection.Detection if opt.ReturnDetections { diff --git a/testutils/inject/camera.go b/testutils/inject/camera.go index 975bb9b58d0..1671cf07005 100644 --- a/testutils/inject/camera.go +++ b/testutils/inject/camera.go @@ -52,20 +52,6 @@ func (c *Camera) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, err return nil, errors.New("NextPointCloud unimplemented") } -// Stream calls the injected Stream or the real version. -func (c *Camera) Stream( - ctx context.Context, - errHandlers ...gostream.ErrorHandler, -) (gostream.VideoStream, error) { - if c.StreamFunc != nil { - return c.StreamFunc(ctx, errHandlers...) - } - if c.Camera != nil { - return c.Camera.Stream(ctx, errHandlers...) - } - return nil, errors.Wrap(ctx.Err(), "no stream function available") -} - // Image calls the injected Image or the real version. func (c *Camera) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { if c.ImageFunc != nil { diff --git a/vision/segmentation/detections_to_objects.go b/vision/segmentation/detections_to_objects.go index 8bce8e4dbbf..10f79ac1cac 100644 --- a/vision/segmentation/detections_to_objects.go +++ b/vision/segmentation/detections_to_objects.go @@ -37,7 +37,7 @@ func (dsc *DetectionSegmenterConfig) ConvertAttributes(am utils.AttributeMap) er func cameraToProjector( ctx context.Context, - source camera.VideoSource, + source camera.Camera, ) (transform.Projector, error) { if source == nil { return nil, errors.New("cannot have a nil source") @@ -76,7 +76,7 @@ func DetectionSegmenter(detector objectdetection.Detector, meanK int, sigma, con } } // return the segmenter - seg := func(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { + seg := func(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { proj, err := cameraToProjector(ctx, src) if err != nil { return nil, err diff --git a/vision/segmentation/er_ccl_clustering.go b/vision/segmentation/er_ccl_clustering.go index 987c7ceb792..f5e684524af 100644 --- a/vision/segmentation/er_ccl_clustering.go +++ b/vision/segmentation/er_ccl_clustering.go @@ -129,7 +129,7 @@ func NewERCCLClustering(params utils.AttributeMap) (Segmenter, error) { } // ErCCLAlgorithm applies the connected components clustering algorithm to a VideoSource. -func (erCCL *ErCCLConfig) ErCCLAlgorithm(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { +func (erCCL *ErCCLConfig) ErCCLAlgorithm(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { // get next point cloud cloud, err := src.NextPointCloud(ctx) if err != nil { diff --git a/vision/segmentation/radius_clustering.go b/vision/segmentation/radius_clustering.go index 588ab8f6596..1d2628a6ddb 100644 --- a/vision/segmentation/radius_clustering.go +++ b/vision/segmentation/radius_clustering.go @@ -87,7 +87,7 @@ func NewRadiusClustering(params utils.AttributeMap) (Segmenter, error) { } // RadiusClustering applies the radius clustering algorithm directly on a given point cloud. -func (rcc *RadiusClusteringConfig) RadiusClustering(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { +func (rcc *RadiusClusteringConfig) RadiusClustering(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { // get next point cloud cloud, err := src.NextPointCloud(ctx) if err != nil { diff --git a/vision/segmentation/radius_clustering_voxel.go b/vision/segmentation/radius_clustering_voxel.go index 06966ba1001..939c7437ba5 100644 --- a/vision/segmentation/radius_clustering_voxel.go +++ b/vision/segmentation/radius_clustering_voxel.go @@ -85,7 +85,7 @@ func NewRadiusClusteringFromVoxels(params utils.AttributeMap) (Segmenter, error) } // RadiusClusteringVoxels turns the cloud into a voxel grid and then does radius clustering to segment it. -func (rcc *RadiusClusteringVoxelConfig) RadiusClusteringVoxels(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { +func (rcc *RadiusClusteringVoxelConfig) RadiusClusteringVoxels(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { // get next point cloud and convert it to a VoxelGrid // NOTE(bh): Maybe one day cameras will return voxel grids directly. cloud, err := src.NextPointCloud(ctx) diff --git a/vision/segmentation/segmenter.go b/vision/segmentation/segmenter.go index b856ea0a693..e418daeaf2f 100644 --- a/vision/segmentation/segmenter.go +++ b/vision/segmentation/segmenter.go @@ -8,4 +8,4 @@ import ( ) // A Segmenter is a function that takes images/pointclouds from an input source and segments them into objects. -type Segmenter func(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) +type Segmenter func(ctx context.Context, src camera.Camera) ([]*vision.Object, error)