Video Data Modification in Gstreamer Application

Sometimes requirement is to modify the input data before feeding it to video output device such as adding logo in some particular frames, highlight part of frame and more.

This tutorial will demonstrate how to use appsink and appsrc by constructing a pipeline to generate video, stream it into an application's code, modify the data and then stream it back into your video output device i.e filesrc.  

Table of Contents

Introduction

Applications can interact with the data flowing through a GStreamer pipeline in several ways. The element used to inject application data into a GStreamer pipeline is appsrc, and its counterpart, used to extract GStreamer data back to the application is appsink.

This tutorial will tell you how to use both to manipulate the data flowing in pipe. 

This is very simple application only concentrate to get sample from source and modify data and then again push to destination. THis tutorial do not tell how to write application for creating pipes but instead it uses gst_parse_launch() to create and launch pipe.  


appsink-snoop.c

/* GStreamer
 *
 * appsink-snoop.c: example for modify data in video pipeline
 * using appsink and appsrc.
 *
 * Based on the appsink-src.c example
 * 
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

#include <gst/gst.h>

#include <string.h>

#include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>

typedef struct
{
  GMainLoop *loop;
  GstElement *source;
  GstElement *sink;
} ProgramData;

/* user modify the data here */

static GstFlowReturn
modify_in_data (GstMapInfo * map)
{
  int dataLength;
  guint8 *rdata;
   
  dataLength = map.size;
  rdata = map.data;
  g_print ("%s dataLen = %d\n", __func__, dataLength);
  
  /* Modify half of frame to plane white */
  for (i=0; i <= dataLength/2; i++) {
	  rdata[i] = 0xff;
  }
}

/* called when the appsink notifies us that there is a new buffer ready for
 * processing */
static GstFlowReturn
on_new_sample_from_sink (GstElement * elt, ProgramData * data)
{
  GstSample *sample;
  GstBuffer *app_buffer, *buffer;
  GstElement *source;
  GstFlowReturn ret;
  GstMapInfo map;
  guint8 *rdata;
  int dataLength;
  int i;
  g_print ("%s\n", __func__);

  /* get the sample from appsink */
  sample = gst_app_sink_pull_sample (GST_APP_SINK (elt));
  buffer = gst_sample_get_buffer (sample);

  /* make a copy */
  app_buffer = gst_buffer_copy_deep (buffer);

  gst_buffer_map (app_buffer, &map, GST_MAP_WRITE);

  /* Here You modify your buffer data */
  modify_in_data (app_buffer);

  /* get source an push new buffer */
  source = gst_bin_get_by_name (GST_BIN (data->sink), "testsource");
  ret = gst_app_src_push_buffer (GST_APP_SRC (source), app_buffer);
  gst_sample_unref (sample);
  /* we don't need the appsink sample anymore */
  gst_buffer_unmap (app_buffer, &map);
  gst_object_unref (source);

  return ret;
}

/* called when we get a GstMessage from the source pipeline when we get EOS, we
 * notify the appsrc of it. */
static gboolean
on_source_message (GstBus * bus, GstMessage * message, ProgramData * data)
{
  GstElement *source;
  g_print ("%s\n", __func__);

  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_EOS:
      g_print ("The source got dry\n");
      source = gst_bin_get_by_name (GST_BIN (data->sink), "testsource");
      gst_app_src_end_of_stream (GST_APP_SRC (source));
      gst_object_unref (source);
      break;
    case GST_MESSAGE_ERROR:
      g_print ("Received error\n");
      g_main_loop_quit (data->loop);
      break;
    default:
      break;
  }
  return TRUE;
}

/* called when we get a GstMessage from the sink pipeline when we get EOS, we
 * exit the mainloop and this testapp. */
static gboolean
on_sink_message (GstBus * bus, GstMessage * message, ProgramData * data)
{
  /* nil */
  g_print ("%s\n", __func__);
  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_EOS:
      g_print ("Finished playback\n");
      g_main_loop_quit (data->loop);
      break;
    case GST_MESSAGE_ERROR:
      g_print ("Received error\n");
      g_main_loop_quit (data->loop);
      break;
    default:
      break;
  }
  return TRUE;
}

