Chapter 8: Complex Event Processing with StreamInsight


This chapter is taken from book "Introducing Microsoft SQL Server 2008 R2" by Ross Mistry and Stacia Misner published for Microsoft Press.

Microsoft SQL Server StreamInsight is a complex event processing (CEP) engine. This technology is a new offering in the SQL Server family, making its first appearance in SQL Server 2008 R2. It ships with the Standard, Enterprise, and Datacenter editions of SQL Server 2008 R2. StreamInsight is both an engine built to process high-throughput streams of data with low latency and a Microsoft .NET Framework platform for developers of CEP applications. The goal of a CEP application is to rapidly aggregate high volumes of raw data for analysis as it streams from point to point. You can apply analytical techniques to trigger a response upon crossing a threshold or to find trends or exceptions in the data without first storing it in a data warehouse.

Complex Event Processing

Complex event processing is the task of sifting through streaming data to find meaningful information. It might involve performing calculations on the data to derive information, or the information might be the revelation of significant trends. As a development platform, StreamInsight can support most types of CEP applications that you might need.

Complex Event Processing Applications

There are certain industries that regularly produce high volumes of streaming data. Manufacturing and utilities companies use sensors, meters, and other devices to monitor processes and alert users when the system identifies events that could lead to a potential failure. Financial trading firms must monitor market prices for stocks, commodities, and other financial instruments and rapidly calculate profits or losses based on changing conditions.

Similarly, there are certain types of applications that benefit from the ability to analyze data as close as possible to the time that the applications capture the data. For example, companies selling products online often use clickstream analysis to change the page layout and site navigation and to display targeted advertising while a user remains connected to a site. Credit card companies monitor transactions for exceptions to normal spending activities that could indicate fraud.

The challenge with CEP arises when you need to process and analyze the data before you have time to perform ETL activities to move the data into a more traditional analytical environment, such as a data warehouse. In CEP applications, the value of the information derived from low-latency processing, defined in milliseconds, can be extremely high. This value begins to diminish as the data ages. Adding to the challenge is the rate at which source applications generate data, often tens of thousands of records per second.

StreamInsight Highlights

StreamInsight's CEP server includes a core engine that is built to process high-throughput data. The engine achieves high performance by executing highly parallel queries and using in-memory caches to avoid incurring the overhead of storing data for processing. The engine can handle data that arrives at a steady rate or in intermittent bursts, and can even rearrange data that arrives out of sequence. Queries can also incorporate nonstreaming data sources, such as master reference data or historical data maintained in a data warehouse.

You write your CEP applications using a .NET language, such as Visual Basic or C#, for rapid application development. In your applications, you embed declarative queries using Language Integrated Query (LINQ) expressions to process the data for analysis.

StreamInsight also includes other tools for administration and development support. The CEP server has a management interface and diagnostic views that you can use to develop applications to monitor StreamInsight. For development support, StreamInsight includes an event flow debugger that you can use to troubleshoot queries. An example of a situation that might require troubleshooting is the arrival of a larger number of events than expected.

StreamInsight Architecture

As with any new technology, you will find it helpful to have an understanding of the Stream- Insight architecture before you begin development of your first CEP application. Your application must restructure data streams to a format usable by the processing engine. You use adapters to perform this restructuring before passing the data to queries that run on the CEP server. The way you choose to develop your application also depends on the deployment model you use to implement StreamInsight.

Data Structures

The high-throughput data that StreamInsight requires is known as a stream. More specifically, a stream is a collection of data that changes over time. For example, a Web log contains data about each server hit, including the date, time, page request, and Internet protocol (IP) address of the visitor. If a visitor clicks on several pages in the Web site, the Web log contains multiple lines, or hits, for the same visitor, and each line records a different time. The information in the Web log shows how each user's activity in a Web site changes over time, which is why this type of information is considered a stream. You can query this stream to find the average number of hits or the top five referring sites over time.

StreamInsight splits a stream into individual units called events. An event contains a header and a payload. The event header includes the event kind and one or more timestamps for the event. The event kind is an indicator of a new event or the completeness of events already in the stream. The payload contains the event's data as a .NET data structure.