int
main (int argc, char *argv[])
{
  ProgramData *data = NULL;
  gchar *string = NULL;
  GstBus *bus = NULL;
  GstElement *testsink = NULL;
  GstElement *testsource = NULL;

  gst_init (&argc, &argv);

  data = g_new0 (ProgramData, 1);

  data->loop = g_main_loop_new (NULL, FALSE);

  /* setting up source pipeline, we read from a file and convert to our desired
   * caps. */
  string =
      g_strdup_printf
      ("videotestsrc num-buffers=5 ! video/x-raw, width=640, height=480, format=RGB ! appsink name=testsink");
  data->source = gst_parse_launch (string, NULL);
  g_free (string);

  if (data->source == NULL) {
    g_print ("Bad source\n");
    g_main_loop_unref (data->loop);
    g_free (data);
    return -1;
  }

  g_print ("Capture bin launched\n");
  /* to be notified of messages from this pipeline, mostly EOS */
  bus = gst_element_get_bus (data->source);
  gst_bus_add_watch (bus, (GstBusFunc) on_source_message, data);
  gst_object_unref (bus);

  /* we use appsink in push mode, it sends us a signal when data is available
   * and we pull out the data in the signal callback. */
  testsink = gst_bin_get_by_name (GST_BIN (data->source), "testsink");
  g_object_set (G_OBJECT (testsink), "emit-signals", TRUE, "sync", FALSE, NULL);
  g_signal_connect (testsink, "new-sample",
      G_CALLBACK (on_new_sample_from_sink), data);
  gst_object_unref (testsink);

  /* setting up sink pipeline, we push video data into this pipeline that will
   * then copy in file using the default filesink. We have no blocking
   * behaviour on the src which means that we will push the entire file into
   * memory. */
  string =
      g_strdup_printf ("appsrc name=testsource ! filesink location=tmp.yuv");
  data->sink = gst_parse_launch (string, NULL);
  g_free (string);

  if (data->sink == NULL) {
    g_print ("Bad sink\n");
    gst_object_unref (data->source);
    g_main_loop_unref (data->loop);
    g_free (data);
    return -1;
  }
  g_print ("Play bin launched\n");

  testsource = gst_bin_get_by_name (GST_BIN (data->sink), "testsource");
  /* configure for time-based format */
  g_object_set (testsource, "format", GST_FORMAT_TIME, NULL);
  /* uncomment the next line to block when appsrc has buffered enough */
  /* g_object_set (testsource, "block", TRUE, NULL); */
  gst_object_unref (testsource);

  bus = gst_element_get_bus (data->sink);
  gst_bus_add_watch (bus, (GstBusFunc) on_sink_message, data);
  gst_object_unref (bus);

  g_print ("Going to set state to play\n");
  /* launching things */
  gst_element_set_state (data->sink, GST_STATE_PLAYING);
  gst_element_set_state (data->source, GST_STATE_PLAYING);

  /* let's run !, this loop will quit when the sink pipeline goes EOS or when an
   * error occurs in the source or sink pipelines. */
  g_print ("Let's run!\n");
  g_main_loop_run (data->loop);
  g_print ("Going out\n");

  gst_element_set_state (data->source, GST_STATE_NULL);
  gst_element_set_state (data->sink, GST_STATE_NULL);

  gst_object_unref (data->source);
  gst_object_unref (data->sink);
  g_main_loop_unref (data->loop);
  g_free (data);

  return 0;
}


Walkthrough

typedef struct
{
  GMainLoop *loop;
  GstElement *source;
  GstElement *sink;
} ProgramData;