There are three types of event models that StreamInsight uses. The interval event model represents events with a fixed duration, such as a stock bid price that is valid only for a certain period of time. The edge event model is another type of duration model, but it represents an event with a duration that is unknown at the time the event starts, such as a Web user session. The point model represents events that occur at a specific point in time, such as a Web user's click entry in a Web log.

The CEP Server

The CEP server is a run-time engine and a set of adapter instances that receive and send events, as shown in Figure 8-1. You develop these adapters in a .NET language and register the assemblies on the CEP server, which then instantiates the adapters at run time. Input adapters receive data as a continuous stream from event stores, such as sensors on a factory floor, Web servers, data feeds, or databases. The data passes from the input adapter to the CEP engine, which processes and transforms the data by using standing queries, which are query instances that the CEP engine manages. The engine then forwards the query results to output adapters, which connect to event consumers, such as pagers, monitoring devices, dashboards, and databases. The output adapters can also include logic to trigger a response based on the query results.

Figure-8.1.gif

FIGURE 8-1 StreamInsight architecture

Input Adapters

The input adapters translate the incoming events into the event format that the CEP engine requires. You can create a typed adapter if the source produces a single event type only, but you must create an untyped adapter when the payload format differs across events or is unknown in advance. In the case of the typed adapter, the payload format is defined in advance with a static number of fields and data types when you implement the adapter. By contrast, an untyped adapter receives the payload format only when the adapter binds to the query (as part of a configuration specification). In the latter case, the number of fields and data types can vary with each query instantiation.

Output Adapters

The output adapters reverse the operations of the input adapters by translating events into a format that is usable by the target device and then sending the translated data to the device. The development process for an output adapter is very similar to the process you use to develop an input adapter.

Query Instances

Standing queries receive the stream of data from an input adapter, apply business logic to the data (such as an aggregation), and send the results as an event stream to an output adapter. You encapsulate the business logic used by a standing query instance in a query template that you develop using a combination of LINQ and a .NET language. To create the standing query instance in the CEP server, you bind a query template with specific input and output. You can use the same query template with multiple standing queries. After you instantiate a query, you are can start, stop, or manage it.

Deployment Models

You have two options for deploying StreamInsight. You can integrate the CEP server into an application as a hosted assembly, or you can deploy it as a standalone server.

Hosted Assembly

Embedding the CEP server into a host application is a simple deployment approach. You have greater flexibility than you would have with a standalone server because there are no dependencies between applications that you must consider before making changes. Each application and the CEP server run as a single process, which may be easier to manage on your server.

You can use any of the development approaches described later, in the "Application Development" section of this chapter, when hosting the CEP server in your application. However, if you decide later that you want your application to run on a standalone server, you will need to rewrite your application using the explicit server development model.

Standalone Server

You should deploy the CEP server as a standalone server when applications need to share event streams or metadata objects. For example, you can reuse event types, adapter types, and query templates and thereby minimize the impact of changes to any of these metadata objects across applications by maintaining a single copy. You can run the CEP server as an executable, or you can configure it as a Windows service. If you want to run it as a service application, you can use StreamInsightHost.exe as a host process or develop your own host process.

If you choose to deploy CEP as a standalone server, there are some limitations that affect the way you develop applications. First, you can use only the explicit server development model (which is described in the next section of this chapter) when developing CEP applications for a standalone server. Second, you must connect to the CEP server by using the Web service Uniform Resource Identifier (URI) of the CEP server host process.

Application Development

You start the typical development cycle for a new CEP application by sampling the existing data streams and developing functions to process the data. You then test the functions, review the results, and determine the changes necessary to improve the functions. This process continues in an iterative fashion until you complete development.

As part of the development of your CEP application, you create event types, adapters, and query templates. The way you use these objects depends on the development model you choose. When you develop using the explicit server development model, you explicitly create and register all of these objects and can reuse these objects in multiple applications. In the implicit server development model, you concentrate on the development of the query logic and rely on the CEP server to act as an implicit host and to create and register the necessary objects.