We start, as usual, putting all our variables in a structure, so we can pass it around to functions. For this tutorial, we need the sink and source elements. Also, we are going to use a sink event when data available on sink pad, so we need a GLib's main loop object.


 gst_init (&argc, &argv);
 
 data = g_new0 (ProgramData, 1);

 data->loop = g_main_loop_new (NULL, FALSE);

 First, GStreamer is initialized through gst_init().

We create a GMainLoop(GLib main loop) and set it running with g_main_loop_run(). This function blocks and will not return until g_main_loop_quit() is issued. In the meantime, it will call the callbacks we have registered a message appears on the bus.



  /* setting up source pipeline, we read from a file and convert to our desired
   * caps. */
  string =
      g_strdup_printf
      ("videotestsrc num-buffers=5 ! video/x-raw, width=640, height=480, format=RGB ! appsink name=testsink");
  data->source = gst_parse_launch (string, NULL);
  g_free (string);


Now we can construct the first half of the pipeline. gst_parse_launch creates a new pipeline based on input string. Please note that you might get a return value that is not NULL even though the error is set. In this case there was a recoverable parsing error and you can try to play the pipeline. In this case, the pipeline is composed of videotestsrc which run for 5 buffers and generate 640X480 RGB frames.  

  testsink = gst_bin_get_by_name (GST_BIN (data->source), "testsink");
  g_object_set (G_OBJECT (testsink), "emit-signals", TRUE, "sync", FALSE, NULL);
  g_signal_connect (testsink, "new-sample",
      G_CALLBACK (on_new_sample_from_sink), data);
  gst_object_unref (testsink);  

We use appsink in push mode, it sends us a signal when data is available and we pull out the data in the signal callback, on_new_sample_from_sink(). Then setting up sink pipeline, we push video data into this pipeline that will then copy in file using the default filesink. We have no blocking behaviour on the src which means that we will push the entire file into memory.

  bus = gst_element_get_bus (data->source);
  gst_bus_add_watch (bus, (GstBusFunc) on_source_message, data); 

  bus = gst_element_get_bus (data->sink);
  gst_bus_add_watch (bus, (GstBusFunc) on_sink_message, data);   

Adds a bus watch to the default main context. In our application these are use to be notified of messages from this pipeline, mostly EOS.


  g_print ("Going to set state to play\n");
  /* launching things */
  gst_element_set_state (data->sink, GST_STATE_PLAYING);
  gst_element_set_state (data->source, GST_STATE_PLAYING);

Finally, the pipelines are started. Once the pipelines are started, the first one begins pushing buffers into the appsink element. on_new_sample_from_sink() is called when Gstremer got signal of new data on bus.

on_new_sample_from_sink (GstElement * elt, ProgramData * data)
{


  /* get the sample from appsink */
  sample = gst_app_sink_pull_sample (GST_APP_SINK (elt));
  buffer = gst_sample_get_buffer (sample);

  /* make a copy */
  app_buffer = gst_buffer_copy_deep (buffer);

  gst_buffer_map (app_buffer, &map, GST_MAP_WRITE);

  /* Here You modify your buffer data */
  modify_in_data (app_buffer);

  /* get source an push new buffer */
  source = gst_bin_get_by_name (GST_BIN (data->sink), "testsource");
  ret = gst_app_src_push_buffer (GST_APP_SRC (source), app_buffer);

}

This is the main function of our application which get the sample from appsink and make a copy of input buffer and then  modify in modify_in_data(). After modification the new buffer is pushed to source and old is unmapped. In modify_in_data we just make half of the input frame to white.

Conclusion

This tutorial has shown:

  • How to launch pipeline with simple gst_parse_launch().

  • How to configure emit-signals.

  • How to retrieve a particular data using signal callback.

  • How to modify the data and push back to pipe.

Remember that the above code is referred from appsink-src.c example and you get more information from below pages:

  1. Short-cutting the pipeline
  2. Pipeline manipulation
  3. appsink-src.c

It has been a pleasure having you here, and see you soon!


© Copyright 2019 - 2022 Xilinx Inc. Privacy Policy