TIP You can locate and download sample applications by searching for StreamInsight at CodePlex (http://www.codeplex.com).

Event Types

An event type defines events published by the event source or consumed by the event consumer. You use event types with a typed adapter or as objects in LINQ expressions that you use in query templates. You create an event type as a .NET Framework class or structure by using only public fields and properties as the payload fields, like this:

        public class sampleEvent
        {
            public string eventId { get; set; }
            public double eventValue { get; set; }
        }


An event type can have no more than 32 payload fields. Payload fields must be only scalar or elementary CLR types. You can use nullable types, such as int? instead of int. The string and byte[] types are always nullable.

You do not create an event type when your application uses untyped adapters for scenarios that must support multiple event types. For example, an input adapter for tables in a SQL

Server database must adapt to the schema of the table that it queries. Instead, you provide the table schema in a configuration specification when the adapter is bound to the query. Conversely, an untyped output adapter receives the event type description, which contains a list of fields, when the query starts. The untyped output adapter must then map the event type to the schema of the destination data source, typically in a configuration specification.

Adapters

Input and output adapters provide transformation interfaces between event sources, event consumers, and the CEP server. Event sources can push events to event consumers, or event consumers can pull events from event sources. Either way, the CEP application operates between these two points and intercepts the events for processing. The input adapter reads events from the source, transforms them into a format recognizable by the CEP server, and provides the transformed events to a standing query. As the CEP server processes the event stream, the output adapter receives the resulting new events, transforms them for the event consumers, and then delivers the transformed events.

Before you can begin developing an adapter, you must know whether you are building an input or output adapter. You must also know the event type, which in this context means you must understand the structure of the event payload and how the application timestamps affect stream processing. The .NET class or structure of the event type provides you with information about the event payload if you are building a typed adapter. The information necessary for the management of stream processing, known as event metadata, comes from an interface in the adapter API when it creates an event. In addition to knowing the event payload and event metadata, you must also know whether the shape of the event is a point, interval, or edge model. Having this information available allows you to choose the applicable base class. The adapter base classes are listed in Table 8-1.

TABLE 8-1 Adapter base classes

Table-8.1.gif

If you are developing an untyped input adapter, you must ensure that it can use the configuration specification during query bind time to determine the event's field types by inference from the query's SELECT statement. You must also add code to the adapter to populate


the fields one at a time and enqueue the event. The untyped output adapter works similarly, but instead it must be able to use the configuration specification to retrieve query processing results from a dequeued event.

The next step is to develop an AdapterFactory object as a container class for your input and output adapters. You use an AdapterFactory object to share resources between adapter implementations and to pass configuration parameters to adapter constructors. Recall that an untyped adapter relies on the configuration specification to properly handle an event's payload structure. The adapter factory must implement the Create() and Dispose() methods as shown in the following code example, which shows how to create adapters for events in a text file:

        public class TextFileInputFactory : IInputAdapterFactory<TextFileInputConfig>
        {
            public InputAdapterBase Create(TextFileInputConfig configInfo,
            EventShape eventShape, CepEventType cepEventType)
            {
                InputAdapterBase adapter = default(InputAdapterBase);
                if (eventShape == EventShape.Point)
                {
                    adapter = new TextFilePointInput(configInfo, cepEventType);
                }
                else if (eventShape == EventShape.Interval)
                {
                    adapter = new TextFileIntervalInput(configInfo, cepEventType);
                }
                else if (eventShape == EventShape.Edge)
                {
                    adapter = new TextFileEdgeInput(configInfo, cepEventType);
                }
                else
                {
                    throw new ArgumentException(
                    string.Format(CultureInfo.InvariantCulture,
                    "TextFileInputFactory cannot instantiate adapter with event shape {0}",
                    eventShape.ToString()));
                }
                return adapter;
            }
            public void Dispose()
            {
            }
        }


The final step is to create a .NET assembly for the adapter. At minimum, the adapter includes a constructor, a Start() method, a Resume() method, and either a ProduceEvents() or ConsumeEvents() method, depending on whether you are developing an input adapter or an output adapter. You can see the general structure of the adapter class in the following code example:

        public class TextFilePointInput : PointInputAdapter
        {
            public TextFilePointInput(TextFileInputConfig configInfo,
            CepEventType cepEventType)
{ ... }
            public override void Start()
{ ... }
            public override void Resume()
{ ... }
            private void ProduceEvents()
{ ... }
        }

Using the constructor method for an untyped adapter, such as TextFilePointInput as in the example, you can pass the configuration parameters from the adapter factory and the event type object that passes from the query binding. The constructor also includes code to connect to the event source and to map fields to the event payload. After the CEP server instantiates the adapter, it invokes the Start() method, which generally calls the ProduceEvents() or ConsumeEvents()method to begin receiving streams. The Resume() method invokes the ProduceEvents() or ConsumeEvents() method again if the CEP server paused the streaming and confirms that the adapter is ready.

The core transformation and queuing of events occurs in the ProduceEvents() method. This method iterates through either reading the events it is receiving from the source or writing events it is sending to the event consumer. It makes calls as necessary to push or pull events into or from the event stream using calls to Enqueue() or Dequeue(). Calls to Enqueue() and Dequeue() return the state of the adapter. If Enqueue() returns FULL or Dequeue() returns EMPTY, the adapter transitions to a suspended state and can no longer produce or consume events. When the adapter is ready to resume, it calls Ready(), which then causes the server to call Resume(), and the cycle of enqueuing and dequeuing begins again from the point in time at which the adapter was suspended.

Another task the adapter must perform is classification of an event. That is, the adapter must specify the event kind as either INSERT or Current Time Increment (CTI). The adapter adds events with the INSERT event kind to the stream as it receives data from the source. It uses the CTI event kind to ignore any additional INSERT events it receives afterward that have a start time earlier than the timestamp of the CTI event.

Query Templates

Query templates encapsulate the business logic that the CEP server instantiates as a standing query instance to process, filter, and aggregate event streams. To define a query template, you first create an event stream object. In a standalone server environment, you can create and register a query template as an object on the CEP server for reuse.

The Event Stream Object

You can create an event stream object from an unbound stream or a user-defined input adapter factory.

You might want to develop a query template to register on the CEP server without binding it to an adapter. In this case, you can use the Create() method of the EventStream class to obtain an event stream that has a defined shape, but without binding information. To do this, you can adapt the following code:

CepStream<PayloadType> inputStream = CepStream<PayloadType>.Create("inputStream");

If you are using the implicit server development model, you can create an event stream object from an input adapter factory and an input configuration. With this approach, you do not need to implement an adapter, but you must specify the event shape. The following example illustrates the syntax to use:

CEPStream<PayloadType> inputStream =
CepStream<PayloadType>.Create(streamName, typeof(AdapterFactory), myConfig,
EventShape.Point);


The QueryTemplate Object

When you use the explicit server development model for standalone server deployment, you can create a QueryTemplate object that you can reuse in multiple bindings with different input and output adapters. To create a QueryTemplate object, you use code similar to the following example:

QueryTemplate myQueryTemplate = application.CreateQueryTemplate("myQueryTemplate",
outputStream);

Queries

After you create an event stream object, you write a LINQ expression on top of the event  stream object. You use LINQ expressions to define the fields for output events, to filter events before query processing, to group events into subsets, and to perform calculations, aggregations, and ranking. You can even use LINQ expressions to combine events from multiple streams through join or union operations. Think of LINQ expressions as the questions you ask of the streaming data.

Projection

The projection operation, which occurs in the select clause of the LINQ expression, allows you to add more fields to the payload or apply calculations to the input event fields. You then project the results into a new event by using field assignments. You can create a new event type implicitly in the expressions, or you can refer to an existing event type explicitly. Consider an example in which you need to increment the fields x and y from every event in the inputStream stream by one. The following code example shows how to use field assignments to implicitly define a new event type by using projection:

            var outputStream = from e in inputStream
            select new { x = e.x + 1, y = e.y + 1 };
 

To refer to an existing event type, you cannot use the type's constructor; you must use field assignments in an expression. For example, assume you have an existing event type called myEventType. You can change the previous code example as shown here to reference the event type explicitly:

            var outputStream = from e in inputStream
            select new myEventType { x = e.x + 1, y = e.y + 1 };


Filtering

You use a filtering operation on a stream when you want to apply operations to a subset of events and discard all other events. All events for which the expression in the where clause evaluates as true pass to the output stream. In the following example, the query selects events where the value in field x equals 5:

            var outputStream = from e in inputStream
                               where e.x == 5
                               select e;

Event Windows

A window represents a subset of data from an event stream for a period of time. After you create a stream of windows, you can perform aggregation, TopK (a LINQ operation described later in this chapter), or user-defined operations on the events that the windows contain. For example, you can count the number of events in each window.

You might be inclined to think of a window as a way to partition the event stream by time. However, the analogy between a window and a partition is useful only up to a point. When you partition records in a table, a record belongs to one and only one partition, but an event can appear in multiple windows based on its start time and end time. That is, the window that covers the time period that includes an event's start time might not include the event's end time. In that case, the event appears in each subsequent window, with the final window covering the period that includes the event's end time. Therefore, you should instead think of a window as a way to partition time that is useful for performing operations on events occurring between the two points of time that define a window.

In Figure 8-2, each unlabeled box below the input stream represents a window and contains multiple events for the period of time that the window covers. In this example, the input stream contains three events, but the first three windows contain two events and the last window contains only one event. Thus, a count aggregation on each window yields results different from a count aggregation on an input stream.

Figure-8.2.gif

FIGURE 8-2 Event windows in an input stream

As you might guess, the key to working with windows is to have a clear understanding of the time span that each window covers. There are three types of window streams that Stream- Insight supports—hopping windows, snapshot windows, and count windows. In a hopping windows stream, each window spans an equal time period. In a snapshot windows stream, the size of a window depends on the events that it contains. By contrast, the size of a count windows stream is not fixed, but varies according to a specified number of consecutive event start times.

To create a hopping window, you specify both the time span that the window covers (also known as window size) and the time span between the start of one window and the start of the next window (also known as hop size). For example, assume that you need to create windows that cover a period of one hour, and a new window starts every 15 minutes, as shown in Figure 8-3. In this case, the window size is one hour and the hop size is 15 minutes. Here is the code to create a hopping windows stream and count the events in each window:

            var outputStream = from eventWindow in
                                   inputStream.HoppingWindow(TimeSpan.FromHours(1), TimeSpan.FromMinutes(15))
                               select new { count = eventWindow.Count() };


Figure-8.3.gif

FIGURE 8-3 Hopping windows

When there are no gaps and there is no overlap between the windows in the stream, hopping windows are also called tumbling windows. Figure 8-2, shown earlier, provides an example of tumbling windows. The window size and hop size are the same in a tumbling

windows stream. Although you can use the HoppingWindow method to create tumbling windows, there is a TumblingWindow method. The following code illustrates how to count events in tumbling windows that occur every half hour.

            var outputStream = from eventWindow in
                                   inputStream.TumblingWindow(TimeSpan.FromMinutes(30))
                               select new { count = eventWindow.Count() };


Snapshot windows are similar to tumbling windows in that the windows do not overlap, but whereas fixed points in time determine the boundaries of a tumbling window, events define the boundaries of a snapshot window. Consider the example in Figure 8-4. At the start of the first event, a new snapshot window starts. That window ends when the second event starts, and a second snapshot window starts and includes both the first and second event. When the first event ends, the second snapshot also ends, and a third snapshot window starts. Thus, the start and stop of an event triggers the start and stop of a window. Because events determine the size of the window, the Snapshot method takes arguments, as shown in the following code, which counts events in each window:

            var outputStream = from eventWindow in inputStream.Snapshot()
                               select new { count = eventWindow.Count() };


Figure-8.4.gif

FIGURE 8-4 Snapshot windows

Count windows are completely different from the other window types because the size of the windows is variable. When you create windows, you provide a parameter n as a count of events to fulfill within a window. For example, assume n is 2 as shown in Figure 8-5. The first window starts when the first event starts and ends when the second event starts, because a count of 2 events fulfills the specification. The second event also resets the counter to 1 and starts a new window. The third event increments the counter to 2, which ends the second window.

Figure-8.5.gif

FIGURE 8-5 Count windows

Aggregations

You cannot perform aggregation operations on event streams directly; instead you must first create a window to group data into periods of time that you can then aggregate. You then create an aggregation as a method of the window and, for all aggregations except Count, use a lambda expression to assign the result to a field.

StreamInsight supports the following aggregation functions:

  • Avg
  • Sum
  • Min
  • Max
  • Count

Assume you want to apply the Sum and Avg aggregations to field x in an input stream. The following example shows you how to use these aggregations as well as the Count aggregation for each snapshot window:

            var outputStream = from eventWindow in inputStream.Snapshot()
                               select new
                               {
                                   sum = eventWindow.Sum(e => e.x),
                                   avg = eventWindow.Avg(e => e.x),
                                   count = eventWindow.Count()
                               };


TopK

A special type of aggregation is the TopK operation, which you use to rank and filter events in an ordered window stream. To order a window stream, you use the orderby clause. Then you use the Take method to specify the number of events that you want to send to the output stream, discarding all other events. The following code shows how to produce a stream of the top three events:

            var outputStream = (from eventWindow in inputStream.Snapshot()
                                from e in eventWindow
                                orderby e.x ascending, e.y descending
                                select e).Take(3);


When you need to include the rank in the output stream, you use projection to add the rank to each event's payload. This is accessible through the Payload property, as shown in the following code:

            var outputStream = (from eventWindow in inputStream.Snapshot()
                                from e in eventWindow
                                orderby e.x ascending, e.y descending
                                select e).Take(3, e => new { x = e.Payload.x, y = e.Payload.y, rank = e.Rank });


Grouping

When you want to compute operations on event groups separately, you add a group by clause. For example, you might want to produce an output stream that aggregates the input stream by location and compute the average for field x for each location. In the following example, the code illustrates how to create the grouping by location and how to aggregate events over a specified column:

            var outputStream = from e in inputStream
                               group e by e.locationID into eachLocation
                               from eventWindow in eachLocation.Snapshot()
                               select new { avgValue = eventWindow.Avg(e => e.x), locationId = eachGroup.Key };


Joins

You can use a join operation to match events from two streams. The CEP server first matches events only if they have overlapping time intervals, and then applies the conditions that you specify in the join predicate. The output of a join operation is a new event that combines payloads from the two matched events. Here is the code to join events from two input streams, where field x is the same value in each event. This code creates a new event containing fields x and y from the first event and field y from the second event.

            var outputStream = from e1 in inputStream1
                               join e2 in inputStream2
                               on e1.x equals e2.x
                               select new { e1.x, e1.y, e2.y };


Another option is to use a cross join, which combines all events in the first input stream with all events in the second input stream. You specify a cross join by using a from clause for each input stream and then creating a new event that includes fields from the events in each stream. By adding a where clause, you can filter the events in each stream before the CEP server performs the cross join. The following example selects events with a value for field x  greater than 5 from the first stream and selects events with a value for field y less than 20 from the second stream, performs the cross join, and then creates a stream of new events containing field x from the first event and field y from the second event:

            var outputStream = from e1 in inputStream1
                               from e2 in inputStream2
                               where e1.x > 5 && e2.y < 20
                               select new { e1.x, e2.y };


Unions

You can also combine events from multiple streams by performing a union operation. You can work with only two streams at a time, but you can cascade a series of union operations if you need to combine events from three or more streams, as shown in the following code:

            var outputStreamTemp = inputStream1.Union(inputStream2);
            var outputStream = outputStreamTemp.Union(inputStream3);
 

User-defined Functions

When you need to perform an operation that the CEP server does not natively support, you can create user-defined functions (UDFs) by reusing existing .NET functions. You add a UDF to the CEP server in the same way that you add an adapter. You can then call the UDF anywhere in your query where an expression can be used, such as in a filter predicate, a join predicate, or a projection.

Query Template Binding

The method that the CEP server uses to instantiate the query template as a standing query depends on the development model that you use. If you are using the explicit server development model, you create a query binder object, but you create an event stream consumer object if you are using the implicit server development model.

The Query Binder Object

In the explicit server development model, you first create explicit input and output adapter objects. Next you create a query binder object as a wrapper for the query template object on the CEP server, which in turn you bind to the input and output adapters, and then you call the CreateQuery() method to create the standing query, as shown here:

            QueryBinder myQuerybinder = new QueryBinder(myQueryTemplate);
            myQuerybinder.BindProducer("querySource", myInputAdapter, inputConf,
            EventShape.Point);
            myQuerybinder.AddConsumer("queryResult", myOutputAdapter, outputConf,
            EventShape.Point, StreamEventOrder.FullyOrdered);
            Query myQuery = application.CreateQuery("query", myQuerybinder, "query description");


Rather than enqueuing CTIs in the input adapter code, you can define the CTI behavior by using the AdvanceTimeSettings class as an optional parameter in the BindProducer method. For example, to send a CTI after every 10 events, set the CTI's timestamp as the most recent event's timestamp, and drop any event that appears later in the stream but has an end timestamp earlier than the CTI, use the following code:

            var ats = new AdvanceTimeSettings(10, TimeSpan.FromSeconds(0),
            AdvanceTimePolicy.Drop);
            queryBinder.BindProducer("querysource", myInputAdapter, inputConf,
            EventShape.Interval, ats);


The Event Stream Consumer Object

After you define the query logic in an application that uses the implicit server development model, you can use the output adapter factory to create an event stream consumer object. You can pass this object directly to the CepStream.ToQuery() method without binding the query template to the output adapter, as you can see in the following example:

            Query myQuery = outputStream.ToQuery<ResultType>(typeof(MyOutputAdapterFactory),
            outputConf, EventShape.Interval, StreamEventOrder.FullyOrdered);


The Query Object

In both the explicit and implicit development models, you create a query object. With that object instantiated, you can use the Start() and Stop() methods. The Start() method instantiates the adapters using the adapter factories, starts the event processing engine, and calls the Start() methods for each adapter. The Stop() method sends a message to the adapters that the query is stopping and then shuts down the query. Your application must include the following code to start and stop the query object:

            query.Start();
            // wait for signal to complete the query
            query.Stop();

The Management Interface

StreamInsight includes the ManagementService API, which you can use to create diagnostic views for monitoring the CEP server's resources and the queries running on the server. Another option is to use Windows PowerShell to access diagnostic information.

Diagnostic Views

Your diagnostic application can retrieve static information, such as object property values, and statistical information, such as a cumulative event count after a particular point in time or an aggregate count of events from child objects. Objects include the server, input and output adapters, query operators, schedulers, and event streams. You can retrieve the desired information by using the GetDiagnosticView() method and passing the object's URI as a method argument.

If you are monitoring queries, you should understand the transition points at which the server records metrics about events in a stream. The name of a query metric identifies the transition point to which the metric applies. For example, Total Outgoing Event Count provides the total number of events that the output adapter has dequeued from the engine. The following four transition points relate to query metrics:

  • Incoming The event arrival at the input adapter
  • Consumed The point at which the input adapter enqueues the event into the engine
  • Produced The point at which the event leaves the last query operator in the engine
  • Outgoing The event departure from the output adapter

Windows PowerShell Diagnostics

For quick analysis, you can use Windows PowerShell scripts to view diagnostic information rather than writing a complete diagnostic application. Before you can use a Windows PowerShell script, the StreamInsight server must be running a query. If the server is running as a hosted assembly, you must expose the Web service.

You start the diagnostic process by loading the Microsoft.ComplexEventProcessing assembly from the Global Assembly Cache (GAC) into Windows PowerShell by using the following code:

PS C:\>
[System.Reflection.Assembly]::LoadWithPartialName("Microsoft.ComplexEventProcessing")

Then you need to create a connection to the StreamInsight host process by using the code in this example:

PS C:\> $server =
Microsoft.ComplexEventProcessing.Server]::Connect("http://localhost/StreamInsight")

Then you can use the GetDiagnosticView() method to retrieve statistics for an object, such as the Event Manager, as shown in the following code:

PS C:\> $dv = $server.GetDiagnosticView("cep:/Server/EventManager")
PS C:\> $dv

To retrieve information about a query, you must provide the full name, following the StreamInsight hierarchical naming schema. For example, for an application named myApplication with a query named myQuery, you use the following code:

PS C:\> $dv =
$server.GetDiagnosticView("cep:/Server/Application/myApplication/Query/myQuery")
PS C:\> $dv

NOTE For a complete list of metrics and statistics that you can query by using diagnostic views, refer to the SQL Server Books Online topic "Monitoring the CEP Server and Queries" at http://msdn.microsoft.com/en-us/library/ee391166(SQL.105).aspx.

Up Next
    Ebook Download
    View all
    Learn
    View all